from __future__ import annotations import json import time from django.core.management.base import BaseCommand, CommandError from core.events.projection import shadow_compare_session from core.models import ChatSession, Message class Command(BaseCommand): help = ( "Run event->message shadow projection comparison and emit mismatch counters " "per chat session." ) def add_arguments(self, parser): parser.add_argument("--user-id", default="") parser.add_argument("--session-id", default="") parser.add_argument("--service", default="") parser.add_argument("--recent-only", action="store_true", default=False) parser.add_argument("--recent-minutes", type=int, default=0) parser.add_argument("--limit-sessions", type=int, default=50) parser.add_argument("--detail-limit", type=int, default=25) parser.add_argument("--fail-on-mismatch", action="store_true", default=False) parser.add_argument("--json", action="store_true", default=False) def handle(self, *args, **options): user_id = str(options.get("user_id") or "").strip() session_id = str(options.get("session_id") or "").strip() service = str(options.get("service") or "").strip().lower() recent_only = bool(options.get("recent_only")) recent_minutes = max(0, int(options.get("recent_minutes") or 0)) if recent_only and recent_minutes <= 0: recent_minutes = 120 limit_sessions = max(1, int(options.get("limit_sessions") or 50)) detail_limit = max(0, int(options.get("detail_limit") or 25)) as_json = bool(options.get("json")) fail_on_mismatch = bool(options.get("fail_on_mismatch")) sessions = ChatSession.objects.all().order_by("-last_interaction", "id") if user_id: sessions = sessions.filter(user_id=user_id) if session_id: sessions = sessions.filter(id=session_id) if service: sessions = sessions.filter(identifier__service=service) if recent_minutes > 0: cutoff_ts = int(time.time() * 1000) - (recent_minutes * 60 * 1000) recent_session_ids = ( Message.objects.filter(ts__gte=cutoff_ts) .values_list("session_id", flat=True) .distinct() ) sessions = sessions.filter(id__in=recent_session_ids) sessions = list(sessions.select_related("user", "identifier")[:limit_sessions]) if not sessions: raise CommandError("No chat sessions matched.") aggregate = { "sessions_scanned": 0, "db_message_count": 0, "projected_message_count": 0, "mismatch_total": 0, "counters": { "missing_in_projection": 0, "missing_in_db": 0, "text_mismatch": 0, "ts_mismatch": 0, "delivered_ts_mismatch": 0, "read_ts_mismatch": 0, "reactions_mismatch": 0, }, "cause_counts": { "missing_event_write": 0, "ambiguous_reaction_target": 0, "payload_normalization_gap": 0, }, } results = [] for session in sessions: compared = shadow_compare_session(session, detail_limit=detail_limit) aggregate["sessions_scanned"] += 1 aggregate["db_message_count"] += int(compared.get("db_message_count") or 0) aggregate["projected_message_count"] += int(compared.get("projected_message_count") or 0) aggregate["mismatch_total"] += int(compared.get("mismatch_total") or 0) for key in aggregate["counters"].keys(): aggregate["counters"][key] += int( (compared.get("counters") or {}).get(key) or 0 ) for key in aggregate["cause_counts"].keys(): aggregate["cause_counts"][key] += int( (compared.get("cause_counts") or {}).get(key) or 0 ) results.append(compared) payload = { "filters": { "user_id": user_id, "session_id": session_id, "service": service, "recent_only": recent_only, "recent_minutes": recent_minutes, "limit_sessions": limit_sessions, "detail_limit": detail_limit, }, "aggregate": aggregate, "sessions": results, } if as_json: self.stdout.write(json.dumps(payload, indent=2, sort_keys=True)) else: self.stdout.write( "shadow compare: " f"sessions={aggregate['sessions_scanned']} " f"db={aggregate['db_message_count']} " f"projected={aggregate['projected_message_count']} " f"mismatches={aggregate['mismatch_total']}" ) self.stdout.write(f"counters={aggregate['counters']}") self.stdout.write(f"cause_counts={aggregate['cause_counts']}") for row in results: self.stdout.write( f"session={row.get('session_id')} mismatch_total={row.get('mismatch_total')} " f"db={row.get('db_message_count')} projected={row.get('projected_message_count')}" ) if fail_on_mismatch and int(aggregate["mismatch_total"] or 0) > 0: raise CommandError( f"Shadow projection mismatch detected: {aggregate['mismatch_total']}" )