Files
GIA/core/management/commands/event_ledger_smoke.py

118 lines
4.2 KiB
Python

from __future__ import annotations
import json
import time
from django.core.management.base import BaseCommand, CommandError
from core.events.manticore import get_recent_event_rows
from core.models import ConversationEvent
class Command(BaseCommand):
help = "Quick non-mutating sanity check for recent canonical event writes."
def _recent_rows(self, *, minutes: int, service: str, user_id: str, limit: int):
cutoff_ts = int(time.time() * 1000) - (minutes * 60 * 1000)
queryset = ConversationEvent.objects.filter(ts__gte=cutoff_ts).order_by("-ts")
if service:
queryset = queryset.filter(origin_transport=service)
if user_id:
queryset = queryset.filter(user_id=user_id)
rows = list(
queryset.values(
"id",
"user_id",
"session_id",
"ts",
"event_type",
"direction",
"origin_transport",
"trace_id",
)[:limit]
)
if rows:
return rows, "django"
try:
manticore_rows = get_recent_event_rows(
minutes=minutes,
service=service,
user_id=user_id,
limit=limit,
)
except Exception:
manticore_rows = []
return manticore_rows, "manticore" if manticore_rows else "django"
def add_arguments(self, parser):
parser.add_argument("--minutes", type=int, default=120)
parser.add_argument("--service", default="")
parser.add_argument("--user-id", default="")
parser.add_argument("--limit", type=int, default=200)
parser.add_argument("--require-types", default="")
parser.add_argument("--fail-if-empty", action="store_true", default=False)
parser.add_argument("--json", action="store_true", default=False)
def handle(self, *args, **options):
minutes = max(1, int(options.get("minutes") or 120))
service = str(options.get("service") or "").strip().lower()
user_id = str(options.get("user_id") or "").strip()
limit = max(1, int(options.get("limit") or 200))
require_types_raw = str(options.get("require_types") or "").strip()
fail_if_empty = bool(options.get("fail_if_empty"))
as_json = bool(options.get("json"))
required_types = [
item.strip().lower()
for item in require_types_raw.split(",")
if item.strip()
]
rows, data_source = self._recent_rows(
minutes=minutes,
service=service,
user_id=user_id,
limit=limit,
)
event_type_counts = {}
for row in rows:
key = str(row.get("event_type") or "")
event_type_counts[key] = int(event_type_counts.get(key) or 0) + 1
missing_required_types = [
event_type
for event_type in required_types
if int(event_type_counts.get(event_type) or 0) <= 0
]
payload = {
"minutes": minutes,
"service": service,
"user_id": user_id,
"data_source": data_source,
"count": len(rows),
"event_type_counts": event_type_counts,
"required_types": required_types,
"missing_required_types": missing_required_types,
"sample": rows[:25],
}
if as_json:
self.stdout.write(json.dumps(payload, indent=2, sort_keys=True))
return
self.stdout.write(
f"event-ledger-smoke minutes={minutes} service={service or '-'} user={user_id or '-'} source={data_source} count={len(rows)}"
)
self.stdout.write(f"event_type_counts={event_type_counts}")
if required_types:
self.stdout.write(
f"required_types={required_types} missing_required_types={missing_required_types}"
)
if fail_if_empty and len(rows) == 0:
raise CommandError("No recent canonical event rows found.")
if missing_required_types:
raise CommandError(
"Missing required event types: " + ", ".join(missing_required_types)
)