from __future__ import annotations from dataclasses import dataclass, field from datetime import date, datetime from decimal import Decimal, InvalidOperation from typing import Any, Callable from urllib.parse import urlencode from django.contrib.auth.mixins import LoginRequiredMixin from django.core.exceptions import FieldDoesNotExist from django.core.paginator import Paginator from django.db import models from django.db.models import Q from django.http import HttpResponseBadRequest from django.shortcuts import render from django.urls import reverse from django.views import View from mixins.views import ObjectList from core.models import Group, Manipulation, Persona, Person def _context_type(request_type: str) -> str: return "modal" if request_type == "page" else request_type def _parse_bool(raw: str) -> bool | None: lowered = raw.strip().lower() truthy = {"1", "true", "yes", "y", "on", "enabled"} falsy = {"0", "false", "no", "n", "off", "disabled"} if lowered in truthy: return True if lowered in falsy: return False return None def _preferred_related_text_field(model: type[models.Model]) -> str | None: preferred = ("name", "alias", "title", "identifier", "model") model_fields = {f.name: f for f in model._meta.get_fields()} for candidate in preferred: field = model_fields.get(candidate) if isinstance(field, (models.CharField, models.TextField)): return candidate return None def _url_with_query(base_url: str, query: dict[str, Any]) -> str: params = {} for key, value in query.items(): if value is None: continue value_str = str(value).strip() if value_str == "": continue params[key] = value_str if not params: return base_url return f"{base_url}?{urlencode(params)}" def _merge_query( current_query: dict[str, Any], **updates: Any ) -> dict[str, Any]: merged = dict(current_query) for key, value in updates.items(): if value is None or str(value).strip() == "": merged.pop(key, None) continue merged[key] = value return merged @dataclass(frozen=True) class OsintColumn: key: str label: str accessor: Callable[[Any], Any] sort_field: str | None = None search_lookup: str | None = None kind: str = "text" @dataclass(frozen=True) class OsintScopeConfig: key: str title: str model: type[models.Model] list_url_name: str update_url_name: str delete_url_name: str default_sort: str columns: tuple[OsintColumn, ...] select_related: tuple[str, ...] = () prefetch_related: tuple[str, ...] = () delete_label: Callable[[Any], str] = str extra_actions: Callable[[Any, str], list[dict[str, Any]]] = field( default_factory=lambda: (lambda _item, _type: []) ) search_lookups: tuple[str, ...] = () def _person_extra_actions(item: Person, _request_type: str) -> list[dict[str, Any]]: return [ { "mode": "link", "url": reverse( "person_identifiers", kwargs={"type": "page", "person": item.id}, ), "icon": "fa-solid fa-eye", "title": "Identifiers", } ] OSINT_SCOPES: dict[str, OsintScopeConfig] = { "people": OsintScopeConfig( key="people", title="People", model=Person, list_url_name="people", update_url_name="person_update", delete_url_name="person_delete", default_sort="name", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="name", label="Name", accessor=lambda item: item.name, sort_field="name", search_lookup="name__icontains", ), OsintColumn( key="sentiment", label="Sentiment", accessor=lambda item: item.sentiment, sort_field="sentiment", ), OsintColumn( key="timezone", label="Timezone", accessor=lambda item: item.timezone, sort_field="timezone", search_lookup="timezone__icontains", ), OsintColumn( key="last_interaction", label="Last Interaction", accessor=lambda item: item.last_interaction, sort_field="last_interaction", kind="datetime", ), ), delete_label=lambda item: item.name, extra_actions=_person_extra_actions, search_lookups=( "id__icontains", "name__icontains", "summary__icontains", "profile__icontains", "revealed__icontains", "likes__icontains", "dislikes__icontains", "timezone__icontains", ), ), "groups": OsintScopeConfig( key="groups", title="Groups", model=Group, list_url_name="groups", update_url_name="group_update", delete_url_name="group_delete", default_sort="name", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="name", label="Name", accessor=lambda item: item.name, sort_field="name", search_lookup="name__icontains", ), OsintColumn( key="people_count", label="People", accessor=lambda item: item.people.count(), kind="number", ), ), prefetch_related=("people",), delete_label=lambda item: item.name, search_lookups=( "id__icontains", "name__icontains", "people__name__icontains", ), ), "personas": OsintScopeConfig( key="personas", title="Personas", model=Persona, list_url_name="personas", update_url_name="persona_update", delete_url_name="persona_delete", default_sort="alias", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="alias", label="Alias", accessor=lambda item: item.alias, sort_field="alias", search_lookup="alias__icontains", ), OsintColumn( key="mbti", label="MBTI", accessor=lambda item: item.mbti, sort_field="mbti", search_lookup="mbti__icontains", ), OsintColumn( key="mbti_identity", label="MBTI Identity", accessor=lambda item: item.mbti_identity, sort_field="mbti_identity", ), OsintColumn( key="humor_style", label="Humor Style", accessor=lambda item: item.humor_style, sort_field="humor_style", search_lookup="humor_style__icontains", ), OsintColumn( key="tone", label="Tone", accessor=lambda item: item.tone, sort_field="tone", search_lookup="tone__icontains", ), OsintColumn( key="trust", label="Trust", accessor=lambda item: item.trust, sort_field="trust", ), OsintColumn( key="adaptability", label="Adaptability", accessor=lambda item: item.adaptability, sort_field="adaptability", ), ), delete_label=lambda item: item.alias or str(item.id), search_lookups=( "id__icontains", "alias__icontains", "mbti__icontains", "humor_style__icontains", "tone__icontains", "communication_style__icontains", "core_values__icontains", "inner_story__icontains", "likes__icontains", "dislikes__icontains", ), ), "manipulations": OsintScopeConfig( key="manipulations", title="Manipulations", model=Manipulation, list_url_name="manipulations", update_url_name="manipulation_update", delete_url_name="manipulation_delete", default_sort="name", columns=( OsintColumn( key="id", label="ID", accessor=lambda item: item.id, sort_field="id", search_lookup="id__icontains", kind="id_copy", ), OsintColumn( key="name", label="Name", accessor=lambda item: item.name, sort_field="name", search_lookup="name__icontains", ), OsintColumn( key="group", label="Group", accessor=lambda item: item.group, sort_field="group__name", search_lookup="group__name__icontains", ), OsintColumn( key="ai", label="AI", accessor=lambda item: item.ai, sort_field="ai__model", search_lookup="ai__model__icontains", ), OsintColumn( key="persona", label="Persona", accessor=lambda item: item.persona, sort_field="persona__alias", search_lookup="persona__alias__icontains", ), OsintColumn( key="enabled", label="Enabled", accessor=lambda item: item.enabled, sort_field="enabled", kind="bool", ), OsintColumn( key="mode", label="Mode", accessor=lambda item: item.mode, sort_field="mode", search_lookup="mode__icontains", ), OsintColumn( key="filter_enabled", label="Filter", accessor=lambda item: item.filter_enabled, sort_field="filter_enabled", kind="bool", ), ), select_related=("group", "ai", "persona"), delete_label=lambda item: item.name, search_lookups=( "id__icontains", "name__icontains", "mode__icontains", "group__name__icontains", "ai__model__icontains", "persona__alias__icontains", ), ), } class OSINTListBase(ObjectList): list_template = "partials/osint/list-table.html" paginate_by = 20 osint_scope = "" def get_scope(self) -> OsintScopeConfig: if self.osint_scope not in OSINT_SCOPES: raise ValueError(f"Unknown OSINT scope: {self.osint_scope}") return OSINT_SCOPES[self.osint_scope] def _list_url(self) -> str: list_url_args = {} for arg in self.list_url_args: if arg in self.kwargs: list_url_args[arg] = self.kwargs[arg] return reverse(self.list_url_name, kwargs=list_url_args) def _active_sort(self, scope: OsintScopeConfig) -> tuple[str, str]: direction = self.request.GET.get("dir", "asc").lower() if direction not in {"asc", "desc"}: direction = "asc" allowed = {col.sort_field for col in scope.columns if col.sort_field} sort_field = self.request.GET.get("sort") if sort_field not in allowed: sort_field = scope.default_sort return sort_field, direction def get_ordering(self): scope = self.get_scope() sort_field, direction = self._active_sort(scope) if not sort_field: return None if direction == "desc": return f"-{sort_field}" return sort_field def _search_lookups(self, scope: OsintScopeConfig) -> dict[str, str]: lookups = {} for column in scope.columns: if column.search_lookup: lookups[column.key] = column.search_lookup return lookups def _query_dict(self) -> dict[str, Any]: return {k: v for k, v in self.request.GET.items() if v not in {"", None}} def _apply_list_search( self, queryset: models.QuerySet, scope: OsintScopeConfig ) -> models.QuerySet: query = self.request.GET.get("q", "").strip() if not query: return queryset lookups_by_field = self._search_lookups(scope) selected_field = self.request.GET.get("field", "__all__").strip() if selected_field in lookups_by_field: selected_lookups = [lookups_by_field[selected_field]] else: selected_lookups = list(scope.search_lookups) or list( lookups_by_field.values() ) if not selected_lookups: return queryset condition = Q() for lookup in selected_lookups: condition |= Q(**{lookup: query}) queryset = queryset.filter(condition) if any("__" in lookup for lookup in selected_lookups): queryset = queryset.distinct() return queryset def get_queryset(self, **kwargs): queryset = super().get_queryset(**kwargs) scope = self.get_scope() if scope.select_related: queryset = queryset.select_related(*scope.select_related) if scope.prefetch_related: queryset = queryset.prefetch_related(*scope.prefetch_related) return self._apply_list_search(queryset, scope) def _build_column_context( self, scope: OsintScopeConfig, list_url: str, query_state: dict[str, Any], ) -> list[dict[str, Any]]: sort_field, direction = self._active_sort(scope) columns = [] for column in scope.columns: if not column.sort_field: columns.append( { "label": column.label, "sortable": False, "kind": column.kind, } ) continue is_sorted = sort_field == column.sort_field next_direction = "desc" if is_sorted and direction == "asc" else "asc" sort_query = _merge_query( query_state, sort=column.sort_field, dir=next_direction, page=1, ) columns.append( { "label": column.label, "sortable": True, "kind": column.kind, "is_sorted": is_sorted, "is_desc": is_sorted and direction == "desc", "sort_url": _url_with_query(list_url, sort_query), } ) return columns def _build_rows( self, scope: OsintScopeConfig, object_list: list[Any], request_type: str, ) -> list[dict[str, Any]]: rows = [] for item in object_list: row = {"id": str(item.pk), "cells": [], "actions": []} for column in scope.columns: row["cells"].append( { "kind": column.kind, "value": column.accessor(item), } ) update_url = reverse( scope.update_url_name, kwargs={"type": context_type, "pk": item.pk}, ) delete_url = reverse( scope.delete_url_name, kwargs={"type": context_type, "pk": item.pk}, ) row["actions"].append( { "mode": "hx-get", "url": update_url, "target": f"#{context_type}s-here", "icon": "fa-solid fa-pencil", "title": "Edit", } ) row["actions"].append( { "mode": "hx-delete", "url": delete_url, "target": "#modals-here", "icon": "fa-solid fa-xmark", "title": "Delete", "confirm": ( "Are you sure you wish to delete " f"{scope.delete_label(item)}?" ), } ) row["actions"].extend(scope.extra_actions(item, context_type)) rows.append(row) return rows def _build_pagination( self, page_obj: Any, list_url: str, query_state: dict[str, Any], ) -> dict[str, Any]: if page_obj is None: return {"enabled": False} pagination = { "enabled": page_obj.paginator.num_pages > 1, "count": page_obj.paginator.count, "current": page_obj.number, "total": page_obj.paginator.num_pages, "has_previous": page_obj.has_previous(), "has_next": page_obj.has_next(), "previous_url": None, "next_url": None, "pages": [], } if page_obj.has_previous(): pagination["previous_url"] = _url_with_query( list_url, _merge_query(query_state, page=page_obj.previous_page_number()), ) if page_obj.has_next(): pagination["next_url"] = _url_with_query( list_url, _merge_query(query_state, page=page_obj.next_page_number()), ) for entry in page_obj.paginator.get_elided_page_range(page_obj.number): if entry == "…": pagination["pages"].append({"ellipsis": True}) continue pagination["pages"].append( { "ellipsis": False, "number": entry, "current": entry == page_obj.number, "url": _url_with_query( list_url, _merge_query(query_state, page=entry), ), } ) return pagination def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) scope = self.get_scope() request_type = self.kwargs.get("type", "modal") list_url = self._list_url() query_state = self._query_dict() query_url = _url_with_query(list_url, query_state) field_options = [{"value": "__all__", "label": "All"}] field_options.extend( [ {"value": key, "label": column.label} for key, column in ( (column.key, column) for column in scope.columns if column.search_lookup ) ] ) context["osint_scope"] = scope.key context["osint_title"] = scope.title context["osint_table_id"] = f"{self.context_object_name}-table" context["osint_event_name"] = f"{self.context_object_name_singular}Event" context["osint_refresh_url"] = query_url or list_url context["osint_columns"] = self._build_column_context( scope, list_url, query_state, ) context["osint_rows"] = self._build_rows( scope, list(context["object_list"]), request_type, ) context["osint_pagination"] = self._build_pagination( context.get("page_obj"), list_url, query_state, ) context["osint_search_query"] = self.request.GET.get("q", "") context["osint_search_field"] = self.request.GET.get("field", "__all__") context["osint_search_fields"] = field_options context["osint_show_search"] = True context["osint_show_actions"] = True context["osint_search_url"] = list_url context["osint_result_count"] = context["osint_pagination"].get("count", 0) return context class OSINTSearch(LoginRequiredMixin, View): allowed_types = {"page", "widget"} page_template = "pages/osint-search.html" widget_template = "mixins/wm/widget.html" panel_template = "partials/osint/search-panel.html" result_template = "partials/results_table.html" per_page_default = 20 per_page_max = 100 def _field_options( self, model_cls: type[models.Model] ) -> list[dict[str, str]]: options = [] for field in model_cls._meta.get_fields(): if field.auto_created and not field.concrete and not field.many_to_many: continue if field.name == "user": continue options.append( { "value": field.name, "label": field.verbose_name.title(), } ) options.sort(key=lambda item: item["label"]) return options def _field_q( self, model_cls: type[models.Model], field_name: str, query: str, ) -> tuple[Q | None, bool]: try: field = model_cls._meta.get_field(field_name) except FieldDoesNotExist: return None, False if isinstance(field, (models.CharField, models.TextField, models.UUIDField)): return Q(**{f"{field_name}__icontains": query}), False if isinstance(field, models.BooleanField): parsed = _parse_bool(query) if parsed is None: return None, False return Q(**{field_name: parsed}), False if isinstance(field, models.IntegerField): try: return Q(**{field_name: int(query)}), False except ValueError: return None, False if isinstance(field, (models.FloatField, models.DecimalField)): try: value = Decimal(query) except InvalidOperation: return None, False return Q(**{field_name: value}), False if isinstance(field, models.DateField): try: parsed_date = date.fromisoformat(query) except ValueError: return None, False return Q(**{field_name: parsed_date}), False if isinstance(field, models.DateTimeField): try: parsed_dt = datetime.fromisoformat(query) except ValueError: try: parsed_date = date.fromisoformat(query) except ValueError: return None, False return Q(**{f"{field_name}__date": parsed_date}), False return Q(**{field_name: parsed_dt}), False if isinstance(field, models.ForeignKey): related_text_field = _preferred_related_text_field(field.related_model) if related_text_field: return Q( **{f"{field_name}__{related_text_field}__icontains": query} ), False return Q(**{f"{field_name}__id__icontains": query}), False if isinstance(field, models.ManyToManyField): related_text_field = _preferred_related_text_field(field.related_model) if related_text_field: return Q( **{f"{field_name}__{related_text_field}__icontains": query} ), True return Q(**{f"{field_name}__id__icontains": query}), True return None, False def _search_queryset( self, queryset: models.QuerySet, model_cls: type[models.Model], query: str, field_name: str, field_options: list[dict[str, str]], ) -> models.QuerySet: if not query: return queryset if field_name != "__all__": field_q, use_distinct = self._field_q(model_cls, field_name, query) if field_q is None: return queryset.none() queryset = queryset.filter(field_q) return queryset.distinct() if use_distinct else queryset condition = Q() use_distinct = False for option in field_options: field_q, field_distinct = self._field_q( model_cls, option["value"], query, ) if field_q is None: continue condition |= field_q use_distinct = use_distinct or field_distinct if not condition.children: return queryset.none() queryset = queryset.filter(condition) return queryset.distinct() if use_distinct else queryset def _per_page(self, raw_value: str | None) -> int: if not raw_value: return self.per_page_default try: value = int(raw_value) except ValueError: return self.per_page_default if value < 1: return self.per_page_default return min(value, self.per_page_max) def _scope_key(self, raw_scope: str | None) -> str: if raw_scope in OSINT_SCOPES: return raw_scope return "people" def _query_state(self, request) -> dict[str, Any]: return {k: v for k, v in request.GET.items() if v not in {None, ""}} def _active_sort(self, scope: OsintScopeConfig) -> tuple[str, str]: direction = self.request.GET.get("dir", "asc").lower() if direction not in {"asc", "desc"}: direction = "asc" allowed = {col.sort_field for col in scope.columns if col.sort_field} sort_field = self.request.GET.get("sort") if sort_field not in allowed: sort_field = scope.default_sort return sort_field, direction def _build_column_context( self, scope: OsintScopeConfig, list_url: str, query_state: dict[str, Any], ) -> list[dict[str, Any]]: sort_field, direction = self._active_sort(scope) columns = [] for column in scope.columns: if not column.sort_field: columns.append( { "label": column.label, "sortable": False, "kind": column.kind, } ) continue is_sorted = sort_field == column.sort_field next_direction = "desc" if is_sorted and direction == "asc" else "asc" sort_query = _merge_query( query_state, sort=column.sort_field, dir=next_direction, page=1, ) columns.append( { "label": column.label, "sortable": True, "kind": column.kind, "is_sorted": is_sorted, "is_desc": is_sorted and direction == "desc", "sort_url": _url_with_query(list_url, sort_query), } ) return columns def _build_rows( self, scope: OsintScopeConfig, object_list: list[Any], request_type: str, ) -> list[dict[str, Any]]: context_type = _context_type(request_type) rows = [] for item in object_list: row = {"id": str(item.pk), "cells": [], "actions": []} for column in scope.columns: row["cells"].append( { "kind": column.kind, "value": column.accessor(item), } ) rows.append(row) return rows def _build_pagination( self, page_obj: Any, list_url: str, query_state: dict[str, Any], ) -> dict[str, Any]: if page_obj is None: return {"enabled": False} pagination = { "enabled": page_obj.paginator.num_pages > 1, "count": page_obj.paginator.count, "current": page_obj.number, "total": page_obj.paginator.num_pages, "has_previous": page_obj.has_previous(), "has_next": page_obj.has_next(), "previous_url": None, "next_url": None, "pages": [], } if page_obj.has_previous(): pagination["previous_url"] = _url_with_query( list_url, _merge_query(query_state, page=page_obj.previous_page_number()), ) if page_obj.has_next(): pagination["next_url"] = _url_with_query( list_url, _merge_query(query_state, page=page_obj.next_page_number()), ) for entry in page_obj.paginator.get_elided_page_range(page_obj.number): if entry == "…": pagination["pages"].append({"ellipsis": True}) continue pagination["pages"].append( { "ellipsis": False, "number": entry, "current": entry == page_obj.number, "url": _url_with_query( list_url, _merge_query(query_state, page=entry), ), } ) return pagination def get(self, request, type): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified.") scope_key = self._scope_key(request.GET.get("scope")) scope = OSINT_SCOPES[scope_key] field_options = self._field_options(scope.model) query = request.GET.get("q", "").strip() field_name = request.GET.get("field", "__all__") if field_name != "__all__": allowed_fields = {option["value"] for option in field_options} if field_name not in allowed_fields: field_name = "__all__" queryset = scope.model.objects.filter(user=request.user) if scope.select_related: queryset = queryset.select_related(*scope.select_related) if scope.prefetch_related: queryset = queryset.prefetch_related(*scope.prefetch_related) queryset = self._search_queryset( queryset, scope.model, query, field_name, field_options, ) sort_field = request.GET.get("sort", scope.default_sort) direction = request.GET.get("dir", "asc").lower() allowed_sort_fields = { column.sort_field for column in scope.columns if column.sort_field } if sort_field not in allowed_sort_fields: sort_field = scope.default_sort if direction not in {"asc", "desc"}: direction = "asc" if sort_field: order_by = sort_field if direction == "asc" else f"-{sort_field}" queryset = queryset.order_by(order_by) per_page = self._per_page(request.GET.get("per_page")) paginator = Paginator(queryset, per_page) page_obj = paginator.get_page(request.GET.get("page")) object_list = list(page_obj.object_list) list_url = reverse("osint_search", kwargs={"type": type}) query_state = self._query_state(request) column_context = self._build_column_context( scope, list_url, query_state, ) rows = self._build_rows( scope, object_list, type, ) pagination = self._build_pagination( page_obj, list_url, query_state, ) context = { "osint_scope": scope.key, "osint_title": f"Search {scope.title}", "osint_table_id": "osint-search-table", "osint_event_name": "", "osint_refresh_url": _url_with_query(list_url, query_state), "osint_columns": column_context, "osint_rows": rows, "osint_pagination": pagination, "osint_show_search": False, "osint_show_actions": False, "osint_result_count": paginator.count, "osint_search_url": list_url, "scope_options": [ {"value": key, "label": conf.title} for key, conf in OSINT_SCOPES.items() ], "field_options": field_options, "selected_scope": scope.key, "selected_field": field_name, "search_query": query, "selected_per_page": per_page, "search_page_url": reverse("osint_search", kwargs={"type": "page"}), "search_widget_url": reverse("osint_search", kwargs={"type": "widget"}), } hx_target = request.headers.get("HX-Target") if request.htmx and hx_target in {"osint-search-results", "osint-search-table"}: response = render(request, self.result_template, context) if type == "page": response["HX-Replace-Url"] = _url_with_query(list_url, query_state) return response if type == "widget": widget_context = { "title": "Search", "unique": "osint-search-widget", "window_content": self.panel_template, "widget_options": 'gs-w="8" gs-h="14" gs-x="0" gs-y="0" gs-min-w="5"', **context, } return render(request, self.widget_template, widget_context) return render(request, self.page_template, context)