158 lines
4.7 KiB
Python
158 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
from asgiref.sync import sync_to_async
|
|
from django.conf import settings
|
|
|
|
from core.events.manticore import get_event_ledger_backend
|
|
from core.models import ConversationEvent
|
|
from core.observability.tracing import ensure_trace_id
|
|
from core.util import logs
|
|
|
|
log = logs.get_logger("event-ledger")
|
|
|
|
|
|
def event_ledger_enabled() -> bool:
|
|
return bool(
|
|
getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False)
|
|
or getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False)
|
|
)
|
|
|
|
|
|
def event_ledger_status() -> dict:
|
|
return {
|
|
"event_ledger_dual_write": bool(
|
|
getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False)
|
|
),
|
|
"event_primary_write_path": bool(
|
|
getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False)
|
|
),
|
|
}
|
|
|
|
|
|
def _normalize_direction(value: str) -> str:
|
|
direction = str(value or "system").strip().lower()
|
|
if direction not in {"in", "out", "system"}:
|
|
return "system"
|
|
return direction
|
|
|
|
|
|
def _safe_ts(value: int | None) -> int:
|
|
if value is None:
|
|
return int(time.time() * 1000)
|
|
try:
|
|
parsed = int(value)
|
|
except Exception:
|
|
return int(time.time() * 1000)
|
|
if parsed <= 0:
|
|
return int(time.time() * 1000)
|
|
return parsed
|
|
|
|
|
|
def append_event_sync(
|
|
*,
|
|
user,
|
|
session,
|
|
event_type: str,
|
|
direction: str,
|
|
actor_identifier: str = "",
|
|
origin_transport: str = "",
|
|
origin_message_id: str = "",
|
|
origin_chat_id: str = "",
|
|
payload: dict | None = None,
|
|
raw_payload: dict | None = None,
|
|
trace_id: str = "",
|
|
ts: int | None = None,
|
|
):
|
|
if not event_ledger_enabled():
|
|
return None
|
|
|
|
normalized_type = str(event_type or "").strip().lower()
|
|
if not normalized_type:
|
|
raise ValueError("event_type is required")
|
|
|
|
candidates = {str(choice[0]) for choice in ConversationEvent.EVENT_TYPE_CHOICES}
|
|
if normalized_type not in candidates:
|
|
raise ValueError(f"unsupported event_type: {normalized_type}")
|
|
|
|
normalized_direction = _normalize_direction(direction)
|
|
normalized_trace = ensure_trace_id(trace_id, payload or {})
|
|
|
|
safe_ts = _safe_ts(ts)
|
|
transport = str(origin_transport or "").strip().lower()
|
|
message_id = str(origin_message_id or "").strip()
|
|
actor_identifier = str(actor_identifier or "").strip()
|
|
origin_chat_id = str(origin_chat_id or "").strip()
|
|
payload = dict(payload or {})
|
|
raw_payload = dict(raw_payload or {})
|
|
|
|
dual_write = bool(getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False))
|
|
primary_write = bool(getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False))
|
|
write_django = dual_write and not primary_write
|
|
|
|
row = None
|
|
if write_django:
|
|
dedup_row = None
|
|
if transport and message_id:
|
|
dedup_row = (
|
|
ConversationEvent.objects.filter(
|
|
user=user,
|
|
session=session,
|
|
event_type=normalized_type,
|
|
origin_transport=transport,
|
|
origin_message_id=message_id,
|
|
)
|
|
.order_by("-created_at")
|
|
.first()
|
|
)
|
|
if dedup_row is not None:
|
|
row = dedup_row
|
|
else:
|
|
row = ConversationEvent.objects.create(
|
|
user=user,
|
|
session=session,
|
|
ts=safe_ts,
|
|
event_type=normalized_type,
|
|
direction=normalized_direction,
|
|
actor_identifier=actor_identifier,
|
|
origin_transport=transport,
|
|
origin_message_id=message_id,
|
|
origin_chat_id=origin_chat_id,
|
|
payload=payload,
|
|
raw_payload=raw_payload,
|
|
trace_id=normalized_trace,
|
|
)
|
|
|
|
try:
|
|
get_event_ledger_backend().upsert_event(
|
|
user_id=int(user.id),
|
|
person_id=str(session.identifier.person_id),
|
|
session_id=str(session.id),
|
|
event_type=normalized_type,
|
|
direction=normalized_direction,
|
|
ts=safe_ts,
|
|
actor_identifier=actor_identifier,
|
|
origin_transport=transport,
|
|
origin_message_id=message_id,
|
|
origin_chat_id=origin_chat_id,
|
|
payload=payload,
|
|
raw_payload=raw_payload,
|
|
trace_id=normalized_trace,
|
|
)
|
|
except Exception as exc:
|
|
if primary_write:
|
|
raise
|
|
log.warning(
|
|
"Event ledger manticore dual-write failed session=%s event_type=%s err=%s",
|
|
getattr(session, "id", "-"),
|
|
normalized_type,
|
|
exc,
|
|
)
|
|
|
|
return row
|
|
|
|
|
|
async def append_event(**kwargs):
|
|
return await sync_to_async(append_event_sync, thread_sensitive=True)(**kwargs)
|