Files
GIA/core/tests/test_backfill_contact_availability.py

62 lines
2.0 KiB
Python

from __future__ import annotations
from unittest.mock import patch
from django.core.management import call_command
from django.test import TestCase
from core.models import ChatSession, Message, Person, PersonIdentifier, User
from core.presence.inference import now_ms
class BackfillContactAvailabilityCommandTests(TestCase):
def setUp(self):
self.user = User.objects.create_user(
"backfill-user", "backfill@example.com", "x"
)
self.person = Person.objects.create(user=self.user, name="Backfill Person")
self.identifier = PersonIdentifier.objects.create(
user=self.user,
person=self.person,
service="signal",
identifier="+15551234567",
)
self.session = ChatSession.objects.create(
user=self.user, identifier=self.identifier
)
@patch("core.management.commands.backfill_contact_availability.append_event_sync")
def test_backfill_replays_message_and_read_receipt_events(self, mocked_append):
base_ts = now_ms()
Message.objects.create(
user=self.user,
session=self.session,
ts=base_ts,
text="hello",
source_service="signal",
source_chat_id="+15551234567",
custom_author="OTHER",
)
Message.objects.create(
user=self.user,
session=self.session,
ts=base_ts + 1000,
text="hey",
source_service="signal",
source_chat_id="+15551234567",
custom_author="USER",
read_ts=base_ts + 2000,
read_by_identifier="+15551234567",
)
call_command(
"backfill_contact_availability",
"--days",
"36500",
"--limit",
"100",
)
event_types = [call.kwargs["event_type"] for call in mocked_append.call_args_list]
self.assertEqual(["message_created", "message_created", "read_receipt"], event_types)