589 lines
20 KiB
Python
589 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import time
|
|
from urllib.parse import urlparse, urlunparse
|
|
from typing import Any
|
|
|
|
import requests
|
|
from django.conf import settings
|
|
|
|
from core.models import ConversationEvent
|
|
from core.util import logs
|
|
from core.events.behavior import parse_payload
|
|
|
|
log = logs.get_logger("event-manticore")
|
|
|
|
|
|
class ManticoreEventLedgerBackend:
|
|
_table_ready_cache: dict[str, float] = {}
|
|
_table_ready_ttl_seconds = 30.0
|
|
|
|
def __init__(self):
|
|
self.base_url = str(
|
|
getattr(settings, "MANTICORE_HTTP_URL", "http://localhost:9308")
|
|
).rstrip("/")
|
|
self.table = (
|
|
str(getattr(settings, "MANTICORE_EVENT_TABLE", "gia_events")).strip()
|
|
or "gia_events"
|
|
)
|
|
self.metrics_table = (
|
|
str(getattr(settings, "MANTICORE_METRIC_TABLE", "gia_metrics")).strip()
|
|
or "gia_metrics"
|
|
)
|
|
self.timeout_seconds = int(getattr(settings, "MANTICORE_HTTP_TIMEOUT", 5) or 5)
|
|
self._table_cache_key = f"{self.base_url}|{self.table}"
|
|
self._metrics_cache_key = f"{self.base_url}|{self.metrics_table}"
|
|
|
|
def _candidate_base_urls(self) -> list[str]:
|
|
parsed = urlparse(self.base_url)
|
|
hostname = str(parsed.hostname or "").strip().lower()
|
|
candidates = [self.base_url]
|
|
if hostname in {"localhost", "127.0.0.1"}:
|
|
replacement = parsed._replace(netloc=f"host.containers.internal:{parsed.port or 9308}")
|
|
candidates.append(urlunparse(replacement))
|
|
output = []
|
|
seen = set()
|
|
for value in candidates:
|
|
key = str(value or "").strip()
|
|
if not key or key in seen:
|
|
continue
|
|
seen.add(key)
|
|
output.append(key)
|
|
return output
|
|
|
|
def _sql(self, query: str) -> dict[str, Any]:
|
|
last_exc = None
|
|
for base_url in self._candidate_base_urls():
|
|
try:
|
|
response = requests.post(
|
|
f"{base_url}/sql",
|
|
data={"mode": "raw", "query": query},
|
|
timeout=self.timeout_seconds,
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if base_url != self.base_url:
|
|
self.base_url = base_url.rstrip("/")
|
|
self._table_cache_key = f"{self.base_url}|{self.table}"
|
|
self._metrics_cache_key = f"{self.base_url}|{self.metrics_table}"
|
|
if isinstance(payload, list):
|
|
return payload[0] if payload else {}
|
|
return dict(payload or {})
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
if last_exc is not None:
|
|
raise last_exc
|
|
return {}
|
|
|
|
def ensure_table(self) -> None:
|
|
last_ready = float(
|
|
self._table_ready_cache.get(self._table_cache_key, 0.0) or 0.0
|
|
)
|
|
if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds):
|
|
return
|
|
self._sql(
|
|
(
|
|
f"CREATE TABLE IF NOT EXISTS {self.table} ("
|
|
"id BIGINT,"
|
|
"user_id BIGINT,"
|
|
"person_id STRING,"
|
|
"session_id STRING,"
|
|
"transport STRING,"
|
|
"kind STRING,"
|
|
"direction STRING,"
|
|
"ts BIGINT,"
|
|
"ts_ref BIGINT,"
|
|
"actor STRING,"
|
|
"duration_ms BIGINT,"
|
|
"abandoned INTEGER,"
|
|
"revision INTEGER,"
|
|
"payload JSON"
|
|
") engine='columnar' min_infix_len='2'"
|
|
)
|
|
)
|
|
self._table_ready_cache[self._table_cache_key] = time.time()
|
|
|
|
def ensure_metrics_table(self) -> None:
|
|
last_ready = float(
|
|
self._table_ready_cache.get(self._metrics_cache_key, 0.0) or 0.0
|
|
)
|
|
if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds):
|
|
return
|
|
self._sql(
|
|
(
|
|
f"CREATE TABLE IF NOT EXISTS {self.metrics_table} ("
|
|
"id BIGINT,"
|
|
"user_id BIGINT,"
|
|
"person_id STRING,"
|
|
"window_days INTEGER,"
|
|
"metric STRING,"
|
|
"value_ms BIGINT,"
|
|
"baseline_ms BIGINT,"
|
|
"z_score FLOAT,"
|
|
"sample_n INTEGER,"
|
|
"computed_at BIGINT"
|
|
") engine='columnar'"
|
|
)
|
|
)
|
|
self._table_ready_cache[self._metrics_cache_key] = time.time()
|
|
|
|
def _escape(self, value: Any) -> str:
|
|
text = str(value or "")
|
|
return text.replace("\\", "\\\\").replace("'", "\\'")
|
|
|
|
def _event_id(self, *, logical_key: str) -> int:
|
|
digest = hashlib.blake2b(
|
|
str(logical_key or "").encode("utf-8"),
|
|
digest_size=8,
|
|
).digest()
|
|
value = int.from_bytes(digest, byteorder="big", signed=False)
|
|
return max(1, int(value))
|
|
|
|
def _event_kind(self, event_type: str) -> str:
|
|
normalized = str(event_type or "").strip().lower()
|
|
return {
|
|
"message_created": "message_sent",
|
|
"delivery_receipt": "message_delivered",
|
|
"read_receipt": "message_read",
|
|
"typing_started": "composing_started",
|
|
"typing_stopped": "composing_stopped",
|
|
"composing_abandoned": "composing_abandoned",
|
|
"presence_available": "presence_available",
|
|
"presence_unavailable": "presence_unavailable",
|
|
}.get(normalized, normalized)
|
|
|
|
def _rows_from_sql_payload(self, payload: dict[str, Any]) -> list[dict]:
|
|
data = payload.get("data") or payload.get("hits") or []
|
|
if isinstance(data, dict):
|
|
data = [data]
|
|
rows = []
|
|
for row in list(data or []):
|
|
if isinstance(row, dict):
|
|
rows.append(dict(row))
|
|
return rows
|
|
|
|
def _build_values(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
person_id: str,
|
|
session_id: str,
|
|
event_type: str,
|
|
direction: str,
|
|
ts: int,
|
|
actor_identifier: str,
|
|
origin_transport: str,
|
|
origin_message_id: str,
|
|
origin_chat_id: str,
|
|
payload: dict | None,
|
|
raw_payload: dict | None,
|
|
trace_id: str,
|
|
) -> str:
|
|
data = dict(payload or {})
|
|
if raw_payload:
|
|
data["raw_payload"] = dict(raw_payload)
|
|
if trace_id:
|
|
data["trace_id"] = str(trace_id)
|
|
if origin_message_id:
|
|
data["origin_message_id"] = str(origin_message_id)
|
|
if origin_chat_id:
|
|
data["origin_chat_id"] = str(origin_chat_id)
|
|
data["legacy_event_type"] = str(event_type or "").strip().lower()
|
|
|
|
ts_ref = 0
|
|
try:
|
|
ts_ref = int(data.get("message_ts") or data.get("source_ts") or 0)
|
|
except Exception:
|
|
ts_ref = 0
|
|
try:
|
|
duration_ms = int(data.get("duration_ms") or 0)
|
|
except Exception:
|
|
duration_ms = 0
|
|
try:
|
|
abandoned = 1 if bool(data.get("abandoned")) else 0
|
|
except Exception:
|
|
abandoned = 0
|
|
try:
|
|
revision = int(data.get("revision") or 0)
|
|
except Exception:
|
|
revision = 0
|
|
|
|
logical_key = "|".join(
|
|
[
|
|
str(user_id),
|
|
str(session_id),
|
|
str(event_type or "").strip().lower(),
|
|
str(direction or "").strip().lower(),
|
|
str(origin_transport or "").strip().lower(),
|
|
str(origin_message_id or "").strip(),
|
|
str(origin_chat_id or "").strip(),
|
|
str(actor_identifier or "").strip(),
|
|
str(int(ts or 0)),
|
|
str(trace_id or "").strip(),
|
|
]
|
|
)
|
|
doc_id = self._event_id(logical_key=logical_key)
|
|
payload_json = json.dumps(data, separators=(",", ":"), sort_keys=True)
|
|
return (
|
|
f"({doc_id},{int(user_id)},'{self._escape(person_id)}',"
|
|
f"'{self._escape(session_id)}','{self._escape(origin_transport)}',"
|
|
f"'{self._escape(self._event_kind(event_type))}','{self._escape(direction)}',"
|
|
f"{int(ts)},{ts_ref},'{self._escape(actor_identifier)}',{duration_ms},"
|
|
f"{abandoned},{revision},'{self._escape(payload_json)}')"
|
|
)
|
|
|
|
def upsert_event(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
person_id: str,
|
|
session_id: str,
|
|
event_type: str,
|
|
direction: str,
|
|
ts: int,
|
|
actor_identifier: str = "",
|
|
origin_transport: str = "",
|
|
origin_message_id: str = "",
|
|
origin_chat_id: str = "",
|
|
payload: dict | None = None,
|
|
raw_payload: dict | None = None,
|
|
trace_id: str = "",
|
|
) -> None:
|
|
self.ensure_table()
|
|
values = self._build_values(
|
|
user_id=user_id,
|
|
person_id=person_id,
|
|
session_id=session_id,
|
|
event_type=event_type,
|
|
direction=direction,
|
|
ts=ts,
|
|
actor_identifier=actor_identifier,
|
|
origin_transport=origin_transport,
|
|
origin_message_id=origin_message_id,
|
|
origin_chat_id=origin_chat_id,
|
|
payload=payload,
|
|
raw_payload=raw_payload,
|
|
trace_id=trace_id,
|
|
)
|
|
self._sql(
|
|
f"REPLACE INTO {self.table} "
|
|
"(id,user_id,person_id,session_id,transport,kind,direction,ts,ts_ref,actor,duration_ms,abandoned,revision,payload) "
|
|
f"VALUES {values}"
|
|
)
|
|
|
|
def query_rows(self, query: str) -> list[dict]:
|
|
return self._rows_from_sql_payload(self._sql(query))
|
|
|
|
def list_event_targets(self, *, user_id: int | None = None) -> list[dict]:
|
|
filters = []
|
|
if user_id is not None:
|
|
filters.append(f"user_id={int(user_id)}")
|
|
where_clause = f" WHERE {' AND '.join(filters)}" if filters else ""
|
|
return self.query_rows(
|
|
f"SELECT user_id, person_id FROM {self.table}{where_clause} "
|
|
"GROUP BY user_id, person_id"
|
|
)
|
|
|
|
def fetch_events(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
person_id: str,
|
|
since_ts: int,
|
|
) -> list[dict]:
|
|
return self.query_rows(
|
|
f"SELECT user_id, person_id, session_id, transport, kind, direction, ts, ts_ref, actor, duration_ms, abandoned, revision, payload "
|
|
f"FROM {self.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
f"AND person_id='{self._escape(person_id)}' "
|
|
f"AND ts>={int(since_ts)} "
|
|
"ORDER BY ts ASC"
|
|
)
|
|
|
|
def _metric_doc_id(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
person_id: str,
|
|
window_days: int,
|
|
metric: str,
|
|
) -> int:
|
|
digest = hashlib.blake2b(
|
|
f"{int(user_id)}|{person_id}|{int(window_days)}|{metric}".encode("utf-8"),
|
|
digest_size=8,
|
|
).digest()
|
|
return max(1, int.from_bytes(digest, byteorder="big", signed=False))
|
|
|
|
def upsert_metric(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
person_id: str,
|
|
window_days: int,
|
|
metric: str,
|
|
value_ms: int,
|
|
baseline_ms: int,
|
|
z_score: float,
|
|
sample_n: int,
|
|
computed_at: int,
|
|
) -> None:
|
|
self.ensure_metrics_table()
|
|
doc_id = self._metric_doc_id(
|
|
user_id=user_id,
|
|
person_id=person_id,
|
|
window_days=window_days,
|
|
metric=metric,
|
|
)
|
|
self._sql(
|
|
f"REPLACE INTO {self.metrics_table} "
|
|
"(id,user_id,person_id,window_days,metric,value_ms,baseline_ms,z_score,sample_n,computed_at) "
|
|
f"VALUES ({doc_id},{int(user_id)},'{self._escape(person_id)}',{int(window_days)},"
|
|
f"'{self._escape(metric)}',{int(value_ms)},{int(baseline_ms)},"
|
|
f"{float(z_score)},{int(sample_n)},{int(computed_at)})"
|
|
)
|
|
|
|
|
|
def get_event_ledger_backend() -> ManticoreEventLedgerBackend:
|
|
return ManticoreEventLedgerBackend()
|
|
|
|
|
|
def upsert_conversation_event(event: ConversationEvent) -> None:
|
|
session = event.session
|
|
identifier = session.identifier
|
|
get_event_ledger_backend().upsert_event(
|
|
user_id=int(event.user_id),
|
|
person_id=str(identifier.person_id),
|
|
session_id=str(session.id),
|
|
event_type=str(event.event_type or ""),
|
|
direction=str(event.direction or "system"),
|
|
ts=int(event.ts or 0),
|
|
actor_identifier=str(event.actor_identifier or ""),
|
|
origin_transport=str(event.origin_transport or ""),
|
|
origin_message_id=str(event.origin_message_id or ""),
|
|
origin_chat_id=str(event.origin_chat_id or ""),
|
|
payload=dict(event.payload or {}),
|
|
raw_payload=dict(event.raw_payload or {}),
|
|
trace_id=str(event.trace_id or ""),
|
|
)
|
|
|
|
|
|
def get_behavioral_availability_stats(*, user_id: int) -> list[dict]:
|
|
backend = get_event_ledger_backend()
|
|
return backend.query_rows(
|
|
f"SELECT person_id, transport, "
|
|
"COUNT(*) AS total_events, "
|
|
"SUM(IF(kind IN ('presence_available','presence_unavailable'),1,0)) AS presence_events, "
|
|
"SUM(IF(kind='message_read',1,0)) AS read_events, "
|
|
"SUM(IF(kind IN ('composing_started','composing_stopped'),1,0)) AS typing_events, "
|
|
"SUM(IF(kind='message_sent',1,0)) AS message_events, "
|
|
"SUM(IF(kind='composing_abandoned',1,0)) AS abandoned_events, "
|
|
"MAX(ts) AS last_event_ts "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
"GROUP BY person_id, transport "
|
|
"ORDER BY total_events DESC, person_id ASC, transport ASC"
|
|
)
|
|
|
|
|
|
def get_behavioral_latest_states(
|
|
*,
|
|
user_id: int,
|
|
person_ids: list[str],
|
|
transport: str = "",
|
|
) -> list[dict]:
|
|
backend = get_event_ledger_backend()
|
|
cleaned_ids = [
|
|
str(value or "").strip()
|
|
for value in list(person_ids or [])
|
|
if str(value or "").strip()
|
|
]
|
|
if not cleaned_ids:
|
|
return []
|
|
id_clause = ",".join(f"'{backend._escape(value)}'" for value in cleaned_ids)
|
|
transport_clause = ""
|
|
if str(transport or "").strip():
|
|
transport_clause = (
|
|
f" AND transport='{backend._escape(str(transport or '').strip().lower())}'"
|
|
)
|
|
return backend.query_rows(
|
|
f"SELECT person_id, transport, kind, ts "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
f"AND person_id IN ({id_clause})"
|
|
f"{transport_clause} "
|
|
"ORDER BY person_id ASC, ts DESC"
|
|
)
|
|
|
|
|
|
def get_behavioral_events_for_range(
|
|
*,
|
|
user_id: int,
|
|
person_id: str,
|
|
start_ts: int,
|
|
end_ts: int,
|
|
transport: str = "",
|
|
) -> list[dict]:
|
|
backend = get_event_ledger_backend()
|
|
transport_clause = ""
|
|
if str(transport or "").strip():
|
|
transport_clause = (
|
|
f" AND transport='{backend._escape(str(transport or '').strip().lower())}'"
|
|
)
|
|
return backend.query_rows(
|
|
f"SELECT person_id, session_id, transport, kind, direction, ts, payload "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
f"AND person_id='{backend._escape(str(person_id or '').strip())}' "
|
|
f"AND ts>={int(start_ts)} AND ts<={int(end_ts)}"
|
|
f"{transport_clause} "
|
|
"ORDER BY ts ASC"
|
|
)
|
|
|
|
|
|
def get_recent_event_rows(
|
|
*,
|
|
minutes: int = 120,
|
|
service: str = "",
|
|
user_id: str = "",
|
|
limit: int = 200,
|
|
) -> list[dict]:
|
|
backend = get_event_ledger_backend()
|
|
cutoff_ts = int(time.time() * 1000) - (max(1, int(minutes)) * 60 * 1000)
|
|
where = [f"ts>={cutoff_ts}"]
|
|
if service:
|
|
where.append(f"transport='{backend._escape(str(service).strip().lower())}'")
|
|
if user_id:
|
|
where.append(f"user_id={int(user_id)}")
|
|
rows = backend.query_rows(
|
|
f"SELECT user_id, session_id, ts, kind, direction, transport, payload "
|
|
f"FROM {backend.table} "
|
|
f"WHERE {' AND '.join(where)} "
|
|
f"ORDER BY ts DESC "
|
|
f"LIMIT {max(1, min(int(limit), 500))}"
|
|
)
|
|
output = []
|
|
for row in list(rows or []):
|
|
payload = parse_payload(row.get("payload"))
|
|
legacy_event_type = str(payload.get("legacy_event_type") or "").strip().lower()
|
|
output.append(
|
|
{
|
|
"id": "",
|
|
"user_id": int(row.get("user_id") or 0),
|
|
"session_id": str(row.get("session_id") or ""),
|
|
"ts": int(row.get("ts") or 0),
|
|
"event_type": legacy_event_type or str(row.get("kind") or ""),
|
|
"kind": str(row.get("kind") or ""),
|
|
"direction": str(row.get("direction") or ""),
|
|
"origin_transport": str(row.get("transport") or ""),
|
|
"trace_id": str(payload.get("trace_id") or ""),
|
|
}
|
|
)
|
|
return output
|
|
|
|
|
|
def count_behavioral_events(*, user_id: int) -> int:
|
|
backend = get_event_ledger_backend()
|
|
rows = backend.query_rows(
|
|
f"SELECT COUNT(*) AS total_events "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)}"
|
|
)
|
|
if not rows:
|
|
return 0
|
|
try:
|
|
return int((rows[0] or {}).get("total_events") or 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def get_trace_ids(*, user_id: int, limit: int = 120) -> list[str]:
|
|
backend = get_event_ledger_backend()
|
|
rows = backend.query_rows(
|
|
f"SELECT payload "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
"ORDER BY ts DESC "
|
|
f"LIMIT {max(1, min(int(limit) * 6, 1000))}"
|
|
)
|
|
seen = set()
|
|
output = []
|
|
for row in list(rows or []):
|
|
payload = parse_payload(row.get("payload"))
|
|
trace_id = str(payload.get("trace_id") or "").strip()
|
|
if not trace_id or trace_id in seen:
|
|
continue
|
|
seen.add(trace_id)
|
|
output.append(trace_id)
|
|
if len(output) >= max(1, min(int(limit), 500)):
|
|
break
|
|
return output
|
|
|
|
|
|
def get_trace_event_rows(*, user_id: int, trace_id: str, limit: int = 500) -> list[dict]:
|
|
backend = get_event_ledger_backend()
|
|
rows = backend.query_rows(
|
|
f"SELECT user_id, session_id, ts, kind, direction, transport, payload "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
"ORDER BY ts ASC "
|
|
f"LIMIT {max(1, min(int(limit) * 8, 5000))}"
|
|
)
|
|
output = []
|
|
target = str(trace_id or "").strip()
|
|
for row in list(rows or []):
|
|
payload = parse_payload(row.get("payload"))
|
|
if str(payload.get("trace_id") or "").strip() != target:
|
|
continue
|
|
output.append(
|
|
{
|
|
"id": "",
|
|
"ts": int(row.get("ts") or 0),
|
|
"event_type": str(
|
|
payload.get("legacy_event_type") or row.get("kind") or ""
|
|
).strip(),
|
|
"kind": str(row.get("kind") or "").strip(),
|
|
"direction": str(row.get("direction") or "").strip(),
|
|
"session_id": str(row.get("session_id") or "").strip(),
|
|
"origin_transport": str(row.get("transport") or "").strip(),
|
|
"origin_message_id": str(payload.get("origin_message_id") or "").strip(),
|
|
"payload": payload,
|
|
"trace_id": target,
|
|
}
|
|
)
|
|
if len(output) >= max(1, min(int(limit), 500)):
|
|
break
|
|
return output
|
|
|
|
|
|
def get_session_event_rows(*, user_id: int, session_id: str, limit: int = 2000) -> list[dict]:
|
|
backend = get_event_ledger_backend()
|
|
rows = backend.query_rows(
|
|
f"SELECT user_id, session_id, ts, kind, direction, transport, actor, payload "
|
|
f"FROM {backend.table} "
|
|
f"WHERE user_id={int(user_id)} "
|
|
f"AND session_id='{backend._escape(str(session_id or '').strip())}' "
|
|
"ORDER BY ts ASC "
|
|
f"LIMIT {max(1, min(int(limit), 5000))}"
|
|
)
|
|
output = []
|
|
for row in list(rows or []):
|
|
payload = parse_payload(row.get("payload"))
|
|
output.append(
|
|
{
|
|
"ts": int(row.get("ts") or 0),
|
|
"event_type": str(
|
|
payload.get("legacy_event_type") or row.get("kind") or ""
|
|
).strip(),
|
|
"kind": str(row.get("kind") or "").strip(),
|
|
"direction": str(row.get("direction") or "").strip(),
|
|
"session_id": str(row.get("session_id") or "").strip(),
|
|
"origin_transport": str(row.get("transport") or "").strip(),
|
|
"actor_identifier": str(row.get("actor") or "").strip(),
|
|
"origin_message_id": str(payload.get("origin_message_id") or "").strip(),
|
|
"payload": payload,
|
|
}
|
|
)
|
|
return output
|