from __future__ import annotations import hashlib import json import time from urllib.parse import urlparse, urlunparse from typing import Any import requests from django.conf import settings from core.models import ConversationEvent from core.util import logs from core.events.behavior import parse_payload log = logs.get_logger("event-manticore") class ManticoreEventLedgerBackend: _table_ready_cache: dict[str, float] = {} _table_ready_ttl_seconds = 30.0 def __init__(self): self.base_url = str( getattr(settings, "MANTICORE_HTTP_URL", "http://localhost:9308") ).rstrip("/") self.table = ( str(getattr(settings, "MANTICORE_EVENT_TABLE", "gia_events")).strip() or "gia_events" ) self.metrics_table = ( str(getattr(settings, "MANTICORE_METRIC_TABLE", "gia_metrics")).strip() or "gia_metrics" ) self.timeout_seconds = int(getattr(settings, "MANTICORE_HTTP_TIMEOUT", 5) or 5) self._table_cache_key = f"{self.base_url}|{self.table}" self._metrics_cache_key = f"{self.base_url}|{self.metrics_table}" def _candidate_base_urls(self) -> list[str]: parsed = urlparse(self.base_url) hostname = str(parsed.hostname or "").strip().lower() candidates = [self.base_url] if hostname in {"localhost", "127.0.0.1"}: replacement = parsed._replace(netloc=f"host.containers.internal:{parsed.port or 9308}") candidates.append(urlunparse(replacement)) output = [] seen = set() for value in candidates: key = str(value or "").strip() if not key or key in seen: continue seen.add(key) output.append(key) return output def _sql(self, query: str) -> dict[str, Any]: last_exc = None for base_url in self._candidate_base_urls(): try: response = requests.post( f"{base_url}/sql", data={"mode": "raw", "query": query}, timeout=self.timeout_seconds, ) response.raise_for_status() payload = response.json() if base_url != self.base_url: self.base_url = base_url.rstrip("/") self._table_cache_key = f"{self.base_url}|{self.table}" self._metrics_cache_key = f"{self.base_url}|{self.metrics_table}" if isinstance(payload, list): return payload[0] if payload else {} return dict(payload or {}) except Exception as exc: last_exc = exc if last_exc is not None: raise last_exc return {} def ensure_table(self) -> None: last_ready = float( self._table_ready_cache.get(self._table_cache_key, 0.0) or 0.0 ) if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds): return self._sql( ( f"CREATE TABLE IF NOT EXISTS {self.table} (" "id BIGINT," "user_id BIGINT," "person_id STRING," "session_id STRING," "transport STRING," "kind STRING," "direction STRING," "ts BIGINT," "ts_ref BIGINT," "actor STRING," "duration_ms BIGINT," "abandoned INTEGER," "revision INTEGER," "payload JSON" ") engine='columnar' min_infix_len='2'" ) ) self._table_ready_cache[self._table_cache_key] = time.time() def ensure_metrics_table(self) -> None: last_ready = float( self._table_ready_cache.get(self._metrics_cache_key, 0.0) or 0.0 ) if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds): return self._sql( ( f"CREATE TABLE IF NOT EXISTS {self.metrics_table} (" "id BIGINT," "user_id BIGINT," "person_id STRING," "window_days INTEGER," "metric STRING," "value_ms BIGINT," "baseline_ms BIGINT," "z_score FLOAT," "sample_n INTEGER," "computed_at BIGINT" ") engine='columnar'" ) ) self._table_ready_cache[self._metrics_cache_key] = time.time() def _escape(self, value: Any) -> str: text = str(value or "") return text.replace("\\", "\\\\").replace("'", "\\'") def _event_id(self, *, logical_key: str) -> int: digest = hashlib.blake2b( str(logical_key or "").encode("utf-8"), digest_size=8, ).digest() value = int.from_bytes(digest, byteorder="big", signed=False) return max(1, int(value)) def _event_kind(self, event_type: str) -> str: normalized = str(event_type or "").strip().lower() return { "message_created": "message_sent", "delivery_receipt": "message_delivered", "read_receipt": "message_read", "typing_started": "composing_started", "typing_stopped": "composing_stopped", "composing_abandoned": "composing_abandoned", "presence_available": "presence_available", "presence_unavailable": "presence_unavailable", }.get(normalized, normalized) def _rows_from_sql_payload(self, payload: dict[str, Any]) -> list[dict]: data = payload.get("data") or payload.get("hits") or [] if isinstance(data, dict): data = [data] rows = [] for row in list(data or []): if isinstance(row, dict): rows.append(dict(row)) return rows def _build_values( self, *, user_id: int, person_id: str, session_id: str, event_type: str, direction: str, ts: int, actor_identifier: str, origin_transport: str, origin_message_id: str, origin_chat_id: str, payload: dict | None, raw_payload: dict | None, trace_id: str, ) -> str: data = dict(payload or {}) if raw_payload: data["raw_payload"] = dict(raw_payload) if trace_id: data["trace_id"] = str(trace_id) if origin_message_id: data["origin_message_id"] = str(origin_message_id) if origin_chat_id: data["origin_chat_id"] = str(origin_chat_id) data["legacy_event_type"] = str(event_type or "").strip().lower() ts_ref = 0 try: ts_ref = int(data.get("message_ts") or data.get("source_ts") or 0) except Exception: ts_ref = 0 try: duration_ms = int(data.get("duration_ms") or 0) except Exception: duration_ms = 0 try: abandoned = 1 if bool(data.get("abandoned")) else 0 except Exception: abandoned = 0 try: revision = int(data.get("revision") or 0) except Exception: revision = 0 logical_key = "|".join( [ str(user_id), str(session_id), str(event_type or "").strip().lower(), str(direction or "").strip().lower(), str(origin_transport or "").strip().lower(), str(origin_message_id or "").strip(), str(origin_chat_id or "").strip(), str(actor_identifier or "").strip(), str(int(ts or 0)), str(trace_id or "").strip(), ] ) doc_id = self._event_id(logical_key=logical_key) payload_json = json.dumps(data, separators=(",", ":"), sort_keys=True) return ( f"({doc_id},{int(user_id)},'{self._escape(person_id)}'," f"'{self._escape(session_id)}','{self._escape(origin_transport)}'," f"'{self._escape(self._event_kind(event_type))}','{self._escape(direction)}'," f"{int(ts)},{ts_ref},'{self._escape(actor_identifier)}',{duration_ms}," f"{abandoned},{revision},'{self._escape(payload_json)}')" ) def upsert_event( self, *, user_id: int, person_id: str, session_id: str, event_type: str, direction: str, ts: int, actor_identifier: str = "", origin_transport: str = "", origin_message_id: str = "", origin_chat_id: str = "", payload: dict | None = None, raw_payload: dict | None = None, trace_id: str = "", ) -> None: self.ensure_table() values = self._build_values( user_id=user_id, person_id=person_id, session_id=session_id, event_type=event_type, direction=direction, ts=ts, actor_identifier=actor_identifier, origin_transport=origin_transport, origin_message_id=origin_message_id, origin_chat_id=origin_chat_id, payload=payload, raw_payload=raw_payload, trace_id=trace_id, ) self._sql( f"REPLACE INTO {self.table} " "(id,user_id,person_id,session_id,transport,kind,direction,ts,ts_ref,actor,duration_ms,abandoned,revision,payload) " f"VALUES {values}" ) def query_rows(self, query: str) -> list[dict]: return self._rows_from_sql_payload(self._sql(query)) def list_event_targets(self, *, user_id: int | None = None) -> list[dict]: filters = [] if user_id is not None: filters.append(f"user_id={int(user_id)}") where_clause = f" WHERE {' AND '.join(filters)}" if filters else "" return self.query_rows( f"SELECT user_id, person_id FROM {self.table}{where_clause} " "GROUP BY user_id, person_id" ) def fetch_events( self, *, user_id: int, person_id: str, since_ts: int, ) -> list[dict]: return self.query_rows( f"SELECT user_id, person_id, session_id, transport, kind, direction, ts, ts_ref, actor, duration_ms, abandoned, revision, payload " f"FROM {self.table} " f"WHERE user_id={int(user_id)} " f"AND person_id='{self._escape(person_id)}' " f"AND ts>={int(since_ts)} " "ORDER BY ts ASC" ) def _metric_doc_id( self, *, user_id: int, person_id: str, window_days: int, metric: str, ) -> int: digest = hashlib.blake2b( f"{int(user_id)}|{person_id}|{int(window_days)}|{metric}".encode("utf-8"), digest_size=8, ).digest() return max(1, int.from_bytes(digest, byteorder="big", signed=False)) def upsert_metric( self, *, user_id: int, person_id: str, window_days: int, metric: str, value_ms: int, baseline_ms: int, z_score: float, sample_n: int, computed_at: int, ) -> None: self.ensure_metrics_table() doc_id = self._metric_doc_id( user_id=user_id, person_id=person_id, window_days=window_days, metric=metric, ) self._sql( f"REPLACE INTO {self.metrics_table} " "(id,user_id,person_id,window_days,metric,value_ms,baseline_ms,z_score,sample_n,computed_at) " f"VALUES ({doc_id},{int(user_id)},'{self._escape(person_id)}',{int(window_days)}," f"'{self._escape(metric)}',{int(value_ms)},{int(baseline_ms)}," f"{float(z_score)},{int(sample_n)},{int(computed_at)})" ) def get_event_ledger_backend() -> ManticoreEventLedgerBackend: return ManticoreEventLedgerBackend() def upsert_conversation_event(event: ConversationEvent) -> None: session = event.session identifier = session.identifier get_event_ledger_backend().upsert_event( user_id=int(event.user_id), person_id=str(identifier.person_id), session_id=str(session.id), event_type=str(event.event_type or ""), direction=str(event.direction or "system"), ts=int(event.ts or 0), actor_identifier=str(event.actor_identifier or ""), origin_transport=str(event.origin_transport or ""), origin_message_id=str(event.origin_message_id or ""), origin_chat_id=str(event.origin_chat_id or ""), payload=dict(event.payload or {}), raw_payload=dict(event.raw_payload or {}), trace_id=str(event.trace_id or ""), ) def get_behavioral_availability_stats(*, user_id: int) -> list[dict]: backend = get_event_ledger_backend() return backend.query_rows( f"SELECT person_id, transport, " "COUNT(*) AS total_events, " "SUM(IF(kind IN ('presence_available','presence_unavailable'),1,0)) AS presence_events, " "SUM(IF(kind='message_read',1,0)) AS read_events, " "SUM(IF(kind IN ('composing_started','composing_stopped'),1,0)) AS typing_events, " "SUM(IF(kind='message_sent',1,0)) AS message_events, " "SUM(IF(kind='composing_abandoned',1,0)) AS abandoned_events, " "MAX(ts) AS last_event_ts " f"FROM {backend.table} " f"WHERE user_id={int(user_id)} " "GROUP BY person_id, transport " "ORDER BY total_events DESC, person_id ASC, transport ASC" ) def get_behavioral_latest_states( *, user_id: int, person_ids: list[str], transport: str = "", ) -> list[dict]: backend = get_event_ledger_backend() cleaned_ids = [ str(value or "").strip() for value in list(person_ids or []) if str(value or "").strip() ] if not cleaned_ids: return [] id_clause = ",".join(f"'{backend._escape(value)}'" for value in cleaned_ids) transport_clause = "" if str(transport or "").strip(): transport_clause = ( f" AND transport='{backend._escape(str(transport or '').strip().lower())}'" ) return backend.query_rows( f"SELECT person_id, transport, kind, ts " f"FROM {backend.table} " f"WHERE user_id={int(user_id)} " f"AND person_id IN ({id_clause})" f"{transport_clause} " "ORDER BY person_id ASC, ts DESC" ) def get_behavioral_events_for_range( *, user_id: int, person_id: str, start_ts: int, end_ts: int, transport: str = "", ) -> list[dict]: backend = get_event_ledger_backend() transport_clause = "" if str(transport or "").strip(): transport_clause = ( f" AND transport='{backend._escape(str(transport or '').strip().lower())}'" ) return backend.query_rows( f"SELECT person_id, session_id, transport, kind, direction, ts, payload " f"FROM {backend.table} " f"WHERE user_id={int(user_id)} " f"AND person_id='{backend._escape(str(person_id or '').strip())}' " f"AND ts>={int(start_ts)} AND ts<={int(end_ts)}" f"{transport_clause} " "ORDER BY ts ASC" ) def get_recent_event_rows( *, minutes: int = 120, service: str = "", user_id: str = "", limit: int = 200, ) -> list[dict]: backend = get_event_ledger_backend() cutoff_ts = int(time.time() * 1000) - (max(1, int(minutes)) * 60 * 1000) where = [f"ts>={cutoff_ts}"] if service: where.append(f"transport='{backend._escape(str(service).strip().lower())}'") if user_id: where.append(f"user_id={int(user_id)}") rows = backend.query_rows( f"SELECT user_id, session_id, ts, kind, direction, transport, payload " f"FROM {backend.table} " f"WHERE {' AND '.join(where)} " f"ORDER BY ts DESC " f"LIMIT {max(1, min(int(limit), 500))}" ) output = [] for row in list(rows or []): payload = parse_payload(row.get("payload")) legacy_event_type = str(payload.get("legacy_event_type") or "").strip().lower() output.append( { "id": "", "user_id": int(row.get("user_id") or 0), "session_id": str(row.get("session_id") or ""), "ts": int(row.get("ts") or 0), "event_type": legacy_event_type or str(row.get("kind") or ""), "kind": str(row.get("kind") or ""), "direction": str(row.get("direction") or ""), "origin_transport": str(row.get("transport") or ""), "trace_id": str(payload.get("trace_id") or ""), } ) return output def count_behavioral_events(*, user_id: int) -> int: backend = get_event_ledger_backend() rows = backend.query_rows( f"SELECT COUNT(*) AS total_events " f"FROM {backend.table} " f"WHERE user_id={int(user_id)}" ) if not rows: return 0 try: return int((rows[0] or {}).get("total_events") or 0) except Exception: return 0 def get_trace_ids(*, user_id: int, limit: int = 120) -> list[str]: backend = get_event_ledger_backend() rows = backend.query_rows( f"SELECT payload " f"FROM {backend.table} " f"WHERE user_id={int(user_id)} " "ORDER BY ts DESC " f"LIMIT {max(1, min(int(limit) * 6, 1000))}" ) seen = set() output = [] for row in list(rows or []): payload = parse_payload(row.get("payload")) trace_id = str(payload.get("trace_id") or "").strip() if not trace_id or trace_id in seen: continue seen.add(trace_id) output.append(trace_id) if len(output) >= max(1, min(int(limit), 500)): break return output def get_trace_event_rows(*, user_id: int, trace_id: str, limit: int = 500) -> list[dict]: backend = get_event_ledger_backend() rows = backend.query_rows( f"SELECT user_id, session_id, ts, kind, direction, transport, payload " f"FROM {backend.table} " f"WHERE user_id={int(user_id)} " "ORDER BY ts ASC " f"LIMIT {max(1, min(int(limit) * 8, 5000))}" ) output = [] target = str(trace_id or "").strip() for row in list(rows or []): payload = parse_payload(row.get("payload")) if str(payload.get("trace_id") or "").strip() != target: continue output.append( { "id": "", "ts": int(row.get("ts") or 0), "event_type": str( payload.get("legacy_event_type") or row.get("kind") or "" ).strip(), "kind": str(row.get("kind") or "").strip(), "direction": str(row.get("direction") or "").strip(), "session_id": str(row.get("session_id") or "").strip(), "origin_transport": str(row.get("transport") or "").strip(), "origin_message_id": str(payload.get("origin_message_id") or "").strip(), "payload": payload, "trace_id": target, } ) if len(output) >= max(1, min(int(limit), 500)): break return output def get_session_event_rows(*, user_id: int, session_id: str, limit: int = 2000) -> list[dict]: backend = get_event_ledger_backend() rows = backend.query_rows( f"SELECT user_id, session_id, ts, kind, direction, transport, actor, payload " f"FROM {backend.table} " f"WHERE user_id={int(user_id)} " f"AND session_id='{backend._escape(str(session_id or '').strip())}' " "ORDER BY ts ASC " f"LIMIT {max(1, min(int(limit), 5000))}" ) output = [] for row in list(rows or []): payload = parse_payload(row.get("payload")) output.append( { "ts": int(row.get("ts") or 0), "event_type": str( payload.get("legacy_event_type") or row.get("kind") or "" ).strip(), "kind": str(row.get("kind") or "").strip(), "direction": str(row.get("direction") or "").strip(), "session_id": str(row.get("session_id") or "").strip(), "origin_transport": str(row.get("transport") or "").strip(), "actor_identifier": str(row.get("actor") or "").strip(), "origin_message_id": str(payload.get("origin_message_id") or "").strip(), "payload": payload, } ) return output