220 lines
7.9 KiB
Python
220 lines
7.9 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from io import StringIO
|
|
from unittest.mock import patch
|
|
|
|
from django.core.management import call_command
|
|
from django.test import SimpleTestCase, TestCase, override_settings
|
|
|
|
from core.events.behavior import ComposingTracker, summarize_metrics
|
|
from core.events.manticore import ManticoreEventLedgerBackend
|
|
from core.models import ChatSession, ConversationEvent, Person, PersonIdentifier, User
|
|
|
|
|
|
class ComposingTrackerTests(SimpleTestCase):
|
|
def test_tracker_emits_abandoned_after_window(self):
|
|
tracker = ComposingTracker(window_ms=300000)
|
|
tracker.observe_started("session-1", 1000)
|
|
abandoned = tracker.observe_stopped("session-1", 301500)
|
|
self.assertIsNotNone(abandoned)
|
|
self.assertEqual(300500, abandoned["duration_ms"])
|
|
self.assertTrue(abandoned["abandoned"])
|
|
|
|
|
|
class BehavioralMetricSummaryTests(SimpleTestCase):
|
|
def test_summarize_metrics_computes_core_intervals(self):
|
|
rows = [
|
|
{
|
|
"session_id": "s1",
|
|
"kind": "message_delivered",
|
|
"ts": 1000,
|
|
"payload": {"message_id": "m1"},
|
|
},
|
|
{
|
|
"session_id": "s1",
|
|
"kind": "message_read",
|
|
"ts": 1600,
|
|
"payload": {"message_id": "m1"},
|
|
},
|
|
{"session_id": "s1", "kind": "presence_available", "ts": 2000},
|
|
{"session_id": "s1", "kind": "composing_started", "ts": 2100},
|
|
{"session_id": "s1", "kind": "composing_started", "ts": 2200},
|
|
{"session_id": "s1", "kind": "message_sent", "ts": 2600},
|
|
{"session_id": "s2", "kind": "composing_started", "ts": 4000},
|
|
{"session_id": "s2", "kind": "composing_abandoned", "ts": 710000},
|
|
]
|
|
|
|
metrics = summarize_metrics(rows, rows)
|
|
|
|
self.assertEqual(600, metrics["delay_b"]["value_ms"])
|
|
self.assertEqual(500, metrics["delay_c"]["value_ms"])
|
|
self.assertEqual(150, metrics["delay_f"]["value_ms"])
|
|
self.assertEqual(2, metrics["revision"]["value_ms"])
|
|
self.assertEqual(333, metrics["abandoned_rate"]["value_ms"])
|
|
|
|
|
|
@override_settings(
|
|
EVENT_LEDGER_DUAL_WRITE=True,
|
|
MANTICORE_HTTP_URL="http://manticore.test:9308",
|
|
MANTICORE_EVENT_TABLE="gia_events_test",
|
|
MANTICORE_METRIC_TABLE="gia_metrics_test",
|
|
)
|
|
class GiaAnalysisCommandTests(TestCase):
|
|
def setUp(self):
|
|
ManticoreEventLedgerBackend._table_ready_cache.clear()
|
|
self.user = User.objects.create_user(
|
|
username="analysis-user",
|
|
email="analysis@example.com",
|
|
password="x",
|
|
)
|
|
self.person = Person.objects.create(user=self.user, name="Analysis Person")
|
|
self.identifier = PersonIdentifier.objects.create(
|
|
user=self.user,
|
|
person=self.person,
|
|
service="whatsapp",
|
|
identifier="15550009999",
|
|
)
|
|
self.session = ChatSession.objects.create(
|
|
user=self.user,
|
|
identifier=self.identifier,
|
|
)
|
|
|
|
@patch("core.events.manticore.requests.post")
|
|
def test_metrics_table_upsert_and_analysis_command(self, mocked_post):
|
|
now_ms = int(time.time() * 1000)
|
|
mocked_response = type(
|
|
"Response",
|
|
(),
|
|
{
|
|
"raise_for_status": lambda self: None,
|
|
"json": lambda self: {},
|
|
},
|
|
)
|
|
mocked_post.return_value = mocked_response()
|
|
|
|
ConversationEvent.objects.create(
|
|
user=self.user,
|
|
session=self.session,
|
|
ts=now_ms - 5000,
|
|
event_type="presence_available",
|
|
direction="system",
|
|
origin_transport="whatsapp",
|
|
payload={},
|
|
)
|
|
ConversationEvent.objects.create(
|
|
user=self.user,
|
|
session=self.session,
|
|
ts=now_ms - 4500,
|
|
event_type="typing_started",
|
|
direction="in",
|
|
origin_transport="whatsapp",
|
|
payload={},
|
|
)
|
|
ConversationEvent.objects.create(
|
|
user=self.user,
|
|
session=self.session,
|
|
ts=now_ms - 3900,
|
|
event_type="message_created",
|
|
direction="in",
|
|
origin_transport="whatsapp",
|
|
payload={"message_id": "m1"},
|
|
)
|
|
ConversationEvent.objects.create(
|
|
user=self.user,
|
|
session=self.session,
|
|
ts=now_ms - 3600,
|
|
event_type="delivery_receipt",
|
|
direction="system",
|
|
origin_transport="whatsapp",
|
|
payload={"message_id": "m1"},
|
|
)
|
|
ConversationEvent.objects.create(
|
|
user=self.user,
|
|
session=self.session,
|
|
ts=now_ms - 3000,
|
|
event_type="read_receipt",
|
|
direction="system",
|
|
origin_transport="whatsapp",
|
|
payload={"message_id": "m1"},
|
|
)
|
|
|
|
out = StringIO()
|
|
with patch(
|
|
"core.management.commands.gia_analysis.get_event_ledger_backend"
|
|
) as mocked_backend:
|
|
backend = mocked_backend.return_value
|
|
backend.list_event_targets.return_value = [
|
|
{"user_id": self.user.id, "person_id": str(self.person.id)}
|
|
]
|
|
backend.fetch_events.return_value = [
|
|
{
|
|
"user_id": self.user.id,
|
|
"person_id": str(self.person.id),
|
|
"session_id": str(self.session.id),
|
|
"kind": "presence_available",
|
|
"direction": "system",
|
|
"ts": now_ms - 5000,
|
|
"payload": {},
|
|
},
|
|
{
|
|
"user_id": self.user.id,
|
|
"person_id": str(self.person.id),
|
|
"session_id": str(self.session.id),
|
|
"kind": "composing_started",
|
|
"direction": "in",
|
|
"ts": now_ms - 4500,
|
|
"payload": {},
|
|
},
|
|
{
|
|
"user_id": self.user.id,
|
|
"person_id": str(self.person.id),
|
|
"session_id": str(self.session.id),
|
|
"kind": "message_sent",
|
|
"direction": "in",
|
|
"ts": now_ms - 3900,
|
|
"payload": {"message_id": "m1"},
|
|
},
|
|
{
|
|
"user_id": self.user.id,
|
|
"person_id": str(self.person.id),
|
|
"session_id": str(self.session.id),
|
|
"kind": "message_delivered",
|
|
"direction": "system",
|
|
"ts": now_ms - 3600,
|
|
"payload": {"message_id": "m1"},
|
|
},
|
|
{
|
|
"user_id": self.user.id,
|
|
"person_id": str(self.person.id),
|
|
"session_id": str(self.session.id),
|
|
"kind": "message_read",
|
|
"direction": "system",
|
|
"ts": now_ms - 3000,
|
|
"payload": {"message_id": "m1"},
|
|
},
|
|
]
|
|
|
|
call_command("gia_analysis", "--once", "--user-id", str(self.user.id), stdout=out)
|
|
|
|
self.assertGreaterEqual(backend.upsert_metric.call_count, 3)
|
|
self.assertIn("gia-analysis wrote=", out.getvalue())
|
|
|
|
@patch("core.events.manticore.requests.post")
|
|
def test_list_event_targets_uses_group_by_query(self, mocked_post):
|
|
mocked_response = type(
|
|
"Response",
|
|
(),
|
|
{
|
|
"raise_for_status": lambda self: None,
|
|
"json": lambda self: {"data": []},
|
|
},
|
|
)
|
|
mocked_post.return_value = mocked_response()
|
|
|
|
backend = ManticoreEventLedgerBackend()
|
|
backend.list_event_targets(user_id=1)
|
|
|
|
query = mocked_post.call_args.kwargs["data"]["query"]
|
|
self.assertIn("GROUP BY user_id, person_id", query)
|