Files
GIA/core/tests/test_event_ledger.py

63 lines
2.0 KiB
Python

from django.test import TestCase, override_settings
from core.events.ledger import append_event_sync
from core.models import ChatSession, ConversationEvent, Person, PersonIdentifier, User
@override_settings(EVENT_LEDGER_DUAL_WRITE=True)
class EventLedgerTests(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username="ledger-user",
email="ledger@example.com",
password="x",
)
self.person = Person.objects.create(user=self.user, name="Ledger Person")
self.identifier = PersonIdentifier.objects.create(
user=self.user,
person=self.person,
service="signal",
identifier="+15555550123",
)
self.session = ChatSession.objects.create(
user=self.user,
identifier=self.identifier,
)
def test_append_event_creates_row(self):
row = append_event_sync(
user=self.user,
session=self.session,
ts=1234,
event_type="message_created",
direction="in",
origin_transport="signal",
origin_message_id="abc",
payload={"text": "hello"},
)
self.assertIsNotNone(row)
self.assertEqual(1, ConversationEvent.objects.count())
def test_append_event_is_idempotent_for_same_origin_and_type(self):
append_event_sync(
user=self.user,
session=self.session,
ts=1234,
event_type="message_created",
direction="in",
origin_transport="signal",
origin_message_id="dup-1",
payload={"text": "hello"},
)
append_event_sync(
user=self.user,
session=self.session,
ts=1235,
event_type="message_created",
direction="in",
origin_transport="signal",
origin_message_id="dup-1",
payload={"text": "hello again"},
)
self.assertEqual(1, ConversationEvent.objects.count())