1130 lines
38 KiB
Python
1130 lines
38 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import date, datetime
|
|
from decimal import Decimal, InvalidOperation
|
|
from typing import Any, Callable
|
|
from urllib.parse import urlencode
|
|
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.core.exceptions import FieldDoesNotExist
|
|
from django.core.paginator import Paginator
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from django.http import HttpResponseBadRequest
|
|
from django.shortcuts import render
|
|
from django.urls import reverse
|
|
from django.views import View
|
|
from mixins.views import ObjectList
|
|
|
|
from core.models import Group, Manipulation, Person, Persona
|
|
|
|
|
|
def _context_type(request_type: str) -> str:
|
|
return "modal" if request_type == "page" else request_type
|
|
|
|
|
|
def _parse_bool(raw: str) -> bool | None:
|
|
lowered = raw.strip().lower()
|
|
truthy = {"1", "true", "yes", "y", "on", "enabled"}
|
|
falsy = {"0", "false", "no", "n", "off", "disabled"}
|
|
if lowered in truthy:
|
|
return True
|
|
if lowered in falsy:
|
|
return False
|
|
return None
|
|
|
|
|
|
def _preferred_related_text_field(model: type[models.Model]) -> str | None:
|
|
preferred = ("name", "alias", "title", "identifier", "model")
|
|
model_fields = {f.name: f for f in model._meta.get_fields()}
|
|
for candidate in preferred:
|
|
field = model_fields.get(candidate)
|
|
if isinstance(field, (models.CharField, models.TextField)):
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def _column_field_name(column: "OsintColumn") -> str:
|
|
if column.search_lookup:
|
|
return str(column.search_lookup).split("__", 1)[0]
|
|
if column.sort_field:
|
|
return str(column.sort_field).split("__", 1)[0]
|
|
return str(column.key)
|
|
|
|
|
|
def _safe_icon_class(raw: str | None, default: str) -> str:
|
|
icon_class = str(raw or "").strip()
|
|
if not icon_class:
|
|
return default
|
|
cleaned_parts = []
|
|
for part in icon_class.split():
|
|
if not part:
|
|
continue
|
|
if all(ch.isalnum() or ch in {"-", "_"} for ch in part):
|
|
cleaned_parts.append(part)
|
|
if not cleaned_parts:
|
|
return default
|
|
return " ".join(cleaned_parts)
|
|
|
|
|
|
def _url_with_query(base_url: str, query: dict[str, Any]) -> str:
|
|
params = {}
|
|
for key, value in query.items():
|
|
if value is None:
|
|
continue
|
|
value_str = str(value).strip()
|
|
if value_str == "":
|
|
continue
|
|
params[key] = value_str
|
|
if not params:
|
|
return base_url
|
|
return f"{base_url}?{urlencode(params)}"
|
|
|
|
|
|
def _merge_query(current_query: dict[str, Any], **updates: Any) -> dict[str, Any]:
|
|
merged = dict(current_query)
|
|
for key, value in updates.items():
|
|
if value is None or str(value).strip() == "":
|
|
merged.pop(key, None)
|
|
continue
|
|
merged[key] = value
|
|
return merged
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OsintColumn:
|
|
key: str
|
|
label: str
|
|
accessor: Callable[[Any], Any]
|
|
sort_field: str | None = None
|
|
search_lookup: str | None = None
|
|
kind: str = "text"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OsintScopeConfig:
|
|
key: str
|
|
title: str
|
|
model: type[models.Model]
|
|
list_url_name: str
|
|
update_url_name: str
|
|
delete_url_name: str
|
|
default_sort: str
|
|
columns: tuple[OsintColumn, ...]
|
|
select_related: tuple[str, ...] = ()
|
|
prefetch_related: tuple[str, ...] = ()
|
|
delete_label: Callable[[Any], str] = str
|
|
extra_actions: Callable[[Any, str], list[dict[str, Any]]] = field(
|
|
default_factory=lambda: (lambda _item, _type: [])
|
|
)
|
|
search_lookups: tuple[str, ...] = ()
|
|
|
|
|
|
def _person_extra_actions(item: Person, _request_type: str) -> list[dict[str, Any]]:
|
|
return [
|
|
{
|
|
"mode": "link",
|
|
"url": reverse(
|
|
"person_identifiers",
|
|
kwargs={"type": "page", "person": item.id},
|
|
),
|
|
"icon": "fa-solid fa-eye",
|
|
"title": "Identifiers",
|
|
}
|
|
]
|
|
|
|
|
|
OSINT_SCOPES: dict[str, OsintScopeConfig] = {
|
|
"people": OsintScopeConfig(
|
|
key="people",
|
|
title="People",
|
|
model=Person,
|
|
list_url_name="people",
|
|
update_url_name="person_update",
|
|
delete_url_name="person_delete",
|
|
default_sort="name",
|
|
columns=(
|
|
OsintColumn(
|
|
key="id",
|
|
label="ID",
|
|
accessor=lambda item: item.id,
|
|
sort_field="id",
|
|
search_lookup="id__icontains",
|
|
kind="id_copy",
|
|
),
|
|
OsintColumn(
|
|
key="name",
|
|
label="Name",
|
|
accessor=lambda item: item.name,
|
|
sort_field="name",
|
|
search_lookup="name__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="sentiment",
|
|
label="Sentiment",
|
|
accessor=lambda item: item.sentiment,
|
|
sort_field="sentiment",
|
|
),
|
|
OsintColumn(
|
|
key="timezone",
|
|
label="Timezone",
|
|
accessor=lambda item: item.timezone,
|
|
sort_field="timezone",
|
|
search_lookup="timezone__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="last_interaction",
|
|
label="Last Interaction",
|
|
accessor=lambda item: item.last_interaction,
|
|
sort_field="last_interaction",
|
|
kind="datetime",
|
|
),
|
|
),
|
|
delete_label=lambda item: item.name,
|
|
extra_actions=_person_extra_actions,
|
|
search_lookups=(
|
|
"id__icontains",
|
|
"name__icontains",
|
|
"summary__icontains",
|
|
"profile__icontains",
|
|
"revealed__icontains",
|
|
"likes__icontains",
|
|
"dislikes__icontains",
|
|
"timezone__icontains",
|
|
),
|
|
),
|
|
"groups": OsintScopeConfig(
|
|
key="groups",
|
|
title="Groups",
|
|
model=Group,
|
|
list_url_name="groups",
|
|
update_url_name="group_update",
|
|
delete_url_name="group_delete",
|
|
default_sort="name",
|
|
columns=(
|
|
OsintColumn(
|
|
key="id",
|
|
label="ID",
|
|
accessor=lambda item: item.id,
|
|
sort_field="id",
|
|
search_lookup="id__icontains",
|
|
kind="id_copy",
|
|
),
|
|
OsintColumn(
|
|
key="name",
|
|
label="Name",
|
|
accessor=lambda item: item.name,
|
|
sort_field="name",
|
|
search_lookup="name__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="people_count",
|
|
label="People",
|
|
accessor=lambda item: item.people.count(),
|
|
kind="number",
|
|
),
|
|
),
|
|
prefetch_related=("people",),
|
|
delete_label=lambda item: item.name,
|
|
search_lookups=(
|
|
"id__icontains",
|
|
"name__icontains",
|
|
"people__name__icontains",
|
|
),
|
|
),
|
|
"personas": OsintScopeConfig(
|
|
key="personas",
|
|
title="Personas",
|
|
model=Persona,
|
|
list_url_name="personas",
|
|
update_url_name="persona_update",
|
|
delete_url_name="persona_delete",
|
|
default_sort="alias",
|
|
columns=(
|
|
OsintColumn(
|
|
key="id",
|
|
label="ID",
|
|
accessor=lambda item: item.id,
|
|
sort_field="id",
|
|
search_lookup="id__icontains",
|
|
kind="id_copy",
|
|
),
|
|
OsintColumn(
|
|
key="alias",
|
|
label="Alias",
|
|
accessor=lambda item: item.alias,
|
|
sort_field="alias",
|
|
search_lookup="alias__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="mbti",
|
|
label="MBTI",
|
|
accessor=lambda item: item.mbti,
|
|
sort_field="mbti",
|
|
search_lookup="mbti__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="mbti_identity",
|
|
label="MBTI Identity",
|
|
accessor=lambda item: item.mbti_identity,
|
|
sort_field="mbti_identity",
|
|
),
|
|
OsintColumn(
|
|
key="humor_style",
|
|
label="Humor Style",
|
|
accessor=lambda item: item.humor_style,
|
|
sort_field="humor_style",
|
|
search_lookup="humor_style__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="tone",
|
|
label="Tone",
|
|
accessor=lambda item: item.tone,
|
|
sort_field="tone",
|
|
search_lookup="tone__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="trust",
|
|
label="Trust",
|
|
accessor=lambda item: item.trust,
|
|
sort_field="trust",
|
|
),
|
|
OsintColumn(
|
|
key="adaptability",
|
|
label="Adaptability",
|
|
accessor=lambda item: item.adaptability,
|
|
sort_field="adaptability",
|
|
),
|
|
),
|
|
delete_label=lambda item: item.alias or str(item.id),
|
|
search_lookups=(
|
|
"id__icontains",
|
|
"alias__icontains",
|
|
"mbti__icontains",
|
|
"humor_style__icontains",
|
|
"tone__icontains",
|
|
"communication_style__icontains",
|
|
"core_values__icontains",
|
|
"inner_story__icontains",
|
|
"likes__icontains",
|
|
"dislikes__icontains",
|
|
),
|
|
),
|
|
"manipulations": OsintScopeConfig(
|
|
key="manipulations",
|
|
title="Manipulations",
|
|
model=Manipulation,
|
|
list_url_name="manipulations",
|
|
update_url_name="manipulation_update",
|
|
delete_url_name="manipulation_delete",
|
|
default_sort="name",
|
|
columns=(
|
|
OsintColumn(
|
|
key="id",
|
|
label="ID",
|
|
accessor=lambda item: item.id,
|
|
sort_field="id",
|
|
search_lookup="id__icontains",
|
|
kind="id_copy",
|
|
),
|
|
OsintColumn(
|
|
key="name",
|
|
label="Name",
|
|
accessor=lambda item: item.name,
|
|
sort_field="name",
|
|
search_lookup="name__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="group",
|
|
label="Group",
|
|
accessor=lambda item: item.group,
|
|
sort_field="group__name",
|
|
search_lookup="group__name__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="ai",
|
|
label="AI",
|
|
accessor=lambda item: item.ai,
|
|
sort_field="ai__model",
|
|
search_lookup="ai__model__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="persona",
|
|
label="Persona",
|
|
accessor=lambda item: item.persona,
|
|
sort_field="persona__alias",
|
|
search_lookup="persona__alias__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="enabled",
|
|
label="Enabled",
|
|
accessor=lambda item: item.enabled,
|
|
sort_field="enabled",
|
|
kind="bool",
|
|
),
|
|
OsintColumn(
|
|
key="mode",
|
|
label="Mode",
|
|
accessor=lambda item: item.mode,
|
|
sort_field="mode",
|
|
search_lookup="mode__icontains",
|
|
),
|
|
OsintColumn(
|
|
key="filter_enabled",
|
|
label="Filter",
|
|
accessor=lambda item: item.filter_enabled,
|
|
sort_field="filter_enabled",
|
|
kind="bool",
|
|
),
|
|
),
|
|
select_related=("group", "ai", "persona"),
|
|
delete_label=lambda item: item.name,
|
|
search_lookups=(
|
|
"id__icontains",
|
|
"name__icontains",
|
|
"mode__icontains",
|
|
"group__name__icontains",
|
|
"ai__model__icontains",
|
|
"persona__alias__icontains",
|
|
),
|
|
),
|
|
}
|
|
|
|
OSINT_SCOPE_ICONS: dict[str, str] = {
|
|
"people": "fa-solid fa-user-group",
|
|
"groups": "fa-solid fa-users",
|
|
"personas": "fa-solid fa-masks-theater",
|
|
"manipulations": "fa-solid fa-sliders",
|
|
}
|
|
|
|
|
|
class OSINTListBase(ObjectList):
|
|
list_template = "partials/osint/list-table.html"
|
|
paginate_by = 20
|
|
osint_scope = ""
|
|
|
|
def get_scope(self) -> OsintScopeConfig:
|
|
if self.osint_scope not in OSINT_SCOPES:
|
|
raise ValueError(f"Unknown OSINT scope: {self.osint_scope}")
|
|
return OSINT_SCOPES[self.osint_scope]
|
|
|
|
def _list_url(self) -> str:
|
|
list_url_args = {}
|
|
for arg in self.list_url_args:
|
|
if arg in self.kwargs:
|
|
list_url_args[arg] = self.kwargs[arg]
|
|
return reverse(self.list_url_name, kwargs=list_url_args)
|
|
|
|
def _active_sort(self, scope: OsintScopeConfig) -> tuple[str, str]:
|
|
direction = self.request.GET.get("dir", "asc").lower()
|
|
if direction not in {"asc", "desc"}:
|
|
direction = "asc"
|
|
allowed = {col.sort_field for col in scope.columns if col.sort_field}
|
|
sort_field = self.request.GET.get("sort")
|
|
if sort_field not in allowed:
|
|
sort_field = scope.default_sort
|
|
return sort_field, direction
|
|
|
|
def get_ordering(self):
|
|
scope = self.get_scope()
|
|
sort_field, direction = self._active_sort(scope)
|
|
if not sort_field:
|
|
return None
|
|
if direction == "desc":
|
|
return f"-{sort_field}"
|
|
return sort_field
|
|
|
|
def _search_lookups(self, scope: OsintScopeConfig) -> dict[str, str]:
|
|
lookups = {}
|
|
for column in scope.columns:
|
|
if column.search_lookup:
|
|
lookups[column.key] = column.search_lookup
|
|
return lookups
|
|
|
|
def _query_dict(self) -> dict[str, Any]:
|
|
return {k: v for k, v in self.request.GET.items() if v not in {"", None}}
|
|
|
|
def _apply_list_search(
|
|
self, queryset: models.QuerySet, scope: OsintScopeConfig
|
|
) -> models.QuerySet:
|
|
query = self.request.GET.get("q", "").strip()
|
|
if not query:
|
|
return queryset
|
|
|
|
lookups_by_field = self._search_lookups(scope)
|
|
selected_field = self.request.GET.get("field", "__all__").strip()
|
|
if selected_field in lookups_by_field:
|
|
selected_lookups = [lookups_by_field[selected_field]]
|
|
else:
|
|
selected_lookups = list(scope.search_lookups) or list(
|
|
lookups_by_field.values()
|
|
)
|
|
|
|
if not selected_lookups:
|
|
return queryset
|
|
|
|
condition = Q()
|
|
for lookup in selected_lookups:
|
|
condition |= Q(**{lookup: query})
|
|
queryset = queryset.filter(condition)
|
|
if any("__" in lookup for lookup in selected_lookups):
|
|
queryset = queryset.distinct()
|
|
return queryset
|
|
|
|
def get_queryset(self, **kwargs):
|
|
queryset = super().get_queryset(**kwargs)
|
|
scope = self.get_scope()
|
|
if scope.select_related:
|
|
queryset = queryset.select_related(*scope.select_related)
|
|
if scope.prefetch_related:
|
|
queryset = queryset.prefetch_related(*scope.prefetch_related)
|
|
return self._apply_list_search(queryset, scope)
|
|
|
|
def _build_column_context(
|
|
self,
|
|
scope: OsintScopeConfig,
|
|
list_url: str,
|
|
query_state: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
sort_field, direction = self._active_sort(scope)
|
|
columns = []
|
|
for column in scope.columns:
|
|
if not column.sort_field:
|
|
columns.append(
|
|
{
|
|
"key": column.key,
|
|
"field_name": _column_field_name(column),
|
|
"label": column.label,
|
|
"sortable": False,
|
|
"kind": column.kind,
|
|
}
|
|
)
|
|
continue
|
|
|
|
is_sorted = sort_field == column.sort_field
|
|
next_direction = "desc" if is_sorted and direction == "asc" else "asc"
|
|
sort_query = _merge_query(
|
|
query_state,
|
|
sort=column.sort_field,
|
|
dir=next_direction,
|
|
page=1,
|
|
)
|
|
columns.append(
|
|
{
|
|
"key": column.key,
|
|
"field_name": _column_field_name(column),
|
|
"label": column.label,
|
|
"sortable": True,
|
|
"kind": column.kind,
|
|
"is_sorted": is_sorted,
|
|
"is_desc": is_sorted and direction == "desc",
|
|
"sort_url": _url_with_query(list_url, sort_query),
|
|
}
|
|
)
|
|
return columns
|
|
|
|
def _build_rows(
|
|
self,
|
|
scope: OsintScopeConfig,
|
|
object_list: list[Any],
|
|
request_type: str,
|
|
) -> list[dict[str, Any]]:
|
|
context_type = _context_type(request_type)
|
|
update_type = "window" if request_type == "widget" else context_type
|
|
update_target = (
|
|
"#windows-here" if update_type == "window" else f"#{update_type}s-here"
|
|
)
|
|
rows = []
|
|
for item in object_list:
|
|
row = {"id": str(item.pk), "cells": [], "actions": []}
|
|
for column in scope.columns:
|
|
row["cells"].append(
|
|
{
|
|
"kind": column.kind,
|
|
"value": column.accessor(item),
|
|
}
|
|
)
|
|
|
|
update_url = reverse(
|
|
scope.update_url_name,
|
|
kwargs={"type": update_type, "pk": item.pk},
|
|
)
|
|
delete_url = reverse(
|
|
scope.delete_url_name,
|
|
kwargs={"type": context_type, "pk": item.pk},
|
|
)
|
|
|
|
row["actions"].append(
|
|
{
|
|
"mode": "hx-get",
|
|
"url": update_url,
|
|
"target": update_target,
|
|
"icon": "fa-solid fa-pencil",
|
|
"title": "Edit",
|
|
}
|
|
)
|
|
row["actions"].append(
|
|
{
|
|
"mode": "hx-delete",
|
|
"url": delete_url,
|
|
"target": "#modals-here",
|
|
"icon": "fa-solid fa-xmark",
|
|
"title": "Delete",
|
|
"confirm": (
|
|
"Are you sure you wish to delete "
|
|
f"{scope.delete_label(item)}?"
|
|
),
|
|
}
|
|
)
|
|
row["actions"].extend(scope.extra_actions(item, context_type))
|
|
rows.append(row)
|
|
return rows
|
|
|
|
def _build_pagination(
|
|
self,
|
|
page_obj: Any,
|
|
list_url: str,
|
|
query_state: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
if page_obj is None:
|
|
return {"enabled": False}
|
|
|
|
pagination = {
|
|
"enabled": page_obj.paginator.num_pages > 1,
|
|
"count": page_obj.paginator.count,
|
|
"current": page_obj.number,
|
|
"total": page_obj.paginator.num_pages,
|
|
"has_previous": page_obj.has_previous(),
|
|
"has_next": page_obj.has_next(),
|
|
"previous_url": None,
|
|
"next_url": None,
|
|
"pages": [],
|
|
}
|
|
|
|
if page_obj.has_previous():
|
|
pagination["previous_url"] = _url_with_query(
|
|
list_url,
|
|
_merge_query(query_state, page=page_obj.previous_page_number()),
|
|
)
|
|
if page_obj.has_next():
|
|
pagination["next_url"] = _url_with_query(
|
|
list_url,
|
|
_merge_query(query_state, page=page_obj.next_page_number()),
|
|
)
|
|
|
|
for entry in page_obj.paginator.get_elided_page_range(page_obj.number):
|
|
if entry == "…":
|
|
pagination["pages"].append({"ellipsis": True})
|
|
continue
|
|
pagination["pages"].append(
|
|
{
|
|
"ellipsis": False,
|
|
"number": entry,
|
|
"current": entry == page_obj.number,
|
|
"url": _url_with_query(
|
|
list_url,
|
|
_merge_query(query_state, page=entry),
|
|
),
|
|
}
|
|
)
|
|
return pagination
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
scope = self.get_scope()
|
|
request_type = self.kwargs.get("type", "modal")
|
|
list_url = self._list_url()
|
|
query_state = self._query_dict()
|
|
query_url = _url_with_query(list_url, query_state)
|
|
|
|
field_options = [{"value": "__all__", "label": "All"}]
|
|
field_options.extend(
|
|
[
|
|
{"value": key, "label": column.label}
|
|
for key, column in (
|
|
(column.key, column)
|
|
for column in scope.columns
|
|
if column.search_lookup
|
|
)
|
|
]
|
|
)
|
|
|
|
context["osint_scope"] = scope.key
|
|
context["osint_title"] = scope.title
|
|
context["osint_table_id"] = f"{self.context_object_name}-table"
|
|
context["osint_event_name"] = f"{self.context_object_name_singular}Event"
|
|
context["osint_refresh_url"] = query_url or list_url
|
|
context["osint_columns"] = self._build_column_context(
|
|
scope,
|
|
list_url,
|
|
query_state,
|
|
)
|
|
context["osint_rows"] = self._build_rows(
|
|
scope,
|
|
list(context["object_list"]),
|
|
request_type,
|
|
)
|
|
context["osint_pagination"] = self._build_pagination(
|
|
context.get("page_obj"),
|
|
list_url,
|
|
query_state,
|
|
)
|
|
context["osint_search_query"] = self.request.GET.get("q", "")
|
|
context["osint_search_field"] = self.request.GET.get("field", "__all__")
|
|
context["osint_search_fields"] = field_options
|
|
context["osint_show_search"] = True
|
|
context["osint_show_actions"] = True
|
|
context["osint_search_url"] = list_url
|
|
context["osint_result_count"] = context["osint_pagination"].get("count", 0)
|
|
context["widget_icon"] = _safe_icon_class(
|
|
self.request.GET.get("widget_icon"),
|
|
OSINT_SCOPE_ICONS.get(scope.key, "fa-solid fa-arrows-minimize"),
|
|
)
|
|
return context
|
|
|
|
|
|
class OSINTSearch(LoginRequiredMixin, View):
|
|
allowed_types = {"page", "widget"}
|
|
page_template = "pages/osint-search.html"
|
|
widget_template = "mixins/wm/widget.html"
|
|
panel_template = "partials/osint/search-panel.html"
|
|
result_template = "partials/results_table.html"
|
|
per_page_default = 20
|
|
per_page_max = 100
|
|
|
|
def _field_options(self, model_cls: type[models.Model]) -> list[dict[str, str]]:
|
|
options = []
|
|
for field in model_cls._meta.get_fields():
|
|
# Skip reverse/accessor relations (e.g. ManyToManyRel) that are not
|
|
# directly searchable as user-facing fields in this selector.
|
|
if field.auto_created and not field.concrete:
|
|
continue
|
|
if field.name == "user":
|
|
continue
|
|
label = getattr(
|
|
field,
|
|
"verbose_name",
|
|
str(field.name).replace("_", " "),
|
|
)
|
|
options.append(
|
|
{
|
|
"value": field.name,
|
|
"label": str(label).title(),
|
|
}
|
|
)
|
|
options.sort(key=lambda item: item["label"])
|
|
return options
|
|
|
|
def _field_q(
|
|
self,
|
|
model_cls: type[models.Model],
|
|
field_name: str,
|
|
query: str,
|
|
) -> tuple[Q | None, bool]:
|
|
try:
|
|
field = model_cls._meta.get_field(field_name)
|
|
except FieldDoesNotExist:
|
|
return None, False
|
|
|
|
if isinstance(field, (models.CharField, models.TextField, models.UUIDField)):
|
|
return Q(**{f"{field_name}__icontains": query}), False
|
|
if isinstance(field, models.BooleanField):
|
|
parsed = _parse_bool(query)
|
|
if parsed is None:
|
|
return None, False
|
|
return Q(**{field_name: parsed}), False
|
|
if isinstance(field, models.IntegerField):
|
|
try:
|
|
return Q(**{field_name: int(query)}), False
|
|
except ValueError:
|
|
return None, False
|
|
if isinstance(field, (models.FloatField, models.DecimalField)):
|
|
try:
|
|
value = Decimal(query)
|
|
except InvalidOperation:
|
|
return None, False
|
|
return Q(**{field_name: value}), False
|
|
if isinstance(field, models.DateField):
|
|
try:
|
|
parsed_date = date.fromisoformat(query)
|
|
except ValueError:
|
|
return None, False
|
|
return Q(**{field_name: parsed_date}), False
|
|
if isinstance(field, models.DateTimeField):
|
|
try:
|
|
parsed_dt = datetime.fromisoformat(query)
|
|
except ValueError:
|
|
try:
|
|
parsed_date = date.fromisoformat(query)
|
|
except ValueError:
|
|
return None, False
|
|
return Q(**{f"{field_name}__date": parsed_date}), False
|
|
return Q(**{field_name: parsed_dt}), False
|
|
if isinstance(field, models.ForeignKey):
|
|
related_text_field = _preferred_related_text_field(field.related_model)
|
|
if related_text_field:
|
|
return (
|
|
Q(**{f"{field_name}__{related_text_field}__icontains": query}),
|
|
False,
|
|
)
|
|
return Q(**{f"{field_name}__id__icontains": query}), False
|
|
if isinstance(field, models.ManyToManyField):
|
|
related_text_field = _preferred_related_text_field(field.related_model)
|
|
if related_text_field:
|
|
return (
|
|
Q(**{f"{field_name}__{related_text_field}__icontains": query}),
|
|
True,
|
|
)
|
|
return Q(**{f"{field_name}__id__icontains": query}), True
|
|
return None, False
|
|
|
|
def _search_queryset(
|
|
self,
|
|
queryset: models.QuerySet,
|
|
model_cls: type[models.Model],
|
|
query: str,
|
|
field_name: str,
|
|
field_options: list[dict[str, str]],
|
|
) -> models.QuerySet:
|
|
if not query:
|
|
return queryset
|
|
|
|
if field_name != "__all__":
|
|
field_q, use_distinct = self._field_q(model_cls, field_name, query)
|
|
if field_q is None:
|
|
return queryset.none()
|
|
queryset = queryset.filter(field_q)
|
|
return queryset.distinct() if use_distinct else queryset
|
|
|
|
condition = Q()
|
|
use_distinct = False
|
|
for option in field_options:
|
|
field_q, field_distinct = self._field_q(
|
|
model_cls,
|
|
option["value"],
|
|
query,
|
|
)
|
|
if field_q is None:
|
|
continue
|
|
condition |= field_q
|
|
use_distinct = use_distinct or field_distinct
|
|
if not condition.children:
|
|
return queryset.none()
|
|
queryset = queryset.filter(condition)
|
|
return queryset.distinct() if use_distinct else queryset
|
|
|
|
def _per_page(self, raw_value: str | None) -> int:
|
|
if not raw_value:
|
|
return self.per_page_default
|
|
try:
|
|
value = int(raw_value)
|
|
except ValueError:
|
|
return self.per_page_default
|
|
if value < 1:
|
|
return self.per_page_default
|
|
return min(value, self.per_page_max)
|
|
|
|
def _scope_key(self, raw_scope: str | None) -> str:
|
|
if raw_scope in OSINT_SCOPES:
|
|
return raw_scope
|
|
return "people"
|
|
|
|
def _query_state(self, request) -> dict[str, Any]:
|
|
return {k: v for k, v in request.GET.items() if v not in {None, ""}}
|
|
|
|
def _active_sort(self, scope: OsintScopeConfig) -> tuple[str, str]:
|
|
direction = self.request.GET.get("dir", "asc").lower()
|
|
if direction not in {"asc", "desc"}:
|
|
direction = "asc"
|
|
allowed = {col.sort_field for col in scope.columns if col.sort_field}
|
|
sort_field = self.request.GET.get("sort")
|
|
if sort_field not in allowed:
|
|
sort_field = scope.default_sort
|
|
return sort_field, direction
|
|
|
|
def _build_column_context(
|
|
self,
|
|
scope: OsintScopeConfig,
|
|
list_url: str,
|
|
query_state: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
sort_field, direction = self._active_sort(scope)
|
|
columns = []
|
|
for column in scope.columns:
|
|
if not column.sort_field:
|
|
columns.append(
|
|
{
|
|
"key": column.key,
|
|
"field_name": _column_field_name(column),
|
|
"label": column.label,
|
|
"sortable": False,
|
|
"kind": column.kind,
|
|
}
|
|
)
|
|
continue
|
|
|
|
is_sorted = sort_field == column.sort_field
|
|
next_direction = "desc" if is_sorted and direction == "asc" else "asc"
|
|
sort_query = _merge_query(
|
|
query_state,
|
|
sort=column.sort_field,
|
|
dir=next_direction,
|
|
page=1,
|
|
)
|
|
columns.append(
|
|
{
|
|
"key": column.key,
|
|
"field_name": _column_field_name(column),
|
|
"label": column.label,
|
|
"sortable": True,
|
|
"kind": column.kind,
|
|
"is_sorted": is_sorted,
|
|
"is_desc": is_sorted and direction == "desc",
|
|
"sort_url": _url_with_query(list_url, sort_query),
|
|
}
|
|
)
|
|
return columns
|
|
|
|
def _build_rows(
|
|
self,
|
|
scope: OsintScopeConfig,
|
|
object_list: list[Any],
|
|
request_type: str,
|
|
) -> list[dict[str, Any]]:
|
|
context_type = _context_type(request_type)
|
|
rows = []
|
|
for item in object_list:
|
|
row = {"id": str(item.pk), "cells": [], "actions": []}
|
|
for column in scope.columns:
|
|
row["cells"].append(
|
|
{
|
|
"kind": column.kind,
|
|
"value": column.accessor(item),
|
|
}
|
|
)
|
|
rows.append(row)
|
|
return rows
|
|
|
|
def _build_pagination(
|
|
self,
|
|
page_obj: Any,
|
|
list_url: str,
|
|
query_state: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
if page_obj is None:
|
|
return {"enabled": False}
|
|
|
|
pagination = {
|
|
"enabled": page_obj.paginator.num_pages > 1,
|
|
"count": page_obj.paginator.count,
|
|
"current": page_obj.number,
|
|
"total": page_obj.paginator.num_pages,
|
|
"has_previous": page_obj.has_previous(),
|
|
"has_next": page_obj.has_next(),
|
|
"previous_url": None,
|
|
"next_url": None,
|
|
"pages": [],
|
|
}
|
|
|
|
if page_obj.has_previous():
|
|
pagination["previous_url"] = _url_with_query(
|
|
list_url,
|
|
_merge_query(query_state, page=page_obj.previous_page_number()),
|
|
)
|
|
if page_obj.has_next():
|
|
pagination["next_url"] = _url_with_query(
|
|
list_url,
|
|
_merge_query(query_state, page=page_obj.next_page_number()),
|
|
)
|
|
|
|
for entry in page_obj.paginator.get_elided_page_range(page_obj.number):
|
|
if entry == "…":
|
|
pagination["pages"].append({"ellipsis": True})
|
|
continue
|
|
pagination["pages"].append(
|
|
{
|
|
"ellipsis": False,
|
|
"number": entry,
|
|
"current": entry == page_obj.number,
|
|
"url": _url_with_query(
|
|
list_url,
|
|
_merge_query(query_state, page=entry),
|
|
),
|
|
}
|
|
)
|
|
return pagination
|
|
|
|
def get(self, request, type):
|
|
if type not in self.allowed_types:
|
|
return HttpResponseBadRequest("Invalid type specified.")
|
|
|
|
scope_key = self._scope_key(request.GET.get("scope"))
|
|
scope = OSINT_SCOPES[scope_key]
|
|
field_options = self._field_options(scope.model)
|
|
query = request.GET.get("q", "").strip()
|
|
field_name = request.GET.get("field", "__all__")
|
|
if field_name != "__all__":
|
|
allowed_fields = {option["value"] for option in field_options}
|
|
if field_name not in allowed_fields:
|
|
field_name = "__all__"
|
|
|
|
queryset = scope.model.objects.filter(user=request.user)
|
|
if scope.select_related:
|
|
queryset = queryset.select_related(*scope.select_related)
|
|
if scope.prefetch_related:
|
|
queryset = queryset.prefetch_related(*scope.prefetch_related)
|
|
|
|
queryset = self._search_queryset(
|
|
queryset,
|
|
scope.model,
|
|
query,
|
|
field_name,
|
|
field_options,
|
|
)
|
|
|
|
sort_field = request.GET.get("sort", scope.default_sort)
|
|
direction = request.GET.get("dir", "asc").lower()
|
|
allowed_sort_fields = {
|
|
column.sort_field for column in scope.columns if column.sort_field
|
|
}
|
|
if sort_field not in allowed_sort_fields:
|
|
sort_field = scope.default_sort
|
|
if direction not in {"asc", "desc"}:
|
|
direction = "asc"
|
|
if sort_field:
|
|
order_by = sort_field if direction == "asc" else f"-{sort_field}"
|
|
queryset = queryset.order_by(order_by)
|
|
|
|
per_page = self._per_page(request.GET.get("per_page"))
|
|
paginator = Paginator(queryset, per_page)
|
|
page_obj = paginator.get_page(request.GET.get("page"))
|
|
object_list = list(page_obj.object_list)
|
|
|
|
list_url = reverse("osint_search", kwargs={"type": type})
|
|
query_state = self._query_state(request)
|
|
column_context = self._build_column_context(
|
|
scope,
|
|
list_url,
|
|
query_state,
|
|
)
|
|
rows = self._build_rows(
|
|
scope,
|
|
object_list,
|
|
type,
|
|
)
|
|
pagination = self._build_pagination(
|
|
page_obj,
|
|
list_url,
|
|
query_state,
|
|
)
|
|
|
|
context = {
|
|
"osint_scope": scope.key,
|
|
"osint_title": f"Search {scope.title}",
|
|
"osint_table_id": "osint-search-table",
|
|
"osint_event_name": "",
|
|
"osint_refresh_url": _url_with_query(list_url, query_state),
|
|
"osint_columns": column_context,
|
|
"osint_rows": rows,
|
|
"osint_pagination": pagination,
|
|
"osint_show_search": False,
|
|
"osint_show_actions": False,
|
|
"osint_result_count": paginator.count,
|
|
"osint_search_url": list_url,
|
|
"scope_options": [
|
|
{"value": key, "label": conf.title}
|
|
for key, conf in OSINT_SCOPES.items()
|
|
],
|
|
"field_options": field_options,
|
|
"selected_scope": scope.key,
|
|
"selected_field": field_name,
|
|
"search_query": query,
|
|
"selected_per_page": per_page,
|
|
"search_page_url": reverse("osint_search", kwargs={"type": "page"}),
|
|
"search_widget_url": reverse("osint_search", kwargs={"type": "widget"}),
|
|
}
|
|
|
|
hx_target = request.headers.get("HX-Target")
|
|
if request.htmx and hx_target in {"osint-search-results", "osint-search-table"}:
|
|
response = render(request, self.result_template, context)
|
|
if type == "page":
|
|
response["HX-Replace-Url"] = _url_with_query(list_url, query_state)
|
|
return response
|
|
|
|
if type == "widget":
|
|
widget_context = {
|
|
"title": "Search",
|
|
"unique": "osint-search-widget",
|
|
"window_content": self.panel_template,
|
|
"widget_options": 'gs-w="8" gs-h="14" gs-x="0" gs-y="0" gs-min-w="5"',
|
|
"widget_icon": _safe_icon_class(
|
|
request.GET.get("widget_icon"),
|
|
"fa-solid fa-magnifying-glass",
|
|
),
|
|
**context,
|
|
}
|
|
return render(request, self.widget_template, widget_context)
|
|
|
|
return render(request, self.page_template, context)
|
|
|
|
|
|
class OSINTWorkspace(LoginRequiredMixin, View):
|
|
template_name = "pages/osint-workspace.html"
|
|
|
|
def get(self, request):
|
|
context = {
|
|
"tabs_widget_url": reverse("osint_workspace_tabs_widget"),
|
|
}
|
|
return render(request, self.template_name, context)
|
|
|
|
|
|
class OSINTWorkspaceTabsWidget(LoginRequiredMixin, View):
|
|
def get(self, request):
|
|
tabs = [
|
|
{
|
|
"key": "people",
|
|
"label": "People",
|
|
"icon": "fa-solid fa-user-group",
|
|
"widget_url": _url_with_query(
|
|
reverse("people", kwargs={"type": "widget"}),
|
|
{"widget_icon": "fa-solid fa-user-group"},
|
|
),
|
|
},
|
|
{
|
|
"key": "groups",
|
|
"label": "Groups",
|
|
"icon": "fa-solid fa-users",
|
|
"widget_url": _url_with_query(
|
|
reverse("groups", kwargs={"type": "widget"}),
|
|
{"widget_icon": "fa-solid fa-users"},
|
|
),
|
|
},
|
|
{
|
|
"key": "personas",
|
|
"label": "Personas",
|
|
"icon": "fa-solid fa-masks-theater",
|
|
"widget_url": _url_with_query(
|
|
reverse("personas", kwargs={"type": "widget"}),
|
|
{"widget_icon": "fa-solid fa-masks-theater"},
|
|
),
|
|
},
|
|
{
|
|
"key": "manipulations",
|
|
"label": "Manipulations",
|
|
"icon": "fa-solid fa-sliders",
|
|
"widget_url": _url_with_query(
|
|
reverse("manipulations", kwargs={"type": "widget"}),
|
|
{"widget_icon": "fa-solid fa-sliders"},
|
|
),
|
|
},
|
|
]
|
|
context = {
|
|
"title": "OSINT Workspace",
|
|
"unique": "osint-workspace-tabs",
|
|
"window_content": "partials/osint-workspace-tabs-widget.html",
|
|
"widget_options": 'gs-w="12" gs-h="4" gs-x="0" gs-y="0" gs-min-w="6"',
|
|
"tabs": tabs,
|
|
}
|
|
return render(request, "mixins/wm/widget.html", context)
|