import asyncio from asgiref.sync import sync_to_async from django.conf import settings from core.clients import transport from core.clients.instagram import InstagramClient from core.clients.signal import SignalClient from core.clients.whatsapp import WhatsAppClient from core.clients.xmpp import XMPPClient from core.assist.engine import process_inbound_assist from core.commands.base import CommandContext from core.commands.engine import process_inbound_message from core.messaging import history from core.models import PersonIdentifier from core.realtime.typing_state import set_person_typing_state from core.translation.engine import process_inbound_translation from core.util import logs class UnifiedRouter(object): """ Unified Router. Contains generic functions for handling XMPP and Signal events. """ def __init__(self, loop): self.loop = loop self.typing_auto_stop_seconds = int( getattr(settings, "XMPP_TYPING_AUTO_STOP_SECONDS", 3) ) self._typing_stop_tasks = {} self.log = logs.get_logger("router") self.log.info("Initialised Unified Router Interface.") self.xmpp = XMPPClient(self, loop, "xmpp") self.signal = SignalClient(self, loop, "signal") self.whatsapp = WhatsAppClient(self, loop, "whatsapp") self.instagram = InstagramClient(self, loop, "instagram") def _typing_task_key(self, target): return ( int(target.user_id), int(target.person_id), str(target.service), str(target.identifier), ) def _cancel_typing_timer(self, key): existing = self._typing_stop_tasks.pop(key, None) if existing and not existing.done(): existing.cancel() def _schedule_typing_auto_stop(self, target): key = self._typing_task_key(target) self._cancel_typing_timer(key) delay = max(1, int(self.typing_auto_stop_seconds)) async def _timer(): try: await asyncio.sleep(delay) await transport.stop_typing(target.service, target.identifier) except asyncio.CancelledError: return except Exception as exc: self.log.warning( "Typing auto-stop failed for %s/%s: %s", target.service, target.identifier, exc, ) finally: self._typing_stop_tasks.pop(key, None) self._typing_stop_tasks[key] = self.loop.create_task(_timer()) def _start(self): self.log.info("Starting unified router clients") self.xmpp.start() self.signal.start() self.whatsapp.start() self.instagram.start() def run(self): try: # self.xmpp.client.client.process() # self.xmpp.start() self.log.debug("Router run loop initializing") self._start() self.loop.run_forever() except (KeyboardInterrupt, SystemExit): self.log.info("Process terminating") finally: self.loop.close() async def message_received(self, protocol, *args, **kwargs): self.log.info(f"Message received ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") local_message = kwargs.get("local_message") message_text = str(kwargs.get("text") or "").strip() if local_message is None: return channel_identifier = "" if isinstance(identifier, PersonIdentifier): channel_identifier = str(identifier.identifier or "").strip() elif identifier is not None: channel_identifier = str(identifier or "").strip() if channel_identifier: try: await process_inbound_message( CommandContext( service=str(protocol or "").strip().lower(), channel_identifier=channel_identifier, message_id=str(local_message.id), user_id=int(local_message.user_id), message_text=message_text, payload=dict(kwargs.get("payload") or {}), ) ) except Exception as exc: self.log.warning("Command engine processing failed: %s", exc) try: await process_inbound_translation(local_message) except Exception as exc: self.log.warning("Translation engine processing failed: %s", exc) try: await process_inbound_assist(local_message) except Exception as exc: self.log.warning("Assist/task processing failed: %s", exc) async def _resolve_identifier_objects(self, protocol, identifier): if isinstance(identifier, PersonIdentifier): return [identifier] value = str(identifier or "").strip() if not value: return [] return await sync_to_async(list)( PersonIdentifier.objects.filter( identifier=value, service=protocol, ) ) async def message_read(self, protocol, *args, **kwargs): self.log.info(f"Message read ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") timestamps = kwargs.get("message_timestamps") or [] read_ts = kwargs.get("read_ts") payload = kwargs.get("payload") or {} read_by = kwargs.get("read_by") or "" identifiers = await self._resolve_identifier_objects(protocol, identifier) for row in identifiers: await history.apply_read_receipts( user=row.user, identifier=row, message_timestamps=timestamps, read_ts=read_ts, source_service=protocol, read_by_identifier=read_by or row.identifier, payload=payload, ) async def started_typing(self, protocol, *args, **kwargs): self.log.info(f"Started typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, person_id=src.person_id, started=True, source_service=protocol, display_name=src.person.name, ) try: await self.xmpp.start_typing_for_person(src.user, src) except Exception as exc: self.log.warning( "Failed to relay typing-start to XMPP for %s: %s", src.identifier, exc, ) targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, person=src.person, ).exclude(service=protocol) ) for target in targets: if target.service == "xmpp": continue await transport.start_typing(target.service, target.identifier) if protocol == "xmpp": self._schedule_typing_auto_stop(target) async def stopped_typing(self, protocol, *args, **kwargs): self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, person_id=src.person_id, started=False, source_service=protocol, display_name=src.person.name, ) try: await self.xmpp.stop_typing_for_person(src.user, src) except Exception as exc: self.log.warning( "Failed to relay typing-stop to XMPP for %s: %s", src.identifier, exc, ) targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, person=src.person, ).exclude(service=protocol) ) for target in targets: if target.service == "xmpp": continue self._cancel_typing_timer(self._typing_task_key(target)) await transport.stop_typing(target.service, target.identifier) async def reacted(self, protocol, *args, **kwargs): self.log.info(f"Reacted ({protocol}) {args} {kwargs}") async def replied(self, protocol, *args, **kwargs): self.log.info(f"Replied ({protocol}) {args} {kwargs}")