import asyncio import re from asgiref.sync import sync_to_async from django.conf import settings from core.clients import transport from core.events import event_ledger_status from core.clients.instagram import InstagramClient from core.clients.signal import SignalClient from core.clients.whatsapp import WhatsAppClient from core.clients.xmpp import XMPPClient from core.assist.engine import process_inbound_assist from core.commands.base import CommandContext from core.commands.engine import process_inbound_message from core.messaging import history from core.models import PersonIdentifier from core.presence import AvailabilitySignal, record_native_signal from core.realtime.typing_state import set_person_typing_state from core.translation.engine import process_inbound_translation from core.util import logs from core.observability.tracing import ensure_trace_id class UnifiedRouter(object): """ Unified Router. Contains generic functions for handling XMPP and Signal events. """ def __init__(self, loop): self.loop = loop self.typing_auto_stop_seconds = int( getattr(settings, "XMPP_TYPING_AUTO_STOP_SECONDS", 3) ) self._typing_stop_tasks = {} self.log = logs.get_logger("router") self.log.info("Initialised Unified Router Interface.") self.log.info( "runtime-flags event_ledger_dual_write=%s event_primary_write_path=%s trace_propagation=%s capability_enforcement=%s", bool(event_ledger_status().get("event_ledger_dual_write")), bool(event_ledger_status().get("event_primary_write_path")), bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)), bool(getattr(settings, "CAPABILITY_ENFORCEMENT_ENABLED", True)), ) self.xmpp = XMPPClient(self, loop, "xmpp") self.signal = SignalClient(self, loop, "signal") self.whatsapp = WhatsAppClient(self, loop, "whatsapp") self.instagram = InstagramClient(self, loop, "instagram") def _typing_task_key(self, target): return ( int(target.user_id), int(target.person_id), str(target.service), str(target.identifier), ) def _cancel_typing_timer(self, key): existing = self._typing_stop_tasks.pop(key, None) if existing and not existing.done(): existing.cancel() def _schedule_typing_auto_stop(self, target): key = self._typing_task_key(target) self._cancel_typing_timer(key) delay = max(1, int(self.typing_auto_stop_seconds)) async def _timer(): try: await asyncio.sleep(delay) await transport.stop_typing(target.service, target.identifier) except asyncio.CancelledError: return except Exception as exc: self.log.warning( "Typing auto-stop failed for %s/%s: %s", target.service, target.identifier, exc, ) finally: self._typing_stop_tasks.pop(key, None) self._typing_stop_tasks[key] = self.loop.create_task(_timer()) def _start(self): self.log.info("Starting unified router clients") self.xmpp.start() self.signal.start() self.whatsapp.start() self.instagram.start() def run(self): try: # self.xmpp.client.client.process() # self.xmpp.start() self.log.debug("Router run loop initializing") self._start() self.loop.run_forever() except (KeyboardInterrupt, SystemExit): self.log.info("Process terminating") finally: self.loop.close() async def message_received(self, protocol, *args, **kwargs): self.log.info(f"Message received ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") local_message = kwargs.get("local_message") payload = dict(kwargs.get("payload") or {}) trace_id = ( ensure_trace_id(payload=payload) if bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)) else "" ) message_text = str(kwargs.get("text") or "").strip() if local_message is None: return identifiers = await self._resolve_identifier_objects(protocol, identifier) if identifiers: outgoing = str(getattr(local_message, "custom_author", "") or "").strip().upper() in { "USER", "BOT", } source_kind = "message_out" if outgoing else "message_in" confidence = 0.65 if outgoing else 0.75 for row in identifiers: record_native_signal( AvailabilitySignal( user=row.user, person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), source_kind=source_kind, availability_state="available", confidence=confidence, ts=int(getattr(local_message, "ts", 0) or 0), payload={ "origin": "router.message_received", "message_id": str(getattr(local_message, "id", "") or ""), "outgoing": outgoing, }, ) ) channel_identifier = "" if isinstance(identifier, PersonIdentifier): channel_identifier = str(identifier.identifier or "").strip() elif identifier is not None: channel_identifier = str(identifier or "").strip() if channel_identifier: try: await process_inbound_message( CommandContext( service=str(protocol or "").strip().lower(), channel_identifier=channel_identifier, message_id=str(local_message.id), user_id=int(local_message.user_id), message_text=message_text, payload=( dict(kwargs.get("payload") or {}) | ({"trace_id": trace_id} if trace_id else {}) ), ) ) except Exception as exc: self.log.warning("Command engine processing failed: %s", exc) try: await process_inbound_translation(local_message) except Exception as exc: self.log.warning("Translation engine processing failed: %s", exc) try: await process_inbound_assist(local_message) except Exception as exc: self.log.warning("Assist/task processing failed: %s", exc) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def _refresh_workspace_metrics_for_identifiers(self, identifiers): if not identifiers: return seen = set() for row in identifiers: person = getattr(row, "person", None) user = getattr(row, "user", None) if person is None or user is None: continue person_key = str(getattr(person, "id", "") or "") if not person_key or person_key in seen: continue seen.add(person_key) try: from core.views.workspace import _conversation_for_person await sync_to_async(_conversation_for_person)(user, person) except Exception as exc: self.log.warning( "Workspace metrics refresh failed for person=%s: %s", person_key, exc, ) async def _resolve_identifier_objects(self, protocol, identifier): if isinstance(identifier, PersonIdentifier): return [identifier] value = str(identifier or "").strip() if not value: return [] variants = [value] bare = value.split("@", 1)[0].strip() if bare and bare not in variants: variants.append(bare) if protocol == "signal": digits = re.sub(r"[^0-9]", "", value) if digits and digits not in variants: variants.append(digits) if digits: plus = f"+{digits}" if plus not in variants: variants.append(plus) elif protocol == "whatsapp" and bare: direct = f"{bare}@s.whatsapp.net" group = f"{bare}@g.us" if direct not in variants: variants.append(direct) if group not in variants: variants.append(group) return await sync_to_async(list)( PersonIdentifier.objects.filter( identifier__in=variants, service=protocol, ) ) async def message_read(self, protocol, *args, **kwargs): self.log.info(f"Message read ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") timestamps = kwargs.get("message_timestamps") or [] read_ts = kwargs.get("read_ts") payload = kwargs.get("payload") or {} trace_id = ( ensure_trace_id(payload=payload) if bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)) else "" ) read_by = kwargs.get("read_by") or "" identifiers = await self._resolve_identifier_objects(protocol, identifier) for row in identifiers: await history.apply_read_receipts( user=row.user, identifier=row, message_timestamps=timestamps, read_ts=read_ts, source_service=protocol, read_by_identifier=read_by or row.identifier, payload=payload, trace_id=trace_id, ) record_native_signal( AvailabilitySignal( user=row.user, person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), source_kind="read_receipt", availability_state="available", confidence=0.95, ts=int(read_ts or 0), payload={ "origin": "router.message_read", "message_timestamps": [int(v) for v in list(timestamps or []) if str(v).isdigit()], "read_by": str(read_by or row.identifier), }, ) ) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def presence_changed(self, protocol, *args, **kwargs): identifier = kwargs.get("identifier") state = str(kwargs.get("state") or "unknown").strip().lower() if state not in {"available", "unavailable", "unknown", "fading"}: state = "unknown" try: confidence = float(kwargs.get("confidence") or 0.6) except Exception: confidence = 0.6 try: ts = int(kwargs.get("ts") or 0) except Exception: ts = 0 payload = dict(kwargs.get("payload") or {}) identifiers = await self._resolve_identifier_objects(protocol, identifier) for row in identifiers: record_native_signal( AvailabilitySignal( user=row.user, person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), source_kind="native_presence", availability_state=state, confidence=confidence, ts=ts, payload=payload, ) ) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def started_typing(self, protocol, *args, **kwargs): self.log.info(f"Started typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: record_native_signal( AvailabilitySignal( user=src.user, person=src.person, person_identifier=src, service=str(protocol or "").strip().lower(), source_kind="typing_start", availability_state="available", confidence=0.9, ts=0, payload={"origin": "router.started_typing"}, ) ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, person_id=src.person_id, started=True, source_service=protocol, display_name=src.person.name, ) try: await self.xmpp.start_typing_for_person(src.user, src) except Exception as exc: self.log.warning( "Failed to relay typing-start to XMPP for %s: %s", src.identifier, exc, ) targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, person=src.person, ).exclude(service=protocol) ) for target in targets: if target.service == "xmpp": continue await transport.start_typing(target.service, target.identifier) if protocol == "xmpp": self._schedule_typing_auto_stop(target) async def stopped_typing(self, protocol, *args, **kwargs): self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: record_native_signal( AvailabilitySignal( user=src.user, person=src.person, person_identifier=src, service=str(protocol or "").strip().lower(), source_kind="typing_stop", availability_state="fading", confidence=0.5, ts=0, payload={"origin": "router.stopped_typing"}, ) ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, person_id=src.person_id, started=False, source_service=protocol, display_name=src.person.name, ) try: await self.xmpp.stop_typing_for_person(src.user, src) except Exception as exc: self.log.warning( "Failed to relay typing-stop to XMPP for %s: %s", src.identifier, exc, ) targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, person=src.person, ).exclude(service=protocol) ) for target in targets: if target.service == "xmpp": continue self._cancel_typing_timer(self._typing_task_key(target)) await transport.stop_typing(target.service, target.identifier) async def reacted(self, protocol, *args, **kwargs): self.log.info(f"Reacted ({protocol}) {args} {kwargs}") async def replied(self, protocol, *args, **kwargs): self.log.info(f"Replied ({protocol}) {args} {kwargs}")