Files
GIA/core/events/ledger.py

158 lines
4.7 KiB
Python

from __future__ import annotations
import time
from asgiref.sync import sync_to_async
from django.conf import settings
from core.events.manticore import get_event_ledger_backend
from core.models import ConversationEvent
from core.observability.tracing import ensure_trace_id
from core.util import logs
log = logs.get_logger("event-ledger")
def event_ledger_enabled() -> bool:
return bool(
getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False)
or getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False)
)
def event_ledger_status() -> dict:
return {
"event_ledger_dual_write": bool(
getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False)
),
"event_primary_write_path": bool(
getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False)
),
}
def _normalize_direction(value: str) -> str:
direction = str(value or "system").strip().lower()
if direction not in {"in", "out", "system"}:
return "system"
return direction
def _safe_ts(value: int | None) -> int:
if value is None:
return int(time.time() * 1000)
try:
parsed = int(value)
except Exception:
return int(time.time() * 1000)
if parsed <= 0:
return int(time.time() * 1000)
return parsed
def append_event_sync(
*,
user,
session,
event_type: str,
direction: str,
actor_identifier: str = "",
origin_transport: str = "",
origin_message_id: str = "",
origin_chat_id: str = "",
payload: dict | None = None,
raw_payload: dict | None = None,
trace_id: str = "",
ts: int | None = None,
):
if not event_ledger_enabled():
return None
normalized_type = str(event_type or "").strip().lower()
if not normalized_type:
raise ValueError("event_type is required")
candidates = {str(choice[0]) for choice in ConversationEvent.EVENT_TYPE_CHOICES}
if normalized_type not in candidates:
raise ValueError(f"unsupported event_type: {normalized_type}")
normalized_direction = _normalize_direction(direction)
normalized_trace = ensure_trace_id(trace_id, payload or {})
safe_ts = _safe_ts(ts)
transport = str(origin_transport or "").strip().lower()
message_id = str(origin_message_id or "").strip()
actor_identifier = str(actor_identifier or "").strip()
origin_chat_id = str(origin_chat_id or "").strip()
payload = dict(payload or {})
raw_payload = dict(raw_payload or {})
dual_write = bool(getattr(settings, "EVENT_LEDGER_DUAL_WRITE", False))
primary_write = bool(getattr(settings, "EVENT_PRIMARY_WRITE_PATH", False))
write_django = dual_write and not primary_write
row = None
if write_django:
dedup_row = None
if transport and message_id:
dedup_row = (
ConversationEvent.objects.filter(
user=user,
session=session,
event_type=normalized_type,
origin_transport=transport,
origin_message_id=message_id,
)
.order_by("-created_at")
.first()
)
if dedup_row is not None:
row = dedup_row
else:
row = ConversationEvent.objects.create(
user=user,
session=session,
ts=safe_ts,
event_type=normalized_type,
direction=normalized_direction,
actor_identifier=actor_identifier,
origin_transport=transport,
origin_message_id=message_id,
origin_chat_id=origin_chat_id,
payload=payload,
raw_payload=raw_payload,
trace_id=normalized_trace,
)
try:
get_event_ledger_backend().upsert_event(
user_id=int(user.id),
person_id=str(session.identifier.person_id),
session_id=str(session.id),
event_type=normalized_type,
direction=normalized_direction,
ts=safe_ts,
actor_identifier=actor_identifier,
origin_transport=transport,
origin_message_id=message_id,
origin_chat_id=origin_chat_id,
payload=payload,
raw_payload=raw_payload,
trace_id=normalized_trace,
)
except Exception as exc:
if primary_write:
raise
log.warning(
"Event ledger manticore dual-write failed session=%s event_type=%s err=%s",
getattr(session, "id", "-"),
normalized_type,
exc,
)
return row
async def append_event(**kwargs):
return await sync_to_async(append_event_sync, thread_sensitive=True)(**kwargs)