from __future__ import annotations import time from io import StringIO from unittest.mock import patch from django.core.management import call_command from django.test import SimpleTestCase, TestCase, override_settings from core.events.behavior import ComposingTracker, summarize_metrics from core.events.manticore import ManticoreEventLedgerBackend from core.models import ChatSession, ConversationEvent, Person, PersonIdentifier, User class ComposingTrackerTests(SimpleTestCase): def test_tracker_emits_abandoned_after_window(self): tracker = ComposingTracker(window_ms=300000) tracker.observe_started("session-1", 1000) abandoned = tracker.observe_stopped("session-1", 301500) self.assertIsNotNone(abandoned) self.assertEqual(300500, abandoned["duration_ms"]) self.assertTrue(abandoned["abandoned"]) class BehavioralMetricSummaryTests(SimpleTestCase): def test_summarize_metrics_computes_core_intervals(self): rows = [ { "session_id": "s1", "kind": "message_delivered", "ts": 1000, "payload": {"message_id": "m1"}, }, { "session_id": "s1", "kind": "message_read", "ts": 1600, "payload": {"message_id": "m1"}, }, {"session_id": "s1", "kind": "presence_available", "ts": 2000}, {"session_id": "s1", "kind": "composing_started", "ts": 2100}, {"session_id": "s1", "kind": "composing_started", "ts": 2200}, {"session_id": "s1", "kind": "message_sent", "ts": 2600}, {"session_id": "s2", "kind": "composing_started", "ts": 4000}, {"session_id": "s2", "kind": "composing_abandoned", "ts": 710000}, ] metrics = summarize_metrics(rows, rows) self.assertEqual(600, metrics["delay_b"]["value_ms"]) self.assertEqual(500, metrics["delay_c"]["value_ms"]) self.assertEqual(150, metrics["delay_f"]["value_ms"]) self.assertEqual(2, metrics["revision"]["value_ms"]) self.assertEqual(333, metrics["abandoned_rate"]["value_ms"]) @override_settings( EVENT_LEDGER_DUAL_WRITE=True, MANTICORE_HTTP_URL="http://manticore.test:9308", MANTICORE_EVENT_TABLE="gia_events_test", MANTICORE_METRIC_TABLE="gia_metrics_test", ) class GiaAnalysisCommandTests(TestCase): def setUp(self): ManticoreEventLedgerBackend._table_ready_cache.clear() self.user = User.objects.create_user( username="analysis-user", email="analysis@example.com", password="x", ) self.person = Person.objects.create(user=self.user, name="Analysis Person") self.identifier = PersonIdentifier.objects.create( user=self.user, person=self.person, service="whatsapp", identifier="15550009999", ) self.session = ChatSession.objects.create( user=self.user, identifier=self.identifier, ) @patch("core.events.manticore.requests.post") def test_metrics_table_upsert_and_analysis_command(self, mocked_post): now_ms = int(time.time() * 1000) mocked_response = type( "Response", (), { "raise_for_status": lambda self: None, "json": lambda self: {}, }, ) mocked_post.return_value = mocked_response() ConversationEvent.objects.create( user=self.user, session=self.session, ts=now_ms - 5000, event_type="presence_available", direction="system", origin_transport="whatsapp", payload={}, ) ConversationEvent.objects.create( user=self.user, session=self.session, ts=now_ms - 4500, event_type="typing_started", direction="in", origin_transport="whatsapp", payload={}, ) ConversationEvent.objects.create( user=self.user, session=self.session, ts=now_ms - 3900, event_type="message_created", direction="in", origin_transport="whatsapp", payload={"message_id": "m1"}, ) ConversationEvent.objects.create( user=self.user, session=self.session, ts=now_ms - 3600, event_type="delivery_receipt", direction="system", origin_transport="whatsapp", payload={"message_id": "m1"}, ) ConversationEvent.objects.create( user=self.user, session=self.session, ts=now_ms - 3000, event_type="read_receipt", direction="system", origin_transport="whatsapp", payload={"message_id": "m1"}, ) out = StringIO() with patch( "core.management.commands.gia_analysis.get_event_ledger_backend" ) as mocked_backend: backend = mocked_backend.return_value backend.list_event_targets.return_value = [ {"user_id": self.user.id, "person_id": str(self.person.id)} ] backend.fetch_events.return_value = [ { "user_id": self.user.id, "person_id": str(self.person.id), "session_id": str(self.session.id), "kind": "presence_available", "direction": "system", "ts": now_ms - 5000, "payload": {}, }, { "user_id": self.user.id, "person_id": str(self.person.id), "session_id": str(self.session.id), "kind": "composing_started", "direction": "in", "ts": now_ms - 4500, "payload": {}, }, { "user_id": self.user.id, "person_id": str(self.person.id), "session_id": str(self.session.id), "kind": "message_sent", "direction": "in", "ts": now_ms - 3900, "payload": {"message_id": "m1"}, }, { "user_id": self.user.id, "person_id": str(self.person.id), "session_id": str(self.session.id), "kind": "message_delivered", "direction": "system", "ts": now_ms - 3600, "payload": {"message_id": "m1"}, }, { "user_id": self.user.id, "person_id": str(self.person.id), "session_id": str(self.session.id), "kind": "message_read", "direction": "system", "ts": now_ms - 3000, "payload": {"message_id": "m1"}, }, ] call_command("gia_analysis", "--once", "--user-id", str(self.user.id), stdout=out) self.assertGreaterEqual(backend.upsert_metric.call_count, 3) self.assertIn("gia-analysis wrote=", out.getvalue()) @patch("core.events.manticore.requests.post") def test_list_event_targets_uses_group_by_query(self, mocked_post): mocked_response = type( "Response", (), { "raise_for_status": lambda self: None, "json": lambda self: {"data": []}, }, ) mocked_post.return_value = mocked_response() backend = ManticoreEventLedgerBackend() backend.list_event_targets(user_id=1) query = mocked_post.call_args.kwargs["data"]["query"] self.assertIn("GROUP BY user_id, person_id", query)