From 658ab10647521cbfe9de11dd0b174b5f727bb755 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Mon, 16 Feb 2026 19:19:32 +0000 Subject: [PATCH] Fully implement WhatsApp, Signal and XMPP multiplexing --- core/clients/signal.py | 124 +++++++++++- core/clients/transport.py | 28 +++ core/clients/whatsapp.py | 215 ++++++++++++++++++--- core/realtime/compose_ws.py | 130 ++++++++----- core/templates/partials/compose-panel.html | 78 +++++++- core/views/compose.py | 142 +++++++++++--- docker-compose.yml | 49 +++++ requirements.txt | 1 + stack.env.example | 3 +- 9 files changed, 659 insertions(+), 111 deletions(-) diff --git a/core/clients/signal.py b/core/clients/signal.py index 0bfcdf5..22730e7 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -1,5 +1,6 @@ import asyncio import json +import time from urllib.parse import quote_plus, urlparse import aiohttp @@ -472,6 +473,14 @@ class HandleMessage(Command): outgoing=is_from_bot, ) stored_messages.add(message_key) + # Notify unified router to ensure service context is preserved + await self.ur.message_received( + self.service, + identifier=identifier, + text=message_text, + ts=ts, + payload=msg, + ) # TODO: Permission checks manips = await sync_to_async(list)(Manipulation.objects.filter(enabled=True)) @@ -595,6 +604,7 @@ class HandleMessage(Command): class SignalClient(ClientBase): def __init__(self, ur, *args, **kwargs): super().__init__(ur, *args, **kwargs) + self._stopping = False signal_number = str(getattr(settings, "SIGNAL_NUMBER", "")).strip() self.client = NewSignalBot( ur, @@ -606,9 +616,121 @@ class SignalClient(ClientBase): ) self.client.register(HandleMessage(self.ur, self.service)) + self._command_task = None + + async def _drain_runtime_commands(self): + """Process queued runtime commands (e.g., web UI sends via composite router).""" + from core.clients import transport + # Process a small burst each loop to keep sends responsive. + for _ in range(5): + command = transport.pop_runtime_command(self.service) + if not command: + return + await self._execute_runtime_command(command) + + async def _execute_runtime_command(self, command): + """Execute a single runtime command like send_message_raw.""" + from core.clients import transport + command_id = str((command or {}).get("id") or "").strip() + action = str((command or {}).get("action") or "").strip() + payload = dict((command or {}).get("payload") or {}) + if not command_id: + return + + if action == "send_message_raw": + recipient = str(payload.get("recipient") or "").strip() + text = payload.get("text") + attachments = payload.get("attachments") or [] + try: + result = await signalapi.send_message_raw( + recipient_uuid=recipient, + text=text, + attachments=attachments, + ) + if result is False or result is None: + raise RuntimeError("signal_send_failed") + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": True, + "timestamp": int(result) + if isinstance(result, int) + else int(time.time() * 1000), + }, + ) + except Exception as exc: + self.log.error(f"send_message_raw failed: {exc}", exc_info=True) + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": str(exc), + }, + ) + return + + if action == "notify_xmpp_sent": + person_identifier_id = str(payload.get("person_identifier_id") or "").strip() + text = str(payload.get("text") or "") + if not person_identifier_id: + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": "missing_person_identifier_id"}, + ) + return + try: + identifier = await sync_to_async( + lambda: PersonIdentifier.objects.filter(id=person_identifier_id).select_related("user", "person").first() + )() + if identifier is None: + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": "person_identifier_not_found"}, + ) + return + await self.ur.xmpp.client.send_from_external( + identifier.user, + identifier, + text, + True, + attachments=[], + ) + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": True, "timestamp": int(time.time() * 1000)}, + ) + except Exception as exc: + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": str(exc)}, + ) + return + + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": f"unsupported_action:{action or '-'}"}, + ) + + async def _command_loop(self): + """Background task to periodically drain queued commands.""" + while not self._stopping: + try: + await self._drain_runtime_commands() + except Exception as exc: + self.log.warning(f"Command loop error: {exc}") + await asyncio.sleep(1) def start(self): self.log.info("Signal client starting...") self.client._event_loop = self.loop - + # Start background command processing loop + if not self._command_task or self._command_task.done(): + self._command_task = self.loop.create_task(self._command_loop()) self.client.start() diff --git a/core/clients/transport.py b/core/clients/transport.py index cc0a1f4..b0c5913 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -44,6 +44,10 @@ def _runtime_command_cancel_key(service: str, command_id: str) -> str: return f"gia:service:command-cancel:{_service_key(service)}:{command_id}" +def _runtime_command_meta_key(service: str, command_id: str) -> str: + return f"gia:service:command-meta:{_service_key(service)}:{command_id}" + + def _gateway_base(service: str) -> str: key = f"{service.upper()}_HTTP_URL" default = f"http://{service}:8080" @@ -110,6 +114,14 @@ def enqueue_runtime_command( if len(queued) > 200: queued = queued[-200:] cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) + cache.set( + _runtime_command_meta_key(service_key, command_id), + { + "created_at": int(command.get("created_at") or int(time.time())), + "action": str(command.get("action") or ""), + }, + timeout=_RUNTIME_COMMANDS_TTL, + ) return command_id @@ -132,6 +144,7 @@ def set_runtime_command_result( payload = dict(result or {}) payload.setdefault("completed_at", int(time.time())) cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) + cache.delete(_runtime_command_meta_key(service_key, command_id)) def cancel_runtime_command(service: str, command_id: str): @@ -142,9 +155,24 @@ def cancel_runtime_command(service: str, command_id: str): payload = {"ok": False, "error": "cancelled", "completed_at": int(time.time())} cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) cache.set(cancel_key, True, timeout=60) + cache.delete(_runtime_command_meta_key(service_key, command_id)) return True +def runtime_command_age_seconds(service: str, command_id: str) -> float | None: + service_key = _service_key(service) + meta = cache.get(_runtime_command_meta_key(service_key, command_id)) + if not isinstance(meta, dict): + return None + try: + created_at = int(meta.get("created_at") or 0) + except Exception: + created_at = 0 + if created_at <= 0: + return None + return max(0.0, time.time() - created_at) + + def cancel_runtime_commands_for_recipient(service: str, recipient: str) -> list[str]: """Cancel any queued runtime commands for the given recipient and return their ids.""" service_key = _service_key(service) diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index a78ad02..78ecb3b 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -1,4 +1,5 @@ import asyncio +import inspect import logging import os import re @@ -42,6 +43,7 @@ class WhatsAppClient(ClientBase): self._qr_handler_registered = False self._qr_handler_supported = False self._event_hook_callable = False + self._last_send_error = "" self.enabled = bool( str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower() @@ -658,6 +660,17 @@ class WhatsAppClient(ClientBase): return await value return value + async def _call_client_method(self, method, *args, timeout: float | None = None): + if method is None: + return None + if inspect.iscoroutinefunction(method): + coro = method(*args) + else: + coro = asyncio.to_thread(method, *args) + if timeout and timeout > 0: + return await asyncio.wait_for(coro, timeout=timeout) + return await coro + async def _drain_runtime_commands(self): # Process a small burst each loop to keep sends responsive but avoid starvation. for _ in range(5): @@ -672,18 +685,27 @@ class WhatsAppClient(ClientBase): payload = dict((command or {}).get("payload") or {}) if not command_id: return + self.log.info( + "whatsapp runtime command start: id=%s action=%s", + command_id, + action, + ) if action == "send_message_raw": recipient = str(payload.get("recipient") or "").strip() text = payload.get("text") attachments = payload.get("attachments") or [] + send_timeout_s = 18.0 try: # Include command_id so send_message_raw can observe cancel requests - result = await self.send_message_raw( - recipient=recipient, - text=text, - attachments=attachments, - command_id=command_id, + result = await asyncio.wait_for( + self.send_message_raw( + recipient=recipient, + text=text, + attachments=attachments, + command_id=command_id, + ), + timeout=send_timeout_s, ) if result is not False and result is not None: transport.set_runtime_command_result( @@ -696,15 +718,45 @@ class WhatsAppClient(ClientBase): else int(time.time() * 1000), }, ) + self.log.info( + "whatsapp runtime command ok: id=%s action=%s", + command_id, + action, + ) return transport.set_runtime_command_result( self.service, command_id, { "ok": False, - "error": "runtime_send_failed", + "error": str( + getattr(self, "_last_send_error", "") + or "runtime_send_failed" + ), }, ) + self.log.warning( + "whatsapp runtime command failed: id=%s action=%s error=%s", + command_id, + action, + str(getattr(self, "_last_send_error", "") or "runtime_send_failed"), + ) + return + except asyncio.TimeoutError: + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": f"runtime_send_timeout:{int(send_timeout_s)}s", + }, + ) + self.log.warning( + "whatsapp runtime command timeout: id=%s action=%s timeout=%ss", + command_id, + action, + int(send_timeout_s), + ) return except Exception as exc: transport.set_runtime_command_result( @@ -715,6 +767,12 @@ class WhatsAppClient(ClientBase): "error": str(exc), }, ) + self.log.warning( + "whatsapp runtime command exception: id=%s action=%s error=%s", + command_id, + action, + exc, + ) return if action == "force_history_sync": @@ -741,6 +799,48 @@ class WhatsAppClient(ClientBase): ) return + if action == "notify_xmpp_sent": + person_identifier_id = str(payload.get("person_identifier_id") or "").strip() + text = str(payload.get("text") or "") + if not person_identifier_id: + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": "missing_person_identifier_id"}, + ) + return + try: + identifier = await sync_to_async( + lambda: PersonIdentifier.objects.filter(id=person_identifier_id).select_related("user", "person").first() + )() + if identifier is None: + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": "person_identifier_not_found"}, + ) + return + await self.ur.xmpp.client.send_from_external( + identifier.user, + identifier, + text, + True, + attachments=[], + ) + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": True, "timestamp": int(time.time() * 1000)}, + ) + return + except Exception as exc: + transport.set_runtime_command_result( + self.service, + command_id, + {"ok": False, "error": str(exc)}, + ) + return + transport.set_runtime_command_result( self.service, command_id, @@ -2338,27 +2438,43 @@ class WhatsAppClient(ClientBase): async def send_message_raw( self, recipient, text=None, attachments=None, command_id: str | None = None ): + self._last_send_error = "" if not self._client: - return False - if self._build_jid is None: + self._last_send_error = "client_missing" return False jid_str = self._to_jid(recipient) if not jid_str: + self._last_send_error = "recipient_invalid" return False - # Convert string JID to actual JID object that neonize expects + # Prefer direct JID string for sends to avoid Neonize usync/device-list + # lookups that can stall on some runtime sessions. + jid = jid_str + jid_obj = None try: - jid = self._build_jid(jid_str) - # Verify it's a proper JID object with SerializeToString method - if not hasattr(jid, "SerializeToString"): - self.log.error( - "whatsapp build_jid returned non-JID object: type=%s repr=%s", - type(jid).__name__, - repr(jid)[:100], - ) - return False + if self._build_jid is not None: + maybe_jid = None + if "@" in jid_str: + local_part, server_part = jid_str.split("@", 1) + try: + maybe_jid = self._build_jid(local_part, server_part) + except TypeError: + maybe_jid = self._build_jid(jid_str) + else: + maybe_jid = self._build_jid(jid_str) + if hasattr(maybe_jid, "SerializeToString"): + jid_obj = maybe_jid + else: + self.log.warning( + "whatsapp build_jid returned non-JID object, falling back to string: type=%s repr=%s", + type(maybe_jid).__name__, + repr(maybe_jid)[:100], + ) except Exception as exc: - self.log.warning("whatsapp failed to build JID from %s: %s", jid_str, exc) - return False + self.log.warning( + "whatsapp failed to build JID from %s, falling back to string: %s", + jid_str, + exc, + ) if not self._connected and hasattr(self._client, "connect"): try: await self._maybe_await(self._client.connect()) @@ -2370,6 +2486,7 @@ class WhatsAppClient(ClientBase): last_error="", ) except Exception as exc: + self._last_send_error = f"reconnect_failed:{exc}" self._publish_state( connected=False, last_event="send_reconnect_failed", @@ -2388,24 +2505,25 @@ class WhatsAppClient(ClientBase): ).lower() data = payload.get("content") or b"" filename = payload.get("filename") or "attachment.bin" + attachment_target = jid_obj if jid_obj is not None else jid try: if mime.startswith("image/") and hasattr(self._client, "send_image"): response = await self._maybe_await( - self._client.send_image(jid, data, caption="") + self._client.send_image(attachment_target, data, caption="") ) elif mime.startswith("video/") and hasattr(self._client, "send_video"): response = await self._maybe_await( - self._client.send_video(jid, data, caption="") + self._client.send_video(attachment_target, data, caption="") ) elif mime.startswith("audio/") and hasattr(self._client, "send_audio"): response = await self._maybe_await( - self._client.send_audio(jid, data) + self._client.send_audio(attachment_target, data) ) elif hasattr(self._client, "send_document"): response = await self._maybe_await( self._client.send_document( - jid, + attachment_target, data, filename=filename, mimetype=mime, @@ -2435,22 +2553,27 @@ class WhatsAppClient(ClientBase): except Exception: cancel_key = None - for attempt in range(5): # Increased from 3 to 5 attempts + for attempt in range(2): # Check for a cancellation marker set by transport.cancel_runtime_command try: if cancel_key and cache.get(cancel_key): self.log.info("whatsapp send cancelled via cancel marker") + self._last_send_error = "cancelled" return False except Exception: pass try: + send_target = jid_obj if jid_obj is not None else jid # Log what we're about to send for debugging if getattr(settings, "WHATSAPP_DEBUG", False): self.log.debug( - f"send_message attempt {attempt+1}: jid={jid} text_type={type(text).__name__} text_len={len(text)}" + f"send_message attempt {attempt+1}: target_type={type(send_target).__name__} text_type={type(text).__name__} text_len={len(text)}" ) - response = await self._maybe_await( - self._client.send_message(jid, text) + response = await self._call_client_method( + getattr(self._client, "send_message", None), + send_target, + text, + timeout=9.0, ) sent_any = True last_error = None @@ -2462,9 +2585,31 @@ class WhatsAppClient(ClientBase): ) last_error = exc - error_text = str(last_error or "").lower() - is_transient = "usync query" in error_text or "timed out" in error_text - if is_transient and attempt < 4: # Updated to match new attempt range + error_text = ( + f"{type(last_error).__name__}:{repr(last_error)}" + if last_error is not None + else "" + ).lower() + is_transient = ( + "usync" in error_text + or "timed out" in error_text + or "timeout" in error_text + or "device list" in error_text + or "serializetostring" in error_text + or not error_text.strip() + ) + if is_transient and attempt < 1: + # If runtime rejected string target, try to build protobuf JID for retry. + if jid_obj is None and self._build_jid is not None and "@" in jid_str: + local_part, server_part = jid_str.split("@", 1) + try: + maybe_retry_jid = self._build_jid(local_part, server_part) + except TypeError: + maybe_retry_jid = self._build_jid(jid_str) + except Exception: + maybe_retry_jid = None + if hasattr(maybe_retry_jid, "SerializeToString"): + jid_obj = maybe_retry_jid if hasattr(self._client, "connect"): try: await self._maybe_await(self._client.connect()) @@ -2482,7 +2627,7 @@ class WhatsAppClient(ClientBase): ) # Sleep but wake earlier if cancelled: poll small intervals # Increase backoff time for device list queries - total_sleep = 1.5 * (attempt + 1) + total_sleep = 0.8 * (attempt + 1) slept = 0.0 while slept < total_sleep: try: @@ -2490,6 +2635,7 @@ class WhatsAppClient(ClientBase): self.log.info( "whatsapp send cancelled during retry backoff" ) + self._last_send_error = "cancelled" return False except Exception: pass @@ -2499,6 +2645,9 @@ class WhatsAppClient(ClientBase): break if last_error is not None and not sent_any: self.log.warning("whatsapp text send failed: %s", last_error) + self._last_send_error = ( + f"text_send_failed:{type(last_error).__name__}:{repr(last_error)}" + ) return False sent_ts = max( sent_ts, @@ -2506,7 +2655,9 @@ class WhatsAppClient(ClientBase): ) if not sent_any: + self._last_send_error = "no_payload_sent" return False + self._last_send_error = "" return sent_ts or int(time.time() * 1000) async def start_typing(self, identifier): diff --git a/core/realtime/compose_ws.py b/core/realtime/compose_ws.py index 9126056..9d47fec 100644 --- a/core/realtime/compose_ws.py +++ b/core/realtime/compose_ws.py @@ -6,9 +6,13 @@ from urllib.parse import parse_qs from asgiref.sync import sync_to_async from django.core import signing -from core.models import ChatSession, Message, PersonIdentifier, WorkspaceConversation +from core.models import Message, Person, PersonIdentifier, WorkspaceConversation from core.realtime.typing_state import get_person_typing_state -from core.views.compose import COMPOSE_WS_TOKEN_SALT, _serialize_messages_with_artifacts +from core.views.compose import ( + COMPOSE_WS_TOKEN_SALT, + ComposeHistorySync, + _serialize_messages_with_artifacts, +) def _safe_int(value, default=0): @@ -19,77 +23,108 @@ def _safe_int(value, default=0): def _load_since(user_id, service, identifier, person_id, after_ts, limit): + person = None person_identifier = None - if person_id: + resolved_person_id = _safe_int(person_id) + + if resolved_person_id > 0: + person = Person.objects.filter(id=resolved_person_id, user_id=user_id).first() + + if person is not None: person_identifier = ( PersonIdentifier.objects.filter( user_id=user_id, - person_id=person_id, + person_id=person.id, service=service, ).first() or PersonIdentifier.objects.filter( user_id=user_id, - person_id=person_id, + person_id=person.id, ).first() ) - if person_identifier is None and identifier: + elif identifier: person_identifier = PersonIdentifier.objects.filter( user_id=user_id, service=service, identifier=identifier, ).first() - if person_identifier is None: - return {"messages": [], "last_ts": after_ts, "person_id": 0} - session = ChatSession.objects.filter( - user_id=user_id, - identifier=person_identifier, - ).first() - if session is None: + session_ids = ComposeHistorySync._session_ids_for_scope( + user=user_id, + person=person, + service=service, + person_identifier=person_identifier, + explicit_identifier=identifier, + ) + + if not session_ids: return { "messages": [], - "last_ts": after_ts, - "person_id": int(person_identifier.person_id), + "last_ts": int(after_ts or 0), + "person_id": int(person.id) if person is not None else 0, } - qs = Message.objects.filter(user_id=user_id, session=session).order_by("ts") + base_queryset = Message.objects.filter( + user_id=user_id, + session_id__in=session_ids, + ) + qs = base_queryset.order_by("ts") seed_previous = None if after_ts > 0: seed_previous = ( - Message.objects.filter( - user_id=user_id, - session=session, - ts__lte=after_ts, - ) - .order_by("-ts") - .first() + base_queryset.filter(ts__lte=after_ts).order_by("-ts").first() ) - qs = qs.filter(ts__gt=after_ts) + # Use a small rolling window to capture late/out-of-order timestamps. + # Frontend dedupes by message id, so repeated rows are ignored. + window_start = max(0, int(after_ts) - 5 * 60 * 1000) + qs = qs.filter(ts__gte=window_start) - rows = list(qs[: max(10, min(limit, 200))]) + rows_desc = list( + qs.select_related( + "session", + "session__identifier", + "session__identifier__person", + ) + .order_by("-ts")[: max(10, min(limit, 200))] + ) + rows_desc.reverse() + rows = rows_desc newest = ( - Message.objects.filter(user_id=user_id, session=session) + Message.objects.filter( + user_id=user_id, + session_id__in=session_ids, + ) .order_by("-ts") .values_list("ts", flat=True) .first() ) - conversation = ( - WorkspaceConversation.objects.filter( - user_id=user_id, - participants__id=person_identifier.person_id, - ) - .order_by("-last_event_ts", "-created_at") - .first() + effective_person_id = ( + int(person.id) + if person is not None + else (int(person_identifier.person_id) if person_identifier is not None else 0) ) - counterpart_identifiers = { - str(value or "").strip() - for value in PersonIdentifier.objects.filter( - user_id=user_id, - person_id=person_identifier.person_id, - ).values_list("identifier", flat=True) - if str(value or "").strip() - } + + conversation = None + counterpart_identifiers = set() + if effective_person_id > 0: + conversation = ( + WorkspaceConversation.objects.filter( + user_id=user_id, + participants__id=effective_person_id, + ) + .order_by("-last_event_ts", "-created_at") + .first() + ) + counterpart_identifiers = { + str(value or "").strip() + for value in PersonIdentifier.objects.filter( + user_id=user_id, + person_id=effective_person_id, + ).values_list("identifier", flat=True) + if str(value or "").strip() + } + return { "messages": _serialize_messages_with_artifacts( rows, @@ -98,7 +133,7 @@ def _load_since(user_id, service, identifier, person_id, after_ts, limit): seed_previous=seed_previous, ), "last_ts": int(newest or after_ts or 0), - "person_id": int(person_identifier.person_id), + "person_id": int(effective_person_id), } @@ -133,6 +168,7 @@ async def compose_ws_application(scope, receive, send): last_ts = 0 limit = 100 last_typing_key = "" + sent_message_ids = set() while True: event = None @@ -159,7 +195,15 @@ async def compose_ws_application(scope, receive, send): after_ts=last_ts, limit=limit, ) - messages = payload.get("messages") or [] + raw_messages = payload.get("messages") or [] + messages = [] + for msg in raw_messages: + message_id = str((msg or {}).get("id") or "").strip() + if message_id and message_id in sent_message_ids: + continue + if message_id: + sent_message_ids.add(message_id) + messages.append(msg) latest = _safe_int(payload.get("last_ts"), last_ts) if resolved_person_id <= 0: resolved_person_id = _safe_int(payload.get("person_id"), 0) diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index 6b37a73..ef56c70 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -242,7 +242,7 @@ data-engage-preview-url="{{ compose_engage_preview_url }}" data-engage-send-url="{{ compose_engage_send_url }}"> {% for msg in serialized_messages %} -
+
{% if msg.gap_fragments %} {% with gap=msg.gap_fragments.0 %}

= 0; index -= 1) { + const existing = rows[index]; + const existingTs = toInt(existing.dataset ? existing.dataset.ts : 0); + if (existingTs <= newTs) { + if (existing.nextSibling) { + thread.insertBefore(row, existing.nextSibling); + } else { + thread.appendChild(row); + } + return; + } + } + thread.insertBefore(row, rows[0]); + }; + const appendBubble = function (msg) { - console.log("[appendBubble]", {id: msg.id, ts: msg.ts, author: msg.author, source_label: msg.source_label, source_service: msg.source_service, outgoing: msg.outgoing}); + const messageId = String(msg && msg.id ? msg.id : "").trim(); + if (messageId && panelState.seenMessageIds.has(messageId)) { + return; + } const row = document.createElement("div"); const outgoing = !!msg.outgoing; row.className = "compose-row " + (outgoing ? "is-out" : "is-in"); row.dataset.ts = String(msg.ts || 0); row.dataset.minute = minuteBucketFromTs(msg.ts || 0); + if (messageId) { + row.dataset.messageId = messageId; + panelState.seenMessageIds.add(messageId); + } appendLatencyChip(row, msg); const bubble = document.createElement("article"); @@ -1771,7 +1806,6 @@ // Add source badge for client-side rendered messages if (msg.source_label) { - console.log("[appendBubble] rendering source badge:", msg.source_label); const badgeWrap = document.createElement("div"); badgeWrap.className = "compose-source-badge-wrap"; const badge = document.createElement("span"); @@ -1868,7 +1902,8 @@ if (emptyWrap) { emptyWrap.remove(); } - thread.appendChild(row); + row.appendChild(bubble); + insertRowByTs(row); wireImageFallbacks(row); updateGlanceFromMessage(msg); }; @@ -2033,18 +2068,15 @@ } params.set("limit", thread.dataset.limit || "60"); params.set("after_ts", String(lastTs)); - console.log("[poll] fetching messages: service=" + params.get("service") + " after_ts=" + lastTs); const response = await fetch(thread.dataset.pollUrl + "?" + params.toString(), { method: "GET", credentials: "same-origin", headers: { Accept: "application/json" } }); if (!response.ok) { - console.log("[poll] response not ok:", response.status); return; } const payload = await response.json(); - console.log("[poll] received payload with " + (payload.messages ? payload.messages.length : 0) + " messages"); appendMessages(payload.messages || [], forceScroll); if (payload.typing) { applyTyping(payload.typing); @@ -2283,6 +2315,7 @@ thread.innerHTML = '

Loading messages...

'; lastTs = 0; thread.dataset.lastTs = "0"; + panelState.seenMessageIds = new Set(); glanceState = { gap: null, metrics: [] }; renderGlanceItems([]); poll(true); @@ -3083,10 +3116,15 @@ // HTMX will dispatch a `composeSendCommandId` event with detail {command_id: "..."}. panelState.pendingCommandId = null; panelState.pendingCommandPoll = null; + panelState.pendingCommandAttempts = 0; + panelState.pendingCommandStartedAt = 0; + panelState.pendingCommandInFlight = false; const startPendingCommandPolling = function (commandId) { if (!commandId) return; panelState.pendingCommandId = commandId; + panelState.pendingCommandAttempts = 0; + panelState.pendingCommandStartedAt = Date.now(); // Show persistent cancel UI showPersistentCancelButton(commandId); // Poll for result every 1500ms @@ -3094,12 +3132,29 @@ clearInterval(panelState.pendingCommandPoll); } panelState.pendingCommandPoll = setInterval(async function () { + if (panelState.pendingCommandInFlight) { + return; + } + panelState.pendingCommandAttempts += 1; + const elapsedMs = Date.now() - (panelState.pendingCommandStartedAt || Date.now()); + if (panelState.pendingCommandAttempts > 14 || elapsedMs > 45000) { + stopPendingCommandPolling(); + hidePersistentCancelButton(); + setStatus('Send timed out waiting for runtime result. Please retry.', 'warning'); + return; + } try { + panelState.pendingCommandInFlight = true; const url = new URL('{% url "compose_command_result" %}', window.location.origin); url.searchParams.set('service', thread.dataset.service || ''); url.searchParams.set('command_id', commandId); - const resp = await fetch(url.toString(), { credentials: 'same-origin' }); + url.searchParams.set('format', 'json'); + const resp = await fetch(url.toString(), { + credentials: 'same-origin', + headers: { 'HX-Request': 'true' }, + }); if (!resp.ok) return; + if (resp.status === 204) return; const payload = await resp.json(); if (payload && payload.pending === false) { // Stop polling @@ -3123,8 +3178,10 @@ } } catch (e) { // ignore transient network errors + } finally { + panelState.pendingCommandInFlight = false; } - }, 1500); + }, 3500); }; const stopPendingCommandPolling = function () { @@ -3133,6 +3190,9 @@ panelState.pendingCommandPoll = null; } panelState.pendingCommandId = null; + panelState.pendingCommandAttempts = 0; + panelState.pendingCommandStartedAt = 0; + panelState.pendingCommandInFlight = false; }; const persistentCancelContainerId = panelId + '-persistent-cancel'; diff --git a/core/views/compose.py b/core/views/compose.py index d6e6349..55c9d35 100644 --- a/core/views/compose.py +++ b/core/views/compose.py @@ -354,29 +354,29 @@ def _serialize_message(msg: Message) -> dict: source_service = svc except Exception: pass + sender_uuid_value = str(getattr(msg, "sender_uuid", "") or "").strip() + if sender_uuid_value.lower() == "xmpp": + source_service = "xmpp" - from core.util import logs as util_logs - - logger = util_logs.get_logger("compose") - logger.info( - f"[serialize_message] id={msg.id} author={author} is_outgoing={is_outgoing} source_service={source_service}" - ) - - # For outgoing messages sent from web UI, label as "Web Chat". - # For incoming messages, use the session's service name (Xmpp, Signal, Whatsapp, etc). - # But if source_service is still "web" and message is incoming, it may be a data issue— - # don't label it as "Web Chat" since that's misleading. + # Outgoing messages created by the web compose UI should be labeled Web Chat. + # Outgoing messages originating from platform runtimes (Signal sync, etc.) + # should keep their service label. + service_labels = { + "xmpp": "XMPP", + "whatsapp": "WhatsApp", + "signal": "Signal", + "instagram": "Instagram", + "web": "Web Chat", + } if is_outgoing: - source_label = "Web Chat" + source_label = ( + "Web Chat" + if not sender_uuid_value + else service_labels.get( + source_service, source_service.title() if source_service else "Unknown" + ) + ) else: - # Incoming message: use service-specific labels - service_labels = { - "xmpp": "XMPP", - "whatsapp": "WhatsApp", - "signal": "Signal", - "instagram": "Instagram", - "web": "External", # Fallback if service not identified - } source_label = service_labels.get( source_service, source_service.title() if source_service else "Unknown" ) @@ -2364,14 +2364,19 @@ class ComposeThread(LoginRequiredMixin, View): seed_previous = ( base_queryset.filter(ts__lte=after_ts).order_by("-ts").first() ) - queryset = queryset.filter(ts__gt=after_ts) - messages = list( + # Use a small rolling window to capture late/out-of-order timestamps. + # Client-side dedupe by message id prevents duplicate rendering. + window_start = max(0, int(after_ts) - 5 * 60 * 1000) + queryset = queryset.filter(ts__gte=window_start) + rows_desc = list( queryset.select_related( "session", "session__identifier", "session__identifier__person", - ).order_by("ts")[:limit] + ).order_by("-ts")[:limit] ) + rows_desc.reverse() + messages = rows_desc newest = ( Message.objects.filter( user=request.user, @@ -2431,9 +2436,17 @@ class ComposeHistorySync(LoginRequiredMixin, View): local = raw.split("@", 1)[0].strip() if local: values.add(local) + elif service == "signal": + # Signal identifiers can be UUID or phone number + digits = re.sub(r"[^0-9]", "", raw) + if digits and not raw.count("-") >= 4: + # Likely a phone number; add variants + values.add(digits) + values.add(f"+{digits}") + # If it looks like a UUID (has hyphens), keep only the original format + # Signal UUIDs are strict and don't have variants return [value for value in values if value] - @classmethod @classmethod def _session_ids_for_scope( cls, @@ -2709,9 +2722,23 @@ class ComposeCommandResult(LoginRequiredMixin, View): """ def get(self, request): + timeout_s = 30.0 + force_json = str(request.GET.get("format") or "").strip().lower() == "json" + is_hx_request = ( + str(request.headers.get("HX-Request") or "").strip().lower() == "true" + ) and not force_json service = _default_service(request.GET.get("service")) command_id = str(request.GET.get("command_id") or "").strip() if not command_id: + if is_hx_request: + return render( + request, + "partials/compose-send-status.html", + { + "notice_message": "Missing command id.", + "notice_level": "warning", + }, + ) return JsonResponse( {"ok": False, "error": "missing_command_id"}, status=400 ) @@ -2720,7 +2747,35 @@ class ComposeCommandResult(LoginRequiredMixin, View): service, command_id, timeout=0.1 ) if result is None: - return JsonResponse({"pending": True}) + age_s = transport.runtime_command_age_seconds(service, command_id) + if age_s is not None and age_s >= timeout_s: + timeout_result = { + "ok": False, + "error": f"runtime_command_timeout:{int(timeout_s)}s", + } + if is_hx_request: + return render( + request, + "partials/compose-send-status.html", + { + "notice_message": str(timeout_result.get("error") or "Send failed."), + "notice_level": "danger", + }, + ) + return JsonResponse({"pending": False, "result": timeout_result}) + return HttpResponse(status=204) + if is_hx_request: + ok = bool(result.get("ok")) if isinstance(result, dict) else False + message = "" if ok else str((result or {}).get("error") or "Send failed.") + level = "success" if ok else "danger" + return render( + request, + "partials/compose-send-status.html", + { + "notice_message": message, + "notice_level": level, + }, + ) return JsonResponse({"pending": False, "result": result}) @@ -3279,6 +3334,30 @@ class ComposeSend(LoginRequiredMixin, View): ts = None command_id = None if runtime_client is None: + if base["service"] == "whatsapp": + runtime_state = transport.get_runtime_state("whatsapp") + last_seen = int(runtime_state.get("runtime_seen_at") or 0) + is_connected = bool(runtime_state.get("connected")) + pair_status = str(runtime_state.get("pair_status") or "").strip().lower() + now_s = int(time.time()) + # Runtime may process sends even when `connected` lags false briefly; + # heartbeat freshness is the reliable signal for queue availability. + heartbeat_age = now_s - last_seen if last_seen > 0 else 10**9 + runtime_healthy = bool(is_connected) or pair_status == "connected" + if (not runtime_healthy) and (last_seen <= 0 or heartbeat_age > 20): + logger.warning( + f"{log_prefix} runtime heartbeat stale (connected={is_connected}, pair_status={pair_status}, last_seen={last_seen}, age={heartbeat_age}); refusing queued send" + ) + return self._response( + request, + ok=False, + message=( + "WhatsApp runtime is not connected right now. " + "Please wait for reconnect, then retry send." + ), + level="warning", + panel_id=panel_id, + ) logger.info(f"{log_prefix} enqueuing runtime command (out-of-process)") command_id = transport.enqueue_runtime_command( base["service"], @@ -3336,6 +3415,19 @@ class ComposeSend(LoginRequiredMixin, View): logger.info( f"{log_prefix} created message id={msg.id} ts={msg_ts} delivered_ts={delivered_ts} custom_author=USER" ) + # Notify XMPP clients from runtime so cross-platform sends appear there too. + if base["service"] in {"signal", "whatsapp"}: + try: + transport.enqueue_runtime_command( + base["service"], + "notify_xmpp_sent", + { + "person_identifier_id": str(base["person_identifier"].id), + "text": text, + }, + ) + except Exception as exc: + logger.warning(f"{log_prefix} failed to enqueue xmpp notify: {exc}") # If we enqueued, inform the client the message is queued and include command id. if runtime_client is None: diff --git a/docker-compose.yml b/docker-compose.yml index 8de9666..00fe09d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,7 @@ services: WHATSAPP_DB_DIR: "${WHATSAPP_DB_DIR}" WHATSAPP_ENABLED: "${WHATSAPP_ENABLED}" INSTAGRAM_ENABLED: "${INSTAGRAM_ENABLED}" + COMPOSE_WS_ENABLED: "${COMPOSE_WS_ENABLED}" XMPP_ADDRESS: "${XMPP_ADDRESS}" XMPP_JID: "${XMPP_JID}" XMPP_PORT: "${XMPP_PORT}" @@ -54,6 +55,54 @@ services: # memory: 0.25G #network_mode: host + asgi: + image: xf/gia:prod + container_name: asgi_gia + build: + context: . + args: + OPERATION: ${OPERATION} + command: sh -c 'rm -f /var/run/asgi-gia.sock && . /venv/bin/activate && python -m pip install --disable-pip-version-check -q uvicorn && python -m uvicorn app.asgi:application --uds /var/run/asgi-gia.sock --workers 1' + volumes: + - ${REPO_DIR}:/code + - ${APP_DATABASE_FILE}:/conf/db.sqlite3 + - gia_whatsapp_data:${WHATSAPP_DB_DIR} + - type: bind + source: /code/vrun + target: /var/run + environment: + APP_PORT: "${APP_PORT}" + REPO_DIR: "${REPO_DIR}" + APP_LOCAL_SETTINGS: "${APP_LOCAL_SETTINGS}" + APP_DATABASE_FILE: "${APP_DATABASE_FILE}" + DOMAIN: "${DOMAIN}" + URL: "${URL}" + ALLOWED_HOSTS: "${ALLOWED_HOSTS}" + NOTIFY_TOPIC: "${NOTIFY_TOPIC}" + CSRF_TRUSTED_ORIGINS: "${CSRF_TRUSTED_ORIGINS}" + DEBUG: "${DEBUG}" + SECRET_KEY: "${SECRET_KEY}" + STATIC_ROOT: "${STATIC_ROOT}" + REGISTRATION_OPEN: "${REGISTRATION_OPEN}" + OPERATION: "${OPERATION}" + SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + SIGNAL_HTTP_URL: "${SIGNAL_HTTP_URL:-http://signal:8080}" + WHATSAPP_DB_DIR: "${WHATSAPP_DB_DIR}" + WHATSAPP_ENABLED: "${WHATSAPP_ENABLED}" + INSTAGRAM_ENABLED: "${INSTAGRAM_ENABLED}" + COMPOSE_WS_ENABLED: "${COMPOSE_WS_ENABLED}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" + depends_on: + redis: + condition: service_healthy + migration: + condition: service_started + collectstatic: + condition: service_started + # giadb: # image: manticoresearch/manticore:dev # container_name: giadb diff --git a/requirements.txt b/requirements.txt index 15a0220..60f0468 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,3 +38,4 @@ aiomysql slixmpp neonize watchdog +uvicorn diff --git a/stack.env.example b/stack.env.example index 1c0cbc4..b91d1ad 100644 --- a/stack.env.example +++ b/stack.env.example @@ -12,4 +12,5 @@ SECRET_KEY= STATIC_ROOT=/code/static REGISTRATION_OPEN=0 OPERATION=uwsgi -BILLING_ENABLED=0 \ No newline at end of file +BILLING_ENABLED=0 +COMPOSE_WS_ENABLED=true \ No newline at end of file