diff --git a/app/settings.py b/app/settings.py index 3350ba8..a01d9b2 100644 --- a/app/settings.py +++ b/app/settings.py @@ -209,7 +209,7 @@ DEBUG_TOOLBAR_PANELS = [ "debug_toolbar.panels.logging.LoggingPanel", "debug_toolbar.panels.redirects.RedirectsPanel", "debug_toolbar.panels.profiling.ProfilingPanel", - "cachalot.panels.CachalotPanel", + # "cachalot.panels.CachalotPanel", # Disabled due to compatibility issues with debug toolbar ] from app.local_settings import * # noqa diff --git a/app/urls.py b/app/urls.py index d4abeaf..f2b733e 100644 --- a/app/urls.py +++ b/app/urls.py @@ -160,6 +160,16 @@ urlpatterns = [ compose.ComposeSend.as_view(), name="compose_send", ), + path( + "compose/cancel-send/", + compose.ComposeCancelSend.as_view(), + name="compose_cancel_send", + ), + path( + "compose/command-result/", + compose.ComposeCommandResult.as_view(), + name="compose_command_result", + ), path( "compose/drafts/", compose.ComposeDrafts.as_view(), diff --git a/app/wsgi.py b/app/wsgi.py index 4725324..f2c7150 100644 --- a/app/wsgi.py +++ b/app/wsgi.py @@ -14,4 +14,3 @@ from django.core.wsgi import get_wsgi_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings") application = get_wsgi_application() - diff --git a/core/clients/gateway.py b/core/clients/gateway.py index cccc3c0..18b88ea 100644 --- a/core/clients/gateway.py +++ b/core/clients/gateway.py @@ -7,8 +7,8 @@ from django.conf import settings from core.clients import ClientBase, transport from core.messaging import history -from core.modules.mixed_protocol import normalize_gateway_event from core.models import PersonIdentifier +from core.modules.mixed_protocol import normalize_gateway_event class GatewayClient(ClientBase): @@ -48,7 +48,9 @@ class GatewayClient(ClientBase): self.log.info("%s gateway disabled by settings", self.service) return if self._task is None: - self.log.info("%s gateway client starting (%s)", self.service, self.base_url) + self.log.info( + "%s gateway client starting (%s)", self.service, self.base_url + ) self._task = self.loop.create_task(self._poll_loop()) async def start_typing(self, identifier): diff --git a/core/clients/serviceapi.py b/core/clients/serviceapi.py index 85ee413..9485556 100644 --- a/core/clients/serviceapi.py +++ b/core/clients/serviceapi.py @@ -5,4 +5,3 @@ Prefer importing from `core.clients.transport`. """ from core.clients.transport import * # noqa: F401,F403 - diff --git a/core/clients/signal.py b/core/clients/signal.py index ec8dea4..0bfcdf5 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -456,7 +456,9 @@ class HandleMessage(Command): if session_key in session_cache: chat_session = session_cache[session_key] else: - chat_session = await history.get_chat_session(identifier.user, identifier) + chat_session = await history.get_chat_session( + identifier.user, identifier + ) session_cache[session_key] = chat_session sender_key = source_uuid or source_number or identifier_candidates[0] message_key = (chat_session.id, ts, sender_key) diff --git a/core/clients/transport.py b/core/clients/transport.py index e99b353..cc0a1f4 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -3,8 +3,8 @@ import base64 import io import secrets import time -from urllib.parse import quote_plus from typing import Any +from urllib.parse import quote_plus import aiohttp import orjson @@ -40,6 +40,10 @@ def _runtime_command_result_key(service: str, command_id: str) -> str: return f"gia:service:command-result:{_service_key(service)}:{command_id}" +def _runtime_command_cancel_key(service: str, command_id: str) -> str: + return f"gia:service:command-cancel:{_service_key(service)}:{command_id}" + + def _gateway_base(service: str) -> str: key = f"{service.upper()}_HTTP_URL" default = f"http://{service}:8080" @@ -88,7 +92,9 @@ def update_runtime_state(service: str, **updates): return state -def enqueue_runtime_command(service: str, action: str, payload: dict | None = None) -> str: +def enqueue_runtime_command( + service: str, action: str, payload: dict | None = None +) -> str: service_key = _service_key(service) command_id = secrets.token_hex(12) command = { @@ -118,7 +124,9 @@ def pop_runtime_command(service: str) -> dict[str, Any] | None: return command -def set_runtime_command_result(service: str, command_id: str, result: dict | None = None): +def set_runtime_command_result( + service: str, command_id: str, result: dict | None = None +): service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) payload = dict(result or {}) @@ -126,7 +134,36 @@ def set_runtime_command_result(service: str, command_id: str, result: dict | Non cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) -async def wait_runtime_command_result(service: str, command_id: str, timeout: float = 20.0): +def cancel_runtime_command(service: str, command_id: str): + """Mark a runtime command as cancelled and set a result so waiters are released.""" + service_key = _service_key(service) + result_key = _runtime_command_result_key(service_key, command_id) + cancel_key = _runtime_command_cancel_key(service_key, command_id) + payload = {"ok": False, "error": "cancelled", "completed_at": int(time.time())} + cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) + cache.set(cancel_key, True, timeout=60) + return True + + +def cancel_runtime_commands_for_recipient(service: str, recipient: str) -> list[str]: + """Cancel any queued runtime commands for the given recipient and return their ids.""" + service_key = _service_key(service) + key = _runtime_commands_key(service_key) + queued = list(cache.get(key) or []) + cancelled = [] + for cmd in list(queued): + payload = dict(cmd.get("payload") or {}) + if str(payload.get("recipient") or "").strip() == str(recipient or "").strip(): + cmd_id = str(cmd.get("id") or "").strip() + if cmd_id: + cancel_runtime_command(service_key, cmd_id) + cancelled.append(cmd_id) + return cancelled + + +async def wait_runtime_command_result( + service: str, command_id: str, timeout: float = 20.0 +): service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) deadline = time.monotonic() + max(0.1, float(timeout or 0.0)) @@ -149,7 +186,9 @@ def list_accounts(service: str): if service_key == "signal": import requests - base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") + base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip( + "/" + ) try: response = requests.get(f"{base}/v1/accounts", timeout=20) if not response.ok: @@ -199,7 +238,9 @@ def unlink_account(service: str, account: str) -> bool: if service_key == "signal": import requests - base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") + base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip( + "/" + ) target = quote_plus(account_value) for path in (f"/v1/accounts/{target}", f"/v1/account/{target}"): try: @@ -242,7 +283,9 @@ def unlink_account(service: str, account: str) -> bool: connected=bool(accounts), pair_status=("connected" if accounts else ""), pair_qr="", - warning=("" if accounts else "Account unlinked. Add account to link again."), + warning=( + "" if accounts else "Account unlinked. Add account to link again." + ), last_event="account_unlinked", last_error="", ) @@ -619,7 +662,9 @@ def get_link_qr(service: str, device_name: str): if service_key == "signal": import requests - base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") + base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip( + "/" + ) response = requests.get( f"{base}/v1/qrcodelink", params={"device_name": device}, diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index 379a1fc..a78ad02 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -1,4 +1,5 @@ import asyncio +import logging import os import re import sqlite3 @@ -8,6 +9,7 @@ from urllib.parse import quote_plus import aiohttp from asgiref.sync import sync_to_async from django.conf import settings +from django.core.cache import cache from core.clients import ClientBase, transport from core.messaging import history, media_bridge @@ -45,12 +47,11 @@ class WhatsAppClient(ClientBase): str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower() in {"1", "true", "yes", "on"} ) - self.client_name = str( - getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp") - ).strip() or "gia_whatsapp" - self.database_url = str( - getattr(settings, "WHATSAPP_DATABASE_URL", "") - ).strip() + self.client_name = ( + str(getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp")).strip() + or "gia_whatsapp" + ) + self.database_url = str(getattr(settings, "WHATSAPP_DATABASE_URL", "")).strip() safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp" # Use a persistent default path (under project mount) instead of /tmp so # link state and contact cache survive container restarts. @@ -70,14 +71,24 @@ class WhatsAppClient(ClientBase): self._publish_state( connected=False, warning=( - "WhatsApp runtime is disabled by settings." - if not self.enabled - else "" + "WhatsApp runtime is disabled by settings." if not self.enabled else "" ), accounts=prior_accounts, last_event="init", session_db=self.session_db, ) + # Reduce third-party library logging noise (neonize/grpc/protobuf). + try: + # Common noisy libraries used by Neonize/WhatsApp stacks + logging.getLogger("neonize").setLevel(logging.WARNING) + logging.getLogger("grpc").setLevel(logging.WARNING) + logging.getLogger("google").setLevel(logging.WARNING) + logging.getLogger("protobuf").setLevel(logging.WARNING) + logging.getLogger("whatsmeow").setLevel(logging.WARNING) + logging.getLogger("whatsmeow.Client").setLevel(logging.WARNING) + logging.getLogger("aiohttp.access").setLevel(logging.WARNING) + except Exception: + pass def _publish_state(self, **updates): state = transport.update_runtime_state(self.service, **updates) @@ -96,8 +107,9 @@ class WhatsAppClient(ClientBase): async def _run(self): try: import neonize.aioze.client as wa_client_mod - from neonize.aioze.client import NewAClient from neonize.aioze import events as wa_events + from neonize.aioze.client import NewAClient + try: from neonize.utils.enum import ChatPresence, ChatPresenceMedia except Exception: @@ -432,7 +444,9 @@ class WhatsAppClient(ClientBase): try: return cls(self.session_db) except Exception as exc: - self.log.warning("whatsapp client init failed (%s): %s", self.session_db, exc) + self.log.warning( + "whatsapp client init failed (%s): %s", self.session_db, exc + ) self._publish_state( last_event="client_init_exception", last_error=str(exc), @@ -611,7 +625,9 @@ class WhatsAppClient(ClientBase): if offline_sync_completed_ev is not None: - async def on_offline_sync_completed(client, event: offline_sync_completed_ev): + async def on_offline_sync_completed( + client, event: offline_sync_completed_ev + ): self._publish_state(last_event="offline_sync_completed") await self._sync_contacts_from_client() @@ -633,8 +649,7 @@ class WhatsAppClient(ClientBase): self._publish_state( last_event="pair_waiting_no_qr", warning=( - "Waiting for WhatsApp QR from Neonize. " - "No QR callback received yet." + "Waiting for WhatsApp QR from Neonize. " "No QR callback received yet." ), ) @@ -663,10 +678,12 @@ class WhatsAppClient(ClientBase): text = payload.get("text") attachments = payload.get("attachments") or [] try: + # Include command_id so send_message_raw can observe cancel requests result = await self.send_message_raw( recipient=recipient, text=text, attachments=attachments, + command_id=command_id, ) if result is not False and result is not None: transport.set_runtime_command_result( @@ -778,10 +795,14 @@ class WhatsAppClient(ClientBase): state = transport.get_runtime_state(self.service) return { "started_at": started_at, - "finished_at": int(state.get("history_sync_finished_at") or int(time.time())), + "finished_at": int( + state.get("history_sync_finished_at") or int(time.time()) + ), "duration_ms": int(state.get("history_sync_duration_ms") or 0), "contacts_sync_count": int(state.get("contacts_sync_count") or 0), - "history_imported_messages": int(state.get("history_imported_messages") or 0), + "history_imported_messages": int( + state.get("history_imported_messages") or 0 + ), "sqlite_imported_messages": int(state.get("history_sqlite_imported") or 0), "sqlite_scanned_messages": int(state.get("history_sqlite_scanned") or 0), "sqlite_table": str(state.get("history_sqlite_table") or ""), @@ -838,7 +859,9 @@ class WhatsAppClient(ClientBase): ID=str(anchor.get("msg_id") or ""), Timestamp=int(anchor.get("ts") or int(time.time() * 1000)), ) - request_msg = build_history_sync_request(info, max(10, min(int(count or 120), 500))) + request_msg = build_history_sync_request( + info, max(10, min(int(count or 120), 500)) + ) await self._maybe_await(self._client.send_message(chat_jid, request_msg)) self._publish_state( last_event="history_on_demand_requested", @@ -966,11 +989,17 @@ class WhatsAppClient(ClientBase): selected = name break if not selected: - return {"rows": [], "table": "", "error": "messages_table_not_found"} + return { + "rows": [], + "table": "", + "error": "messages_table_not_found", + } columns = [ str(row[1] or "") - for row in cur.execute(f'PRAGMA table_info("{selected}")').fetchall() + for row in cur.execute( + f'PRAGMA table_info("{selected}")' + ).fetchall() ] if not columns: return {"rows": [], "table": selected, "error": "no_columns"} @@ -1039,7 +1068,11 @@ class WhatsAppClient(ClientBase): "table": selected, "error": "required_message_columns_missing", } - select_cols = [col for col in [text_col, ts_col, from_me_col, sender_col, chat_col] if col] + select_cols = [ + col + for col in [text_col, ts_col, from_me_col, sender_col, chat_col] + if col + ] quoted = ", ".join(f'"{col}"' for col in select_cols) order_expr = f'"{ts_col}" DESC' if ts_col else "ROWID DESC" sql = f'SELECT {quoted} FROM "{selected}" ORDER BY {order_expr} LIMIT 12000' @@ -1057,8 +1090,12 @@ class WhatsAppClient(ClientBase): text = str(row_map.get(text_col) or "").strip() if not text: continue - raw_sender = str(row_map.get(sender_col) or "").strip() if sender_col else "" - raw_chat = str(row_map.get(chat_col) or "").strip() if chat_col else "" + raw_sender = ( + str(row_map.get(sender_col) or "").strip() if sender_col else "" + ) + raw_chat = ( + str(row_map.get(chat_col) or "").strip() if chat_col else "" + ) raw_from_me = row_map.get(from_me_col) if from_me_col else None parsed.append( { @@ -1089,7 +1126,9 @@ class WhatsAppClient(ClientBase): target_candidates = self._normalize_identifier_candidates(identifier) target_local = str(identifier or "").strip().split("@", 1)[0] if target_local: - target_candidates.update(self._normalize_identifier_candidates(target_local)) + target_candidates.update( + self._normalize_identifier_candidates(target_local) + ) identifiers = await sync_to_async(list)( PersonIdentifier.objects.filter(service="whatsapp") @@ -1348,7 +1387,9 @@ class WhatsAppClient(ClientBase): for row in cur.execute( 'SELECT DISTINCT "our_jid" FROM "whatsmeow_contacts"' ).fetchall(): - base = str((row or [None])[0] or "").strip().split("@", 1)[0] + base = ( + str((row or [None])[0] or "").strip().split("@", 1)[0] + ) base = base.split(":", 1)[0] if base: own_ids.add(base.lower()) @@ -1364,9 +1405,18 @@ class WhatsAppClient(ClientBase): ).fetchall() except Exception: rows = [] - for their_jid, first_name, full_name, push_name, business_name in rows: + for ( + their_jid, + first_name, + full_name, + push_name, + business_name, + ) in rows: jid_value = str(their_jid or "").strip() - if "@s.whatsapp.net" not in jid_value and "@lid" not in jid_value: + if ( + "@s.whatsapp.net" not in jid_value + and "@lid" not in jid_value + ): continue identifier = jid_value.split("@", 1)[0].strip().split(":", 1)[0] if "@lid" in jid_value: @@ -1539,7 +1589,9 @@ class WhatsAppClient(ClientBase): me = await self._maybe_await(self._client.get_me()) if me: self._connected = True - self._publish_state(connected=True, warning="", pair_status="connected") + self._publish_state( + connected=True, warning="", pair_status="connected" + ) return True except Exception: pass @@ -1561,22 +1613,17 @@ class WhatsAppClient(ClientBase): return pushname_rows = ( - self._pluck(data, "pushnames") - or self._pluck(data, "Pushnames") - or [] + self._pluck(data, "pushnames") or self._pluck(data, "Pushnames") or [] ) pushname_map = {} for row in pushname_rows: - raw_id = ( - self._jid_to_identifier(self._pluck(row, "ID")) - or self._jid_to_identifier(self._pluck(row, "id")) - ) + raw_id = self._jid_to_identifier( + self._pluck(row, "ID") + ) or self._jid_to_identifier(self._pluck(row, "id")) if not raw_id: continue pushname = str( - self._pluck(row, "pushname") - or self._pluck(row, "Pushname") - or "" + self._pluck(row, "pushname") or self._pluck(row, "Pushname") or "" ).strip() if not pushname: continue @@ -1693,12 +1740,20 @@ class WhatsAppClient(ClientBase): self._pluck(msg_obj, "videoMessage", "caption"), self._pluck(msg_obj, "documentMessage", "caption"), self._pluck(msg_obj, "ephemeralMessage", "message", "conversation"), - self._pluck(msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text"), + self._pluck( + msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text" + ), self._pluck(msg_obj, "viewOnceMessage", "message", "conversation"), - self._pluck(msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text"), + self._pluck( + msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text" + ), self._pluck(msg_obj, "viewOnceMessageV2", "message", "conversation"), - self._pluck(msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text"), - self._pluck(msg_obj, "viewOnceMessageV2Extension", "message", "conversation"), + self._pluck( + msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text" + ), + self._pluck( + msg_obj, "viewOnceMessageV2Extension", "message", "conversation" + ), self._pluck( msg_obj, "viewOnceMessageV2Extension", @@ -1753,7 +1808,9 @@ class WhatsAppClient(ClientBase): ) return str(sender or fallback_chat_jid or "").strip() - async def _import_history_messages_for_conversation(self, row, chat_jid: str) -> int: + async def _import_history_messages_for_conversation( + self, row, chat_jid: str + ) -> int: imported = 0 msg_rows = self._history_message_rows(row) if not msg_rows: @@ -1761,7 +1818,9 @@ class WhatsAppClient(ClientBase): chat_identifier = str(chat_jid or "").split("@", 1)[0].strip() if not chat_identifier: return imported - candidate_values = self._normalize_identifier_candidates(chat_jid, chat_identifier) + candidate_values = self._normalize_identifier_candidates( + chat_jid, chat_identifier + ) if not candidate_values: return imported identifiers = await sync_to_async(list)( @@ -1968,8 +2027,7 @@ class WhatsAppClient(ClientBase): or self._pluck(event, "info", "messageSource") ) is_from_me = bool( - self._pluck(source, "IsFromMe") - or self._pluck(source, "isFromMe") + self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe") ) sender = self._jid_to_identifier( @@ -1979,8 +2037,7 @@ class WhatsAppClient(ClientBase): or self._pluck(source, "senderAlt") ) chat = self._jid_to_identifier( - self._pluck(source, "Chat") - or self._pluck(source, "chat") + self._pluck(source, "Chat") or self._pluck(source, "chat") ) raw_ts = ( self._pluck(event, "Info", "Timestamp") @@ -2092,14 +2149,15 @@ class WhatsAppClient(ClientBase): or self._pluck(event, "info", "messageSource") ) sender = self._jid_to_identifier( - self._pluck(source, "Sender") - or self._pluck(source, "sender") + self._pluck(source, "Sender") or self._pluck(source, "sender") ) chat = self._jid_to_identifier( self._pluck(source, "Chat") or self._pluck(source, "chat") ) timestamps = [] - raw_ids = self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or [] + raw_ids = ( + self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or [] + ) if isinstance(raw_ids, (list, tuple, set)): for item in raw_ids: try: @@ -2141,8 +2199,7 @@ class WhatsAppClient(ClientBase): or {} ) sender = self._jid_to_identifier( - self._pluck(source, "Sender") - or self._pluck(source, "sender") + self._pluck(source, "Sender") or self._pluck(source, "sender") ) chat = self._jid_to_identifier( self._pluck(source, "Chat") or self._pluck(source, "chat") @@ -2161,19 +2218,26 @@ class WhatsAppClient(ClientBase): await self.ur.started_typing( self.service, identifier=candidate, - payload={"presence": state_text, "sender": str(sender), "chat": str(chat)}, + payload={ + "presence": state_text, + "sender": str(sender), + "chat": str(chat), + }, ) else: - await self.ur.stopped_typing( + await self.ur.stopped_typing( self.service, identifier=candidate, - payload={"presence": state_text, "sender": str(sender), "chat": str(chat)}, + payload={ + "presence": state_text, + "sender": str(sender), + "chat": str(chat), + }, ) async def _handle_presence_event(self, event): sender = self._jid_to_identifier( - self._pluck(event, "From", "User") - or self._pluck(event, "from", "user") + self._pluck(event, "From", "User") or self._pluck(event, "from", "user") ) is_unavailable = bool( self._pluck(event, "Unavailable") or self._pluck(event, "unavailable") @@ -2271,7 +2335,9 @@ class WhatsAppClient(ClientBase): } return None - async def send_message_raw(self, recipient, text=None, attachments=None): + async def send_message_raw( + self, recipient, text=None, attachments=None, command_id: str | None = None + ): if not self._client: return False if self._build_jid is None: @@ -2283,8 +2349,12 @@ class WhatsAppClient(ClientBase): try: jid = self._build_jid(jid_str) # Verify it's a proper JID object with SerializeToString method - if not hasattr(jid, 'SerializeToString'): - self.log.error("whatsapp build_jid returned non-JID object: type=%s repr=%s", type(jid).__name__, repr(jid)[:100]) + if not hasattr(jid, "SerializeToString"): + self.log.error( + "whatsapp build_jid returned non-JID object: type=%s repr=%s", + type(jid).__name__, + repr(jid)[:100], + ) return False except Exception as exc: self.log.warning("whatsapp failed to build JID from %s: %s", jid_str, exc) @@ -2313,7 +2383,9 @@ class WhatsAppClient(ClientBase): payload = await self._fetch_attachment_payload(attachment) if not payload: continue - mime = str(payload.get("content_type") or "application/octet-stream").lower() + mime = str( + payload.get("content_type") or "application/octet-stream" + ).lower() data = payload.get("content") or b"" filename = payload.get("filename") or "attachment.bin" @@ -2327,7 +2399,9 @@ class WhatsAppClient(ClientBase): self._client.send_video(jid, data, caption="") ) elif mime.startswith("audio/") and hasattr(self._client, "send_audio"): - response = await self._maybe_await(self._client.send_audio(jid, data)) + response = await self._maybe_await( + self._client.send_audio(jid, data) + ) elif hasattr(self._client, "send_document"): response = await self._maybe_await( self._client.send_document( @@ -2351,21 +2425,46 @@ class WhatsAppClient(ClientBase): if text: response = None last_error = None - for attempt in range(3): + # Prepare cancel key (if caller provided command_id) + cancel_key = None + try: + if command_id: + cancel_key = transport._runtime_command_cancel_key( + self.service, str(command_id) + ) + except Exception: + cancel_key = None + + for attempt in range(5): # Increased from 3 to 5 attempts + # Check for a cancellation marker set by transport.cancel_runtime_command + try: + if cancel_key and cache.get(cancel_key): + self.log.info("whatsapp send cancelled via cancel marker") + return False + except Exception: + pass try: # Log what we're about to send for debugging - self.log.debug(f"send_message attempt {attempt+1}: jid={jid} text_type={type(text).__name__} text_len={len(text)}") - response = await self._maybe_await(self._client.send_message(jid, text)) + if getattr(settings, "WHATSAPP_DEBUG", False): + self.log.debug( + f"send_message attempt {attempt+1}: jid={jid} text_type={type(text).__name__} text_len={len(text)}" + ) + response = await self._maybe_await( + self._client.send_message(jid, text) + ) sent_any = True last_error = None break except Exception as exc: - self.log.debug(f"send_message attempt {attempt+1} failed: {type(exc).__name__}: {exc}") + if getattr(settings, "WHATSAPP_DEBUG", False): + self.log.debug( + f"send_message attempt {attempt+1} failed: {type(exc).__name__}: {exc}" + ) last_error = exc error_text = str(last_error or "").lower() is_transient = "usync query" in error_text or "timed out" in error_text - if is_transient and attempt < 2: + if is_transient and attempt < 4: # Updated to match new attempt range if hasattr(self._client, "connect"): try: await self._maybe_await(self._client.connect()) @@ -2381,7 +2480,21 @@ class WhatsAppClient(ClientBase): last_event="send_retry_reconnect_failed", last_error=str(reconnect_exc), ) - await asyncio.sleep(0.8 * (attempt + 1)) + # Sleep but wake earlier if cancelled: poll small intervals + # Increase backoff time for device list queries + total_sleep = 1.5 * (attempt + 1) + slept = 0.0 + while slept < total_sleep: + try: + if cancel_key and cache.get(cancel_key): + self.log.info( + "whatsapp send cancelled during retry backoff" + ) + return False + except Exception: + pass + await asyncio.sleep(0.2) + slept += 0.2 continue break if last_error is not None and not sent_any: @@ -2420,7 +2533,9 @@ class WhatsAppClient(ClientBase): pass if hasattr(self._client, "set_chat_presence"): try: - await self._maybe_await(self._client.set_chat_presence(jid, "composing")) + await self._maybe_await( + self._client.set_chat_presence(jid, "composing") + ) return True except Exception: pass diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py index f354bf9..26aa089 100644 --- a/core/clients/xmpp.py +++ b/core/clients/xmpp.py @@ -922,7 +922,9 @@ class XMPPComponent(ComponentXMPP): } ) - if (not body or body.strip().lower() in {"[no body]", "(no text)"}) and attachments: + if ( + not body or body.strip().lower() in {"[no body]", "(no text)"} + ) and attachments: attachment_urls = [ str(item.get("url") or "").strip() for item in attachments diff --git a/core/management/commands/backfill_xmpp_attachment_urls.py b/core/management/commands/backfill_xmpp_attachment_urls.py index cd7c6f6..a6380cc 100644 --- a/core/management/commands/backfill_xmpp_attachment_urls.py +++ b/core/management/commands/backfill_xmpp_attachment_urls.py @@ -3,7 +3,6 @@ from django.db.models import Q from core.models import Message, MessageEvent - EMPTY_TEXT_VALUES = { "", "[No Body]", @@ -105,9 +104,7 @@ class Command(BaseCommand): queryset = Message.objects.filter( sender_uuid__iexact="xmpp", - ).filter( - Q(text__isnull=True) | Q(text__exact="") | Q(text__iexact="[No Body]") - ) + ).filter(Q(text__isnull=True) | Q(text__exact="") | Q(text__iexact="[No Body]")) if user_id: queryset = queryset.filter(user_id=user_id) queryset = queryset.order_by("ts", "id") diff --git a/core/messaging/media_bridge.py b/core/messaging/media_bridge.py index 6fa184a..8cc8bbc 100644 --- a/core/messaging/media_bridge.py +++ b/core/messaging/media_bridge.py @@ -4,7 +4,6 @@ import time from django.core.cache import cache - DEFAULT_BLOB_TTL_SECONDS = 60 * 20 diff --git a/core/modules/mixed_protocol.py b/core/modules/mixed_protocol.py index a797d3e..f1d6b14 100644 --- a/core/modules/mixed_protocol.py +++ b/core/modules/mixed_protocol.py @@ -21,7 +21,9 @@ class UnifiedEvent: def normalize_gateway_event(service: str, payload: dict[str, Any]) -> UnifiedEvent: event_type = str(payload.get("type") or "").strip().lower() message_timestamps = [] - raw_timestamps = payload.get("message_timestamps") or payload.get("timestamps") or [] + raw_timestamps = ( + payload.get("message_timestamps") or payload.get("timestamps") or [] + ) if isinstance(raw_timestamps, list): for item in raw_timestamps: try: @@ -44,7 +46,10 @@ def normalize_gateway_event(service: str, payload: dict[str, Any]) -> UnifiedEve service=service, event_type=event_type, identifier=str( - payload.get("identifier") or payload.get("source") or payload.get("from") or "" + payload.get("identifier") + or payload.get("source") + or payload.get("from") + or "" ).strip(), text=str(payload.get("text") or ""), ts=ts, diff --git a/core/realtime/compose_ws.py b/core/realtime/compose_ws.py index 0e1b4ee..9126056 100644 --- a/core/realtime/compose_ws.py +++ b/core/realtime/compose_ws.py @@ -8,10 +8,7 @@ from django.core import signing from core.models import ChatSession, Message, PersonIdentifier, WorkspaceConversation from core.realtime.typing_state import get_person_typing_state -from core.views.compose import ( - COMPOSE_WS_TOKEN_SALT, - _serialize_messages_with_artifacts, -) +from core.views.compose import COMPOSE_WS_TOKEN_SALT, _serialize_messages_with_artifacts def _safe_int(value, default=0): diff --git a/core/realtime/typing_state.py b/core/realtime/typing_state.py index 0590261..1edb6a4 100644 --- a/core/realtime/typing_state.py +++ b/core/realtime/typing_state.py @@ -25,9 +25,7 @@ def set_person_typing_state( "source_service": str(source_service or ""), "display_name": str(display_name or ""), "updated_ts": now_ms, - "expires_ts": ( - now_ms + (TYPING_TTL_SECONDS * 1000) if started else now_ms - ), + "expires_ts": (now_ms + (TYPING_TTL_SECONDS * 1000) if started else now_ms), } cache.set( _person_key(user_id, person_id), diff --git a/core/templates/base.html b/core/templates/base.html index cf91aa8..4f0f5ab 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -77,21 +77,21 @@ headers: { 'HX-Request': 'true' }, }) .then((response) => { - if (!response.ok) { - throw new Error('Failed contacts preview fetch.'); - } - return response.text(); - }) + if (!response.ok) { + throw new Error('Failed contacts preview fetch.'); + } + return response.text(); + }) .then((html) => { - composeDropdown.innerHTML = html; - composePreviewLoaded = true; - }) + composeDropdown.innerHTML = html; + composePreviewLoaded = true; + }) .catch(() => { - composePreviewLoaded = false; - }) + composePreviewLoaded = false; + }) .finally(() => { - composePreviewLoading = false; - }); + composePreviewLoading = false; + }); }); } diff --git a/core/templates/pages/compose-contact-match.html b/core/templates/pages/compose-contact-match.html index e03c65a..b667510 100644 --- a/core/templates/pages/compose-contact-match.html +++ b/core/templates/pages/compose-contact-match.html @@ -161,88 +161,88 @@
- - - - - - - - - - - - - {% for row in candidates %} - - - - - - - + + + + + + + + + + + + {% for row in candidates %} + + + + + + + + - - - {% endfor %} - -
PersonNameServiceIdentifierSuggestStatus
{{ row.linked_person_name|default:"-" }}{{ row.detected_name|default:"-" }} - {{ row.service|title }} - {{ row.identifier }} - {% if not row.linked_person and row.suggestions %} -
- {% for suggestion in row.suggestions %} -
- {% csrf_token %} - - - - -
- {% endfor %} -
- {% else %} - - - {% endif %} -
- {% if row.linked_person %} - linked - {% else %} +
PersonNameServiceIdentifierSuggestStatus
{{ row.linked_person_name|default:"-" }}{{ row.detected_name|default:"-" }} + {{ row.service|title }} + {{ row.identifier }} + {% if not row.linked_person and row.suggestions %} +
+ {% for suggestion in row.suggestions %} +
+ {% csrf_token %} + + + + +
+ {% endfor %} +
+ {% else %} + - + {% endif %} +
+ {% if row.linked_person %} + linked + {% else %} + + {% endif %} + - {% endif %} - - - - - -
+ + + + + + {% endfor %} + +
{% else %} diff --git a/core/templates/partials/ai-workspace-ai-result.html b/core/templates/partials/ai-workspace-ai-result.html index 749af48..63ba80b 100644 --- a/core/templates/partials/ai-workspace-ai-result.html +++ b/core/templates/partials/ai-workspace-ai-result.html @@ -399,8 +399,8 @@ return String(value || "") .split(",") .map(function (item) { - return item.trim(); - }) + return item.trim(); + }) .filter(Boolean); }; diff --git a/core/templates/partials/ai-workspace-person-widget.html b/core/templates/partials/ai-workspace-person-widget.html index 014e7e3..a7e9561 100644 --- a/core/templates/partials/ai-workspace-person-widget.html +++ b/core/templates/partials/ai-workspace-person-widget.html @@ -536,8 +536,8 @@ showOperationPane(operation); const activeTab = tabKey || ( operation === "artifacts" - ? ((window.giaWorkspaceState[personId] || {}).currentMitigationTab || "plan_board") - : operation + ? ((window.giaWorkspaceState[personId] || {}).currentMitigationTab || "plan_board") + : operation ); setTopCapsuleActive(activeTab); const hydrated = hydrateCachedIfAvailable(operation); @@ -573,8 +573,8 @@ const currentState = window.giaWorkspaceState[personId] || {}; const targetTabKey = currentState.pendingTabKey || ( operation === "artifacts" - ? (currentState.currentMitigationTab || "plan_board") - : operation + ? (currentState.currentMitigationTab || "plan_board") + : operation ); if (!forceRefresh && currentState.current === operation && pane.dataset.loaded === "1") { window.giaWorkspaceShowTab(personId, operation, targetTabKey); @@ -663,8 +663,8 @@ const state = window.giaWorkspaceState[personId] || {}; const currentTab = state.currentTab || ( state.current === "artifacts" - ? (state.currentMitigationTab || "plan_board") - : (state.current || "plan_board") + ? (state.currentMitigationTab || "plan_board") + : (state.current || "plan_board") ); window.giaWorkspaceOpenTab(personId, currentTab, true); }; diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index 6e790b3..6b37a73 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -254,6 +254,9 @@ {% endwith %} {% endif %}
+
+ {{ msg.source_label }} +
{% if msg.image_urls %} {% for image_url in msg.image_urls %}
@@ -285,14 +288,30 @@

{{ msg.display_ts }}{% if msg.author %} · {{ msg.author }}{% endif %} {% if msg.read_ts %} - + - {{ msg.read_display }} + {{ msg.read_delta_display }} {% elif msg.delivered_ts %} - + - {{ msg.delivered_display }} + {{ msg.delivered_delta_display }} {% endif %}

@@ -438,6 +457,26 @@ padding: 0.52rem 0.62rem; box-shadow: none; } + #{{ panel_id }} .compose-source-badge-wrap { + display: flex; + justify-content: flex-start; + margin-bottom: 0.36rem; + } + #{{ panel_id }} .compose-source-badge { + font-size: 0.84rem; + padding: 0.12rem 0.5rem; + border-radius: 6px; + color: #fff; + font-weight: 800; + letter-spacing: 0.02em; + box-shadow: 0 1px 0 rgba(0,0,0,0.06); + } + #{{ panel_id }} .compose-source-badge.source-web { background: #2f4f7a; } + #{{ panel_id }} .compose-source-badge.source-xmpp { background: #6a88b4; } + #{{ panel_id }} .compose-source-badge.source-whatsapp { background: #25D366; color: #063; } + #{{ panel_id }} .compose-source-badge.source-signal { background: #3b82f6; } + #{{ panel_id }} .compose-source-badge.source-instagram { background: #c13584; } + #{{ panel_id }} .compose-source-badge.source-unknown { background: #6b7280; } #{{ panel_id }} .compose-bubble.is-in { background: rgba(255, 255, 255, 0.96); } @@ -1719,6 +1758,7 @@ }; const appendBubble = function (msg) { + console.log("[appendBubble]", {id: msg.id, ts: msg.ts, author: msg.author, source_label: msg.source_label, source_service: msg.source_service, outgoing: msg.outgoing}); const row = document.createElement("div"); const outgoing = !!msg.outgoing; row.className = "compose-row " + (outgoing ? "is-out" : "is-in"); @@ -1729,12 +1769,24 @@ const bubble = document.createElement("article"); bubble.className = "compose-bubble " + (outgoing ? "is-out" : "is-in"); + // Add source badge for client-side rendered messages + if (msg.source_label) { + console.log("[appendBubble] rendering source badge:", msg.source_label); + const badgeWrap = document.createElement("div"); + badgeWrap.className = "compose-source-badge-wrap"; + const badge = document.createElement("span"); + const svc = String(msg.source_service || "web").toLowerCase(); + badge.className = "compose-source-badge source-" + svc; + badge.textContent = String(msg.source_label || ""); + badgeWrap.appendChild(badge); + bubble.appendChild(badgeWrap); + } const imageCandidatesFromPayload = Array.isArray(msg.image_urls) && msg.image_urls.length - ? msg.image_urls - : (msg.image_url ? [msg.image_url] : []); + ? msg.image_urls + : (msg.image_url ? [msg.image_url] : []); const imageCandidates = imageCandidatesFromPayload.length - ? imageCandidatesFromPayload - : extractUrlCandidates(msg.text || msg.display_text || ""); + ? imageCandidatesFromPayload + : extractUrlCandidates(msg.text || msg.display_text || ""); appendImageCandidates(bubble, imageCandidates); if (!msg.hide_text) { @@ -1759,44 +1811,55 @@ if (msg.author) { metaText += " · " + String(msg.author); } - meta.textContent = metaText; - // Render delivery/read ticks and a small time label when available. - if (msg.read_ts) { - const tickWrap = document.createElement("span"); - tickWrap.className = "compose-ticks"; - tickWrap.title = "Read at " + String(msg.read_display || msg.read_ts || ""); - const icon = document.createElement("span"); - icon.className = "icon is-small"; - const i = document.createElement("i"); - i.className = "fa-solid fa-check-double has-text-info"; - icon.appendChild(i); - const timeSpan = document.createElement("span"); - timeSpan.className = "compose-tick-time"; - timeSpan.textContent = String(msg.read_display || ""); - tickWrap.appendChild(icon); - tickWrap.appendChild(timeSpan); - meta.appendChild(document.createTextNode(" ")); - meta.appendChild(tickWrap); - } else if (msg.delivered_ts) { - const tickWrap = document.createElement("span"); - tickWrap.className = "compose-ticks"; - tickWrap.title = "Delivered at " + String(msg.delivered_display || msg.delivered_ts || ""); - const icon = document.createElement("span"); - icon.className = "icon is-small"; - const i = document.createElement("i"); - i.className = "fa-solid fa-check-double has-text-grey"; - icon.appendChild(i); - const timeSpan = document.createElement("span"); - timeSpan.className = "compose-tick-time"; - timeSpan.textContent = String(msg.delivered_display || ""); - tickWrap.appendChild(icon); - tickWrap.appendChild(timeSpan); - meta.appendChild(document.createTextNode(" ")); - meta.appendChild(tickWrap); - } + meta.textContent = metaText; + // Render delivery/read ticks and a small time label when available. + if (msg.read_ts) { + const tickWrap = document.createElement("span"); + tickWrap.className = "compose-ticks"; + tickWrap.title = "Read at " + String(msg.read_display || msg.read_ts || ""); + const icon = document.createElement("span"); + icon.className = "icon is-small"; + const i = document.createElement("i"); + i.className = "fa-solid fa-check-double has-text-info"; + icon.appendChild(i); + const timeSpan = document.createElement("span"); + timeSpan.className = "compose-tick-time"; + timeSpan.textContent = String(msg.read_display || ""); + tickWrap.appendChild(icon); + tickWrap.appendChild(timeSpan); + meta.appendChild(document.createTextNode(" ")); + meta.appendChild(tickWrap); + } else if (msg.delivered_ts) { + const tickWrap = document.createElement("span"); + tickWrap.className = "compose-ticks"; + tickWrap.title = "Delivered at " + String(msg.delivered_display || msg.delivered_ts || ""); + const icon = document.createElement("span"); + icon.className = "icon is-small"; + const i = document.createElement("i"); + i.className = "fa-solid fa-check-double has-text-grey"; + icon.appendChild(i); + const timeSpan = document.createElement("span"); + timeSpan.className = "compose-tick-time"; + timeSpan.textContent = String(msg.delivered_display || ""); + tickWrap.appendChild(icon); + tickWrap.appendChild(timeSpan); + meta.appendChild(document.createTextNode(" ")); + meta.appendChild(tickWrap); + } bubble.appendChild(meta); - row.appendChild(bubble); + // If message carries receipt metadata, append dataset so the popover can use it. + if (msg.receipt_payload || msg.read_source_service || msg.read_by_identifier) { + // Attach data attributes on the row so event delegation can find them. + try { + row.dataset.receipt = JSON.stringify(msg.receipt_payload || {}); + } catch (e) { + row.dataset.receipt = "{}"; + } + row.dataset.receiptSource = String(msg.read_source_service || ""); + row.dataset.receiptBy = String(msg.read_by_identifier || ""); + row.dataset.receiptId = String(msg.id || ""); + } const empty = thread.querySelector(".compose-empty"); if (empty) { empty.remove(); @@ -1810,6 +1873,87 @@ updateGlanceFromMessage(msg); }; + // Receipt popover (similar to contact info popover) + const receiptPopover = document.createElement("div"); + receiptPopover.id = "compose-receipt-popover"; + receiptPopover.className = "compose-ai-popover is-hidden"; + receiptPopover.setAttribute("aria-hidden", "true"); + receiptPopover.innerHTML = ` +
+

Receipt Details

+
+ + + + + + + +
Message ID-
Source-
Read By-
Delivered-
Read-
Payload
+
+
+ `; + document.body.appendChild(receiptPopover); + + let activeReceiptBtn = null; + function hideReceiptPopover() { + receiptPopover.classList.add("is-hidden"); + receiptPopover.setAttribute("aria-hidden", "true"); + activeReceiptBtn = null; + } + function positionReceiptPopover(btn) { + const rect = btn.getBoundingClientRect(); + const width = Math.min(520, Math.max(280, Math.floor(window.innerWidth * 0.32))); + const left = Math.min(window.innerWidth - width - 16, Math.max(12, rect.left - width + rect.width)); + const top = Math.min(window.innerHeight - 24, rect.bottom + 8); + receiptPopover.style.left = left + "px"; + receiptPopover.style.top = top + "px"; + receiptPopover.style.width = width + "px"; + } + function openReceiptPopoverFromData(data, btn) { + document.getElementById("receipt-msg-id").textContent = data.id || "-"; + document.getElementById("receipt-source").textContent = data.source || "-"; + document.getElementById("receipt-by").textContent = data.by || "-"; + document.getElementById("receipt-delivered").textContent = data.delivered || "-"; + document.getElementById("receipt-read").textContent = data.read || "-"; + try { + const out = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload || {}; + document.getElementById("receipt-payload").textContent = JSON.stringify(out, null, 2); + } catch (e) { + document.getElementById("receipt-payload").textContent = String(data.payload || "{}"); + } + positionReceiptPopover(btn); + receiptPopover.classList.remove("is-hidden"); + receiptPopover.setAttribute("aria-hidden", "false"); + } + + // Delegate click on tick triggers inside thread + thread.addEventListener("click", function (ev) { + const btn = ev.target.closest && ev.target.closest('.js-receipt-trigger'); + if (!btn) return; + if (activeReceiptBtn === btn && !receiptPopover.classList.contains('is-hidden')) { + hideReceiptPopover(); + return; + } + activeReceiptBtn = btn; + const payload = btn.dataset && btn.dataset.receipt ? btn.dataset.receipt : (btn.parentNode && btn.parentNode.dataset ? btn.parentNode.dataset.receipt : "{}"); + const source = btn.dataset && btn.dataset.source ? btn.dataset.source : (btn.parentNode && btn.parentNode.dataset ? btn.parentNode.dataset.receiptSource : ""); + const by = btn.dataset && btn.dataset.by ? btn.dataset.by : (btn.parentNode && btn.parentNode.dataset ? btn.parentNode.dataset.receiptBy : ""); + const id = btn.dataset && btn.dataset.id ? btn.dataset.id : (btn.parentNode && btn.parentNode.dataset ? btn.parentNode.dataset.receiptId : ""); + const delivered = btn.title || ""; + const read = btn.title || ""; + openReceiptPopoverFromData({ id: id, payload: payload, source: source, by: by, delivered: delivered, read: read }, btn); + }); + + // Close receipt popover on outside click / escape + document.addEventListener("click", function (ev) { + if (receiptPopover.classList.contains('is-hidden')) return; + if (receiptPopover.contains(ev.target)) return; + if (activeReceiptBtn && activeReceiptBtn.contains(ev.target)) return; + hideReceiptPopover(); + }); + document.addEventListener("keydown", function (ev) { if (ev.key === 'Escape') hideReceiptPopover(); }); + const applyMinuteGrouping = function () { const rows = Array.from(thread.querySelectorAll(".compose-row")); rows.forEach(function (row) { @@ -1889,15 +2033,18 @@ } params.set("limit", thread.dataset.limit || "60"); params.set("after_ts", String(lastTs)); + console.log("[poll] fetching messages: service=" + params.get("service") + " after_ts=" + lastTs); const response = await fetch(thread.dataset.pollUrl + "?" + params.toString(), { method: "GET", credentials: "same-origin", headers: { Accept: "application/json" } }); if (!response.ok) { + console.log("[poll] response not ok:", response.status); return; } const payload = await response.json(); + console.log("[poll] received payload with " + (payload.messages ? payload.messages.length : 0) + " messages"); appendMessages(payload.messages || [], forceScroll); if (payload.typing) { applyTyping(payload.typing); @@ -2522,7 +2669,7 @@ } catch (err) { setCardLoading(card, false); card.querySelector(".compose-ai-content").textContent = - "Failed to load quick insights."; + "Failed to load quick insights."; } }; @@ -2541,8 +2688,8 @@ const customText = card.querySelector(".engage-custom-text"); const selectedSource = ( preferredSource !== undefined - ? preferredSource - : (sourceSelect ? sourceSelect.value : "") + ? preferredSource + : (sourceSelect ? sourceSelect.value : "") ); const customValue = customText ? String(customText.value || "").trim() : ""; const showCustom = selectedSource === "custom"; @@ -2734,8 +2881,8 @@ const selectedPerson = selected.dataset.person || thread.dataset.person || ""; const selectedPageUrl = ( renderMode === "page" - ? selected.dataset.pageUrl - : selected.dataset.widgetUrl + ? selected.dataset.pageUrl + : selected.dataset.widgetUrl ) || ""; switchThreadContext( selectedService, @@ -2764,8 +2911,8 @@ const selectedPerson = selected.dataset.person || ""; let selectedPageUrl = ( renderMode === "page" - ? selected.dataset[servicePageUrlKey] - : selected.dataset[serviceWidgetUrlKey] + ? selected.dataset[servicePageUrlKey] + : selected.dataset[serviceWidgetUrlKey] ) || ""; if (!selectedIdentifier) { selectedService = selected.dataset.service || selectedService; @@ -2774,8 +2921,8 @@ if (!selectedPageUrl) { selectedPageUrl = ( renderMode === "page" - ? selected.dataset.pageUrl - : selected.dataset.widgetUrl + ? selected.dataset.pageUrl + : selected.dataset.widgetUrl ) || ""; } switchThreadContext( @@ -2877,6 +3024,51 @@ textarea.focus(); }); + // Cancel send support: show a cancel button while the form request is pending. + let cancelBtn = null; + const showCancelButton = function () { + if (cancelBtn) return; + cancelBtn = document.createElement('button'); + cancelBtn.type = 'button'; + cancelBtn.className = 'button is-danger is-light is-small compose-cancel-send-btn'; + cancelBtn.textContent = 'Cancel Send'; + cancelBtn.addEventListener('click', function () { + // Post cancel by service+identifier + const payload = new URLSearchParams(); + payload.set('service', thread.dataset.service || ''); + payload.set('identifier', thread.dataset.identifier || ''); + fetch('{% url "compose_cancel_send" %}', { + method: 'POST', + credentials: 'same-origin', + headers: { 'X-CSRFToken': '{{ csrf_token }}', 'Content-Type': 'application/x-www-form-urlencoded' }, + body: payload.toString(), + }).then(function (resp) { + // Hide cancel once requested + hideCancelButton(); + }).catch(function () { + hideCancelButton(); + }); + }); + if (statusBox) { + statusBox.appendChild(cancelBtn); + } + }; + const hideCancelButton = function () { + if (!cancelBtn) return; + try { cancelBtn.remove(); } catch (e) {} + cancelBtn = null; + }; + + // Show cancel on submit; htmx will make the request asynchronously. + form.addEventListener('submit', function (ev) { + // Only show when send confirmation allows + if (sendButton && sendButton.disabled) return; + showCancelButton(); + }); + + // Hide cancel after HTMX request completes + form.addEventListener('htmx:afterRequest', function () { hideCancelButton(); }); + panelState.eventHandler = function (event) { const detail = (event && event.detail) || {}; const sourcePanelId = String(detail.panel_id || ""); @@ -2887,6 +3079,114 @@ }; document.body.addEventListener("composeMessageSent", panelState.eventHandler); + // Persistent queued-command handling: when server returns composeSendCommandId + // HTMX will dispatch a `composeSendCommandId` event with detail {command_id: "..."}. + panelState.pendingCommandId = null; + panelState.pendingCommandPoll = null; + + const startPendingCommandPolling = function (commandId) { + if (!commandId) return; + panelState.pendingCommandId = commandId; + // Show persistent cancel UI + showPersistentCancelButton(commandId); + // Poll for result every 1500ms + if (panelState.pendingCommandPoll) { + clearInterval(panelState.pendingCommandPoll); + } + panelState.pendingCommandPoll = setInterval(async function () { + try { + const url = new URL('{% url "compose_command_result" %}', window.location.origin); + url.searchParams.set('service', thread.dataset.service || ''); + url.searchParams.set('command_id', commandId); + const resp = await fetch(url.toString(), { credentials: 'same-origin' }); + if (!resp.ok) return; + const payload = await resp.json(); + if (payload && payload.pending === false) { + // Stop polling + stopPendingCommandPolling(); + // Hide cancel UI + hidePersistentCancelButton(); + // Surface result to the user + const result = payload.result || {}; + if (result.ok) { + setStatus('', 'success'); + textarea.value = ''; + autosize(); + flashCompose('is-send-success'); + poll(true); + } else { + const msg = String(result.error || 'Send failed.'); + setStatus(msg, 'danger'); + flashCompose('is-send-fail'); + poll(true); + } + } + } catch (e) { + // ignore transient network errors + } + }, 1500); + }; + + const stopPendingCommandPolling = function () { + if (panelState.pendingCommandPoll) { + clearInterval(panelState.pendingCommandPoll); + panelState.pendingCommandPoll = null; + } + panelState.pendingCommandId = null; + }; + + const persistentCancelContainerId = panelId + '-persistent-cancel'; + const showPersistentCancelButton = function (commandId) { + hidePersistentCancelButton(); + const container = document.createElement('div'); + container.id = persistentCancelContainerId; + container.style.marginTop = '0.35rem'; + const btn = document.createElement('button'); + btn.type = 'button'; + btn.className = 'button is-danger is-light is-small compose-persistent-cancel-btn'; + btn.textContent = 'Cancel Queued Send'; + btn.addEventListener('click', function () { + const payload = new URLSearchParams(); + payload.set('service', thread.dataset.service || ''); + payload.set('identifier', thread.dataset.identifier || ''); + payload.set('command_id', String(commandId || '')); + fetch('{% url "compose_cancel_send" %}', { + method: 'POST', + credentials: 'same-origin', + headers: { 'X-CSRFToken': '{{ csrf_token }}', 'Content-Type': 'application/x-www-form-urlencoded' }, + body: payload.toString(), + }).then(function (resp) { + stopPendingCommandPolling(); + hidePersistentCancelButton(); + setStatus('Send cancelled.', 'warning'); + poll(true); + }).catch(function () { + hidePersistentCancelButton(); + }); + }); + container.appendChild(btn); + if (statusBox) { + statusBox.appendChild(container); + } + }; + + const hidePersistentCancelButton = function () { + try { + const el = document.getElementById(persistentCancelContainerId); + if (el) el.remove(); + } catch (e) {} + }; + + document.body.addEventListener('composeSendCommandId', function (ev) { + try { + const detail = (ev && ev.detail) || {}; + const cmd = (detail && detail.command_id) || (detail && detail.composeSendCommandId && detail.composeSendCommandId.command_id) || null; + if (cmd) { + startPendingCommandPolling(String(cmd)); + } + } catch (e) {} + }); + panelState.sendResultHandler = function (event) { const detail = (event && event.detail) || {}; const sourcePanelId = String(detail.panel_id || ""); diff --git a/core/views/compose.py b/core/views/compose.py index cce6c51..d6e6349 100644 --- a/core/views/compose.py +++ b/core/views/compose.py @@ -4,8 +4,9 @@ import hashlib import json import re import time +from datetime import datetime +from datetime import timezone as dt_timezone from difflib import SequenceMatcher -from datetime import datetime, timezone as dt_timezone from urllib.parse import quote_plus, urlencode, urlparse from asgiref.sync import async_to_sync @@ -40,7 +41,11 @@ from core.models import ( WorkspaceConversation, ) from core.realtime.typing_state import get_person_typing_state -from core.views.workspace import INSIGHT_METRICS, _build_engage_payload, _parse_draft_options +from core.views.workspace import ( + INSIGHT_METRICS, + _build_engage_payload, + _parse_draft_options, +) COMPOSE_WS_TOKEN_SALT = "compose-ws" COMPOSE_ENGAGE_TOKEN_SALT = "compose-engage" @@ -129,7 +134,9 @@ def _extract_urls(text_value: str) -> list[str]: def _is_url_only_text(text_value: str) -> bool: - lines = [line.strip() for line in str(text_value or "").splitlines() if line.strip()] + lines = [ + line.strip() for line in str(text_value or "").splitlines() if line.strip() + ] if not lines: return False return all(bool(URL_PATTERN.fullmatch(line)) for line in lines) @@ -150,10 +157,14 @@ def _is_xmpp_share_url(url_value: str) -> bool: return False parsed = urlparse(url_value) host = str(parsed.netloc or "").strip().lower() - configured = str( - getattr(settings, "XMPP_UPLOAD_SERVICE", "") - or getattr(settings, "XMPP_UPLOAD_JID", "") - ).strip().lower() + configured = ( + str( + getattr(settings, "XMPP_UPLOAD_SERVICE", "") + or getattr(settings, "XMPP_UPLOAD_JID", "") + ) + .strip() + .lower() + ) if not configured: return False configured_host = configured @@ -200,15 +211,21 @@ def _extract_attachment_image_urls(blob) -> list[str]: return urls if isinstance(blob, dict): - content_type = str( - blob.get("content_type") - or blob.get("contentType") - or blob.get("mime_type") - or blob.get("mimetype") - or "" - ).strip().lower() + content_type = ( + str( + blob.get("content_type") + or blob.get("contentType") + or blob.get("mime_type") + or blob.get("mimetype") + or "" + ) + .strip() + .lower() + ) filename = str(blob.get("filename") or blob.get("fileName") or "").strip() - image_hint = content_type.startswith("image/") or _looks_like_image_name(filename) + image_hint = content_type.startswith("image/") or _looks_like_image_name( + filename + ) direct_urls = [] for key in ("url", "source_url", "download_url", "proxy_url", "href", "uri"): @@ -264,7 +281,9 @@ def _attachment_image_urls_by_message(messages): ).order_by("ts") for event in linked_events: - legacy_id = str((event.raw_payload_ref or {}).get("legacy_message_id") or "").strip() + legacy_id = str( + (event.raw_payload_ref or {}).get("legacy_message_id") or "" + ).strip() if not legacy_id: continue urls = _uniq_ordered( @@ -296,9 +315,7 @@ def _attachment_image_urls_by_message(messages): continue msg_ts = int(msg.ts or 0) candidates = [ - event - for event in fallback_list - if abs(int(event.ts or 0) - msg_ts) <= 3000 + event for event in fallback_list if abs(int(event.ts or 0) - msg_ts) <= 3000 ] if not candidates: continue @@ -322,8 +339,51 @@ def _serialize_message(msg: Message) -> dict: and _is_url_only_text(text_value) and all(_looks_like_image_url(url) for url in image_urls) ) - display_text = text_value if text_value.strip() else ("(no text)" if not image_url else "") + display_text = ( + text_value if text_value.strip() else ("(no text)" if not image_url else "") + ) author = str(msg.custom_author or "").strip() + is_outgoing = _is_outgoing(msg) + + # Determine source service for display: prefer explicit session identifier service + source_service = "web" + try: + if getattr(msg, "session", None) and getattr(msg.session, "identifier", None): + svc = str(msg.session.identifier.service or "").strip().lower() + if svc: + source_service = svc + except Exception: + pass + + from core.util import logs as util_logs + + logger = util_logs.get_logger("compose") + logger.info( + f"[serialize_message] id={msg.id} author={author} is_outgoing={is_outgoing} source_service={source_service}" + ) + + # For outgoing messages sent from web UI, label as "Web Chat". + # For incoming messages, use the session's service name (Xmpp, Signal, Whatsapp, etc). + # But if source_service is still "web" and message is incoming, it may be a data issue— + # don't label it as "Web Chat" since that's misleading. + if is_outgoing: + source_label = "Web Chat" + else: + # Incoming message: use service-specific labels + service_labels = { + "xmpp": "XMPP", + "whatsapp": "WhatsApp", + "signal": "Signal", + "instagram": "Instagram", + "web": "External", # Fallback if service not identified + } + source_label = service_labels.get( + source_service, source_service.title() if source_service else "Unknown" + ) + + # Ensure source_label is never empty for UI rendering + if not source_label: + source_label = "Unknown" delivered_ts = int(msg.delivered_ts or 0) read_ts = int(msg.read_ts or 0) delivered_display = _format_ts_label(int(delivered_ts)) if delivered_ts else "" @@ -331,6 +391,17 @@ def _serialize_message(msg: Message) -> dict: ts_val = int(msg.ts or 0) delivered_delta = int(delivered_ts - ts_val) if delivered_ts and ts_val else None read_delta = int(read_ts - ts_val) if read_ts and ts_val else None + # Human friendly delta strings + delivered_delta_display = ( + _format_gap_duration(delivered_delta) if delivered_delta is not None else "" + ) + read_delta_display = ( + _format_gap_duration(read_delta) if read_delta is not None else "" + ) + # Receipt payload and metadata + receipt_payload = msg.receipt_payload or {} + read_source_service = str(msg.read_source_service or "").strip() + read_by_identifier = str(msg.read_by_identifier or "").strip() return { "id": str(msg.id), @@ -343,12 +414,19 @@ def _serialize_message(msg: Message) -> dict: "hide_text": hide_text, "author": author, "outgoing": _is_outgoing(msg), + "source_service": source_service, + "source_label": source_label, "delivered_ts": delivered_ts, "read_ts": read_ts, "delivered_display": delivered_display, "read_display": read_display, "delivered_delta": delivered_delta, "read_delta": read_delta, + "delivered_delta_display": delivered_delta_display, + "read_delta_display": read_delta_display, + "receipt_payload": receipt_payload, + "read_source_service": read_source_service, + "read_by_identifier": read_by_identifier, } @@ -510,9 +588,8 @@ def _workspace_conversation_for_person(user, person): def _counterpart_identifiers_for_person(user, person): if person is None: return set() - values = ( - PersonIdentifier.objects.filter(user=user, person=person) - .values_list("identifier", flat=True) + values = PersonIdentifier.objects.filter(user=user, person=person).values_list( + "identifier", flat=True ) return {str(value or "").strip() for value in values if str(value or "").strip()} @@ -598,13 +675,17 @@ def _build_thread_metric_fragments(conversation): def _build_gap_fragment(is_outgoing_reply, lag_ms, snapshot): - metric_slug = "outbound_response_score" if is_outgoing_reply else "inbound_response_score" + metric_slug = ( + "outbound_response_score" if is_outgoing_reply else "inbound_response_score" + ) copy = _metric_copy(metric_slug, "Response Score") score_value = None if snapshot is not None: score_value = getattr( snapshot, - "outbound_response_score" if is_outgoing_reply else "inbound_response_score", + "outbound_response_score" + if is_outgoing_reply + else "inbound_response_score", None, ) if score_value is None: @@ -651,7 +732,9 @@ def _serialize_messages_with_artifacts( item["metric_fragments"] = [] counterpart_identifiers = set(counterpart_identifiers or []) - snapshot = conversation.metric_snapshots.first() if conversation is not None else None + snapshot = ( + conversation.metric_snapshots.first() if conversation is not None else None + ) prev_msg = seed_previous prev_ts = int(prev_msg.ts or 0) if prev_msg is not None else None @@ -663,7 +746,9 @@ def _serialize_messages_with_artifacts( for idx, msg in enumerate(rows): current_ts = int(msg.ts or 0) - current_outgoing = _message_is_outgoing_for_analysis(msg, counterpart_identifiers) + current_outgoing = _message_is_outgoing_for_analysis( + msg, counterpart_identifiers + ) if ( prev_msg is not None and prev_ts is not None @@ -680,7 +765,9 @@ def _serialize_messages_with_artifacts( prev_outgoing = current_outgoing if serialized: - serialized[-1]["metric_fragments"] = _build_thread_metric_fragments(conversation) + serialized[-1]["metric_fragments"] = _build_thread_metric_fragments( + conversation + ) return serialized @@ -770,12 +857,7 @@ def _build_glance_items(serialized_messages, person_id=None): def _owner_name(user) -> str: - return ( - user.first_name - or user.get_full_name().strip() - or user.username - or "Me" - ) + return user.first_name or user.get_full_name().strip() or user.username or "Me" def _compose_ws_token(user_id, service, identifier, person_id): @@ -789,7 +871,9 @@ def _compose_ws_token(user_id, service, identifier, person_id): return signing.dumps(payload, salt=COMPOSE_WS_TOKEN_SALT) -def _compose_ai_cache_key(kind, user_id, service, identifier, person_id, last_ts, limit): +def _compose_ai_cache_key( + kind, user_id, service, identifier, person_id, last_ts, limit +): raw = "|".join( [ str(kind or ""), @@ -825,7 +909,9 @@ def _engage_body_only(value): def _messages_for_ai(user, person_identifier, limit): if person_identifier is None: return [] - session, _ = ChatSession.objects.get_or_create(user=user, identifier=person_identifier) + session, _ = ChatSession.objects.get_or_create( + user=user, identifier=person_identifier + ) rows = list( Message.objects.filter(user=user, session=session) .select_related("session", "session__identifier", "session__identifier__person") @@ -949,7 +1035,9 @@ def _trend_meta(current, previous, higher_is_better=True): improves = is_up if higher_is_better else not is_up return { "direction": "up" if is_up else "down", - "icon": "fa-solid fa-arrow-trend-up" if is_up else "fa-solid fa-arrow-trend-down", + "icon": "fa-solid fa-arrow-trend-up" + if is_up + else "fa-solid fa-arrow-trend-down", "class_name": "has-text-success" if improves else "has-text-danger", "meaning": "Improving signal" if improves else "Risk signal", } @@ -1443,7 +1531,9 @@ def _manual_contact_rows(user): if key in seen: return seen.add(key) - urls = _compose_urls(service_key, identifier_value, person.id if person else None) + urls = _compose_urls( + service_key, identifier_value, person.id if person else None + ) linked_person_name = person.name if person else "" detected = _clean_detected_name(detected_name or account or "") person_name = linked_person_name or detected or identifier_value @@ -1502,7 +1592,9 @@ def _manual_contact_rows(user): person=(linked.person if linked else None), source="signal_chat", account=str(chat.account or ""), - detected_name=_clean_detected_name(chat.source_name or chat.account or ""), + detected_name=_clean_detected_name( + chat.source_name or chat.account or "" + ), ) whatsapp_links = { @@ -1529,7 +1621,9 @@ def _manual_contact_rows(user): continue if _normalize_contact_key(candidate) in wa_account_keys: continue - detected_name = _clean_detected_name(item.get("name") or item.get("chat") or "") + detected_name = _clean_detected_name( + item.get("name") or item.get("chat") or "" + ) if detected_name.lower() == "linked account": continue linked = whatsapp_links.get(candidate) @@ -1572,7 +1666,10 @@ def _recent_manual_contacts( current_person_id = str(current_person.id) if current_person else "" row_by_key = { - (str(row.get("service") or "").strip().lower(), str(row.get("identifier") or "").strip()): row + ( + str(row.get("service") or "").strip().lower(), + str(row.get("identifier") or "").strip(), + ): row for row in all_rows } by_person_service = {} @@ -1716,8 +1813,12 @@ def _recent_manual_contacts( seen_unknown.add(unknown_key) row["service_label"] = _service_label(service_key) for svc in ("signal", "whatsapp", "instagram", "xmpp"): - row[f"{svc}_identifier"] = identifier_value if svc == service_key else "" - row[f"{svc}_compose_url"] = row.get("compose_url") if svc == service_key else "" + row[f"{svc}_identifier"] = ( + identifier_value if svc == service_key else "" + ) + row[f"{svc}_compose_url"] = ( + row.get("compose_url") if svc == service_key else "" + ) row[f"{svc}_compose_widget_url"] = ( row.get("compose_widget_url") if svc == service_key else "" ) @@ -1855,7 +1956,9 @@ def _panel_context( for service_key in sorted(by_service.keys(), key=_service_order): identifier_value = by_service[service_key] - option_urls = _compose_urls(service_key, identifier_value, base["person"].id) + option_urls = _compose_urls( + service_key, identifier_value, base["person"].id + ) platform_options.append( { "service": service_key, @@ -2122,7 +2225,9 @@ class ComposeContactMatch(LoginRequiredMixin, View): row.save(update_fields=["person"]) message = f"Re-linked {identifier} ({service}) to {person.name}." else: - message = f"{identifier} ({service}) is already linked to {person.name}." + message = ( + f"{identifier} ({service}) is already linked to {person.name}." + ) linked_companions = 0 skipped_companions = 0 @@ -2247,7 +2352,8 @@ class ComposeThread(LoginRequiredMixin, View): user=request.user, identifier=base["person_identifier"], ) - session_ids = list({*session_ids, int(session.id)}) + # Don't convert UUIDs to int; keep them as UUIDs for the filter query + session_ids = list({*session_ids, session.id}) if session_ids: base_queryset = Message.objects.filter( user=request.user, @@ -2264,8 +2370,7 @@ class ComposeThread(LoginRequiredMixin, View): "session", "session__identifier", "session__identifier__person", - ) - .order_by("ts")[:limit] + ).order_by("ts")[:limit] ) newest = ( Message.objects.filter( @@ -2328,6 +2433,7 @@ class ComposeHistorySync(LoginRequiredMixin, View): values.add(local) return [value for value in values if value] + @classmethod @classmethod def _session_ids_for_scope( cls, @@ -2370,12 +2476,13 @@ class ComposeHistorySync(LoginRequiredMixin, View): unique_ids.append(row_id) if not unique_ids: return [] - return list( + result = list( ChatSession.objects.filter( user=user, identifier_id__in=unique_ids, ).values_list("id", flat=True) ) + return result @staticmethod def _reconcile_duplicate_messages(user, session_ids): @@ -2417,7 +2524,11 @@ class ComposeHistorySync(LoginRequiredMixin, View): person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return JsonResponse( - {"ok": False, "message": "Missing contact identifier.", "level": "danger"} + { + "ok": False, + "message": "Missing contact identifier.", + "level": "danger", + } ) base = _context_base(request.user, service, identifier, person) @@ -2575,6 +2686,44 @@ class ComposeHistorySync(LoginRequiredMixin, View): ) +class ComposeCancelSend(LoginRequiredMixin, View): + def post(self, request): + service = _default_service(request.POST.get("service")) + identifier = str(request.POST.get("identifier") or "").strip() + command_id = str(request.POST.get("command_id") or "").strip() + if not identifier: + return JsonResponse({"ok": False, "error": "missing_identifier"}) + # If a specific command_id is supplied, cancel that command only. + if command_id: + ok = transport.cancel_runtime_command(service, command_id) + return JsonResponse({"ok": True, "cancelled": [command_id] if ok else []}) + cancelled = transport.cancel_runtime_commands_for_recipient(service, identifier) + return JsonResponse({"ok": True, "cancelled": cancelled}) + + +class ComposeCommandResult(LoginRequiredMixin, View): + """Return the runtime command result for a queued send (if available). + + GET parameters: `service`, `command_id`. + Returns JSON: if pending -> {"pending": True}, else returns the result dict. + """ + + def get(self, request): + service = _default_service(request.GET.get("service")) + command_id = str(request.GET.get("command_id") or "").strip() + if not command_id: + return JsonResponse( + {"ok": False, "error": "missing_command_id"}, status=400 + ) + # Non-blocking check for runtime command result + result = async_to_sync(transport.wait_runtime_command_result)( + service, command_id, timeout=0.1 + ) + if result is None: + return JsonResponse({"pending": True}) + return JsonResponse({"pending": False, "result": result}) + + class ComposeMediaBlob(LoginRequiredMixin, View): """ Serve cached media blobs for authenticated compose image previews. @@ -2773,21 +2922,23 @@ class ComposeQuickInsights(LoginRequiredMixin, View): "thread": "", "last_event": "", "last_ai_run": "", - "workspace_created": "", - "snapshot_count": 0, - "platform_docs": _metric_copy("platform", "Platform"), - "state_docs": _metric_copy("stability_state", "Participant State"), - "thread_docs": _metric_copy("thread", "Thread"), - "snapshot_docs": { - "calculation": ( - "Count of stored workspace metric snapshots for this person." - ), - "psychology": ( - "More points improve trend reliability; sparse points are " - "best treated as directional signals." + "workspace_created": "", + "snapshot_count": 0, + "platform_docs": _metric_copy("platform", "Platform"), + "state_docs": _metric_copy( + "stability_state", "Participant State" ), + "thread_docs": _metric_copy("thread", "Thread"), + "snapshot_docs": { + "calculation": ( + "Count of stored workspace metric snapshots for this person." + ), + "psychology": ( + "More points improve trend reliability; sparse points are " + "best treated as directional signals." + ), + }, }, - }, "rows": [], "docs": [ "Quick Insights needs at least one workspace conversation snapshot.", @@ -2935,7 +3086,9 @@ class ComposeEngagePreview(LoginRequiredMixin, View): ) preview = str(payload.get("preview") or "").strip() outbound = _engage_body_only(payload.get("outbound") or "") - artifact_label = f"{source_kind.title()}: {getattr(source_obj, 'title', '')}" + artifact_label = ( + f"{source_kind.title()}: {getattr(source_obj, 'title', '')}" + ) else: ai_obj = AI.objects.filter(user=request.user).first() if ai_obj is not None: @@ -3062,6 +3215,11 @@ class ComposeSend(LoginRequiredMixin, View): "panel_id": str(panel_id or ""), } } + # Optional: include command id to allow client-side cancellation UI + if hasattr(request, "_compose_command_id") and request._compose_command_id: + trigger_payload["composeSendCommandId"] = { + "command_id": str(request._compose_command_id) + } if ok: trigger_payload["composeMessageSent"] = {"panel_id": str(panel_id or "")} response["HX-Trigger"] = json.dumps(trigger_payload) @@ -3104,12 +3262,48 @@ class ComposeSend(LoginRequiredMixin, View): ) base = _context_base(request.user, service, identifier, person) - ts = async_to_sync(transport.send_message_raw)( - base["service"], - base["identifier"], - text=text, - attachments=[], + from core.util import logs as util_logs + + logger = util_logs.get_logger("compose") + log_prefix = ( + f"[ComposeSend] service={base['service']} identifier={base['identifier']}" ) + logger.info(f"{log_prefix} text_len={len(text)} attempting send") + + # If runtime is out-of-process, enqueue command and return immediately (non-blocking). + # Expose command id for cancellation so the client can cancel or poll later. + runtime_client = transport.get_runtime_client(base["service"]) or None + logger.info( + f"{log_prefix} runtime_client={type(runtime_client).__name__ if runtime_client else 'None (queued)'}" + ) + ts = None + command_id = None + if runtime_client is None: + logger.info(f"{log_prefix} enqueuing runtime command (out-of-process)") + command_id = transport.enqueue_runtime_command( + base["service"], + "send_message_raw", + {"recipient": base["identifier"], "text": text, "attachments": []}, + ) + logger.info( + f"{log_prefix} command_id={command_id} enqueued, returning immediately" + ) + # attach command id to request so _response can include it in HX-Trigger + request._compose_command_id = command_id + # Do NOT wait here — return immediately so the UI doesn't block. + # Record a pending message locally so the thread shows the outgoing message. + ts = int(time.time() * 1000) + else: + # In-process runtime can perform the send synchronously and return a timestamp. + logger.info(f"{log_prefix} calling in-process send_message_raw (blocking)") + ts = async_to_sync(transport.send_message_raw)( + base["service"], + base["identifier"], + text=text, + attachments=[], + ) + logger.info(f"{log_prefix} in-process send returned ts={ts}") + # For queued sends we set `ts` to a local timestamp; for in-process sends ts may be False. if not ts: return self._response( request, @@ -3124,15 +3318,34 @@ class ComposeSend(LoginRequiredMixin, View): user=request.user, identifier=base["person_identifier"], ) - Message.objects.create( + logger.info(f"{log_prefix} session_id={session.id}") + # For in-process sends (Signal, etc), ts is a timestamp or True. + # For queued sends (WhatsApp/UR), ts is a local timestamp. + # Set delivered_ts only if we got a real timestamp OR if it's an in-process sync send. + msg_ts = int(ts) if str(ts).isdigit() else int(time.time() * 1000) + delivered_ts = msg_ts if runtime_client is not None else None + msg = Message.objects.create( user=request.user, session=session, sender_uuid="", text=text, - ts=int(ts) if str(ts).isdigit() else int(time.time() * 1000), - delivered_ts=int(ts) if str(ts).isdigit() else None, + ts=msg_ts, + delivered_ts=delivered_ts, custom_author="USER", ) + logger.info( + f"{log_prefix} created message id={msg.id} ts={msg_ts} delivered_ts={delivered_ts} custom_author=USER" + ) + + # If we enqueued, inform the client the message is queued and include command id. + if runtime_client is None: + return self._response( + request, + ok=True, + message="Message queued for sending.", + level="info", + panel_id=panel_id, + ) return self._response( request, diff --git a/core/views/groups.py b/core/views/groups.py index 364d374..01d5802 100644 --- a/core/views/groups.py +++ b/core/views/groups.py @@ -3,8 +3,8 @@ from mixins.views import ObjectCreate, ObjectDelete, ObjectUpdate from core.forms import GroupForm from core.models import Group -from core.views.osint import OSINTListBase from core.util import logs +from core.views.osint import OSINTListBase log = logs.get_logger(__name__) diff --git a/core/views/manipulations.py b/core/views/manipulations.py index 4f7420e..e116335 100644 --- a/core/views/manipulations.py +++ b/core/views/manipulations.py @@ -3,8 +3,8 @@ from mixins.views import ObjectCreate, ObjectDelete, ObjectUpdate from core.forms import ManipulationForm from core.models import Manipulation -from core.views.osint import OSINTListBase from core.util import logs +from core.views.osint import OSINTListBase log = logs.get_logger(__name__) diff --git a/core/views/osint.py b/core/views/osint.py index bba4692..809e80d 100644 --- a/core/views/osint.py +++ b/core/views/osint.py @@ -17,7 +17,7 @@ from django.urls import reverse from django.views import View from mixins.views import ObjectList -from core.models import Group, Manipulation, Persona, Person +from core.models import Group, Manipulation, Person, Persona def _context_type(request_type: str) -> str: @@ -82,9 +82,7 @@ def _url_with_query(base_url: str, query: dict[str, Any]) -> str: return f"{base_url}?{urlencode(params)}" -def _merge_query( - current_query: dict[str, Any], **updates: Any -) -> dict[str, Any]: +def _merge_query(current_query: dict[str, Any], **updates: Any) -> dict[str, Any]: merged = dict(current_query) for key, value in updates.items(): if value is None or str(value).strip() == "": @@ -695,9 +693,7 @@ class OSINTSearch(LoginRequiredMixin, View): per_page_default = 20 per_page_max = 100 - def _field_options( - self, model_cls: type[models.Model] - ) -> list[dict[str, str]]: + def _field_options(self, model_cls: type[models.Model]) -> list[dict[str, str]]: options = [] for field in model_cls._meta.get_fields(): # Skip reverse/accessor relations (e.g. ManyToManyRel) that are not @@ -768,16 +764,18 @@ class OSINTSearch(LoginRequiredMixin, View): if isinstance(field, models.ForeignKey): related_text_field = _preferred_related_text_field(field.related_model) if related_text_field: - return Q( - **{f"{field_name}__{related_text_field}__icontains": query} - ), False + return ( + Q(**{f"{field_name}__{related_text_field}__icontains": query}), + False, + ) return Q(**{f"{field_name}__id__icontains": query}), False if isinstance(field, models.ManyToManyField): related_text_field = _preferred_related_text_field(field.related_model) if related_text_field: - return Q( - **{f"{field_name}__{related_text_field}__icontains": query} - ), True + return ( + Q(**{f"{field_name}__{related_text_field}__icontains": query}), + True, + ) return Q(**{f"{field_name}__id__icontains": query}), True return None, False diff --git a/core/views/people.py b/core/views/people.py index d7dee91..6937908 100644 --- a/core/views/people.py +++ b/core/views/people.py @@ -3,8 +3,8 @@ from mixins.views import ObjectCreate, ObjectDelete, ObjectUpdate from core.forms import PersonForm from core.models import Person -from core.views.osint import OSINTListBase from core.util import logs +from core.views.osint import OSINTListBase log = logs.get_logger(__name__) diff --git a/core/views/personas.py b/core/views/personas.py index 38b361d..0a27bff 100644 --- a/core/views/personas.py +++ b/core/views/personas.py @@ -3,8 +3,8 @@ from mixins.views import ObjectCreate, ObjectDelete, ObjectUpdate from core.forms import PersonaForm from core.models import Persona -from core.views.osint import OSINTListBase from core.util import logs +from core.views.osint import OSINTListBase log = logs.get_logger(__name__) diff --git a/core/views/signal.py b/core/views/signal.py index 0aaeb74..e90e128 100644 --- a/core/views/signal.py +++ b/core/views/signal.py @@ -1,9 +1,10 @@ +from urllib.parse import urlencode + import orjson import requests from django.conf import settings from django.shortcuts import render from django.urls import reverse -from urllib.parse import urlencode from django.views import View from mixins.views import ObjectList, ObjectRead diff --git a/core/views/system.py b/core/views/system.py index c7d11b4..db0a09d 100644 --- a/core/views/system.py +++ b/core/views/system.py @@ -19,8 +19,8 @@ from core.models import ( PatternMitigationPlan, PatternMitigationRule, Person, - PersonIdentifier, Persona, + PersonIdentifier, QueuedMessage, WorkspaceConversation, WorkspaceMetricSnapshot, @@ -37,7 +37,9 @@ class SystemSettings(SuperUserRequiredMixin, View): "messages": Message.objects.filter(user=user).count(), "queued_messages": QueuedMessage.objects.filter(user=user).count(), "message_events": MessageEvent.objects.filter(user=user).count(), - "workspace_conversations": WorkspaceConversation.objects.filter(user=user).count(), + "workspace_conversations": WorkspaceConversation.objects.filter( + user=user + ).count(), "workspace_snapshots": WorkspaceMetricSnapshot.objects.filter( conversation__user=user ).count(), @@ -57,7 +59,9 @@ class SystemSettings(SuperUserRequiredMixin, View): "mitigation_auto_settings": PatternMitigationAutoSettings.objects.filter( user=user ).count(), - "mitigation_exports": PatternArtifactExport.objects.filter(user=user).count(), + "mitigation_exports": PatternArtifactExport.objects.filter( + user=user + ).count(), "osint_people": Person.objects.filter(user=user).count(), "osint_identifiers": PersonIdentifier.objects.filter(user=user).count(), "osint_groups": Group.objects.filter(user=user).count(), @@ -77,7 +81,9 @@ class SystemSettings(SuperUserRequiredMixin, View): deleted += AIResult.objects.filter(user=user).delete()[0] deleted += AIRequest.objects.filter(user=user).delete()[0] deleted += MemoryItem.objects.filter(user=user).delete()[0] - deleted += WorkspaceMetricSnapshot.objects.filter(conversation__user=user).delete()[0] + deleted += WorkspaceMetricSnapshot.objects.filter( + conversation__user=user + ).delete()[0] deleted += MessageEvent.objects.filter(user=user).delete()[0] deleted += Message.objects.filter(user=user).delete()[0] deleted += QueuedMessage.objects.filter(user=user).delete()[0] diff --git a/core/views/whatsapp.py b/core/views/whatsapp.py index a7ea593..2cb8784 100644 --- a/core/views/whatsapp.py +++ b/core/views/whatsapp.py @@ -1,15 +1,16 @@ +import time +from urllib.parse import urlencode + from django.shortcuts import render from django.urls import reverse -from urllib.parse import urlencode from django.views import View from mixins.views import ObjectList, ObjectRead from core.clients import transport from core.models import ChatSession, Message, PersonIdentifier +from core.util import logs from core.views.compose import _compose_urls, _service_icon_class from core.views.manage.permissions import SuperUserRequiredMixin -from core.util import logs -import time log = logs.get_logger("whatsapp_view") @@ -32,16 +33,13 @@ class WhatsApp(SuperUserRequiredMixin, View): ) def delete(self, request, *args, **kwargs): - account = ( - str(request.GET.get("account") or "").strip() - or next( - ( - str(item or "").strip() - for item in transport.list_accounts("whatsapp") - if str(item or "").strip() - ), - "", - ) + account = str(request.GET.get("account") or "").strip() or next( + ( + str(item or "").strip() + for item in transport.list_accounts("whatsapp") + if str(item or "").strip() + ), + "", ) if account: transport.unlink_account("whatsapp", account) @@ -381,9 +379,7 @@ class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead): def _detail_context(self, kwargs, obj): detail_url_args = { - arg: kwargs[arg] - for arg in self.detail_url_args - if arg in kwargs + arg: kwargs[arg] for arg in self.detail_url_args if arg in kwargs } return { "object": obj, @@ -410,7 +406,9 @@ class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead): sqlite_scanned = int(state.get("history_sqlite_scanned") or 0) on_demand_requested = bool(state.get("history_on_demand_requested")) on_demand_error = str(state.get("history_on_demand_error") or "").strip() or "-" - on_demand_anchor = str(state.get("history_on_demand_anchor") or "").strip() or "-" + on_demand_anchor = ( + str(state.get("history_on_demand_anchor") or "").strip() or "-" + ) history_running = bool(state.get("history_sync_running")) return [ f"connected={bool(state.get('connected'))}", diff --git a/core/views/workspace.py b/core/views/workspace.py index f83eb1f..ae4f604 100644 --- a/core/views/workspace.py +++ b/core/views/workspace.py @@ -147,8 +147,7 @@ INSIGHT_METRICS = { "group": "stability", "history_field": "stability_score", "calculation": ( - "0.35*reciprocity + 0.25*continuity + 0.20*response + " - "0.20*volatility." + "0.35*reciprocity + 0.25*continuity + 0.20*response + " "0.20*volatility." ), "psychology": ( "Higher values suggest consistent mutual engagement patterns; falling " @@ -176,9 +175,7 @@ INSIGHT_METRICS = { "100 * min(1, distinct_sample_days / span_days). Higher means steadier " "day-to-day continuity." ), - "psychology": ( - "Drops can signal communication becoming episodic or reactive." - ), + "psychology": ("Drops can signal communication becoming episodic or reactive."), }, "response_score": { "title": "Response Component", @@ -232,8 +229,7 @@ INSIGHT_METRICS = { "history_field": "stability_sample_days", "calculation": "Count of distinct calendar days represented in the sample.", "psychology": ( - "Coverage across days better captures rhythm, not just intensity " - "bursts." + "Coverage across days better captures rhythm, not just intensity " "bursts." ), }, "stability_computed": { @@ -250,9 +246,7 @@ INSIGHT_METRICS = { "title": "Commit In", "group": "commitment", "history_field": "commitment_inbound_score", - "calculation": ( - "0.60*inbound_response_score + 0.40*inbound_balance_score." - ), + "calculation": ("0.60*inbound_response_score + 0.40*inbound_balance_score."), "psychology": ( "Estimates counterpart follow-through and reciprocity toward the user." ), @@ -261,9 +255,7 @@ INSIGHT_METRICS = { "title": "Commit Out", "group": "commitment", "history_field": "commitment_outbound_score", - "calculation": ( - "0.60*outbound_response_score + 0.40*outbound_balance_score." - ), + "calculation": ("0.60*outbound_response_score + 0.40*outbound_balance_score."), "psychology": ( "Estimates user follow-through and consistency toward the counterpart." ), @@ -931,16 +923,22 @@ def _metric_psychological_read(metric_slug, conversation): if score is None: return "Calibrating: collect more interaction data before interpreting." if score >= 70: - return "Pattern suggests low relational friction and resilient repair cycles." + return ( + "Pattern suggests low relational friction and resilient repair cycles." + ) if score >= 50: return "Pattern suggests moderate strain; monitor for repeated escalation loops." - return "Pattern suggests high friction risk; prioritise safety and repair pacing." + return ( + "Pattern suggests high friction risk; prioritise safety and repair pacing." + ) if metric_slug == "stability_confidence": conf = conversation.stability_confidence or 0.0 if conf < 0.25: return "Low certainty: treat this as a weak signal, not a conclusion." if conf < 0.6: - return "Moderate certainty: useful directional cue, still context-dependent." + return ( + "Moderate certainty: useful directional cue, still context-dependent." + ) return "High certainty: trend interpretation is likely reliable." if metric_slug in {"commitment_inbound", "commitment_outbound"}: inbound = conversation.commitment_inbound_score @@ -3119,7 +3117,7 @@ def _ai_detect_violations(user, plan, person, recent_rows, metric_context=None): "clarification": "proactive correction mapped to an artifact", "severity": "low|medium|high", } - ] + ], }, } prompt = [ @@ -3673,7 +3671,9 @@ class AIWorkspaceInformation(LoginRequiredMixin, View): latest_snapshot = conversation.metric_snapshots.first() directionality = _commitment_directionality_payload(conversation) commitment_graph_cards = [ - card for card in _all_graph_payload(conversation) if card["group"] == "commitment" + card + for card in _all_graph_payload(conversation) + if card["group"] == "commitment" ] graph_refs = [] diff --git a/docker-compose.yml b/docker-compose.yml index 630b5e9..8de9666 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -271,47 +271,10 @@ services: # memory: 0.25G #network_mode: host - # Optional watcher service to restart the runtime router (UR) when core code changes. - # This runs the `docker/watch_and_restart.py` script inside the same image and - # will restart the `ur_gia` container when files under `/code/core` change. - watch_ur: - image: xf/gia:prod - container_name: watch_ur_gia - build: - context: . - args: - OPERATION: ${OPERATION} - command: sh -c '. /venv/bin/activate && python docker/watch_and_restart.py' - volumes: - - ${REPO_DIR}:/code - - ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini - - ${APP_DATABASE_FILE}:/conf/db.sqlite3 - - type: bind - source: /code/vrun - target: /var/run - environment: - WATCH_PATHS: "/code/core" - TARGET_CONTAINER: "ur_gia" - - # Optional watcher service to restart the scheduling process when app code changes. - watch_scheduling: - image: xf/gia:prod - container_name: watch_scheduling_gia - build: - context: . - args: - OPERATION: ${OPERATION} - command: sh -c '. /venv/bin/activate && python docker/watch_and_restart.py' - volumes: - - ${REPO_DIR}:/code - - ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini - - ${APP_DATABASE_FILE}:/conf/db.sqlite3 - - type: bind - source: /code/vrun - target: /var/run - environment: - WATCH_PATHS: "/code/app" - TARGET_CONTAINER: "scheduling_gia" + # Watchers disabled - use manual restart for ur and scheduling services + # uWSGI auto-reload is enabled in uwsgi.ini for core code changes + # To restart ur after code changes: docker-compose restart ur + # To restart scheduling after code changes: docker-compose restart scheduling redis: image: redis diff --git a/docker/watch_and_restart.py b/docker/watch_and_restart.py index 1e683a6..abdd340 100644 --- a/docker/watch_and_restart.py +++ b/docker/watch_and_restart.py @@ -24,12 +24,13 @@ management command), you can "touch" any file under the watched path: The watcher ignores `__pycache__`, `.pyc` files and `.git` paths. """ import os +import subprocess import sys import time -import subprocess from pathlib import Path -from watchdog.observers import Observer + from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer class ChangeHandler(FileSystemEventHandler): @@ -48,7 +49,7 @@ class ChangeHandler(FileSystemEventHandler): def _check_and_restart(self, path): # Ignore pycache and compiled files - if '__pycache__' in path or '.pyc' in path or '.git' in path: + if "__pycache__" in path or ".pyc" in path or ".git" in path: return now = time.time() @@ -62,13 +63,16 @@ class ChangeHandler(FileSystemEventHandler): def _restart_ur(self): # Determine target container from environment (default `ur_gia`) - target = os.environ.get('TARGET_CONTAINER', 'ur_gia') + target = os.environ.get("TARGET_CONTAINER", "ur_gia") print(f'[{time.strftime("%H:%M:%S")}] Restarting {target}...', flush=True) # Try podman first (preferred in this setup), then docker cmd = f"podman restart {target} 2>/dev/null || docker restart {target} 2>/dev/null" result = subprocess.run(cmd, shell=True, capture_output=True) if result.returncode == 0: - print(f'[{time.strftime("%H:%M:%S")}] {target} restarted successfully', flush=True) + print( + f'[{time.strftime("%H:%M:%S")}] {target} restarted successfully', + flush=True, + ) else: print(f'[{time.strftime("%H:%M:%S")}] {target} restart failed', flush=True) time.sleep(1) @@ -80,17 +84,20 @@ def main(): # Allow overriding watched paths via environment variable `WATCH_PATHS`. # Default is `/code/core,/code/app` but you can set e.g. `WATCH_PATHS=/code/core` - watch_paths_env = os.environ.get('WATCH_PATHS', '/code/core,/code/app') - watch_paths = [p.strip() for p in watch_paths_env.split(',') if p.strip()] + watch_paths_env = os.environ.get("WATCH_PATHS", "/code/core,/code/app") + watch_paths = [p.strip() for p in watch_paths_env.split(",") if p.strip()] for path in watch_paths: if os.path.exists(path): observer.schedule(handler, path, recursive=True) - print(f'Watching: {path}', flush=True) + print(f"Watching: {path}", flush=True) else: - print(f'Not found (will not watch): {path}', flush=True) + print(f"Not found (will not watch): {path}", flush=True) observer.start() - print(f'[{time.strftime("%H:%M:%S")}] File watcher started. Monitoring for changes...', flush=True) + print( + f'[{time.strftime("%H:%M:%S")}] File watcher started. Monitoring for changes...', + flush=True, + ) try: while True: @@ -100,5 +107,5 @@ def main(): observer.join() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/docker/watch_simple.py b/docker/watch_simple.py index b3d3660..2ed9322 100644 --- a/docker/watch_simple.py +++ b/docker/watch_simple.py @@ -15,9 +15,9 @@ Touching a file under the watched path will trigger a restart of the target container; e.g. `touch /code/core/__restart__` will cause the watcher to act. """ import os +import subprocess import sys import time -import subprocess def get_mtime(path): @@ -25,9 +25,9 @@ def get_mtime(path): max_mtime = 0 for root, dirs, files in os.walk(path): # Skip pycache and hidden dirs - dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__'] + dirs[:] = [d for d in dirs if not d.startswith(".") and d != "__pycache__"] for file in files: - if file.endswith(('.pyc', '.pyo')): + if file.endswith((".pyc", ".pyo")): continue try: mtime = os.path.getmtime(os.path.join(root, file)) @@ -39,9 +39,9 @@ def get_mtime(path): def restart_ur(): """Restart target container (defaults to `ur_gia`).""" - target = os.environ.get('TARGET_CONTAINER', 'ur_gia') + target = os.environ.get("TARGET_CONTAINER", "ur_gia") print(f'[{time.strftime("%H:%M:%S")}] Restarting {target}...', flush=True) - cmd = f'podman restart {target} 2>/dev/null || docker restart {target} 2>/dev/null' + cmd = f"podman restart {target} 2>/dev/null || docker restart {target} 2>/dev/null" result = subprocess.run(cmd, shell=True, capture_output=True) if result.returncode == 0: print(f'[{time.strftime("%H:%M:%S")}] {target} restarted', flush=True) @@ -53,16 +53,16 @@ def main(): # In the container the repository is mounted at /code # Allow overriding watched paths via environment variable `WATCH_PATHS`. # Default is `/code/core,/code/app`. - paths_env = os.environ.get('WATCH_PATHS', '/code/core,/code/app') - paths = [p.strip() for p in paths_env.split(',') if p.strip()] + paths_env = os.environ.get("WATCH_PATHS", "/code/core,/code/app") + paths = [p.strip() for p in paths_env.split(",") if p.strip()] last_mtimes = {} for path in paths: if os.path.exists(path): - print(f'Watching: {path}', flush=True) + print(f"Watching: {path}", flush=True) last_mtimes[path] = get_mtime(path) else: - print(f'Not found: {path}', flush=True) + print(f"Not found: {path}", flush=True) print(f'[{time.strftime("%H:%M:%S")}] Watcher started', flush=True) restart_debounce = 0 @@ -77,15 +77,17 @@ def main(): continue current_mtime = get_mtime(path) if current_mtime > last_mtimes.get(path, 0): - print(f'[{time.strftime("%H:%M:%S")}] Changes in {path}', flush=True) + print( + f'[{time.strftime("%H:%M:%S")}] Changes in {path}', flush=True + ) last_mtimes[path] = current_mtime if restart_debounce <= 0: restart_ur() restart_debounce = 5 # Don't restart more than every 5s except KeyboardInterrupt: - print('Watcher stopped', flush=True) + print("Watcher stopped", flush=True) sys.exit(0) -if __name__ == '__main__': +if __name__ == "__main__": main()