import asyncio import re import time from asgiref.sync import sync_to_async from django.conf import settings from core.assist.engine import process_inbound_assist from core.clients import transport from core.clients.instagram import InstagramClient from core.clients.signal import SignalClient from core.clients.whatsapp import WhatsAppClient from core.clients.xmpp import XMPPClient from core.commands.base import CommandContext from core.commands.engine import process_inbound_message from core.events import append_event, event_ledger_enabled, event_ledger_status from core.events.behavior import ComposingTracker from core.messaging import history from core.models import PersonIdentifier from core.observability.tracing import ensure_trace_id from core.presence import AvailabilitySignal, record_native_signal from core.realtime.typing_state import set_person_typing_state from core.translation.engine import process_inbound_translation from core.util import logs class UnifiedRouter(object): """ Unified Router. Contains generic functions for handling XMPP and Signal events. """ def __init__(self, loop): self.loop = loop self.typing_auto_stop_seconds = int( getattr(settings, "XMPP_TYPING_AUTO_STOP_SECONDS", 3) ) self.composing_abandoned_window_seconds = int( getattr(settings, "COMPOSING_ABANDONED_WINDOW_SECONDS", 300) ) self._typing_stop_tasks = {} self._composing_tracker = ComposingTracker( window_ms=self.composing_abandoned_window_seconds * 1000 ) self.log = logs.get_logger("router") self.log.info("Initialised Unified Router Interface.") self.log.info( "runtime-flags event_ledger_dual_write=%s event_primary_write_path=%s trace_propagation=%s capability_enforcement=%s", bool(event_ledger_status().get("event_ledger_dual_write")), bool(event_ledger_status().get("event_primary_write_path")), bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)), bool(getattr(settings, "CAPABILITY_ENFORCEMENT_ENABLED", True)), ) self.xmpp = XMPPClient(self, loop, "xmpp") self.signal = SignalClient(self, loop, "signal") self.whatsapp = WhatsAppClient(self, loop, "whatsapp") self.instagram = InstagramClient(self, loop, "instagram") def _typing_task_key(self, target): return ( int(target.user_id), int(target.person_id), str(target.service), str(target.identifier), ) def _cancel_typing_timer(self, key): existing = self._typing_stop_tasks.pop(key, None) if existing and not existing.done(): existing.cancel() def _schedule_typing_auto_stop(self, target): key = self._typing_task_key(target) self._cancel_typing_timer(key) delay = max(1, int(self.typing_auto_stop_seconds)) async def _timer(): try: await asyncio.sleep(delay) await transport.stop_typing(target.service, target.identifier) except asyncio.CancelledError: return except Exception as exc: self.log.warning( "Typing auto-stop failed for %s/%s: %s", target.service, target.identifier, exc, ) finally: self._typing_stop_tasks.pop(key, None) self._typing_stop_tasks[key] = self.loop.create_task(_timer()) def _behavior_direction(self, protocol: str) -> str: return "out" if str(protocol or "").strip().lower() == "xmpp" else "in" def _event_ts_from_kwargs(self, kwargs: dict) -> int | None: payload = dict(kwargs.get("payload") or {}) for candidate in ( kwargs.get("ts"), payload.get("ts"), payload.get("timestamp"), payload.get("messageTimestamp"), payload.get("message_ts"), ): try: parsed = int(candidate) except Exception: continue if parsed > 0: return parsed return int(time.time() * 1000) async def _append_identifier_event( self, *, identifier_row, event_type: str, protocol: str, direction: str, ts: int | None = None, payload: dict | None = None, raw_payload: dict | None = None, actor_identifier: str = "", ): if not event_ledger_enabled(): return None session = await history.get_chat_session(identifier_row.user, identifier_row) await append_event( user=identifier_row.user, session=session, ts=ts, event_type=event_type, direction=direction, actor_identifier=str(actor_identifier or identifier_row.identifier or ""), origin_transport=str(protocol or "").strip().lower(), origin_chat_id=str(identifier_row.identifier or ""), payload=dict(payload or {}), raw_payload=dict(raw_payload or {}), ) return session def _start(self): self.log.info("Starting unified router clients") self.xmpp.start() self.signal.start() self.whatsapp.start() self.instagram.start() def run(self): try: # self.xmpp.client.client.process() # self.xmpp.start() self.log.debug("Router run loop initializing") self._start() self.loop.run_forever() except (KeyboardInterrupt, SystemExit): self.log.info("Process terminating") finally: self.loop.close() async def message_received(self, protocol, *args, **kwargs): self.log.info(f"Message received ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") local_message = kwargs.get("local_message") payload = dict(kwargs.get("payload") or {}) trace_id = ( ensure_trace_id(payload=payload) if bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)) else "" ) message_text = str(kwargs.get("text") or "").strip() if local_message is None: return self._composing_tracker.observe_message( str(getattr(local_message, "session_id", "") or "") ) identifiers = await self._resolve_identifier_objects(protocol, identifier) if identifiers: outgoing = str( getattr(local_message, "custom_author", "") or "" ).strip().upper() in { "USER", "BOT", } source_kind = "message_out" if outgoing else "message_in" confidence = 0.65 if outgoing else 0.75 for row in identifiers: record_native_signal( AvailabilitySignal( user=row.user, person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), source_kind=source_kind, availability_state="available", confidence=confidence, ts=int(getattr(local_message, "ts", 0) or 0), payload={ "origin": "router.message_received", "message_id": str(getattr(local_message, "id", "") or ""), "outgoing": outgoing, }, ) ) channel_identifier = "" if isinstance(identifier, PersonIdentifier): channel_identifier = str(identifier.identifier or "").strip() elif identifier is not None: channel_identifier = str(identifier or "").strip() if channel_identifier: try: await process_inbound_message( CommandContext( service=str(protocol or "").strip().lower(), channel_identifier=channel_identifier, message_id=str(local_message.id), user_id=int(local_message.user_id), message_text=message_text, payload=( dict(kwargs.get("payload") or {}) | ({"trace_id": trace_id} if trace_id else {}) ), ) ) except Exception as exc: self.log.warning("Command engine processing failed: %s", exc) try: await process_inbound_translation(local_message) except Exception as exc: self.log.warning("Translation engine processing failed: %s", exc) try: await process_inbound_assist(local_message) except Exception as exc: self.log.warning("Assist/task processing failed: %s", exc) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def _refresh_workspace_metrics_for_identifiers(self, identifiers): if not identifiers: return seen = set() for row in identifiers: person = getattr(row, "person", None) user = getattr(row, "user", None) if person is None or user is None: continue person_key = str(getattr(person, "id", "") or "") if not person_key or person_key in seen: continue seen.add(person_key) try: from core.views.workspace import _conversation_for_person await sync_to_async(_conversation_for_person)(user, person) except Exception as exc: self.log.warning( "Workspace metrics refresh failed for person=%s: %s", person_key, exc, ) async def _resolve_identifier_objects(self, protocol, identifier): if isinstance(identifier, PersonIdentifier): return [identifier] value = str(identifier or "").strip() if not value: return [] variants = [value] bare = value.split("@", 1)[0].strip() if bare and bare not in variants: variants.append(bare) if protocol == "signal": digits = re.sub(r"[^0-9]", "", value) if digits and digits not in variants: variants.append(digits) if digits: plus = f"+{digits}" if plus not in variants: variants.append(plus) elif protocol == "whatsapp" and bare: direct = f"{bare}@s.whatsapp.net" group = f"{bare}@g.us" if direct not in variants: variants.append(direct) if group not in variants: variants.append(group) return await sync_to_async(list)( PersonIdentifier.objects.filter( identifier__in=variants, service=protocol, ) ) async def message_read(self, protocol, *args, **kwargs): self.log.info(f"Message read ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") timestamps = kwargs.get("message_timestamps") or [] read_ts = kwargs.get("read_ts") payload = kwargs.get("payload") or {} payload_type = str((payload or {}).get("type") or "").strip().lower() receipt_event_type = ( "delivery_receipt" if payload_type == "delivered" else "read_receipt" ) trace_id = ( ensure_trace_id(payload=payload) if bool(getattr(settings, "TRACE_PROPAGATION_ENABLED", True)) else "" ) read_by = kwargs.get("read_by") or "" identifiers = await self._resolve_identifier_objects(protocol, identifier) for row in identifiers: await history.apply_read_receipts( user=row.user, identifier=row, message_timestamps=timestamps, read_ts=read_ts, source_service=protocol, read_by_identifier=read_by or row.identifier, payload=payload, trace_id=trace_id, receipt_event_type=receipt_event_type, ) record_native_signal( AvailabilitySignal( user=row.user, person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), source_kind=receipt_event_type, availability_state="available", confidence=0.95, ts=int(read_ts or 0), payload={ "origin": "router.message_read", "receipt_event_type": receipt_event_type, "message_timestamps": [ int(v) for v in list(timestamps or []) if str(v).isdigit() ], "read_by": str(read_by or row.identifier), }, ) ) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def presence_changed(self, protocol, *args, **kwargs): identifier = kwargs.get("identifier") state = str(kwargs.get("state") or "unknown").strip().lower() if state not in {"available", "unavailable", "unknown", "fading"}: state = "unknown" try: confidence = float(kwargs.get("confidence") or 0.6) except Exception: confidence = 0.6 try: ts = int(kwargs.get("ts") or 0) except Exception: ts = 0 payload = dict(kwargs.get("payload") or {}) identifiers = await self._resolve_identifier_objects(protocol, identifier) for row in identifiers: record_native_signal( AvailabilitySignal( user=row.user, person=row.person, person_identifier=row, service=str(protocol or "").strip().lower(), source_kind="native_presence", availability_state=state, confidence=confidence, ts=ts, payload=payload, ) ) state_event = None if state == "available": state_event = "presence_available" elif state == "unavailable": state_event = "presence_unavailable" if state_event: try: await self._append_identifier_event( identifier_row=row, event_type=state_event, protocol=protocol, direction="system", ts=(ts or None), payload={ "state": state, "confidence": confidence, **payload, }, raw_payload=payload, actor_identifier=str(row.identifier or ""), ) except Exception as exc: self.log.warning( "Failed to append presence event for %s: %s", row.identifier, exc, ) await self._refresh_workspace_metrics_for_identifiers(identifiers) async def started_typing(self, protocol, *args, **kwargs): self.log.info(f"Started typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") payload = dict(kwargs.get("payload") or {}) event_ts = self._event_ts_from_kwargs(kwargs) direction = self._behavior_direction(protocol) identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: record_native_signal( AvailabilitySignal( user=src.user, person=src.person, person_identifier=src, service=str(protocol or "").strip().lower(), source_kind="typing_start", availability_state="available", confidence=0.9, ts=0, payload={"origin": "router.started_typing"}, ) ) try: session = await history.get_chat_session(src.user, src) state = self._composing_tracker.observe_started( str(session.id), int(event_ts or 0), ) await append_event( user=src.user, session=session, ts=event_ts, event_type="typing_started", direction=direction, actor_identifier=str(src.identifier or ""), origin_transport=str(protocol or "").strip().lower(), origin_chat_id=str(src.identifier or ""), payload=dict(payload or {}, revision=int(state.revision or 1)), raw_payload=dict(payload or {}), ) except Exception as exc: self.log.warning( "Failed to append typing-start event for %s: %s", src.identifier, exc, ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, person_id=src.person_id, started=True, source_service=protocol, display_name=src.person.name, ) try: await self.xmpp.start_typing_for_person(src.user, src) except Exception as exc: self.log.warning( "Failed to relay typing-start to XMPP for %s: %s", src.identifier, exc, ) targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, person=src.person, ).exclude(service=protocol) ) for target in targets: if target.service == "xmpp": continue await transport.start_typing(target.service, target.identifier) if protocol == "xmpp": self._schedule_typing_auto_stop(target) async def stopped_typing(self, protocol, *args, **kwargs): self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") payload = dict(kwargs.get("payload") or {}) event_ts = self._event_ts_from_kwargs(kwargs) direction = self._behavior_direction(protocol) identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: record_native_signal( AvailabilitySignal( user=src.user, person=src.person, person_identifier=src, service=str(protocol or "").strip().lower(), source_kind="typing_stop", availability_state="fading", confidence=0.5, ts=0, payload={"origin": "router.stopped_typing"}, ) ) try: session = await history.get_chat_session(src.user, src) await append_event( user=src.user, session=session, ts=event_ts, event_type="typing_stopped", direction=direction, actor_identifier=str(src.identifier or ""), origin_transport=str(protocol or "").strip().lower(), origin_chat_id=str(src.identifier or ""), payload=dict(payload or {}), raw_payload=dict(payload or {}), ) if session is not None: abandoned = self._composing_tracker.observe_stopped( str(session.id), int(event_ts or 0), ) if abandoned is not None: await append_event( user=src.user, session=session, ts=int(abandoned.get("stopped_ts") or event_ts or 0), event_type="composing_abandoned", direction=direction, actor_identifier=str(src.identifier or ""), origin_transport=str(protocol or "").strip().lower(), origin_chat_id=str(src.identifier or ""), payload={ **dict(payload or {}), "abandoned": True, "duration_ms": int( abandoned.get("duration_ms") or 0 ), "revision": int(abandoned.get("revision") or 1), "started_ts": int(abandoned.get("started_ts") or 0), }, raw_payload=dict(payload or {}), ) except Exception as exc: self.log.warning( "Failed to append typing-stop event for %s: %s", src.identifier, exc, ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, person_id=src.person_id, started=False, source_service=protocol, display_name=src.person.name, ) try: await self.xmpp.stop_typing_for_person(src.user, src) except Exception as exc: self.log.warning( "Failed to relay typing-stop to XMPP for %s: %s", src.identifier, exc, ) targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, person=src.person, ).exclude(service=protocol) ) for target in targets: if target.service == "xmpp": continue self._cancel_typing_timer(self._typing_task_key(target)) await transport.stop_typing(target.service, target.identifier) async def reacted(self, protocol, *args, **kwargs): self.log.info(f"Reacted ({protocol}) {args} {kwargs}") async def replied(self, protocol, *args, **kwargs): self.log.info(f"Replied ({protocol}) {args} {kwargs}")