Files
GIA/core/modules/router.py

386 lines
15 KiB
Python

import asyncio
import re
from asgiref.sync import sync_to_async
from django.conf import settings
from core.clients import transport
from core.clients.instagram import InstagramClient
from core.clients.signal import SignalClient
from core.clients.whatsapp import WhatsAppClient
from core.clients.xmpp import XMPPClient
from core.assist.engine import process_inbound_assist
from core.commands.base import CommandContext
from core.commands.engine import process_inbound_message
from core.messaging import history
from core.models import PersonIdentifier
from core.presence import AvailabilitySignal, record_native_signal
from core.realtime.typing_state import set_person_typing_state
from core.translation.engine import process_inbound_translation
from core.util import logs
class UnifiedRouter(object):
"""
Unified Router. Contains generic functions for handling XMPP and Signal events.
"""
def __init__(self, loop):
self.loop = loop
self.typing_auto_stop_seconds = int(
getattr(settings, "XMPP_TYPING_AUTO_STOP_SECONDS", 3)
)
self._typing_stop_tasks = {}
self.log = logs.get_logger("router")
self.log.info("Initialised Unified Router Interface.")
self.xmpp = XMPPClient(self, loop, "xmpp")
self.signal = SignalClient(self, loop, "signal")
self.whatsapp = WhatsAppClient(self, loop, "whatsapp")
self.instagram = InstagramClient(self, loop, "instagram")
def _typing_task_key(self, target):
return (
int(target.user_id),
int(target.person_id),
str(target.service),
str(target.identifier),
)
def _cancel_typing_timer(self, key):
existing = self._typing_stop_tasks.pop(key, None)
if existing and not existing.done():
existing.cancel()
def _schedule_typing_auto_stop(self, target):
key = self._typing_task_key(target)
self._cancel_typing_timer(key)
delay = max(1, int(self.typing_auto_stop_seconds))
async def _timer():
try:
await asyncio.sleep(delay)
await transport.stop_typing(target.service, target.identifier)
except asyncio.CancelledError:
return
except Exception as exc:
self.log.warning(
"Typing auto-stop failed for %s/%s: %s",
target.service,
target.identifier,
exc,
)
finally:
self._typing_stop_tasks.pop(key, None)
self._typing_stop_tasks[key] = self.loop.create_task(_timer())
def _start(self):
self.log.info("Starting unified router clients")
self.xmpp.start()
self.signal.start()
self.whatsapp.start()
self.instagram.start()
def run(self):
try:
# self.xmpp.client.client.process()
# self.xmpp.start()
self.log.debug("Router run loop initializing")
self._start()
self.loop.run_forever()
except (KeyboardInterrupt, SystemExit):
self.log.info("Process terminating")
finally:
self.loop.close()
async def message_received(self, protocol, *args, **kwargs):
self.log.info(f"Message received ({protocol}) {args} {kwargs}")
identifier = kwargs.get("identifier")
local_message = kwargs.get("local_message")
message_text = str(kwargs.get("text") or "").strip()
if local_message is None:
return
identifiers = await self._resolve_identifier_objects(protocol, identifier)
if identifiers:
outgoing = str(getattr(local_message, "custom_author", "") or "").strip().upper() in {
"USER",
"BOT",
}
source_kind = "message_out" if outgoing else "message_in"
confidence = 0.65 if outgoing else 0.75
for row in identifiers:
record_native_signal(
AvailabilitySignal(
user=row.user,
person=row.person,
person_identifier=row,
service=str(protocol or "").strip().lower(),
source_kind=source_kind,
availability_state="available",
confidence=confidence,
ts=int(getattr(local_message, "ts", 0) or 0),
payload={
"origin": "router.message_received",
"message_id": str(getattr(local_message, "id", "") or ""),
"outgoing": outgoing,
},
)
)
channel_identifier = ""
if isinstance(identifier, PersonIdentifier):
channel_identifier = str(identifier.identifier or "").strip()
elif identifier is not None:
channel_identifier = str(identifier or "").strip()
if channel_identifier:
try:
await process_inbound_message(
CommandContext(
service=str(protocol or "").strip().lower(),
channel_identifier=channel_identifier,
message_id=str(local_message.id),
user_id=int(local_message.user_id),
message_text=message_text,
payload=dict(kwargs.get("payload") or {}),
)
)
except Exception as exc:
self.log.warning("Command engine processing failed: %s", exc)
try:
await process_inbound_translation(local_message)
except Exception as exc:
self.log.warning("Translation engine processing failed: %s", exc)
try:
await process_inbound_assist(local_message)
except Exception as exc:
self.log.warning("Assist/task processing failed: %s", exc)
await self._refresh_workspace_metrics_for_identifiers(identifiers)
async def _refresh_workspace_metrics_for_identifiers(self, identifiers):
if not identifiers:
return
seen = set()
for row in identifiers:
person = getattr(row, "person", None)
user = getattr(row, "user", None)
if person is None or user is None:
continue
person_key = str(getattr(person, "id", "") or "")
if not person_key or person_key in seen:
continue
seen.add(person_key)
try:
from core.views.workspace import _conversation_for_person
await sync_to_async(_conversation_for_person)(user, person)
except Exception as exc:
self.log.warning(
"Workspace metrics refresh failed for person=%s: %s",
person_key,
exc,
)
async def _resolve_identifier_objects(self, protocol, identifier):
if isinstance(identifier, PersonIdentifier):
return [identifier]
value = str(identifier or "").strip()
if not value:
return []
variants = [value]
bare = value.split("@", 1)[0].strip()
if bare and bare not in variants:
variants.append(bare)
if protocol == "signal":
digits = re.sub(r"[^0-9]", "", value)
if digits and digits not in variants:
variants.append(digits)
if digits:
plus = f"+{digits}"
if plus not in variants:
variants.append(plus)
elif protocol == "whatsapp" and bare:
direct = f"{bare}@s.whatsapp.net"
group = f"{bare}@g.us"
if direct not in variants:
variants.append(direct)
if group not in variants:
variants.append(group)
return await sync_to_async(list)(
PersonIdentifier.objects.filter(
identifier__in=variants,
service=protocol,
)
)
async def message_read(self, protocol, *args, **kwargs):
self.log.info(f"Message read ({protocol}) {args} {kwargs}")
identifier = kwargs.get("identifier")
timestamps = kwargs.get("message_timestamps") or []
read_ts = kwargs.get("read_ts")
payload = kwargs.get("payload") or {}
read_by = kwargs.get("read_by") or ""
identifiers = await self._resolve_identifier_objects(protocol, identifier)
for row in identifiers:
await history.apply_read_receipts(
user=row.user,
identifier=row,
message_timestamps=timestamps,
read_ts=read_ts,
source_service=protocol,
read_by_identifier=read_by or row.identifier,
payload=payload,
)
record_native_signal(
AvailabilitySignal(
user=row.user,
person=row.person,
person_identifier=row,
service=str(protocol or "").strip().lower(),
source_kind="read_receipt",
availability_state="available",
confidence=0.95,
ts=int(read_ts or 0),
payload={
"origin": "router.message_read",
"message_timestamps": [int(v) for v in list(timestamps or []) if str(v).isdigit()],
"read_by": str(read_by or row.identifier),
},
)
)
await self._refresh_workspace_metrics_for_identifiers(identifiers)
async def presence_changed(self, protocol, *args, **kwargs):
identifier = kwargs.get("identifier")
state = str(kwargs.get("state") or "unknown").strip().lower()
if state not in {"available", "unavailable", "unknown", "fading"}:
state = "unknown"
try:
confidence = float(kwargs.get("confidence") or 0.6)
except Exception:
confidence = 0.6
try:
ts = int(kwargs.get("ts") or 0)
except Exception:
ts = 0
payload = dict(kwargs.get("payload") or {})
identifiers = await self._resolve_identifier_objects(protocol, identifier)
for row in identifiers:
record_native_signal(
AvailabilitySignal(
user=row.user,
person=row.person,
person_identifier=row,
service=str(protocol or "").strip().lower(),
source_kind="native_presence",
availability_state=state,
confidence=confidence,
ts=ts,
payload=payload,
)
)
await self._refresh_workspace_metrics_for_identifiers(identifiers)
async def started_typing(self, protocol, *args, **kwargs):
self.log.info(f"Started typing ({protocol}) {args} {kwargs}")
identifier = kwargs.get("identifier")
identifiers = await self._resolve_identifier_objects(protocol, identifier)
for src in identifiers:
record_native_signal(
AvailabilitySignal(
user=src.user,
person=src.person,
person_identifier=src,
service=str(protocol or "").strip().lower(),
source_kind="typing_start",
availability_state="available",
confidence=0.9,
ts=0,
payload={"origin": "router.started_typing"},
)
)
if protocol != "xmpp":
set_person_typing_state(
user_id=src.user_id,
person_id=src.person_id,
started=True,
source_service=protocol,
display_name=src.person.name,
)
try:
await self.xmpp.start_typing_for_person(src.user, src)
except Exception as exc:
self.log.warning(
"Failed to relay typing-start to XMPP for %s: %s",
src.identifier,
exc,
)
targets = await sync_to_async(list)(
PersonIdentifier.objects.filter(
user=src.user,
person=src.person,
).exclude(service=protocol)
)
for target in targets:
if target.service == "xmpp":
continue
await transport.start_typing(target.service, target.identifier)
if protocol == "xmpp":
self._schedule_typing_auto_stop(target)
async def stopped_typing(self, protocol, *args, **kwargs):
self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}")
identifier = kwargs.get("identifier")
identifiers = await self._resolve_identifier_objects(protocol, identifier)
for src in identifiers:
record_native_signal(
AvailabilitySignal(
user=src.user,
person=src.person,
person_identifier=src,
service=str(protocol or "").strip().lower(),
source_kind="typing_stop",
availability_state="fading",
confidence=0.5,
ts=0,
payload={"origin": "router.stopped_typing"},
)
)
if protocol != "xmpp":
set_person_typing_state(
user_id=src.user_id,
person_id=src.person_id,
started=False,
source_service=protocol,
display_name=src.person.name,
)
try:
await self.xmpp.stop_typing_for_person(src.user, src)
except Exception as exc:
self.log.warning(
"Failed to relay typing-stop to XMPP for %s: %s",
src.identifier,
exc,
)
targets = await sync_to_async(list)(
PersonIdentifier.objects.filter(
user=src.user,
person=src.person,
).exclude(service=protocol)
)
for target in targets:
if target.service == "xmpp":
continue
self._cancel_typing_timer(self._typing_task_key(target))
await transport.stop_typing(target.service, target.identifier)
async def reacted(self, protocol, *args, **kwargs):
self.log.info(f"Reacted ({protocol}) {args} {kwargs}")
async def replied(self, protocol, *args, **kwargs):
self.log.info(f"Replied ({protocol}) {args} {kwargs}")