From cbedcd67f64c7bd60c15bd899785a60b12a21eed Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 11 Mar 2026 02:19:08 +0000 Subject: [PATCH] Implement Manticore fully and re-theme --- CLAUDE.md | 13 + INSTALL.md | 7 + README.md | 3 + app/local_settings.py | 8 + app/urls.py | 7 +- core/clients/whatsapp.py | 33 +- core/context_processors.py | 10 +- core/events/behavior.py | 213 +++++++ core/events/ledger.py | 95 ++- core/events/manticore.py | 588 ++++++++++++++++++ core/events/projection.py | 81 ++- core/events/shadow.py | 148 +++++ .../commands/backfill_contact_availability.py | 129 ++-- .../management/commands/event_ledger_smoke.py | 62 +- core/management/commands/gia_analysis.py | 96 +++ .../management/commands/manticore_backfill.py | 46 ++ .../commands/prune_behavioral_orm_data.py | 62 ++ .../recalculate_contact_availability.py | 266 +++----- core/messaging/history.py | 27 +- ...onversationevent_behavioral_event_types.py | 33 + ...emove_contactavailabilityevent_and_span.py | 16 + core/models.py | 106 +--- core/modules/router.py | 173 +++++- core/presence/engine.py | 156 +---- core/presence/query.py | 137 +++- core/templates/base.html | 174 +++++- .../pages/availability-settings.html | 89 ++- core/templates/pages/system-settings.html | 4 +- core/tests/test_availability_settings_page.py | 108 ++-- .../test_backfill_contact_availability.py | 25 +- core/tests/test_behavioral_event_platform.py | 161 +++++ core/tests/test_event_ledger_smoke_command.py | 27 + core/tests/test_event_projection_shadow.py | 33 + core/tests/test_gia_analysis.py | 219 +++++++ core/tests/test_presence_engine.py | 129 +--- ...test_presence_query_and_compose_context.py | 243 +++++++- core/tests/test_prune_behavioral_orm_data.py | 80 +++ core/tests/test_settings_integrity.py | 9 + core/tests/test_system_diagnostics_api.py | 79 ++- .../test_whatsapp_reaction_and_recalc.py | 80 +-- core/transports/capabilities.py | 27 + core/views/availability.py | 143 +++-- core/views/compose.py | 62 +- core/views/system.py | 176 +++++- docker/uwsgi.ini | 1 + stack.env.example | 4 + 46 files changed, 3444 insertions(+), 944 deletions(-) create mode 100644 core/events/behavior.py create mode 100644 core/events/manticore.py create mode 100644 core/events/shadow.py create mode 100644 core/management/commands/gia_analysis.py create mode 100644 core/management/commands/manticore_backfill.py create mode 100644 core/management/commands/prune_behavioral_orm_data.py create mode 100644 core/migrations/0047_conversationevent_behavioral_event_types.py create mode 100644 core/migrations/0048_remove_contactavailabilityevent_and_span.py create mode 100644 core/tests/test_behavioral_event_platform.py create mode 100644 core/tests/test_gia_analysis.py create mode 100644 core/tests/test_prune_behavioral_orm_data.py diff --git a/CLAUDE.md b/CLAUDE.md index dae378b..f1f63b3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -46,3 +46,16 @@ Preferred terms: | "Them" | "contact" or "remote party" | Apply this in: comments, template labels, log messages, and variable names. + +## Runtime: uWSGI Reload File + +The app container uses uWSGI with a single touch-based reload sentinel: + +- Reload file: `/code/.uwsgi-reload` +- Config: [docker/uwsgi.ini](/code/xf/GIA/docker/uwsgi.ini) + +Rules: + +- Never run `python manage.py ...` on the host. Run Django management commands inside Podman, for example with `podman exec`. +- After changing templates or app code that should be picked up by the `gia` uWSGI service, touch `/code/.uwsgi-reload`. +- If the uWSGI config itself changes, touch `/code/.uwsgi-reload` and restart the `gia` container so the new config is loaded. diff --git a/INSTALL.md b/INSTALL.md index 16bd100..f6c9d3f 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -59,6 +59,10 @@ Memory/wiki search helpers: - `MEMORY_SEARCH_BACKEND` (`django` or `manticore`) - `MANTICORE_HTTP_URL` - `MANTICORE_MEMORY_TABLE` +- `MANTICORE_EVENT_TABLE` +- `MANTICORE_METRIC_TABLE` +- `COMPOSING_ABANDONED_WINDOW_SECONDS` +- `CONVERSATION_EVENT_RETENTION_DAYS` - `MANTICORE_HTTP_TIMEOUT` For XMPP media upload, configure one of: @@ -238,6 +242,9 @@ Performance defaults now applied in GIA: - Batched Manticore reindex writes (`REPLACE ... VALUES (...)` in chunks) for lower ingest latency. - Cached table-ensure checks to avoid `CREATE TABLE IF NOT EXISTS` overhead on every query. +- Behavioral event dual-write uses `MANTICORE_EVENT_TABLE` (default `gia_events`) when event ledger flags are enabled. +- Behavioral metrics are written by `python manage.py gia_analysis` into `MANTICORE_METRIC_TABLE` (default `gia_metrics`). +- ORM shadow copies can be pruned with `python manage.py prune_behavioral_orm_data`; defaults are driven by `CONVERSATION_EVENT_RETENTION_DAYS`. - Runtime table maintenance available through MCP (`FLUSH RAMCHUNK`, `OPTIMIZE TABLE`) for steady query responsiveness. ### F) MCP server for task + memory tooling (VS Code) diff --git a/README.md b/README.md index f0c1dcd..472bd68 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,9 @@ GIA is a multi-transport communication workspace that unifies Signal, WhatsApp, - Supports canonical task creation from chat commands, web UI, and MCP tooling. - Bridges messages across transports (including XMPP) with attachment handling. - Tracks delivery/read metadata and typing state events. +- Can dual-write canonical behavioral events to Manticore for time-series analysis. +- Includes `gia_analysis` for rolling behavioral metric aggregation into Manticore. +- Includes `prune_behavioral_orm_data` to keep Django event shadow tables bounded once Manticore is primary. - Provides AI workspace analytics, mitigation plans, and insight visualizations. - Exposes fine-grained capability policy controls for gateway commands, task intake, and command execution. - Separates XMPP encryption controls into plaintext rejection, component-chat encryption, and relayed-contact encryption. diff --git a/app/local_settings.py b/app/local_settings.py index aec6103..795a14c 100644 --- a/app/local_settings.py +++ b/app/local_settings.py @@ -108,6 +108,14 @@ EVENT_PRIMARY_WRITE_PATH = getenv("EVENT_PRIMARY_WRITE_PATH", "false").lower() i MEMORY_SEARCH_BACKEND = getenv("MEMORY_SEARCH_BACKEND", "django") MANTICORE_HTTP_URL = getenv("MANTICORE_HTTP_URL", "http://localhost:9308") MANTICORE_MEMORY_TABLE = getenv("MANTICORE_MEMORY_TABLE", "gia_memory_items") +MANTICORE_EVENT_TABLE = getenv("MANTICORE_EVENT_TABLE", "gia_events") +MANTICORE_METRIC_TABLE = getenv("MANTICORE_METRIC_TABLE", "gia_metrics") +COMPOSING_ABANDONED_WINDOW_SECONDS = int( + getenv("COMPOSING_ABANDONED_WINDOW_SECONDS", "300") +) +CONVERSATION_EVENT_RETENTION_DAYS = int( + getenv("CONVERSATION_EVENT_RETENTION_DAYS", "90") or 90 +) MANTICORE_HTTP_TIMEOUT = int(getenv("MANTICORE_HTTP_TIMEOUT", "5") or 5) # Attachment security defaults for transport adapters. diff --git a/app/urls.py b/app/urls.py index f770299..1225d8c 100644 --- a/app/urls.py +++ b/app/urls.py @@ -445,8 +445,13 @@ urlpatterns = [ name="codex_approval", ), path( - "settings/availability/", + "settings/behavioral/", availability.AvailabilitySettingsPage.as_view(), + name="behavioral_signals_settings", + ), + path( + "settings/availability/", + RedirectView.as_view(pattern_name="behavioral_signals_settings", permanent=False), name="availability_settings", ), # AIs diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index e77d0c1..d0fb7b4 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -1579,6 +1579,33 @@ class WhatsAppClient(ClientBase): out.add(f"{mapped}@s.whatsapp.net") return out + def _message_identifier_candidates(self, *, sender, chat, is_from_me): + """ + Resolve the logical contact for a WhatsApp message event. + + Direct outbound messages must bind to the chat peer, not the sender, + otherwise the user's own account identifier can fan out the same message + into unrelated XMPP contact threads. + """ + sender_value = self._jid_to_identifier(sender) + chat_value = self._jid_to_identifier(chat) + candidate_values = [] + + if chat_value.endswith("@g.us"): + candidate_values.append(chat) + elif is_from_me: + if chat_value: + candidate_values.append(chat) + elif sender_value: + candidate_values.append(sender) + else: + if sender_value: + candidate_values.append(sender) + elif chat_value: + candidate_values.append(chat) + + return self._normalize_identifier_candidates(*candidate_values) + async def _sync_contacts_from_client(self): if self._client is None: return @@ -2666,7 +2693,11 @@ class WhatsAppClient(ClientBase): sender_jid=str(sender or ""), ) - identifier_values = self._normalize_identifier_candidates(sender, chat) + identifier_values = self._message_identifier_candidates( + sender=sender, + chat=chat, + is_from_me=is_from_me, + ) if not identifier_values: return diff --git a/core/context_processors.py b/core/context_processors.py index 0d43344..6b3e83a 100644 --- a/core/context_processors.py +++ b/core/context_processors.py @@ -30,7 +30,7 @@ def settings_hierarchy_nav(request): business_plans_href = reverse("business_plan_inbox") tasks_href = reverse("tasks_settings") translation_href = reverse("translation_settings") - availability_href = reverse("availability_settings") + behavioral_href = reverse("behavioral_signals_settings") categories = { "general": { @@ -99,6 +99,7 @@ def settings_hierarchy_nav(request): "translation_settings", "translation_preview", "availability_settings", + "behavioral_signals_settings", "codex_settings", "codex_approval", }, @@ -116,7 +117,12 @@ def settings_hierarchy_nav(request): translation_href, lambda: url_name in {"translation_settings", "translation_preview"}, ), - ("Availability", availability_href, lambda: path == availability_href), + ( + "Behavioral Signals", + behavioral_href, + lambda: url_name + in {"availability_settings", "behavioral_signals_settings"}, + ), ], }, } diff --git a/core/events/behavior.py b/core/events/behavior.py new file mode 100644 index 0000000..eb5d801 --- /dev/null +++ b/core/events/behavior.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +import json +import statistics +from dataclasses import dataclass +from typing import Any + + +def safe_int(value: Any, default: int = 0) -> int: + try: + return int(value) + except Exception: + return int(default) + + +def parse_payload(value: Any) -> dict: + if isinstance(value, dict): + return dict(value) + if isinstance(value, str): + text = value.strip() + if not text: + return {} + try: + loaded = json.loads(text) + except Exception: + return {} + if isinstance(loaded, dict): + return dict(loaded) + return {} + + +def median_ms(values: list[int]) -> int: + clean = [int(v) for v in values if safe_int(v, 0) > 0] + if not clean: + return 0 + return int(statistics.median(clean)) + + +def z_score(value: int, baseline_samples: list[int]) -> float: + clean = [int(v) for v in baseline_samples if safe_int(v, 0) > 0] + if len(clean) < 2: + return 0.0 + baseline = statistics.median(clean) + stdev = statistics.pstdev(clean) + if stdev <= 0: + return 0.0 + return float((float(value) - float(baseline)) / float(stdev)) + + +@dataclass +class CompositionState: + started_ts: int + last_started_ts: int + stopped_ts: int = 0 + revision: int = 1 + + +class ComposingTracker: + def __init__(self, window_ms: int = 300000): + self.window_ms = max(1000, int(window_ms or 300000)) + self._state: dict[str, CompositionState] = {} + + def observe_started(self, session_id: str, ts: int) -> CompositionState: + key = str(session_id or "").strip() + if not key: + raise ValueError("session_id is required") + safe_ts_value = max(0, safe_int(ts, 0)) + state = self._state.get(key) + if state is None: + state = CompositionState( + started_ts=safe_ts_value, + last_started_ts=safe_ts_value, + revision=1, + ) + self._state[key] = state + return state + if state.stopped_ts > 0: + state.revision += 1 + state.last_started_ts = safe_ts_value + state.stopped_ts = 0 + return state + + def observe_stopped(self, session_id: str, ts: int) -> dict | None: + key = str(session_id or "").strip() + state = self._state.get(key) + if state is None: + return None + safe_ts_value = max(0, safe_int(ts, 0)) + duration_ms = max(0, safe_ts_value - int(state.started_ts or 0)) + if duration_ms >= self.window_ms: + self._state.pop(key, None) + return { + "started_ts": int(state.started_ts or 0), + "stopped_ts": safe_ts_value, + "duration_ms": duration_ms, + "revision": int(state.revision or 1), + "abandoned": True, + } + state.stopped_ts = safe_ts_value + return None + + def observe_message(self, session_id: str) -> CompositionState | None: + key = str(session_id or "").strip() + if not key: + return None + return self._state.pop(key, None) + + +def extract_metric_samples(rows: list[dict]) -> dict[str, list[int]]: + delivered_by_message: dict[str, int] = {} + read_by_message: dict[str, int] = {} + delay_c_samples: list[int] = [] + delay_f_samples: list[int] = [] + revision_samples: list[int] = [] + abandoned_started = 0 + abandoned_total = 0 + composition_by_session: dict[str, dict[str, int]] = {} + presence_by_session: dict[str, int] = {} + + for row in sorted( + list(rows or []), + key=lambda item: ( + safe_int(item.get("ts"), 0), + str(item.get("kind") or ""), + str(item.get("session_id") or ""), + ), + ): + kind = str(row.get("kind") or "").strip().lower() + session_id = str(row.get("session_id") or "").strip() + ts = safe_int(row.get("ts"), 0) + payload = parse_payload(row.get("payload")) + message_id = str( + payload.get("message_id") + or payload.get("origin_message_id") + or row.get("origin_message_id") + or "" + ).strip() + + if kind == "message_delivered" and message_id: + delivered_by_message[message_id] = ts + continue + if kind == "message_read" and message_id: + read_by_message[message_id] = ts + continue + if kind == "presence_available" and session_id: + presence_by_session[session_id] = ts + continue + if kind == "composing_started" and session_id: + abandoned_started += 1 + state = composition_by_session.get(session_id) + if state is None: + state = {"started_ts": ts, "revision": 1} + composition_by_session[session_id] = state + else: + state["revision"] = int(state.get("revision", 1)) + 1 + if presence_by_session.get(session_id): + delta = ts - int(presence_by_session.get(session_id) or 0) + if delta >= 0: + delay_f_samples.append(delta) + continue + if kind == "composing_abandoned": + abandoned_total += 1 + if session_id: + composition_by_session.pop(session_id, None) + continue + if kind == "message_sent" and session_id: + state = composition_by_session.pop(session_id, None) + if state is None: + continue + delta = ts - int(state.get("started_ts") or 0) + if delta >= 0: + delay_c_samples.append(delta) + revision_samples.append(max(1, int(state.get("revision") or 1))) + + delay_b_samples = [] + for message_id, delivered_ts in delivered_by_message.items(): + read_ts = safe_int(read_by_message.get(message_id), 0) + if read_ts > 0 and read_ts >= delivered_ts: + delay_b_samples.append(read_ts - delivered_ts) + + abandoned_rate_samples = [] + if abandoned_started > 0: + abandoned_rate_samples.append( + int(round((float(abandoned_total) / float(abandoned_started)) * 1000)) + ) + + return { + "delay_b": delay_b_samples, + "delay_c": delay_c_samples, + "delay_f": delay_f_samples, + "revision": revision_samples, + "abandoned_rate": abandoned_rate_samples, + } + + +def summarize_metrics(window_rows: list[dict], baseline_rows: list[dict]) -> dict[str, dict]: + window_samples = extract_metric_samples(window_rows) + baseline_samples = extract_metric_samples(baseline_rows) + metrics: dict[str, dict] = {} + for metric in ("delay_b", "delay_c", "delay_f", "revision", "abandoned_rate"): + samples = list(window_samples.get(metric) or []) + if not samples: + continue + baseline = list(baseline_samples.get(metric) or []) + value = median_ms(samples) + baseline_value = median_ms(baseline) + metrics[metric] = { + "value_ms": int(value), + "baseline_ms": int(baseline_value), + "z_score": float(round(z_score(value, baseline), 6)), + "sample_n": len(samples), + } + return metrics diff --git a/core/events/ledger.py b/core/events/ledger.py index 41957f5..7b193dd 100644 --- a/core/events/ledger.py +++ b/core/events/ledger.py @@ -5,12 +5,19 @@ import time from asgiref.sync import sync_to_async from django.conf import settings +from core.events.manticore import get_event_ledger_backend from core.models import ConversationEvent from core.observability.tracing import ensure_trace_id +from core.util import logs + +log = logs.get_logger("event-ledger") def event_ledger_enabled() -> bool: - return bool(getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False)) + return bool( + getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False) + or getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False) + ) def event_ledger_status() -> dict: @@ -72,38 +79,78 @@ def append_event_sync( normalized_direction = _normalize_direction(direction) normalized_trace = ensure_trace_id(trace_id, payload or {}) + safe_ts = _safe_ts(ts) transport = str(origin_transport or "").strip().lower() message_id = str(origin_message_id or "").strip() - dedup_row = None - if transport and message_id: - dedup_row = ( - ConversationEvent.objects.filter( + actor_identifier = str(actor_identifier or "").strip() + origin_chat_id = str(origin_chat_id or "").strip() + payload = dict(payload or {}) + raw_payload = dict(raw_payload or {}) + + dual_write = bool(getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False)) + primary_write = bool(getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False)) + write_django = dual_write and not primary_write + + row = None + if write_django: + dedup_row = None + if transport and message_id: + dedup_row = ( + ConversationEvent.objects.filter( + user=user, + session=session, + event_type=normalized_type, + origin_transport=transport, + origin_message_id=message_id, + ) + .order_by("-created_at") + .first() + ) + if dedup_row is not None: + row = dedup_row + else: + row = ConversationEvent.objects.create( user=user, session=session, + ts=safe_ts, event_type=normalized_type, + direction=normalized_direction, + actor_identifier=actor_identifier, origin_transport=transport, origin_message_id=message_id, + origin_chat_id=origin_chat_id, + payload=payload, + raw_payload=raw_payload, + trace_id=normalized_trace, ) - .order_by("-created_at") - .first() - ) - if dedup_row is not None: - return dedup_row - return ConversationEvent.objects.create( - user=user, - session=session, - ts=_safe_ts(ts), - event_type=normalized_type, - direction=normalized_direction, - actor_identifier=str(actor_identifier or "").strip(), - origin_transport=transport, - origin_message_id=message_id, - origin_chat_id=str(origin_chat_id or "").strip(), - payload=dict(payload or {}), - raw_payload=dict(raw_payload or {}), - trace_id=normalized_trace, - ) + try: + get_event_ledger_backend().upsert_event( + user_id=int(user.id), + person_id=str(session.identifier.person_id), + session_id=str(session.id), + event_type=normalized_type, + direction=normalized_direction, + ts=safe_ts, + actor_identifier=actor_identifier, + origin_transport=transport, + origin_message_id=message_id, + origin_chat_id=origin_chat_id, + payload=payload, + raw_payload=raw_payload, + trace_id=normalized_trace, + ) + except Exception as exc: + if primary_write: + raise + log.warning( + "Event ledger manticore dual-write failed session=%s event_type=%s err=%s", + getattr(session, "id", "-"), + normalized_type, + exc, + ) + + return row async def append_event(**kwargs): diff --git a/core/events/manticore.py b/core/events/manticore.py new file mode 100644 index 0000000..aa4a122 --- /dev/null +++ b/core/events/manticore.py @@ -0,0 +1,588 @@ +from __future__ import annotations + +import hashlib +import json +import time +from urllib.parse import urlparse, urlunparse +from typing import Any + +import requests +from django.conf import settings + +from core.models import ConversationEvent +from core.util import logs +from core.events.behavior import parse_payload + +log = logs.get_logger("event-manticore") + + +class ManticoreEventLedgerBackend: + _table_ready_cache: dict[str, float] = {} + _table_ready_ttl_seconds = 30.0 + + def __init__(self): + self.base_url = str( + getattr(settings, "MANTICORE_HTTP_URL", "http://localhost:9308") + ).rstrip("/") + self.table = ( + str(getattr(settings, "MANTICORE_EVENT_TABLE", "gia_events")).strip() + or "gia_events" + ) + self.metrics_table = ( + str(getattr(settings, "MANTICORE_METRIC_TABLE", "gia_metrics")).strip() + or "gia_metrics" + ) + self.timeout_seconds = int(getattr(settings, "MANTICORE_HTTP_TIMEOUT", 5) or 5) + self._table_cache_key = f"{self.base_url}|{self.table}" + self._metrics_cache_key = f"{self.base_url}|{self.metrics_table}" + + def _candidate_base_urls(self) -> list[str]: + parsed = urlparse(self.base_url) + hostname = str(parsed.hostname or "").strip().lower() + candidates = [self.base_url] + if hostname in {"localhost", "127.0.0.1"}: + replacement = parsed._replace(netloc=f"host.containers.internal:{parsed.port or 9308}") + candidates.append(urlunparse(replacement)) + output = [] + seen = set() + for value in candidates: + key = str(value or "").strip() + if not key or key in seen: + continue + seen.add(key) + output.append(key) + return output + + def _sql(self, query: str) -> dict[str, Any]: + last_exc = None + for base_url in self._candidate_base_urls(): + try: + response = requests.post( + f"{base_url}/sql", + data={"mode": "raw", "query": query}, + timeout=self.timeout_seconds, + ) + response.raise_for_status() + payload = response.json() + if base_url != self.base_url: + self.base_url = base_url.rstrip("/") + self._table_cache_key = f"{self.base_url}|{self.table}" + self._metrics_cache_key = f"{self.base_url}|{self.metrics_table}" + if isinstance(payload, list): + return payload[0] if payload else {} + return dict(payload or {}) + except Exception as exc: + last_exc = exc + if last_exc is not None: + raise last_exc + return {} + + def ensure_table(self) -> None: + last_ready = float( + self._table_ready_cache.get(self._table_cache_key, 0.0) or 0.0 + ) + if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds): + return + self._sql( + ( + f"CREATE TABLE IF NOT EXISTS {self.table} (" + "id BIGINT," + "user_id BIGINT," + "person_id STRING," + "session_id STRING," + "transport STRING," + "kind STRING," + "direction STRING," + "ts BIGINT," + "ts_ref BIGINT," + "actor STRING," + "duration_ms BIGINT," + "abandoned INTEGER," + "revision INTEGER," + "payload JSON" + ") engine='columnar' min_infix_len='2'" + ) + ) + self._table_ready_cache[self._table_cache_key] = time.time() + + def ensure_metrics_table(self) -> None: + last_ready = float( + self._table_ready_cache.get(self._metrics_cache_key, 0.0) or 0.0 + ) + if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds): + return + self._sql( + ( + f"CREATE TABLE IF NOT EXISTS {self.metrics_table} (" + "id BIGINT," + "user_id BIGINT," + "person_id STRING," + "window_days INTEGER," + "metric STRING," + "value_ms BIGINT," + "baseline_ms BIGINT," + "z_score FLOAT," + "sample_n INTEGER," + "computed_at BIGINT" + ") engine='columnar'" + ) + ) + self._table_ready_cache[self._metrics_cache_key] = time.time() + + def _escape(self, value: Any) -> str: + text = str(value or "") + return text.replace("\\", "\\\\").replace("'", "\\'") + + def _event_id(self, *, logical_key: str) -> int: + digest = hashlib.blake2b( + str(logical_key or "").encode("utf-8"), + digest_size=8, + ).digest() + value = int.from_bytes(digest, byteorder="big", signed=False) + return max(1, int(value)) + + def _event_kind(self, event_type: str) -> str: + normalized = str(event_type or "").strip().lower() + return { + "message_created": "message_sent", + "delivery_receipt": "message_delivered", + "read_receipt": "message_read", + "typing_started": "composing_started", + "typing_stopped": "composing_stopped", + "composing_abandoned": "composing_abandoned", + "presence_available": "presence_available", + "presence_unavailable": "presence_unavailable", + }.get(normalized, normalized) + + def _rows_from_sql_payload(self, payload: dict[str, Any]) -> list[dict]: + data = payload.get("data") or payload.get("hits") or [] + if isinstance(data, dict): + data = [data] + rows = [] + for row in list(data or []): + if isinstance(row, dict): + rows.append(dict(row)) + return rows + + def _build_values( + self, + *, + user_id: int, + person_id: str, + session_id: str, + event_type: str, + direction: str, + ts: int, + actor_identifier: str, + origin_transport: str, + origin_message_id: str, + origin_chat_id: str, + payload: dict | None, + raw_payload: dict | None, + trace_id: str, + ) -> str: + data = dict(payload or {}) + if raw_payload: + data["raw_payload"] = dict(raw_payload) + if trace_id: + data["trace_id"] = str(trace_id) + if origin_message_id: + data["origin_message_id"] = str(origin_message_id) + if origin_chat_id: + data["origin_chat_id"] = str(origin_chat_id) + data["legacy_event_type"] = str(event_type or "").strip().lower() + + ts_ref = 0 + try: + ts_ref = int(data.get("message_ts") or data.get("source_ts") or 0) + except Exception: + ts_ref = 0 + try: + duration_ms = int(data.get("duration_ms") or 0) + except Exception: + duration_ms = 0 + try: + abandoned = 1 if bool(data.get("abandoned")) else 0 + except Exception: + abandoned = 0 + try: + revision = int(data.get("revision") or 0) + except Exception: + revision = 0 + + logical_key = "|".join( + [ + str(user_id), + str(session_id), + str(event_type or "").strip().lower(), + str(direction or "").strip().lower(), + str(origin_transport or "").strip().lower(), + str(origin_message_id or "").strip(), + str(origin_chat_id or "").strip(), + str(actor_identifier or "").strip(), + str(int(ts or 0)), + str(trace_id or "").strip(), + ] + ) + doc_id = self._event_id(logical_key=logical_key) + payload_json = json.dumps(data, separators=(",", ":"), sort_keys=True) + return ( + f"({doc_id},{int(user_id)},'{self._escape(person_id)}'," + f"'{self._escape(session_id)}','{self._escape(origin_transport)}'," + f"'{self._escape(self._event_kind(event_type))}','{self._escape(direction)}'," + f"{int(ts)},{ts_ref},'{self._escape(actor_identifier)}',{duration_ms}," + f"{abandoned},{revision},'{self._escape(payload_json)}')" + ) + + def upsert_event( + self, + *, + user_id: int, + person_id: str, + session_id: str, + event_type: str, + direction: str, + ts: int, + actor_identifier: str = "", + origin_transport: str = "", + origin_message_id: str = "", + origin_chat_id: str = "", + payload: dict | None = None, + raw_payload: dict | None = None, + trace_id: str = "", + ) -> None: + self.ensure_table() + values = self._build_values( + user_id=user_id, + person_id=person_id, + session_id=session_id, + event_type=event_type, + direction=direction, + ts=ts, + actor_identifier=actor_identifier, + origin_transport=origin_transport, + origin_message_id=origin_message_id, + origin_chat_id=origin_chat_id, + payload=payload, + raw_payload=raw_payload, + trace_id=trace_id, + ) + self._sql( + f"REPLACE INTO {self.table} " + "(id,user_id,person_id,session_id,transport,kind,direction,ts,ts_ref,actor,duration_ms,abandoned,revision,payload) " + f"VALUES {values}" + ) + + def query_rows(self, query: str) -> list[dict]: + return self._rows_from_sql_payload(self._sql(query)) + + def list_event_targets(self, *, user_id: int | None = None) -> list[dict]: + filters = [] + if user_id is not None: + filters.append(f"user_id={int(user_id)}") + where_clause = f" WHERE {' AND '.join(filters)}" if filters else "" + return self.query_rows( + f"SELECT user_id, person_id FROM {self.table}{where_clause} " + "GROUP BY user_id, person_id" + ) + + def fetch_events( + self, + *, + user_id: int, + person_id: str, + since_ts: int, + ) -> list[dict]: + return self.query_rows( + f"SELECT user_id, person_id, session_id, transport, kind, direction, ts, ts_ref, actor, duration_ms, abandoned, revision, payload " + f"FROM {self.table} " + f"WHERE user_id={int(user_id)} " + f"AND person_id='{self._escape(person_id)}' " + f"AND ts>={int(since_ts)} " + "ORDER BY ts ASC" + ) + + def _metric_doc_id( + self, + *, + user_id: int, + person_id: str, + window_days: int, + metric: str, + ) -> int: + digest = hashlib.blake2b( + f"{int(user_id)}|{person_id}|{int(window_days)}|{metric}".encode("utf-8"), + digest_size=8, + ).digest() + return max(1, int.from_bytes(digest, byteorder="big", signed=False)) + + def upsert_metric( + self, + *, + user_id: int, + person_id: str, + window_days: int, + metric: str, + value_ms: int, + baseline_ms: int, + z_score: float, + sample_n: int, + computed_at: int, + ) -> None: + self.ensure_metrics_table() + doc_id = self._metric_doc_id( + user_id=user_id, + person_id=person_id, + window_days=window_days, + metric=metric, + ) + self._sql( + f"REPLACE INTO {self.metrics_table} " + "(id,user_id,person_id,window_days,metric,value_ms,baseline_ms,z_score,sample_n,computed_at) " + f"VALUES ({doc_id},{int(user_id)},'{self._escape(person_id)}',{int(window_days)}," + f"'{self._escape(metric)}',{int(value_ms)},{int(baseline_ms)}," + f"{float(z_score)},{int(sample_n)},{int(computed_at)})" + ) + + +def get_event_ledger_backend() -> ManticoreEventLedgerBackend: + return ManticoreEventLedgerBackend() + + +def upsert_conversation_event(event: ConversationEvent) -> None: + session = event.session + identifier = session.identifier + get_event_ledger_backend().upsert_event( + user_id=int(event.user_id), + person_id=str(identifier.person_id), + session_id=str(session.id), + event_type=str(event.event_type or ""), + direction=str(event.direction or "system"), + ts=int(event.ts or 0), + actor_identifier=str(event.actor_identifier or ""), + origin_transport=str(event.origin_transport or ""), + origin_message_id=str(event.origin_message_id or ""), + origin_chat_id=str(event.origin_chat_id or ""), + payload=dict(event.payload or {}), + raw_payload=dict(event.raw_payload or {}), + trace_id=str(event.trace_id or ""), + ) + + +def get_behavioral_availability_stats(*, user_id: int) -> list[dict]: + backend = get_event_ledger_backend() + return backend.query_rows( + f"SELECT person_id, transport, " + "COUNT(*) AS total_events, " + "SUM(IF(kind IN ('presence_available','presence_unavailable'),1,0)) AS presence_events, " + "SUM(IF(kind='message_read',1,0)) AS read_events, " + "SUM(IF(kind IN ('composing_started','composing_stopped'),1,0)) AS typing_events, " + "SUM(IF(kind='message_sent',1,0)) AS message_events, " + "SUM(IF(kind='composing_abandoned',1,0)) AS abandoned_events, " + "MAX(ts) AS last_event_ts " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)} " + "GROUP BY person_id, transport " + "ORDER BY total_events DESC, person_id ASC, transport ASC" + ) + + +def get_behavioral_latest_states( + *, + user_id: int, + person_ids: list[str], + transport: str = "", +) -> list[dict]: + backend = get_event_ledger_backend() + cleaned_ids = [ + str(value or "").strip() + for value in list(person_ids or []) + if str(value or "").strip() + ] + if not cleaned_ids: + return [] + id_clause = ",".join(f"'{backend._escape(value)}'" for value in cleaned_ids) + transport_clause = "" + if str(transport or "").strip(): + transport_clause = ( + f" AND transport='{backend._escape(str(transport or '').strip().lower())}'" + ) + return backend.query_rows( + f"SELECT person_id, transport, kind, ts " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)} " + f"AND person_id IN ({id_clause})" + f"{transport_clause} " + "ORDER BY person_id ASC, ts DESC" + ) + + +def get_behavioral_events_for_range( + *, + user_id: int, + person_id: str, + start_ts: int, + end_ts: int, + transport: str = "", +) -> list[dict]: + backend = get_event_ledger_backend() + transport_clause = "" + if str(transport or "").strip(): + transport_clause = ( + f" AND transport='{backend._escape(str(transport or '').strip().lower())}'" + ) + return backend.query_rows( + f"SELECT person_id, session_id, transport, kind, direction, ts, payload " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)} " + f"AND person_id='{backend._escape(str(person_id or '').strip())}' " + f"AND ts>={int(start_ts)} AND ts<={int(end_ts)}" + f"{transport_clause} " + "ORDER BY ts ASC" + ) + + +def get_recent_event_rows( + *, + minutes: int = 120, + service: str = "", + user_id: str = "", + limit: int = 200, +) -> list[dict]: + backend = get_event_ledger_backend() + cutoff_ts = int(time.time() * 1000) - (max(1, int(minutes)) * 60 * 1000) + where = [f"ts>={cutoff_ts}"] + if service: + where.append(f"transport='{backend._escape(str(service).strip().lower())}'") + if user_id: + where.append(f"user_id={int(user_id)}") + rows = backend.query_rows( + f"SELECT user_id, session_id, ts, kind, direction, transport, payload " + f"FROM {backend.table} " + f"WHERE {' AND '.join(where)} " + f"ORDER BY ts DESC " + f"LIMIT {max(1, min(int(limit), 500))}" + ) + output = [] + for row in list(rows or []): + payload = parse_payload(row.get("payload")) + legacy_event_type = str(payload.get("legacy_event_type") or "").strip().lower() + output.append( + { + "id": "", + "user_id": int(row.get("user_id") or 0), + "session_id": str(row.get("session_id") or ""), + "ts": int(row.get("ts") or 0), + "event_type": legacy_event_type or str(row.get("kind") or ""), + "kind": str(row.get("kind") or ""), + "direction": str(row.get("direction") or ""), + "origin_transport": str(row.get("transport") or ""), + "trace_id": str(payload.get("trace_id") or ""), + } + ) + return output + + +def count_behavioral_events(*, user_id: int) -> int: + backend = get_event_ledger_backend() + rows = backend.query_rows( + f"SELECT COUNT(*) AS total_events " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)}" + ) + if not rows: + return 0 + try: + return int((rows[0] or {}).get("total_events") or 0) + except Exception: + return 0 + + +def get_trace_ids(*, user_id: int, limit: int = 120) -> list[str]: + backend = get_event_ledger_backend() + rows = backend.query_rows( + f"SELECT payload " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)} " + "ORDER BY ts DESC " + f"LIMIT {max(1, min(int(limit) * 6, 1000))}" + ) + seen = set() + output = [] + for row in list(rows or []): + payload = parse_payload(row.get("payload")) + trace_id = str(payload.get("trace_id") or "").strip() + if not trace_id or trace_id in seen: + continue + seen.add(trace_id) + output.append(trace_id) + if len(output) >= max(1, min(int(limit), 500)): + break + return output + + +def get_trace_event_rows(*, user_id: int, trace_id: str, limit: int = 500) -> list[dict]: + backend = get_event_ledger_backend() + rows = backend.query_rows( + f"SELECT user_id, session_id, ts, kind, direction, transport, payload " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)} " + "ORDER BY ts ASC " + f"LIMIT {max(1, min(int(limit) * 8, 5000))}" + ) + output = [] + target = str(trace_id or "").strip() + for row in list(rows or []): + payload = parse_payload(row.get("payload")) + if str(payload.get("trace_id") or "").strip() != target: + continue + output.append( + { + "id": "", + "ts": int(row.get("ts") or 0), + "event_type": str( + payload.get("legacy_event_type") or row.get("kind") or "" + ).strip(), + "kind": str(row.get("kind") or "").strip(), + "direction": str(row.get("direction") or "").strip(), + "session_id": str(row.get("session_id") or "").strip(), + "origin_transport": str(row.get("transport") or "").strip(), + "origin_message_id": str(payload.get("origin_message_id") or "").strip(), + "payload": payload, + "trace_id": target, + } + ) + if len(output) >= max(1, min(int(limit), 500)): + break + return output + + +def get_session_event_rows(*, user_id: int, session_id: str, limit: int = 2000) -> list[dict]: + backend = get_event_ledger_backend() + rows = backend.query_rows( + f"SELECT user_id, session_id, ts, kind, direction, transport, actor, payload " + f"FROM {backend.table} " + f"WHERE user_id={int(user_id)} " + f"AND session_id='{backend._escape(str(session_id or '').strip())}' " + "ORDER BY ts ASC " + f"LIMIT {max(1, min(int(limit), 5000))}" + ) + output = [] + for row in list(rows or []): + payload = parse_payload(row.get("payload")) + output.append( + { + "ts": int(row.get("ts") or 0), + "event_type": str( + payload.get("legacy_event_type") or row.get("kind") or "" + ).strip(), + "kind": str(row.get("kind") or "").strip(), + "direction": str(row.get("direction") or "").strip(), + "session_id": str(row.get("session_id") or "").strip(), + "origin_transport": str(row.get("transport") or "").strip(), + "actor_identifier": str(row.get("actor") or "").strip(), + "origin_message_id": str(payload.get("origin_message_id") or "").strip(), + "payload": payload, + } + ) + return output diff --git a/core/events/projection.py b/core/events/projection.py index 8426790..516308e 100644 --- a/core/events/projection.py +++ b/core/events/projection.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass +from core.events.manticore import get_session_event_rows from core.models import ChatSession, ConversationEvent, Message @@ -59,27 +60,56 @@ def _normalize_reactions(rows: list[dict] | None) -> list[dict]: ) -def project_session_from_events(session: ChatSession) -> list[dict]: - rows = list( - ConversationEvent.objects.filter( - user=session.user, - session=session, - ).order_by("ts", "created_at") +def _event_rows_for_session(session: ChatSession): + try: + rows = get_session_event_rows( + user_id=int(session.user_id), + session_id=str(session.id), + limit=2000, + ) + except Exception: + rows = [] + if rows: + return rows, "manticore" + return ( + list( + ConversationEvent.objects.filter( + user=session.user, + session=session, + ).order_by("ts", "created_at") + ), + "django", ) + +def project_session_from_events(session: ChatSession) -> list[dict]: + rows, _source = _event_rows_for_session(session) + projected: dict[str, _ProjectedMessage] = {} order: list[str] = [] for event in rows: - payload = dict(event.payload or {}) - event_type = str(event.event_type or "").strip().lower() + is_dict = isinstance(event, dict) + payload = dict( + (event.get("payload") if is_dict else getattr(event, "payload", {})) or {} + ) + event_type = str( + (event.get("event_type") if is_dict else getattr(event, "event_type", "")) + or "" + ).strip().lower() message_id = str( payload.get("message_id") or payload.get("target_message_id") or "" ).strip() if event_type == "message_created": message_id = str( - payload.get("message_id") or event.origin_message_id or "" + payload.get("message_id") + or ( + event.get("origin_message_id") + if is_dict + else getattr(event, "origin_message_id", "") + ) + or "" ).strip() if not message_id: continue @@ -88,10 +118,14 @@ def project_session_from_events(session: ChatSession) -> list[dict]: state = _ProjectedMessage(message_id=message_id) projected[message_id] = state order.append(message_id) - state.ts = _safe_int(payload.get("message_ts"), _safe_int(event.ts)) + state.ts = _safe_int( + payload.get("message_ts"), + _safe_int(event.get("ts") if is_dict else getattr(event, "ts", 0)), + ) state.text = str(payload.get("text") or state.text or "") delivered_default = _safe_int( - payload.get("delivered_ts"), _safe_int(event.ts) + payload.get("delivered_ts"), + _safe_int(event.get("ts") if is_dict else getattr(event, "ts", 0)), ) if state.delivered_ts is None: state.delivered_ts = delivered_default or None @@ -102,7 +136,10 @@ def project_session_from_events(session: ChatSession) -> list[dict]: state = projected[message_id] if event_type == "read_receipt": - read_ts = _safe_int(payload.get("read_ts"), _safe_int(event.ts)) + read_ts = _safe_int( + payload.get("read_ts"), + _safe_int(event.get("ts") if is_dict else getattr(event, "ts", 0)), + ) if read_ts > 0: if state.read_ts is None: state.read_ts = read_ts @@ -114,11 +151,27 @@ def project_session_from_events(session: ChatSession) -> list[dict]: if event_type in {"reaction_added", "reaction_removed"}: source_service = ( - str(payload.get("source_service") or event.origin_transport or "") + str( + payload.get("source_service") + or ( + event.get("origin_transport") + if is_dict + else getattr(event, "origin_transport", "") + ) + or "" + ) .strip() .lower() ) - actor = str(payload.get("actor") or event.actor_identifier or "").strip() + actor = str( + payload.get("actor") + or ( + event.get("actor_identifier") + if is_dict + else getattr(event, "actor_identifier", "") + ) + or "" + ).strip() emoji = str(payload.get("emoji") or "").strip() if not source_service and not actor and not emoji: continue diff --git a/core/events/shadow.py b/core/events/shadow.py new file mode 100644 index 0000000..cd94931 --- /dev/null +++ b/core/events/shadow.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from django.db.models import Count, Max, Q + +from core.models import ConversationEvent, Person, User + + +def _kind_from_event_type(event_type: str) -> str: + normalized = str(event_type or "").strip().lower() + return { + "message_created": "message_sent", + "delivery_receipt": "message_delivered", + "read_receipt": "message_read", + "typing_started": "composing_started", + "typing_stopped": "composing_stopped", + "composing_abandoned": "composing_abandoned", + "presence_available": "presence_available", + "presence_unavailable": "presence_unavailable", + }.get(normalized, normalized) + + +def get_shadow_behavioral_availability_stats(*, user: User) -> list[dict]: + person_map = { + str(row["id"]): str(row["name"] or "") + for row in Person.objects.filter(user=user).values("id", "name") + } + rows = ( + ConversationEvent.objects.filter( + user=user, + session__identifier__person__isnull=False, + ) + .values("session__identifier__person_id", "origin_transport") + .annotate( + total_events=Count("id"), + presence_events=Count( + "id", + filter=Q(event_type__in=["presence_available", "presence_unavailable"]), + ), + read_events=Count("id", filter=Q(event_type="read_receipt")), + typing_events=Count( + "id", + filter=Q( + event_type__in=["typing_started", "typing_stopped"] + ), + ), + message_events=Count("id", filter=Q(event_type="message_created")), + abandoned_events=Count("id", filter=Q(event_type="composing_abandoned")), + last_event_ts=Max("ts"), + ) + .order_by("-total_events", "session__identifier__person_id", "origin_transport") + ) + output = [] + for row in rows: + person_id = str(row.get("session__identifier__person_id") or "").strip() + output.append( + { + "person_id": person_id, + "person_name": person_map.get(person_id, person_id or "-"), + "service": str(row.get("origin_transport") or "").strip().lower(), + "total_events": int(row.get("total_events") or 0), + "presence_events": int(row.get("presence_events") or 0), + "read_events": int(row.get("read_events") or 0), + "typing_events": int(row.get("typing_events") or 0), + "message_events": int(row.get("message_events") or 0), + "abandoned_events": int(row.get("abandoned_events") or 0), + "last_event_ts": int(row.get("last_event_ts") or 0), + } + ) + return output + + +def get_shadow_behavioral_latest_states( + *, user: User, person_ids: list[str], transport: str = "" +) -> list[dict]: + queryset = ConversationEvent.objects.filter( + user=user, + session__identifier__person_id__in=[str(value) for value in person_ids], + event_type__in=[ + "message_created", + "delivery_receipt", + "read_receipt", + "typing_started", + "typing_stopped", + "composing_abandoned", + "presence_available", + "presence_unavailable", + ], + ).select_related("session__identifier") + if transport: + queryset = queryset.filter(origin_transport=str(transport).strip().lower()) + rows = [] + seen = set() + for row in queryset.order_by( + "session__identifier__person_id", "-ts", "-created_at" + )[:500]: + person_id = str(getattr(row.session.identifier, "person_id", "") or "").strip() + if not person_id or person_id in seen: + continue + seen.add(person_id) + rows.append( + { + "person_id": person_id, + "transport": str(row.origin_transport or "").strip().lower(), + "kind": _kind_from_event_type(row.event_type), + "ts": int(row.ts or 0), + } + ) + return rows + + +def get_shadow_behavioral_events_for_range( + *, + user: User, + person_id: str, + start_ts: int, + end_ts: int, + transport: str = "", +) -> list[dict]: + queryset = ConversationEvent.objects.filter( + user=user, + session__identifier__person_id=str(person_id or "").strip(), + ts__gte=int(start_ts), + ts__lte=int(end_ts), + event_type__in=[ + "message_created", + "delivery_receipt", + "read_receipt", + "typing_started", + "typing_stopped", + "composing_abandoned", + "presence_available", + "presence_unavailable", + ], + ).order_by("ts", "created_at") + if transport: + queryset = queryset.filter(origin_transport=str(transport).strip().lower()) + return [ + { + "person_id": str(person_id or "").strip(), + "session_id": str(row.session_id or ""), + "transport": str(row.origin_transport or "").strip().lower(), + "kind": _kind_from_event_type(row.event_type), + "direction": str(row.direction or "").strip().lower(), + "ts": int(row.ts or 0), + "payload": dict(row.payload or {}), + } + for row in queryset[:1000] + ] diff --git a/core/management/commands/backfill_contact_availability.py b/core/management/commands/backfill_contact_availability.py index 29b72e2..c965df3 100644 --- a/core/management/commands/backfill_contact_availability.py +++ b/core/management/commands/backfill_contact_availability.py @@ -4,13 +4,16 @@ from typing import Iterable from django.core.management.base import BaseCommand +from core.events.ledger import append_event_sync from core.models import Message -from core.presence import AvailabilitySignal, record_inferred_signal from core.presence.inference import now_ms class Command(BaseCommand): - help = "Backfill inferred contact availability events from historical message/read-receipt activity." + help = ( + "Backfill behavioral event ledger rows from historical message and " + "read-receipt activity." + ) def add_arguments(self, parser): parser.add_argument("--days", type=int, default=30) @@ -39,17 +42,18 @@ class Command(BaseCommand): user_filter = str(options.get("user_id") or "").strip() dry_run = bool(options.get("dry_run")) - created = 0 + indexed = 0 scanned = 0 for msg in self._iter_messages( days=days, limit=limit, service=service_filter, user_id=user_filter ): scanned += 1 - identifier = getattr(getattr(msg, "session", None), "identifier", None) + session = getattr(msg, "session", None) + identifier = getattr(session, "identifier", None) person = getattr(identifier, "person", None) user = getattr(msg, "user", None) - if not identifier or not person or not user: + if not session or not identifier or not person or not user: continue service = ( @@ -60,76 +64,65 @@ class Command(BaseCommand): if not service: continue - base_ts = int(getattr(msg, "ts", 0) or 0) - message_author = ( - str(getattr(msg, "custom_author", "") or "").strip().upper() - ) - outgoing = message_author in {"USER", "BOT"} + author = str(getattr(msg, "custom_author", "") or "").strip().upper() + outgoing = author in {"USER", "BOT"} + message_id = str( + getattr(msg, "source_message_id", "") or f"django-message-{msg.id}" + ).strip() - candidates = [] - if base_ts > 0: - candidates.append( - { - "source_kind": "message_out" if outgoing else "message_in", - "availability_state": "available", - "confidence": 0.65 if outgoing else 0.75, - "ts": base_ts, - "payload": { - "origin": "backfill_contact_availability", - "message_id": str(msg.id), - "inferred_from": "message_activity", - }, - } + if not dry_run: + append_event_sync( + user=user, + session=session, + ts=int(getattr(msg, "ts", 0) or 0), + event_type="message_created", + direction="out" if outgoing else "in", + actor_identifier=str( + getattr(msg, "sender_uuid", "") or identifier.identifier or "" + ), + origin_transport=service, + origin_message_id=message_id, + origin_chat_id=str(getattr(msg, "source_chat_id", "") or ""), + payload={ + "origin": "backfill_contact_availability", + "message_id": str(msg.id), + "text": str(getattr(msg, "text", "") or ""), + "outgoing": outgoing, + }, ) + indexed += 1 read_ts = int(getattr(msg, "read_ts", 0) or 0) - if read_ts > 0: - candidates.append( - { - "source_kind": "read_receipt", - "availability_state": "available", - "confidence": 0.95, - "ts": read_ts, - "payload": { - "origin": "backfill_contact_availability", - "message_id": str(msg.id), - "inferred_from": "read_receipt", - "read_by": str( - getattr(msg, "read_by_identifier", "") or "" - ), - }, - } - ) - - for row in candidates: - exists = user.contact_availability_events.filter( - person=person, - person_identifier=identifier, - service=service, - source_kind=row["source_kind"], - ts=int(row["ts"]), - ).exists() - if exists: - continue - created += 1 - if dry_run: - continue - record_inferred_signal( - AvailabilitySignal( - user=user, - person=person, - person_identifier=identifier, - service=service, - source_kind=row["source_kind"], - availability_state=row["availability_state"], - confidence=float(row["confidence"]), - ts=int(row["ts"]), - payload=dict(row["payload"]), - ) + if read_ts <= 0: + continue + if not dry_run: + append_event_sync( + user=user, + session=session, + ts=read_ts, + event_type="read_receipt", + direction="system", + actor_identifier=str( + getattr(msg, "read_by_identifier", "") or identifier.identifier + ), + origin_transport=service, + origin_message_id=message_id, + origin_chat_id=str(getattr(msg, "source_chat_id", "") or ""), + payload={ + "origin": "backfill_contact_availability", + "message_id": str(msg.id), + "message_ts": int(getattr(msg, "ts", 0) or 0), + "read_by": str( + getattr(msg, "read_by_identifier", "") or "" + ).strip(), + }, ) + indexed += 1 self.stdout.write( self.style.SUCCESS( - f"backfill_contact_availability complete scanned={scanned} created={created} dry_run={dry_run} days={days} limit={limit}" + "backfill_contact_availability complete " + f"scanned={scanned} indexed={indexed} dry_run={dry_run} " + f"days={days} limit={limit}" ) ) diff --git a/core/management/commands/event_ledger_smoke.py b/core/management/commands/event_ledger_smoke.py index 4ea1d48..8b892f7 100644 --- a/core/management/commands/event_ledger_smoke.py +++ b/core/management/commands/event_ledger_smoke.py @@ -5,12 +5,46 @@ import time from django.core.management.base import BaseCommand, CommandError +from core.events.manticore import get_recent_event_rows from core.models import ConversationEvent class Command(BaseCommand): help = "Quick non-mutating sanity check for recent canonical event writes." + def _recent_rows(self, *, minutes: int, service: str, user_id: str, limit: int): + cutoff_ts = int(time.time() * 1000) - (minutes * 60 * 1000) + queryset = ConversationEvent.objects.filter(ts__gte=cutoff_ts).order_by("-ts") + if service: + queryset = queryset.filter(origin_transport=service) + if user_id: + queryset = queryset.filter(user_id=user_id) + + rows = list( + queryset.values( + "id", + "user_id", + "session_id", + "ts", + "event_type", + "direction", + "origin_transport", + "trace_id", + )[:limit] + ) + if rows: + return rows, "django" + try: + manticore_rows = get_recent_event_rows( + minutes=minutes, + service=service, + user_id=user_id, + limit=limit, + ) + except Exception: + manticore_rows = [] + return manticore_rows, "manticore" if manticore_rows else "django" + def add_arguments(self, parser): parser.add_argument("--minutes", type=int, default=120) parser.add_argument("--service", default="") @@ -34,24 +68,11 @@ class Command(BaseCommand): if item.strip() ] - cutoff_ts = int(time.time() * 1000) - (minutes * 60 * 1000) - queryset = ConversationEvent.objects.filter(ts__gte=cutoff_ts).order_by("-ts") - if service: - queryset = queryset.filter(origin_transport=service) - if user_id: - queryset = queryset.filter(user_id=user_id) - - rows = list( - queryset.values( - "id", - "user_id", - "session_id", - "ts", - "event_type", - "direction", - "origin_transport", - "trace_id", - )[:limit] + rows, data_source = self._recent_rows( + minutes=minutes, + service=service, + user_id=user_id, + limit=limit, ) event_type_counts = {} for row in rows: @@ -67,6 +88,7 @@ class Command(BaseCommand): "minutes": minutes, "service": service, "user_id": user_id, + "data_source": data_source, "count": len(rows), "event_type_counts": event_type_counts, "required_types": required_types, @@ -79,7 +101,7 @@ class Command(BaseCommand): return self.stdout.write( - f"event-ledger-smoke minutes={minutes} service={service or '-'} user={user_id or '-'} count={len(rows)}" + f"event-ledger-smoke minutes={minutes} service={service or '-'} user={user_id or '-'} source={data_source} count={len(rows)}" ) self.stdout.write(f"event_type_counts={event_type_counts}") if required_types: @@ -88,7 +110,7 @@ class Command(BaseCommand): ) if fail_if_empty and len(rows) == 0: - raise CommandError("No recent ConversationEvent rows found.") + raise CommandError("No recent canonical event rows found.") if missing_required_types: raise CommandError( "Missing required event types: " + ", ".join(missing_required_types) diff --git a/core/management/commands/gia_analysis.py b/core/management/commands/gia_analysis.py new file mode 100644 index 0000000..4b4f971 --- /dev/null +++ b/core/management/commands/gia_analysis.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import time + +from django.core.management.base import BaseCommand + +from core.events.behavior import summarize_metrics +from core.events.manticore import get_event_ledger_backend +from core.util import logs + +log = logs.get_logger("gia_analysis") + + +class Command(BaseCommand): + help = "Compute behavioral metrics from Manticore event rows into gia_metrics." + + def add_arguments(self, parser): + parser.add_argument("--once", action="store_true", default=False) + parser.add_argument("--user-id", type=int) + parser.add_argument("--person-id") + parser.add_argument("--sleep-seconds", type=float, default=60.0) + parser.add_argument("--window-days", nargs="*", type=int, default=[1, 7, 30, 90]) + + def _run_cycle( + self, + *, + user_id: int | None = None, + person_id: str = "", + window_days: list[int] | None = None, + ) -> int: + backend = get_event_ledger_backend() + now_ms = int(time.time() * 1000) + baseline_since = now_ms - (90 * 86400000) + windows = sorted({max(1, int(value)) for value in list(window_days or [1, 7, 30, 90])}) + + targets = backend.list_event_targets(user_id=user_id) + if person_id: + targets = [ + row + for row in targets + if str(row.get("person_id") or "").strip() == str(person_id).strip() + ] + + written = 0 + for target in targets: + target_user_id = int(target.get("user_id") or 0) + target_person_id = str(target.get("person_id") or "").strip() + if target_user_id <= 0 or not target_person_id: + continue + baseline_rows = backend.fetch_events( + user_id=target_user_id, + person_id=target_person_id, + since_ts=baseline_since, + ) + if not baseline_rows: + continue + for window in windows: + since_ts = now_ms - (int(window) * 86400000) + window_rows = [ + row + for row in baseline_rows + if int(row.get("ts") or 0) >= since_ts + ] + metrics = summarize_metrics(window_rows, baseline_rows) + for metric, values in metrics.items(): + backend.upsert_metric( + user_id=target_user_id, + person_id=target_person_id, + window_days=int(window), + metric=metric, + value_ms=int(values.get("value_ms") or 0), + baseline_ms=int(values.get("baseline_ms") or 0), + z_score=float(values.get("z_score") or 0.0), + sample_n=int(values.get("sample_n") or 0), + computed_at=now_ms, + ) + written += 1 + return written + + def handle(self, *args, **options): + once = bool(options.get("once")) + sleep_seconds = max(1.0, float(options.get("sleep_seconds") or 60.0)) + user_id = options.get("user_id") + person_id = str(options.get("person_id") or "").strip() + window_days = list(options.get("window_days") or [1, 7, 30, 90]) + + while True: + written = self._run_cycle( + user_id=user_id, + person_id=person_id, + window_days=window_days, + ) + self.stdout.write(f"gia-analysis wrote={written}") + if once: + return + time.sleep(sleep_seconds) diff --git a/core/management/commands/manticore_backfill.py b/core/management/commands/manticore_backfill.py new file mode 100644 index 0000000..f37f81c --- /dev/null +++ b/core/management/commands/manticore_backfill.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from django.core.management.base import BaseCommand, CommandError + +from core.events.manticore import upsert_conversation_event +from core.models import ConversationEvent + + +class Command(BaseCommand): + help = "Backfill behavioral events into Manticore from ConversationEvent rows." + + def add_arguments(self, parser): + parser.add_argument( + "--from-conversation-events", + action="store_true", + help="Replay ConversationEvent rows into the Manticore event table.", + ) + parser.add_argument("--user-id", type=int, default=None) + parser.add_argument("--limit", type=int, default=5000) + + def handle(self, *args, **options): + if not bool(options.get("from_conversation_events")): + raise CommandError("Pass --from-conversation-events to run this backfill.") + + queryset = ( + ConversationEvent.objects.select_related("session__identifier") + .order_by("ts", "created_at") + ) + user_id = options.get("user_id") + if user_id is not None: + queryset = queryset.filter(user_id=int(user_id)) + + scanned = 0 + indexed = 0 + limit = max(1, int(options.get("limit") or 5000)) + for event in queryset[:limit]: + scanned += 1 + upsert_conversation_event(event) + indexed += 1 + + self.stdout.write( + self.style.SUCCESS( + "manticore-backfill scanned=%s indexed=%s user=%s" + % (scanned, indexed, user_id if user_id is not None else "-") + ) + ) diff --git a/core/management/commands/prune_behavioral_orm_data.py b/core/management/commands/prune_behavioral_orm_data.py new file mode 100644 index 0000000..4f61354 --- /dev/null +++ b/core/management/commands/prune_behavioral_orm_data.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import time + +from django.conf import settings +from django.core.management.base import BaseCommand + +from core.models import ConversationEvent + + +class Command(BaseCommand): + help = ( + "Prune high-growth behavioral ORM shadow tables after data has been " + "persisted to Manticore." + ) + + def add_arguments(self, parser): + parser.add_argument("--user-id", default="") + parser.add_argument("--dry-run", action="store_true", default=False) + parser.add_argument("--conversation-days", type=int) + parser.add_argument( + "--tables", + default="conversation_events", + help="Comma separated subset of: conversation_events", + ) + + def _cutoff_ms(self, days: int) -> int: + return int(time.time() * 1000) - (max(1, int(days)) * 24 * 60 * 60 * 1000) + + def handle(self, *args, **options): + user_id = str(options.get("user_id") or "").strip() + dry_run = bool(options.get("dry_run")) + conversation_days = int( + options.get("conversation_days") + or getattr(settings, "CONVERSATION_EVENT_RETENTION_DAYS", 90) + or 90 + ) + selected_tables = { + str(item or "").strip().lower() + for item in str(options.get("tables") or "").split(",") + if str(item or "").strip() + } + + deleted = { + "conversation_events": 0, + } + + if "conversation_events" in selected_tables: + qs = ConversationEvent.objects.filter( + ts__lt=self._cutoff_ms(conversation_days) + ) + if user_id: + qs = qs.filter(user_id=user_id) + deleted["conversation_events"] = int(qs.count() if dry_run else qs.delete()[0]) + + self.stdout.write( + "prune-behavioral-orm-data " + f"dry_run={dry_run} " + f"user_id={user_id or '-'} " + f"conversation_days={conversation_days} " + f"deleted={deleted}" + ) diff --git a/core/management/commands/recalculate_contact_availability.py b/core/management/commands/recalculate_contact_availability.py index 1c2e667..2e7b1de 100644 --- a/core/management/commands/recalculate_contact_availability.py +++ b/core/management/commands/recalculate_contact_availability.py @@ -2,22 +2,15 @@ from __future__ import annotations from django.core.management.base import BaseCommand -from core.models import ContactAvailabilityEvent, ContactAvailabilitySpan, Message -from core.presence import AvailabilitySignal, record_native_signal +from core.events.ledger import append_event_sync +from core.models import Message from core.presence.inference import now_ms -_SOURCE_ORDER = { - "message_in": 10, - "message_out": 20, - "read_receipt": 30, - "native_presence": 40, -} - class Command(BaseCommand): help = ( - "Recalculate contact availability events/spans from persisted message, " - "read-receipt, and reaction history (deterministic rebuild)." + "Replay behavioral event ledger rows from persisted message, receipt, " + "and reaction history." ) def add_arguments(self, parser): @@ -39,70 +32,93 @@ class Command(BaseCommand): qs = qs.filter(user_id=str(user_id).strip()) return qs.order_by("ts")[: max(1, int(limit))] - def _build_event_rows(self, messages): - rows = [] + def handle(self, *args, **options): + days = max(1, int(options.get("days") or 90)) + limit = max(1, int(options.get("limit") or 20000)) + service_filter = str(options.get("service") or "").strip().lower() + user_filter = str(options.get("user_id") or "").strip() + dry_run = bool(options.get("dry_run")) + + messages = list( + self._iter_messages( + days=days, + limit=limit, + service=service_filter, + user_id=user_filter, + ) + ) + indexed = 0 + for msg in messages: - identifier = getattr(getattr(msg, "session", None), "identifier", None) + session = getattr(msg, "session", None) + identifier = getattr(session, "identifier", None) person = getattr(identifier, "person", None) user = getattr(msg, "user", None) - if not identifier or not person or not user: + if not session or not identifier or not person or not user: continue service = ( - str( - getattr(msg, "source_service", "") - or getattr(identifier, "service", "") - ) + str(getattr(msg, "source_service", "") or identifier.service or "") .strip() .lower() ) if not service: continue - ts = int(getattr(msg, "ts", 0) or 0) - if ts > 0: - author = str(getattr(msg, "custom_author", "") or "").strip().upper() - outgoing = author in {"USER", "BOT"} - rows.append( - { - "user": user, - "person": person, - "person_identifier": identifier, - "service": service, - "source_kind": "message_out" if outgoing else "message_in", - "availability_state": "available", - "confidence": 0.65 if outgoing else 0.75, - "ts": ts, - "payload": { - "origin": "recalculate_contact_availability", - "message_id": str(msg.id), - "inferred_from": "message_activity", - }, - } + author = str(getattr(msg, "custom_author", "") or "").strip().upper() + outgoing = author in {"USER", "BOT"} + message_id = str( + getattr(msg, "source_message_id", "") or f"django-message-{msg.id}" + ).strip() + + if not dry_run: + append_event_sync( + user=user, + session=session, + ts=int(getattr(msg, "ts", 0) or 0), + event_type="message_created", + direction="out" if outgoing else "in", + actor_identifier=str( + getattr(msg, "sender_uuid", "") or identifier.identifier or "" + ), + origin_transport=service, + origin_message_id=message_id, + origin_chat_id=str(getattr(msg, "source_chat_id", "") or ""), + payload={ + "origin": "recalculate_contact_availability", + "message_id": str(msg.id), + "text": str(getattr(msg, "text", "") or ""), + "outgoing": outgoing, + }, ) + indexed += 1 read_ts = int(getattr(msg, "read_ts", 0) or 0) if read_ts > 0: - rows.append( - { - "user": user, - "person": person, - "person_identifier": identifier, - "service": service, - "source_kind": "read_receipt", - "availability_state": "available", - "confidence": 0.95, - "ts": read_ts, - "payload": { + if not dry_run: + append_event_sync( + user=user, + session=session, + ts=read_ts, + event_type="read_receipt", + direction="system", + actor_identifier=str( + getattr(msg, "read_by_identifier", "") + or identifier.identifier + ), + origin_transport=service, + origin_message_id=message_id, + origin_chat_id=str(getattr(msg, "source_chat_id", "") or ""), + payload={ "origin": "recalculate_contact_availability", "message_id": str(msg.id), - "inferred_from": "read_receipt", + "message_ts": int(getattr(msg, "ts", 0) or 0), "read_by": str( getattr(msg, "read_by_identifier", "") or "" - ), + ).strip(), }, - } - ) + ) + indexed += 1 reactions = list( (getattr(msg, "receipt_payload", {}) or {}).get("reactions") or [] @@ -114,138 +130,32 @@ class Command(BaseCommand): reaction_ts = int(item.get("updated_at") or 0) if reaction_ts <= 0: continue - rows.append( - { - "user": user, - "person": person, - "person_identifier": identifier, - "service": service, - "source_kind": "native_presence", - "availability_state": "available", - "confidence": 0.9, - "ts": reaction_ts, - "payload": { + if not dry_run: + append_event_sync( + user=user, + session=session, + ts=reaction_ts, + event_type="presence_available", + direction="system", + actor_identifier=str(item.get("actor") or ""), + origin_transport=service, + origin_message_id=message_id, + origin_chat_id=str(getattr(msg, "source_chat_id", "") or ""), + payload={ "origin": "recalculate_contact_availability", "message_id": str(msg.id), "inferred_from": "reaction", "emoji": str(item.get("emoji") or ""), - "actor": str(item.get("actor") or ""), - "source_service": str( - item.get("source_service") or service - ), + "source_service": str(item.get("source_service") or service), }, - } - ) - - rows.sort( - key=lambda row: ( - str(getattr(row["user"], "id", "")), - str(getattr(row["person"], "id", "")), - str(row.get("service") or ""), - int(row.get("ts") or 0), - _SOURCE_ORDER.get(str(row.get("source_kind") or ""), 999), - str((row.get("payload") or {}).get("message_id") or ""), - ) - ) - return rows - - def handle(self, *args, **options): - days = max(1, int(options.get("days") or 90)) - limit = max(1, int(options.get("limit") or 20000)) - service_filter = str(options.get("service") or "").strip().lower() - user_filter = str(options.get("user_id") or "").strip() - dry_run = bool(options.get("dry_run")) - reset = not bool(options.get("no_reset")) - cutoff_ts = now_ms() - (days * 24 * 60 * 60 * 1000) - - messages = list( - self._iter_messages( - days=days, - limit=limit, - service=service_filter, - user_id=user_filter, - ) - ) - rows = self._build_event_rows(messages) - - keys_to_reset = set() - for row in rows: - keys_to_reset.add( - ( - str(getattr(row["user"], "id", "")), - str(getattr(row["person"], "id", "")), - str(row.get("service") or ""), - ) - ) - - deleted_events = 0 - deleted_spans = 0 - if reset and keys_to_reset and not dry_run: - for user_id, person_id, service in keys_to_reset: - deleted_events += ContactAvailabilityEvent.objects.filter( - user_id=user_id, - person_id=person_id, - service=service, - ts__gte=cutoff_ts, - ).delete()[0] - deleted_spans += ContactAvailabilitySpan.objects.filter( - user_id=user_id, - person_id=person_id, - service=service, - end_ts__gte=cutoff_ts, - ).delete()[0] - - created = 0 - dedup_seen = set() - for row in rows: - dedup_key = ( - str(getattr(row["user"], "id", "")), - str(getattr(row["person"], "id", "")), - str(getattr(row["person_identifier"], "id", "")), - str(row.get("service") or ""), - str(row.get("source_kind") or ""), - int(row.get("ts") or 0), - str((row.get("payload") or {}).get("message_id") or ""), - str((row.get("payload") or {}).get("inferred_from") or ""), - ) - if dedup_key in dedup_seen: - continue - dedup_seen.add(dedup_key) - - if not reset: - exists = ContactAvailabilityEvent.objects.filter( - user=row["user"], - person=row["person"], - person_identifier=row["person_identifier"], - service=row["service"], - source_kind=row["source_kind"], - ts=row["ts"], - ).exists() - if exists: - continue - - created += 1 - if dry_run: - continue - record_native_signal( - AvailabilitySignal( - user=row["user"], - person=row["person"], - person_identifier=row["person_identifier"], - service=row["service"], - source_kind=row["source_kind"], - availability_state=row["availability_state"], - confidence=float(row["confidence"]), - ts=int(row["ts"]), - payload=dict(row["payload"]), - ) - ) + ) + indexed += 1 self.stdout.write( self.style.SUCCESS( "recalculate_contact_availability complete " - f"messages_scanned={len(messages)} candidates={len(rows)} " - f"created={created} deleted_events={deleted_events} deleted_spans={deleted_spans} " - f"reset={reset} dry_run={dry_run} days={days} limit={limit}" + f"messages_scanned={len(messages)} indexed={indexed} " + f"dry_run={dry_run} no_reset={bool(options.get('no_reset'))} " + f"days={days} limit={limit}" ) ) diff --git a/core/messaging/history.py b/core/messaging/history.py index fd5ad55..8fa496c 100644 --- a/core/messaging/history.py +++ b/core/messaging/history.py @@ -293,6 +293,7 @@ async def apply_read_receipts( read_by_identifier="", payload=None, trace_id="", + receipt_event_type="read_receipt", ): """ Persist delivery/read metadata for one identifier's messages. @@ -310,6 +311,9 @@ async def apply_read_receipts( read_at = int(read_ts) if read_ts else None except Exception: read_at = None + normalized_event_type = str(receipt_event_type or "read_receipt").strip().lower() + if normalized_event_type not in {"read_receipt", "delivery_receipt"}: + normalized_event_type = "read_receipt" rows = await sync_to_async(list)( Message.objects.filter( @@ -324,13 +328,25 @@ async def apply_read_receipts( if message.delivered_ts is None: message.delivered_ts = read_at or message.ts dirty.append("delivered_ts") - if read_at and (message.read_ts is None or read_at > message.read_ts): + if ( + normalized_event_type == "read_receipt" + and read_at + and (message.read_ts is None or read_at > message.read_ts) + ): message.read_ts = read_at dirty.append("read_ts") - if source_service and message.read_source_service != source_service: + if ( + normalized_event_type == "read_receipt" + and source_service + and message.read_source_service != source_service + ): message.read_source_service = source_service dirty.append("read_source_service") - if read_by_identifier and message.read_by_identifier != read_by_identifier: + if ( + normalized_event_type == "read_receipt" + and read_by_identifier + and message.read_by_identifier != read_by_identifier + ): message.read_by_identifier = read_by_identifier dirty.append("read_by_identifier") if payload: @@ -346,7 +362,7 @@ async def apply_read_receipts( user=user, session=message.session, ts=int(read_at or message.ts or 0), - event_type="read_receipt", + event_type=normalized_event_type, direction="system", actor_identifier=str(read_by_identifier or ""), origin_transport=str(source_service or ""), @@ -356,6 +372,7 @@ async def apply_read_receipts( "message_id": str(message.id), "message_ts": int(message.ts or 0), "read_ts": int(read_at or 0), + "receipt_event_type": normalized_event_type, "read_by_identifier": str(read_by_identifier or ""), "timestamps": [int(v) for v in ts_values], }, @@ -364,7 +381,7 @@ async def apply_read_receipts( ) except Exception as exc: log.warning( - "Event ledger append failed for read receipt message=%s: %s", + "Event ledger append failed for receipt message=%s: %s", message.id, exc, ) diff --git a/core/migrations/0047_conversationevent_behavioral_event_types.py b/core/migrations/0047_conversationevent_behavioral_event_types.py new file mode 100644 index 0000000..f737ceb --- /dev/null +++ b/core/migrations/0047_conversationevent_behavioral_event_types.py @@ -0,0 +1,33 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0046_externalchatlink_provider_default_mock"), + ] + + operations = [ + migrations.AlterField( + model_name="conversationevent", + name="event_type", + field=models.CharField( + choices=[ + ("message_created", "Message Created"), + ("message_edited", "Message Edited"), + ("message_deleted", "Message Deleted"), + ("reaction_added", "Reaction Added"), + ("reaction_removed", "Reaction Removed"), + ("read_receipt", "Read Receipt"), + ("typing_started", "Typing Started"), + ("typing_stopped", "Typing Stopped"), + ("composing_abandoned", "Composing Abandoned"), + ("presence_available", "Presence Available"), + ("presence_unavailable", "Presence Unavailable"), + ("participant_added", "Participant Added"), + ("participant_removed", "Participant Removed"), + ("delivery_receipt", "Delivery Receipt"), + ], + max_length=64, + ), + ), + ] diff --git a/core/migrations/0048_remove_contactavailabilityevent_and_span.py b/core/migrations/0048_remove_contactavailabilityevent_and_span.py new file mode 100644 index 0000000..d5234b7 --- /dev/null +++ b/core/migrations/0048_remove_contactavailabilityevent_and_span.py @@ -0,0 +1,16 @@ +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0047_conversationevent_behavioral_event_types"), + ] + + operations = [ + migrations.DeleteModel( + name="ContactAvailabilitySpan", + ), + migrations.DeleteModel( + name="ContactAvailabilityEvent", + ), + ] diff --git a/core/models.py b/core/models.py index f4d1190..5881d77 100644 --- a/core/models.py +++ b/core/models.py @@ -82,6 +82,7 @@ class User(AbstractUser): customer_id = models.UUIDField(default=uuid.uuid4, null=True, blank=True) billing_provider_id = models.CharField(max_length=255, null=True, blank=True) email = models.EmailField(unique=True) + allow_contacts_to_create_tasks = models.BooleanField(default=True) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -397,6 +398,9 @@ class ConversationEvent(models.Model): ("read_receipt", "Read Receipt"), ("typing_started", "Typing Started"), ("typing_stopped", "Typing Stopped"), + ("composing_abandoned", "Composing Abandoned"), + ("presence_available", "Presence Available"), + ("presence_unavailable", "Presence Unavailable"), ("participant_added", "Participant Added"), ("participant_removed", "Participant Removed"), ("delivery_receipt", "Delivery Receipt"), @@ -2759,108 +2763,6 @@ class ContactAvailabilitySettings(models.Model): updated_at = models.DateTimeField(auto_now=True) -class ContactAvailabilityEvent(models.Model): - SOURCE_KIND_CHOICES = ( - ("native_presence", "Native Presence"), - ("read_receipt", "Read Receipt"), - ("typing_start", "Typing Start"), - ("typing_stop", "Typing Stop"), - ("message_in", "Message In"), - ("message_out", "Message Out"), - ("inferred_timeout", "Inferred Timeout"), - ) - STATE_CHOICES = ( - ("available", "Available"), - ("unavailable", "Unavailable"), - ("unknown", "Unknown"), - ("fading", "Fading"), - ) - - user = models.ForeignKey( - User, - on_delete=models.CASCADE, - related_name="contact_availability_events", - ) - person = models.ForeignKey( - Person, - on_delete=models.CASCADE, - related_name="availability_events", - ) - person_identifier = models.ForeignKey( - PersonIdentifier, - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name="availability_events", - ) - service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) - source_kind = models.CharField(max_length=32, choices=SOURCE_KIND_CHOICES) - availability_state = models.CharField(max_length=32, choices=STATE_CHOICES) - confidence = models.FloatField(default=0.0) - ts = models.BigIntegerField(db_index=True) - payload = models.JSONField(default=dict, blank=True) - created_at = models.DateTimeField(auto_now_add=True) - - class Meta: - indexes = [ - models.Index(fields=["user", "person", "ts"]), - models.Index(fields=["user", "service", "ts"]), - models.Index(fields=["user", "availability_state", "ts"]), - ] - - -class ContactAvailabilitySpan(models.Model): - STATE_CHOICES = ContactAvailabilityEvent.STATE_CHOICES - - user = models.ForeignKey( - User, - on_delete=models.CASCADE, - related_name="contact_availability_spans", - ) - person = models.ForeignKey( - Person, - on_delete=models.CASCADE, - related_name="availability_spans", - ) - person_identifier = models.ForeignKey( - PersonIdentifier, - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name="availability_spans", - ) - service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) - state = models.CharField(max_length=32, choices=STATE_CHOICES) - start_ts = models.BigIntegerField(db_index=True) - end_ts = models.BigIntegerField(db_index=True) - confidence_start = models.FloatField(default=0.0) - confidence_end = models.FloatField(default=0.0) - opening_event = models.ForeignKey( - ContactAvailabilityEvent, - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name="opening_spans", - ) - closing_event = models.ForeignKey( - ContactAvailabilityEvent, - on_delete=models.SET_NULL, - null=True, - blank=True, - related_name="closing_spans", - ) - payload = models.JSONField(default=dict, blank=True) - created_at = models.DateTimeField(auto_now_add=True) - updated_at = models.DateTimeField(auto_now=True) - - class Meta: - indexes = [ - models.Index(fields=["user", "person", "start_ts"]), - models.Index(fields=["user", "person", "end_ts"]), - models.Index(fields=["user", "service", "start_ts"]), - ] - - class ExternalChatLink(models.Model): user = models.ForeignKey( User, diff --git a/core/modules/router.py b/core/modules/router.py index ea7988a..cd1bfa0 100644 --- a/core/modules/router.py +++ b/core/modules/router.py @@ -1,5 +1,6 @@ import asyncio import re +import time from asgiref.sync import sync_to_async from django.conf import settings @@ -12,7 +13,8 @@ from core.clients.whatsapp import WhatsAppClient from core.clients.xmpp import XMPPClient from core.commands.base import CommandContext from core.commands.engine import process_inbound_message -from core.events import event_ledger_status +from core.events import append_event, event_ledger_enabled, event_ledger_status +from core.events.behavior import ComposingTracker from core.messaging import history from core.models import PersonIdentifier from core.observability.tracing import ensure_trace_id @@ -32,7 +34,13 @@ class UnifiedRouter(object): self.typing_auto_stop_seconds = int( getattr(settings, "XMPP_TYPING_AUTO_STOP_SECONDS", 3) ) + self.composing_abandoned_window_seconds = int( + getattr(settings, "COMPOSING_ABANDONED_WINDOW_SECONDS", 300) + ) self._typing_stop_tasks = {} + self._composing_tracker = ComposingTracker( + window_ms=self.composing_abandoned_window_seconds * 1000 + ) self.log = logs.get_logger("router") self.log.info("Initialised Unified Router Interface.") @@ -85,6 +93,55 @@ class UnifiedRouter(object): self._typing_stop_tasks[key] = self.loop.create_task(_timer()) + def _behavior_direction(self, protocol: str) -> str: + return "out" if str(protocol or "").strip().lower() == "xmpp" else "in" + + def _event_ts_from_kwargs(self, kwargs: dict) -> int | None: + payload = dict(kwargs.get("payload") or {}) + for candidate in ( + kwargs.get("ts"), + payload.get("ts"), + payload.get("timestamp"), + payload.get("messageTimestamp"), + payload.get("message_ts"), + ): + try: + parsed = int(candidate) + except Exception: + continue + if parsed > 0: + return parsed + return int(time.time() * 1000) + + async def _append_identifier_event( + self, + *, + identifier_row, + event_type: str, + protocol: str, + direction: str, + ts: int | None = None, + payload: dict | None = None, + raw_payload: dict | None = None, + actor_identifier: str = "", + ): + if not event_ledger_enabled(): + return None + session = await history.get_chat_session(identifier_row.user, identifier_row) + await append_event( + user=identifier_row.user, + session=session, + ts=ts, + event_type=event_type, + direction=direction, + actor_identifier=str(actor_identifier or identifier_row.identifier or ""), + origin_transport=str(protocol or "").strip().lower(), + origin_chat_id=str(identifier_row.identifier or ""), + payload=dict(payload or {}), + raw_payload=dict(raw_payload or {}), + ) + return session + def _start(self): self.log.info("Starting unified router clients") self.xmpp.start() @@ -117,6 +174,9 @@ class UnifiedRouter(object): message_text = str(kwargs.get("text") or "").strip() if local_message is None: return + self._composing_tracker.observe_message( + str(getattr(local_message, "session_id", "") or "") + ) identifiers = await self._resolve_identifier_objects(protocol, identifier) if identifiers: outgoing = str( @@ -239,6 +299,10 @@ class UnifiedRouter(object): timestamps = kwargs.get("message_timestamps") or [] read_ts = kwargs.get("read_ts") payload = kwargs.get("payload") or {} + payload_type = str((payload or {}).get("type") or "").strip().lower() + receipt_event_type = ( + "delivery_receipt" if payload_type == "delivered" else "read_receipt" + ) trace_id = ( ensure_trace_id(payload=payload) if bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)) @@ -257,6 +321,7 @@ class UnifiedRouter(object): read_by_identifier=read_by or row.identifier, payload=payload, trace_id=trace_id, + receipt_event_type=receipt_event_type, ) record_native_signal( AvailabilitySignal( @@ -264,12 +329,13 @@ class UnifiedRouter(object): person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), - source_kind="read_receipt", + source_kind=receipt_event_type, availability_state="available", confidence=0.95, ts=int(read_ts or 0), payload={ "origin": "router.message_read", + "receipt_event_type": receipt_event_type, "message_timestamps": [ int(v) for v in list(timestamps or []) if str(v).isdigit() ], @@ -309,11 +375,41 @@ class UnifiedRouter(object): payload=payload, ) ) + state_event = None + if state == "available": + state_event = "presence_available" + elif state == "unavailable": + state_event = "presence_unavailable" + if state_event: + try: + await self._append_identifier_event( + identifier_row=row, + event_type=state_event, + protocol=protocol, + direction="system", + ts=(ts or None), + payload={ + "state": state, + "confidence": confidence, + **payload, + }, + raw_payload=payload, + actor_identifier=str(row.identifier or ""), + ) + except Exception as exc: + self.log.warning( + "Failed to append presence event for %s: %s", + row.identifier, + exc, + ) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def started_typing(self, protocol, *args, **kwargs): self.log.info(f"Started typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") + payload = dict(kwargs.get("payload") or {}) + event_ts = self._event_ts_from_kwargs(kwargs) + direction = self._behavior_direction(protocol) identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: record_native_signal( @@ -329,6 +425,30 @@ class UnifiedRouter(object): payload={"origin": "router.started_typing"}, ) ) + try: + session = await history.get_chat_session(src.user, src) + state = self._composing_tracker.observe_started( + str(session.id), + int(event_ts or 0), + ) + await append_event( + user=src.user, + session=session, + ts=event_ts, + event_type="typing_started", + direction=direction, + actor_identifier=str(src.identifier or ""), + origin_transport=str(protocol or "").strip().lower(), + origin_chat_id=str(src.identifier or ""), + payload=dict(payload or {}, revision=int(state.revision or 1)), + raw_payload=dict(payload or {}), + ) + except Exception as exc: + self.log.warning( + "Failed to append typing-start event for %s: %s", + src.identifier, + exc, + ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, @@ -362,6 +482,9 @@ class UnifiedRouter(object): async def stopped_typing(self, protocol, *args, **kwargs): self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") + payload = dict(kwargs.get("payload") or {}) + event_ts = self._event_ts_from_kwargs(kwargs) + direction = self._behavior_direction(protocol) identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: record_native_signal( @@ -377,6 +500,52 @@ class UnifiedRouter(object): payload={"origin": "router.stopped_typing"}, ) ) + try: + session = await history.get_chat_session(src.user, src) + await append_event( + user=src.user, + session=session, + ts=event_ts, + event_type="typing_stopped", + direction=direction, + actor_identifier=str(src.identifier or ""), + origin_transport=str(protocol or "").strip().lower(), + origin_chat_id=str(src.identifier or ""), + payload=dict(payload or {}), + raw_payload=dict(payload or {}), + ) + if session is not None: + abandoned = self._composing_tracker.observe_stopped( + str(session.id), + int(event_ts or 0), + ) + if abandoned is not None: + await append_event( + user=src.user, + session=session, + ts=int(abandoned.get("stopped_ts") or event_ts or 0), + event_type="composing_abandoned", + direction=direction, + actor_identifier=str(src.identifier or ""), + origin_transport=str(protocol or "").strip().lower(), + origin_chat_id=str(src.identifier or ""), + payload={ + **dict(payload or {}), + "abandoned": True, + "duration_ms": int( + abandoned.get("duration_ms") or 0 + ), + "revision": int(abandoned.get("revision") or 1), + "started_ts": int(abandoned.get("started_ts") or 0), + }, + raw_payload=dict(payload or {}), + ) + except Exception as exc: + self.log.warning( + "Failed to append typing-stop event for %s: %s", + src.identifier, + exc, + ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, diff --git a/core/presence/engine.py b/core/presence/engine.py index d32ea97..2199fa8 100644 --- a/core/presence/engine.py +++ b/core/presence/engine.py @@ -2,25 +2,7 @@ from __future__ import annotations from dataclasses import dataclass -from django.db import transaction - -from core.models import ( - ContactAvailabilityEvent, - ContactAvailabilitySettings, - ContactAvailabilitySpan, - Person, - PersonIdentifier, - User, -) - -from .inference import fade_confidence, now_ms, should_fade - -POSITIVE_SOURCE_KINDS = { - "native_presence", - "read_receipt", - "typing_start", - "message_in", -} +from core.models import ContactAvailabilitySettings, Person, PersonIdentifier, User @dataclass(slots=True) @@ -41,95 +23,24 @@ def get_settings(user: User) -> ContactAvailabilitySettings: return settings_row -def _normalize_ts(value: int | None) -> int: - try: - ts = int(value or 0) - except Exception: - ts = 0 - return ts if ts > 0 else now_ms() +def record_native_signal(signal: AvailabilitySignal) -> AvailabilitySignal | None: + """ + Compatibility adapter for existing router call sites. - -def _upsert_spans_for_event(event: ContactAvailabilityEvent) -> None: - prev = ( - ContactAvailabilitySpan.objects.filter( - user=event.user, - person=event.person, - service=event.service, - ) - .order_by("-end_ts", "-id") - .first() - ) - - if prev and prev.state == event.availability_state: - prev.end_ts = max(int(prev.end_ts or 0), int(event.ts or 0)) - prev.confidence_end = float(event.confidence or 0.0) - prev.closing_event = event - prev.payload = dict(prev.payload or {}) - prev.payload.update({"extended_by": str(event.source_kind or "")}) - prev.save( - update_fields=[ - "end_ts", - "confidence_end", - "closing_event", - "payload", - "updated_at", - ] - ) - return - - ContactAvailabilitySpan.objects.create( - user=event.user, - person=event.person, - person_identifier=event.person_identifier, - service=event.service, - state=event.availability_state, - start_ts=int(event.ts or 0), - end_ts=int(event.ts or 0), - confidence_start=float(event.confidence or 0.0), - confidence_end=float(event.confidence or 0.0), - opening_event=event, - closing_event=event, - payload=dict(event.payload or {}), - ) - - -@transaction.atomic -def record_native_signal(signal: AvailabilitySignal) -> ContactAvailabilityEvent | None: + Availability state is now derived from behavioral events in Manticore, so this + function no longer persists a separate ORM availability row. + """ settings_row = get_settings(signal.user) if not settings_row.enabled: return None - - event = ContactAvailabilityEvent.objects.create( - user=signal.user, - person=signal.person, - person_identifier=signal.person_identifier, - service=str(signal.service or "").strip().lower() or "signal", - source_kind=str(signal.source_kind or "").strip() or "native_presence", - availability_state=str(signal.availability_state or "unknown").strip() - or "unknown", - confidence=float(signal.confidence or 0.0), - ts=_normalize_ts(signal.ts), - payload=dict(signal.payload or {}), - ) - _upsert_spans_for_event(event) - _prune_old_data(signal.user, settings_row.retention_days) - return event + return signal -def record_inferred_signal( - signal: AvailabilitySignal, -) -> ContactAvailabilityEvent | None: +def record_inferred_signal(signal: AvailabilitySignal) -> AvailabilitySignal | None: settings_row = get_settings(signal.user) if not settings_row.enabled or not settings_row.inference_enabled: return None - return record_native_signal(signal) - - -def _prune_old_data(user: User, retention_days: int) -> None: - days = max(1, int(retention_days or 90)) - cutoff = now_ms() - (days * 24 * 60 * 60 * 1000) - ContactAvailabilityEvent.objects.filter(user=user, ts__lt=cutoff).delete() - ContactAvailabilitySpan.objects.filter(user=user, end_ts__lt=cutoff).delete() + return signal def ensure_fading_state( @@ -139,48 +50,5 @@ def ensure_fading_state( person_identifier: PersonIdentifier | None, service: str, at_ts: int | None = None, -) -> ContactAvailabilityEvent | None: - settings_row = get_settings(user) - if not settings_row.enabled or not settings_row.inference_enabled: - return None - - current_ts = _normalize_ts(at_ts) - latest = ( - ContactAvailabilityEvent.objects.filter( - user=user, - person=person, - service=str(service or "").strip().lower(), - ) - .order_by("-ts", "-id") - .first() - ) - if latest is None: - return None - if latest.availability_state in {"fading", "unavailable"}: - return None - if latest.source_kind not in POSITIVE_SOURCE_KINDS: - return None - if not should_fade( - int(latest.ts or 0), current_ts, settings_row.fade_threshold_seconds - ): - return None - - elapsed = max(0, current_ts - int(latest.ts or 0)) - payload = { - "inferred_from": latest.source_kind, - "last_signal_ts": int(latest.ts or 0), - "elapsed_ms": elapsed, - } - return record_inferred_signal( - AvailabilitySignal( - user=user, - person=person, - person_identifier=person_identifier, - service=service, - source_kind="inferred_timeout", - availability_state="fading", - confidence=fade_confidence(elapsed, settings_row.fade_threshold_seconds), - ts=current_ts, - payload=payload, - ) - ) +) -> None: + return None diff --git a/core/presence/query.py b/core/presence/query.py index 06d1217..b78a5c8 100644 --- a/core/presence/query.py +++ b/core/presence/query.py @@ -1,11 +1,33 @@ from __future__ import annotations -from django.db.models import Q +from core.events.behavior import parse_payload +from core.events.manticore import ( + get_behavioral_events_for_range, + get_behavioral_latest_states, +) +from core.events.shadow import ( + get_shadow_behavioral_events_for_range, + get_shadow_behavioral_latest_states, +) +from core.models import Person, User -from core.models import ContactAvailabilityEvent, ContactAvailabilitySpan, Person, User -from .engine import ensure_fading_state -from .inference import now_ms +def _behavioral_state_from_kind(kind: str) -> tuple[str, float]: + normalized = str(kind or "").strip().lower() + if normalized == "presence_unavailable": + return ("unavailable", 0.95) + if normalized == "composing_abandoned": + return ("fading", 0.8) + if normalized in { + "presence_available", + "message_read", + "message_delivered", + "composing_started", + "composing_stopped", + "message_sent", + }: + return ("available", 0.75) + return ("unknown", 0.5) def spans_for_range( @@ -17,43 +39,98 @@ def spans_for_range( service: str = "", limit: int = 200, ): - qs = ContactAvailabilitySpan.objects.filter( - user=user, - person=person, - ).filter(Q(start_ts__lte=end_ts) & Q(end_ts__gte=start_ts)) - if service: - qs = qs.filter(service=str(service).strip().lower()) + service_filter = str(service or "").strip().lower() + try: + rows = get_behavioral_events_for_range( + user_id=int(user.id), + person_id=str(person.id), + start_ts=int(start_ts), + end_ts=int(end_ts), + transport=service_filter, + ) + except Exception: + rows = [] + if not rows: + rows = get_shadow_behavioral_events_for_range( + user=user, + person_id=str(person.id), + start_ts=int(start_ts), + end_ts=int(end_ts), + transport=service_filter, + ) - ensure_fading_state( - user=user, - person=person, - person_identifier=None, - service=(str(service or "").strip().lower() or "signal"), - at_ts=now_ms(), - ) + spans = [] + current = None + for row in list(rows or []): + transport = str(row.get("transport") or "").strip().lower() + if service_filter and transport != service_filter: + continue + ts = int(row.get("ts") or 0) + state, confidence = _behavioral_state_from_kind(str(row.get("kind") or "")) + if current is None or str(current.get("state")) != state: + if current is not None: + spans.append(current) + current = { + "id": 0, + "service": transport or service_filter, + "state": state, + "start_ts": ts, + "end_ts": ts, + "confidence_start": float(confidence), + "confidence_end": float(confidence), + "payload": { + "source": "manticore_behavioral", + "kind": str(row.get("kind") or "").strip().lower(), + "raw_payload": parse_payload(row.get("payload")), + }, + } + continue + current["end_ts"] = max(int(current.get("end_ts") or 0), ts) + current["confidence_end"] = float(confidence) + payload = dict(current.get("payload") or {}) + payload["kind"] = str(row.get("kind") or "").strip().lower() + current["payload"] = payload - return list(qs.order_by("-end_ts")[: max(1, min(int(limit or 200), 500))]) + if current is not None: + spans.append(current) + return list(reversed(spans[: max(1, min(int(limit or 200), 500))])) def latest_state_for_people(*, user: User, person_ids: list, service: str = "") -> dict: out = {} if not person_ids: return out - qs = ContactAvailabilityEvent.objects.filter(user=user, person_id__in=person_ids) - if service: - qs = qs.filter(service=str(service).strip().lower()) - rows = list(qs.order_by("person_id", "-ts", "-id")) + service_filter = str(service or "").strip().lower() + try: + rows = get_behavioral_latest_states( + user_id=int(user.id), + person_ids=[str(value) for value in person_ids], + transport=service_filter, + ) + except Exception: + rows = [] + if not rows: + rows = get_shadow_behavioral_latest_states( + user=user, + person_ids=[str(value) for value in person_ids], + transport=service_filter, + ) + seen = set() - for row in rows: - person_key = str(row.person_id) - if person_key in seen: + for row in list(rows or []): + person_key = str(row.get("person_id") or "").strip() + transport = str(row.get("transport") or "").strip().lower() + if service_filter and transport != service_filter: + continue + if not person_key or person_key in seen: continue seen.add(person_key) + state, confidence = _behavioral_state_from_kind(str(row.get("kind") or "")) out[person_key] = { - "state": str(row.availability_state or "unknown"), - "confidence": float(row.confidence or 0.0), - "service": str(row.service or ""), - "ts": int(row.ts or 0), - "source_kind": str(row.source_kind or ""), + "state": state, + "confidence": float(confidence), + "service": transport or service_filter, + "ts": int(row.get("ts") or 0), + "source_kind": f"behavioral:{str(row.get('kind') or '').strip().lower()}", } return out diff --git a/core/templates/base.html b/core/templates/base.html index 6b2dfc2..8fb5c9a 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -6,10 +6,50 @@ + + {% block browser_title %}{% firstof page_browser_title page_title as explicit_title %}{% if explicit_title %}{{ explicit_title }} · GIA{% else %}{% with route_value=request.resolver_match.url_name|default:request.path_info|humanize_route %}{% if route_value %}{{ route_value }} · GIA{% else %}GIA{% endif %}{% endwith %}{% endif %}{% endblock %} - + + @@ -29,6 +69,66 @@