from __future__ import annotations import re from dataclasses import dataclass, field from datetime import datetime from typing import Any, Callable from urllib.parse import urlencode from django.contrib.auth.mixins import LoginRequiredMixin from django.db import models from django.db.models import Q from django.shortcuts import render from django.urls import reverse from django.views import View from core.models import Group, Manipulation, Message, Person, Persona, PersonIdentifier from mixins.views import ObjectList _QUERY_MAX_LEN = 400 _QUERY_ALLOWED_PATTERN = re.compile(r"[\w\s@\-\+\.:,#/]+", re.UNICODE) def _sanitize_search_query(value: str) -> str: raw = str(value or "").strip() if not raw: return "" trimmed = raw[:_QUERY_MAX_LEN] cleaned = "".join(_QUERY_ALLOWED_PATTERN.findall(trimmed)).strip() return cleaned def _safe_page_number(value: Any) -> int: try: page_value = int(value) except (TypeError, ValueError): return 1 return max(1, page_value) def _sanitize_query_state(raw: dict[str, Any]) -> dict[str, str]: cleaned: dict[str, str] = {} for key, value in (raw or {}).items(): key_text = str(key or "").strip() if not key_text or len(key_text) > 80: continue value_text = str(value or "").strip() if not value_text: continue if key_text in {"q", "query"}: value_text = _sanitize_search_query(value_text) elif key_text == "page": value_text = str(_safe_page_number(value_text)) else: value_text = value_text[:200] if value_text: cleaned[key_text] = value_text return cleaned def _context_type(request_type: str) -> str: return "modal" if request_type == "page" else request_type def _column_field_name(column: "OsintColumn") -> str: if column.search_lookup: return str(column.search_lookup).split("__", 1)[0] if column.sort_field: return str(column.sort_field).split("__", 1)[0] return str(column.key) def _safe_icon_class(raw: str | None, default: str) -> str: icon_class = str(raw or "").strip() if not icon_class: return default cleaned_parts = [] for part in icon_class.split(): if not part: continue if all(ch.isalnum() or ch in {"-", "_"} for ch in part): cleaned_parts.append(part) if not cleaned_parts: return default return " ".join(cleaned_parts) def _url_with_query(base_url: str, query: dict[str, Any]) -> str: params = {} for key, value in query.items(): if value is None: continue value_str = str(value).strip() if value_str == "": continue params[key] = value_str if not params: return base_url return f"{base_url}?{urlencode(params)}" def _merge_query(current_query: dict[str, Any], **updates: Any) -> dict[str, Any]: merged = dict(current_query) for key, value in updates.items(): if value is None or str(value).strip() == "": merged.pop(key, None) continue merged[key] = value return merged @dataclass(frozen=True) class OsintColumn: key: str label: str accessor: Callable[[Any], Any] sort_field: str | None = None search_lookup: str | None = None kind: str = "text" @dataclass(frozen=True) class OsintScopeConfig: key: str title: str model: type[models.Model] list_url_name: str update_url_name: str delete_url_name: str default_sort: str columns: tuple[OsintColumn, ...] select_related: tuple[str, ...] = () prefetch_related: tuple[str, ...] = () delete_label: Callable[[Any], str] = str extra_actions: Callable[[Any, str], list[dict[str, Any]]] = field( default_factory=lambda: (lambda _item, _type: []) ) search_lookups: tuple[str, ...] = () def _person_extra_actions(item: Person, _request_type: str) -> list[dict[str, Any]]: return [ { "mode": "link", "url": reverse( "person_identifiers", kwargs={"type": "page", "person": item.id}, ), "icon": "fa-solid fa-eye", "title": "Identifiers", } ] OSINT_SCOPES: dict[str, OsintScopeConfig] = { "people": OsintScopeConfig( key="people", title="People", model=Person, list_url_name="people", update_url_name="person_update", delete_url_name="person_delete", default_sort="name", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="name", label="Name", accessor=lambda item: item.name, sort_field="name", search_lookup="name__icontains", ), OsintColumn( key="sentiment", label="Sentiment", accessor=lambda item: item.sentiment, sort_field="sentiment", ), OsintColumn( key="timezone", label="Timezone", accessor=lambda item: item.timezone, sort_field="timezone", search_lookup="timezone__icontains", ), OsintColumn( key="last_interaction", label="Last Interaction", accessor=lambda item: item.last_interaction, sort_field="last_interaction", kind="datetime", ), ), delete_label=lambda item: item.name, extra_actions=_person_extra_actions, search_lookups=( "id__icontains", "name__icontains", "summary__icontains", "profile__icontains", "revealed__icontains", "likes__icontains", "dislikes__icontains", "timezone__icontains", ), ), "groups": OsintScopeConfig( key="groups", title="Groups", model=Group, list_url_name="groups", update_url_name="group_update", delete_url_name="group_delete", default_sort="name", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="name", label="Name", accessor=lambda item: item.name, sort_field="name", search_lookup="name__icontains", ), OsintColumn( key="people_count", label="People", accessor=lambda item: item.people.count(), kind="number", ), ), prefetch_related=("people",), delete_label=lambda item: item.name, search_lookups=( "id__icontains", "name__icontains", "people__name__icontains", ), ), "personas": OsintScopeConfig( key="personas", title="Personas", model=Persona, list_url_name="personas", update_url_name="persona_update", delete_url_name="persona_delete", default_sort="alias", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="alias", label="Alias", accessor=lambda item: item.alias, sort_field="alias", search_lookup="alias__icontains", ), OsintColumn( key="mbti", label="MBTI", accessor=lambda item: item.mbti, sort_field="mbti", search_lookup="mbti__icontains", ), OsintColumn( key="mbti_identity", label="MBTI Identity", accessor=lambda item: item.mbti_identity, sort_field="mbti_identity", ), OsintColumn( key="humor_style", label="Humor Style", accessor=lambda item: item.humor_style, sort_field="humor_style", search_lookup="humor_style__icontains", ), OsintColumn( key="tone", label="Tone", accessor=lambda item: item.tone, sort_field="tone", search_lookup="tone__icontains", ), OsintColumn( key="trust", label="Trust", accessor=lambda item: item.trust, sort_field="trust", ), OsintColumn( key="adaptability", label="Adaptability", accessor=lambda item: item.adaptability, sort_field="adaptability", ), ), delete_label=lambda item: item.alias or str(item.id), search_lookups=( "id__icontains", "alias__icontains", "mbti__icontains", "humor_style__icontains", "tone__icontains", "communication_style__icontains", "core_values__icontains", "inner_story__icontains", "likes__icontains", "dislikes__icontains", ), ), "manipulations": OsintScopeConfig( key="manipulations", title="Manipulations", model=Manipulation, list_url_name="manipulations", update_url_name="manipulation_update", delete_url_name="manipulation_delete", default_sort="name", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="name", label="Name", accessor=lambda item: item.name, sort_field="name", search_lookup="name__icontains", ), OsintColumn( key="group", label="Group", accessor=lambda item: item.group, sort_field="group__name", search_lookup="group__name__icontains", ), OsintColumn( key="ai", label="AI", accessor=lambda item: item.ai, sort_field="ai__model", search_lookup="ai__model__icontains", ), OsintColumn( key="persona", label="Persona", accessor=lambda item: item.persona, sort_field="persona__alias", search_lookup="persona__alias__icontains", ), OsintColumn( key="enabled", label="Enabled", accessor=lambda item: item.enabled, sort_field="enabled", kind="bool", ), OsintColumn( key="mode", label="Mode", accessor=lambda item: item.mode, sort_field="mode", search_lookup="mode__icontains", ), OsintColumn( key="filter_enabled", label="Filter", accessor=lambda item: item.filter_enabled, sort_field="filter_enabled", kind="bool", ), ), select_related=("group", "ai", "persona"), delete_label=lambda item: item.name, search_lookups=( "id__icontains", "name__icontains", "mode__icontains", "group__name__icontains", "ai__model__icontains", "persona__alias__icontains", ), ), "identifiers": OsintScopeConfig( key="identifiers", title="Identifiers", model=PersonIdentifier, list_url_name="identifiers", update_url_name="identifier_update", delete_url_name="identifier_delete", default_sort="identifier", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="person", label="Person", accessor=lambda item: item.person.name if item.person_id else "", sort_field="person__name", search_lookup="person__name__icontains", ), OsintColumn( key="service", label="Service", accessor=lambda item: item.service, sort_field="service", search_lookup="service__icontains", ), OsintColumn( key="identifier", label="Identifier", accessor=lambda item: item.identifier, sort_field="identifier", search_lookup="identifier__icontains", ), ), select_related=("person",), delete_label=lambda item: item.identifier, search_lookups=( "id__icontains", "person__name__icontains", "service__icontains", "identifier__icontains", ), ), "messages": OsintScopeConfig( key="messages", title="Messages", model=Message, list_url_name="sessions", update_url_name="session_update", delete_url_name="session_delete", default_sort="ts", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="service", label="Service", accessor=lambda item: item.source_service or "", sort_field="source_service", search_lookup="source_service__icontains", ), OsintColumn( key="chat", label="Chat", accessor=lambda item: { "display": "", "copy": item.source_chat_id or "", }, sort_field="source_chat_id", search_lookup="source_chat_id__icontains", kind="chat_ref", ), OsintColumn( key="sender", label="Sender", accessor=lambda item: item.custom_author or item.sender_uuid or "", sort_field="sender_uuid", search_lookup="sender_uuid__icontains", ), OsintColumn( key="text", label="Text", accessor=lambda item: item.text or "", search_lookup="text__icontains", ), OsintColumn( key="ts", label="Timestamp", accessor=lambda item: datetime.fromtimestamp(item.ts / 1000.0), sort_field="ts", kind="datetime", ), ), search_lookups=( "id__icontains", "text__icontains", "source_service__icontains", "source_chat_id__icontains", "sender_uuid__icontains", "custom_author__icontains", "source_message_id__icontains", ), ), } OSINT_SCOPE_ICONS: dict[str, str] = { "all": "fa-solid fa-globe", "people": "fa-solid fa-user-group", "groups": "fa-solid fa-users", "personas": "fa-solid fa-masks-theater", "manipulations": "fa-solid fa-sliders", "identifiers": "fa-solid fa-id-card", "messages": "fa-solid fa-message", } class OSINTListBase(ObjectList): list_template = "partials/osint/list-table.html" paginate_by = 20 osint_scope = "" def get_scope(self) -> OsintScopeConfig: if self.osint_scope not in OSINT_SCOPES: raise ValueError(f"Unknown OSINT scope: {self.osint_scope}") return OSINT_SCOPES[self.osint_scope] def _list_url(self) -> str: list_url_args = {} for arg in self.list_url_args: if arg in self.kwargs: list_url_args[arg] = self.kwargs[arg] return reverse(self.list_url_name, kwargs=list_url_args) def _active_sort(self, scope: OsintScopeConfig) -> tuple[str, str]: direction = self.request.GET.get("dir", "asc").lower() if direction not in {"asc", "desc"}: direction = "asc" allowed = {col.sort_field for col in scope.columns if col.sort_field} sort_field = self.request.GET.get("sort") if sort_field not in allowed: sort_field = scope.default_sort return sort_field, direction def get_ordering(self): scope = self.get_scope() sort_field, direction = self._active_sort(scope) if not sort_field: return None if direction == "desc": return f"-{sort_field}" return sort_field def _search_lookups(self, scope: OsintScopeConfig) -> dict[str, str]: lookups = {} for column in scope.columns: if column.search_lookup: lookups[column.key] = column.search_lookup return lookups def _query_dict(self) -> dict[str, Any]: return _sanitize_query_state( {k: v for k, v in self.request.GET.items() if v not in {"", None}} ) def _apply_list_search( self, queryset: models.QuerySet, scope: OsintScopeConfig ) -> models.QuerySet: query = _sanitize_search_query(self.request.GET.get("q", "")) if not query: return queryset lookups_by_field = self._search_lookups(scope) selected_field = self.request.GET.get("field", "__all__").strip() if selected_field in lookups_by_field: selected_lookups = [lookups_by_field[selected_field]] else: selected_lookups = list(scope.search_lookups) or list( lookups_by_field.values() ) if not selected_lookups: return queryset condition = Q() for lookup in selected_lookups: condition |= Q(**{lookup: query}) queryset = queryset.filter(condition) if any("__" in lookup for lookup in selected_lookups): queryset = queryset.distinct() return queryset def get_queryset(self, **kwargs): queryset = super().get_queryset(**kwargs) scope = self.get_scope() if scope.select_related: queryset = queryset.select_related(*scope.select_related) if scope.prefetch_related: queryset = queryset.prefetch_related(*scope.prefetch_related) return self._apply_list_search(queryset, scope) def _build_column_context( self, scope: OsintScopeConfig, list_url: str, query_state: dict[str, Any], ) -> list[dict[str, Any]]: sort_field, direction = self._active_sort(scope) columns = [] for column in scope.columns: if not column.sort_field: columns.append( { "key": column.key, "field_name": _column_field_name(column), "label": column.label, "sortable": False, "kind": column.kind, } ) continue is_sorted = sort_field == column.sort_field next_direction = "desc" if is_sorted and direction == "asc" else "asc" sort_query = _merge_query( query_state, sort=column.sort_field, dir=next_direction, page=1, ) columns.append( { "key": column.key, "field_name": _column_field_name(column), "label": column.label, "sortable": True, "kind": column.kind, "is_sorted": is_sorted, "is_desc": is_sorted and direction == "desc", "sort_url": _url_with_query(list_url, sort_query), } ) return columns def _build_rows( self, scope: OsintScopeConfig, object_list: list[Any], request_type: str, ) -> list[dict[str, Any]]: context_type = _context_type(request_type) update_type = "window" if request_type == "widget" else context_type update_target = ( "#windows-here" if update_type == "window" else f"#{update_type}s-here" ) rows = [] for item in object_list: row = {"id": str(item.pk), "cells": [], "actions": []} for column in scope.columns: row["cells"].append( { "kind": column.kind, "value": column.accessor(item), } ) update_url = reverse( scope.update_url_name, kwargs={"type": update_type, "pk": item.pk}, ) delete_url = reverse( scope.delete_url_name, kwargs={"type": context_type, "pk": item.pk}, ) row["actions"].append( { "mode": "hx-get", "url": update_url, "target": update_target, "icon": "fa-solid fa-pencil", "title": "Edit", } ) row["actions"].append( { "mode": "hx-delete", "url": delete_url, "target": "#modals-here", "icon": "fa-solid fa-xmark", "title": "Delete", "confirm": ( "Are you sure you wish to delete " f"{scope.delete_label(item)}?" ), } ) row["actions"].extend(scope.extra_actions(item, context_type)) rows.append(row) return rows def _build_pagination( self, page_obj: Any, list_url: str, query_state: dict[str, Any], ) -> dict[str, Any]: if page_obj is None: return {"enabled": False} pagination = { "enabled": page_obj.paginator.num_pages > 1, "count": page_obj.paginator.count, "current": page_obj.number, "total": page_obj.paginator.num_pages, "has_previous": page_obj.has_previous(), "has_next": page_obj.has_next(), "previous_url": None, "next_url": None, "pages": [], } if page_obj.has_previous(): previous_page = _safe_page_number(page_obj.previous_page_number()) pagination["previous_url"] = _url_with_query( list_url, {"page": previous_page}, ) if page_obj.has_next(): next_page = _safe_page_number(page_obj.next_page_number()) pagination["next_url"] = _url_with_query( list_url, {"page": next_page}, ) for entry in page_obj.paginator.get_elided_page_range(page_obj.number): if entry == "…": pagination["pages"].append({"ellipsis": True}) continue pagination["pages"].append( { "ellipsis": False, "number": entry, "current": entry == page_obj.number, "url": _url_with_query( list_url, {"page": _safe_page_number(entry)}, ), } ) return pagination def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) scope = self.get_scope() request_type = self.kwargs.get("type", "modal") list_url = self._list_url() query_state = self._query_dict() query_url = _url_with_query(list_url, query_state) field_options = [{"value": "__all__", "label": "All"}] field_options.extend( [ {"value": key, "label": column.label} for key, column in ( (column.key, column) for column in scope.columns if column.search_lookup ) ] ) context["osint_scope"] = scope.key context["osint_title"] = scope.title context["osint_table_id"] = f"{self.context_object_name}-table" context["osint_event_name"] = f"{self.context_object_name_singular}Event" context["osint_refresh_url"] = query_url or list_url context["osint_columns"] = self._build_column_context( scope, list_url, query_state, ) context["osint_rows"] = self._build_rows( scope, list(context["object_list"]), request_type, ) context["osint_pagination"] = self._build_pagination( context.get("page_obj"), list_url, query_state, ) context["osint_search_query"] = self.request.GET.get("q", "") context["osint_search_field"] = self.request.GET.get("field", "__all__") context["osint_search_fields"] = field_options context["osint_show_search"] = True context["osint_show_actions"] = True context["osint_search_url"] = list_url context["osint_result_count"] = context["osint_pagination"].get("count", 0) context["widget_icon"] = _safe_icon_class( self.request.GET.get("widget_icon"), OSINT_SCOPE_ICONS.get(scope.key, "fa-solid fa-arrows-minimize"), ) return context class OSINTWorkspace(LoginRequiredMixin, View): template_name = "pages/osint-workspace.html" def get(self, request): context = { "tabs_widget_url": reverse("osint_workspace_tabs_widget"), } return render(request, self.template_name, context) class OSINTWorkspaceTabsWidget(LoginRequiredMixin, View): def get(self, request): tabs = [ { "key": "people", "label": "People", "icon": "fa-solid fa-user-group", "widget_url": _url_with_query( reverse("people", kwargs={"type": "widget"}), {"widget_icon": "fa-solid fa-user-group"}, ), }, { "key": "groups", "label": "Groups", "icon": "fa-solid fa-users", "widget_url": _url_with_query( reverse("groups", kwargs={"type": "widget"}), {"widget_icon": "fa-solid fa-users"}, ), }, { "key": "personas", "label": "Personas", "icon": "fa-solid fa-masks-theater", "widget_url": _url_with_query( reverse("personas", kwargs={"type": "widget"}), {"widget_icon": "fa-solid fa-masks-theater"}, ), }, { "key": "manipulations", "label": "Manipulations", "icon": "fa-solid fa-sliders", "widget_url": _url_with_query( reverse("manipulations", kwargs={"type": "widget"}), {"widget_icon": "fa-solid fa-sliders"}, ), }, ] context = { "title": "OSINT Workspace", "unique": "osint-workspace-tabs", "window_content": "partials/osint-workspace-tabs-widget.html", "widget_options": 'gs-w="12" gs-h="4" gs-x="0" gs-y="0" gs-min-w="6"', "tabs": tabs, } return render(request, "mixins/wm/widget.html", context)