From 4cf75b9923342219801f46644692ff5593368de9 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sun, 15 Feb 2026 18:58:58 +0000 Subject: [PATCH] Implement attachment view --- core/clients/xmpp.py | 136 +++++++- .../commands/backfill_xmpp_attachment_urls.py | 162 ++++++++++ core/modules/router.py | 85 +++++ core/realtime/compose_ws.py | 65 +++- core/realtime/typing_state.py | 74 +++++ core/templates/partials/compose-panel.html | 300 ++++++++++++++++-- .../partials/compose-send-status.html | 4 +- core/views/compose.py | 157 +++++++-- 8 files changed, 914 insertions(+), 69 deletions(-) create mode 100644 core/management/commands/backfill_xmpp_attachment_urls.py create mode 100644 core/realtime/typing_state.py diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py index 0e6fc1c..f275054 100644 --- a/core/clients/xmpp.py +++ b/core/clients/xmpp.py @@ -1,4 +1,6 @@ import asyncio +import re +from urllib.parse import urlsplit import aiohttp from asgiref.sync import sync_to_async @@ -27,6 +29,50 @@ from core.models import ( ) from core.util import logs +URL_PATTERN = re.compile(r"https?://[^\s<>'\"\\]+") + + +def _clean_url(value): + return str(value or "").strip().rstrip(".,);:!?\"'") + + +def _filename_from_url(url_value): + path = urlsplit(str(url_value or "")).path + name = path.rsplit("/", 1)[-1] + return name or "attachment" + + +def _extract_xml_attachment_urls(message_stanza): + urls = [] + + def _add(candidate): + cleaned = _clean_url(candidate) + if not cleaned: + return + if not cleaned.startswith("http://") and not cleaned.startswith("https://"): + return + if cleaned not in urls: + urls.append(cleaned) + + # Explicit attachments and OOB payloads. + for node in message_stanza.xml.findall(".//{urn:xmpp:attachments}attachment"): + _add(node.attrib.get("url")) + for node in message_stanza.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): + _add(node.text) + + # XMPP references frequently carry attachment URIs. + for node in message_stanza.xml.findall(".//{urn:xmpp:reference:0}reference"): + _add(node.attrib.get("uri")) + + # Generic fallback for custom namespaces and rich message payloads. + for node in message_stanza.xml.iter(): + for key in ("url", "uri", "href", "src"): + _add(node.attrib.get(key)) + for match in URL_PATTERN.findall(str(node.text or "")): + _add(match) + + return urls + class XMPPComponent(ComponentXMPP): @@ -821,38 +867,69 @@ class XMPPComponent(ComponentXMPP): recipient_domain = recipient_jid # Extract message body - body = msg["body"] if msg["body"] else "[No Body]" + body = msg["body"] if msg["body"] else "" attachments = [] self.log.info(f"Full XMPP Message: {ET.tostring(msg.xml, encoding='unicode')}") - # Extract attachments from standard XMPP (if present) + # Extract attachments from standard XMPP payloads. for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"): + url_value = _clean_url(att.attrib.get("url")) + if not url_value: + continue attachments.append( { - "url": att.attrib.get("url"), - "filename": att.attrib.get("filename"), - "content_type": att.attrib.get("content_type"), + "url": url_value, + "filename": att.attrib.get("filename") + or _filename_from_url(url_value), + "content_type": att.attrib.get("content_type") + or "application/octet-stream", } ) - # Extract attachments from XEP-0066 format (Out of Band Data) + # Extract attachments from XEP-0066 OOB payloads. for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): + url_value = _clean_url(oob.text) + if not url_value: + continue attachments.append( { - "url": oob.text, - "filename": oob.text.split("/")[-1], # Extract filename from URL - "content_type": "application/octet-stream", # Generic content-type + "url": url_value, + "filename": _filename_from_url(url_value), + "content_type": "application/octet-stream", } ) + # Fallback extraction for alternate attachment encodings. + extracted_urls = _extract_xml_attachment_urls(msg) + existing_urls = {str(item.get("url") or "").strip() for item in attachments} + for url_value in extracted_urls: + if url_value in existing_urls: + continue + attachments.append( + { + "url": url_value, + "filename": _filename_from_url(url_value), + "content_type": "application/octet-stream", + } + ) + + if (not body or body.strip().lower() in {"[no body]", "(no text)"}) and attachments: + attachment_urls = [ + str(item.get("url") or "").strip() + for item in attachments + if str(item.get("url") or "").strip() + ] + if attachment_urls: + body = "\n".join(attachment_urls) + self.log.info(f"Extracted {len(attachments)} attachments from XMPP message.") # Log extracted information with variable name annotations log_message = ( f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " - f"Body: {body}" + f"Body: {body or '[No Body]'}" ) self.log.info(log_message) @@ -960,6 +1037,11 @@ class XMPPComponent(ComponentXMPP): ) self.log.info(f"MANIP11 {manipulations}") if not manipulations: + await self.ur.stopped_typing( + "xmpp", + identifier=identifier, + payload={"reason": "message_sent"}, + ) await identifier.send( body, attachments, @@ -984,6 +1066,11 @@ class XMPPComponent(ComponentXMPP): text=result, ts=int(now().timestamp() * 1000), ) + await self.ur.stopped_typing( + "xmpp", + identifier=identifier, + payload={"reason": "message_sent"}, + ) await identifier.send( result, attachments, @@ -1052,6 +1139,29 @@ class XMPPComponent(ComponentXMPP): self.log.info(f"Sending XMPP message: {msg.xml}") msg.send() + async def send_chat_state(self, recipient_jid, sender_jid, started): + """Send XMPP chat-state update to the client.""" + msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + state_tag = "composing" if started else "paused" + msg.xml.append( + ET.Element(f"{{http://jabber.org/protocol/chatstates}}{state_tag}") + ) + self.log.info( + "Sending XMPP chat-state %s: %s -> %s", + state_tag, + sender_jid, + recipient_jid, + ) + msg.send() + + async def send_typing_for_person(self, user, person_identifier, started): + sender_jid = ( + f"{person_identifier.person.name.lower()}|" + f"{person_identifier.service}@{settings.XMPP_JID}" + ) + recipient_jid = f"{user.username}@{settings.XMPP_ADDRESS}" + await self.send_chat_state(recipient_jid, sender_jid, started) + async def send_from_external( self, user, person_identifier, text, is_outgoing_message, attachments=[] ): @@ -1110,3 +1220,9 @@ class XMPPClient(ClientBase): self.client.connect() # self.client.process() + + async def start_typing_for_person(self, user, person_identifier): + await self.client.send_typing_for_person(user, person_identifier, True) + + async def stop_typing_for_person(self, user, person_identifier): + await self.client.send_typing_for_person(user, person_identifier, False) diff --git a/core/management/commands/backfill_xmpp_attachment_urls.py b/core/management/commands/backfill_xmpp_attachment_urls.py new file mode 100644 index 0000000..cd7c6f6 --- /dev/null +++ b/core/management/commands/backfill_xmpp_attachment_urls.py @@ -0,0 +1,162 @@ +from django.core.management.base import BaseCommand +from django.db.models import Q + +from core.models import Message, MessageEvent + + +EMPTY_TEXT_VALUES = { + "", + "[No Body]", + "[no body]", +} + + +def _normalize_url(value): + if not value: + return "" + text = str(value).strip().rstrip(".,);:!?\"'") + if text.startswith("http://") or text.startswith("https://"): + return text + return "" + + +def _extract_urls_from_attachment_blob(blob): + urls = [] + if isinstance(blob, str): + normalized = _normalize_url(blob) + if normalized: + urls.append(normalized) + return urls + + if isinstance(blob, dict): + for key in ("url", "source_url", "download_url", "proxy_url", "href"): + normalized = _normalize_url(blob.get(key)) + if normalized: + urls.append(normalized) + nested = blob.get("attachments") + if isinstance(nested, list): + for row in nested: + urls.extend(_extract_urls_from_attachment_blob(row)) + return urls + + if isinstance(blob, list): + for row in blob: + urls.extend(_extract_urls_from_attachment_blob(row)) + return urls + + +def _uniq(values): + seen = set() + output = [] + for value in values: + if value in seen: + continue + seen.add(value) + output.append(value) + return output + + +class Command(BaseCommand): + help = ( + "Backfill empty Message.text rows originating from XMPP by recovering " + "attachment URLs from MessageEvent metadata." + ) + + def add_arguments(self, parser): + parser.add_argument( + "--apply", + action="store_true", + help="Persist updates. Without this flag, runs as dry-run.", + ) + parser.add_argument( + "--user-id", + type=int, + default=None, + help="Limit processing to one user ID.", + ) + parser.add_argument( + "--limit", + type=int, + default=0, + help="Maximum number of candidate rows to inspect (0 = no limit).", + ) + + def _candidate_events(self, message): + linked = MessageEvent.objects.filter( + user=message.user, + raw_payload_ref__legacy_message_id=str(message.id), + ) + if linked.exists(): + return linked + + # Fallback heuristic for older rows with missing legacy refs. + window = 2000 + return MessageEvent.objects.filter( + user=message.user, + source_system="xmpp", + ts__gte=int(message.ts) - window, + ts__lte=int(message.ts) + window, + ).exclude(attachments=[]) + + def handle(self, *args, **options): + apply_changes = bool(options.get("apply")) + user_id = options.get("user_id") + limit = int(options.get("limit") or 0) + + queryset = Message.objects.filter( + sender_uuid__iexact="xmpp", + ).filter( + Q(text__isnull=True) | Q(text__exact="") | Q(text__iexact="[No Body]") + ) + if user_id: + queryset = queryset.filter(user_id=user_id) + queryset = queryset.order_by("ts", "id") + if limit > 0: + queryset = queryset[:limit] + + inspected = 0 + recoverable = 0 + updated = 0 + unrecoverable = 0 + + for message in queryset.iterator(): + inspected += 1 + current_text = str(message.text or "").strip() + if current_text not in EMPTY_TEXT_VALUES: + continue + + urls = [] + for event in self._candidate_events(message): + urls.extend(_extract_urls_from_attachment_blob(event.attachments)) + urls.extend( + _extract_urls_from_attachment_blob(event.raw_payload_ref or {}) + ) + if urls: + break + + urls = _uniq(urls) + if not urls: + unrecoverable += 1 + continue + + recoverable += 1 + new_text = "\n".join(urls) + if apply_changes: + message.text = new_text + message.save(update_fields=["text"]) + updated += 1 + else: + self.stdout.write( + f"[dry-run] {message.id}: would set {len(urls)} URL(s)" + ) + + mode = "apply" if apply_changes else "dry-run" + self.stdout.write( + self.style.SUCCESS( + "XMPP attachment URL backfill complete " + f"({mode}): inspected={inspected}, " + f"recoverable={recoverable}, " + f"updated={updated}, " + f"unrecoverable={unrecoverable}" + ) + ) diff --git a/core/modules/router.py b/core/modules/router.py index dcda649..b81cd42 100644 --- a/core/modules/router.py +++ b/core/modules/router.py @@ -1,4 +1,7 @@ +import asyncio + from asgiref.sync import sync_to_async +from django.conf import settings from core.clients import transport from core.clients.instagram import InstagramClient @@ -7,6 +10,7 @@ from core.clients.whatsapp import WhatsAppClient from core.clients.xmpp import XMPPClient from core.messaging import history from core.models import PersonIdentifier +from core.realtime.typing_state import set_person_typing_state from core.util import logs @@ -17,6 +21,10 @@ class UnifiedRouter(object): def __init__(self, loop): self.loop = loop + self.typing_auto_stop_seconds = int( + getattr(settings, "XMPP_TYPING_AUTO_STOP_SECONDS", 3) + ) + self._typing_stop_tasks = {} self.log = logs.get_logger("router") self.log.info("Initialised Unified Router Interface.") @@ -26,6 +34,42 @@ class UnifiedRouter(object): self.whatsapp = WhatsAppClient(self, loop, "whatsapp") self.instagram = InstagramClient(self, loop, "instagram") + def _typing_task_key(self, target): + return ( + int(target.user_id), + int(target.person_id), + str(target.service), + str(target.identifier), + ) + + def _cancel_typing_timer(self, key): + existing = self._typing_stop_tasks.pop(key, None) + if existing and not existing.done(): + existing.cancel() + + def _schedule_typing_auto_stop(self, target): + key = self._typing_task_key(target) + self._cancel_typing_timer(key) + delay = max(1, int(self.typing_auto_stop_seconds)) + + async def _timer(): + try: + await asyncio.sleep(delay) + await transport.stop_typing(target.service, target.identifier) + except asyncio.CancelledError: + return + except Exception as exc: + self.log.warning( + "Typing auto-stop failed for %s/%s: %s", + target.service, + target.identifier, + exc, + ) + finally: + self._typing_stop_tasks.pop(key, None) + + self._typing_stop_tasks[key] = self.loop.create_task(_timer()) + def _start(self): print("UR _start") self.xmpp.start() @@ -86,6 +130,23 @@ class UnifiedRouter(object): identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: + if protocol != "xmpp": + set_person_typing_state( + user_id=src.user_id, + person_id=src.person_id, + started=True, + source_service=protocol, + display_name=src.person.name, + ) + try: + await self.xmpp.start_typing_for_person(src.user, src) + except Exception as exc: + self.log.warning( + "Failed to relay typing-start to XMPP for %s: %s", + src.identifier, + exc, + ) + targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, @@ -93,13 +154,34 @@ class UnifiedRouter(object): ).exclude(service=protocol) ) for target in targets: + if target.service == "xmpp": + continue await transport.start_typing(target.service, target.identifier) + if protocol == "xmpp": + self._schedule_typing_auto_stop(target) async def stopped_typing(self, protocol, *args, **kwargs): self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: + if protocol != "xmpp": + set_person_typing_state( + user_id=src.user_id, + person_id=src.person_id, + started=False, + source_service=protocol, + display_name=src.person.name, + ) + try: + await self.xmpp.stop_typing_for_person(src.user, src) + except Exception as exc: + self.log.warning( + "Failed to relay typing-stop to XMPP for %s: %s", + src.identifier, + exc, + ) + targets = await sync_to_async(list)( PersonIdentifier.objects.filter( user=src.user, @@ -107,6 +189,9 @@ class UnifiedRouter(object): ).exclude(service=protocol) ) for target in targets: + if target.service == "xmpp": + continue + self._cancel_typing_timer(self._typing_task_key(target)) await transport.stop_typing(target.service, target.identifier) async def reacted(self, protocol, *args, **kwargs): diff --git a/core/realtime/compose_ws.py b/core/realtime/compose_ws.py index b9ce47d..860f7d2 100644 --- a/core/realtime/compose_ws.py +++ b/core/realtime/compose_ws.py @@ -8,7 +8,13 @@ from asgiref.sync import sync_to_async from django.core import signing from core.models import ChatSession, Message, PersonIdentifier -from core.views.compose import COMPOSE_WS_TOKEN_SALT +from core.realtime.typing_state import get_person_typing_state +from core.views.compose import ( + COMPOSE_WS_TOKEN_SALT, + _image_urls_from_text, + _is_url_only_text, + _looks_like_image_url, +) def _safe_int(value, default=0): @@ -28,11 +34,24 @@ def _fmt_ts(ts_value): def _serialize_message(msg): author = str(msg.custom_author or "").strip() + text_value = str(msg.text or "") + image_urls = _image_urls_from_text(text_value) + image_url = image_urls[0] if image_urls else "" + hide_text = bool( + image_urls + and _is_url_only_text(text_value) + and all(_looks_like_image_url(url) for url in image_urls) + ) + display_text = text_value if text_value.strip() else ("(no text)" if not image_url else "") return { "id": str(msg.id), "ts": int(msg.ts or 0), "display_ts": _fmt_ts(msg.ts), - "text": str(msg.text or ""), + "text": text_value, + "display_text": display_text, + "image_url": image_url, + "image_urls": image_urls, + "hide_text": hide_text, "author": author, "outgoing": author.upper() in {"USER", "BOT"}, } @@ -59,14 +78,18 @@ def _load_since(user_id, service, identifier, person_id, after_ts, limit): identifier=identifier, ).first() if person_identifier is None: - return {"messages": [], "last_ts": after_ts} + return {"messages": [], "last_ts": after_ts, "person_id": 0} session = ChatSession.objects.filter( user_id=user_id, identifier=person_identifier, ).first() if session is None: - return {"messages": [], "last_ts": after_ts} + return { + "messages": [], + "last_ts": after_ts, + "person_id": int(person_identifier.person_id), + } qs = Message.objects.filter( user_id=user_id, @@ -85,6 +108,7 @@ def _load_since(user_id, service, identifier, person_id, after_ts, limit): return { "messages": [_serialize_message(row) for row in rows], "last_ts": int(newest or after_ts or 0), + "person_id": int(person_identifier.person_id), } @@ -109,6 +133,7 @@ async def compose_ws_application(scope, receive, send): service = str(payload.get("s") or "").strip() identifier = str(payload.get("i") or "").strip() person_id = str(payload.get("p") or "").strip() + resolved_person_id = _safe_int(person_id) if user_id <= 0 or (not identifier and not person_id): await send({"type": "websocket.close", "code": 4401}) @@ -117,6 +142,7 @@ async def compose_ws_application(scope, receive, send): await send({"type": "websocket.accept"}) last_ts = 0 limit = 100 + last_typing_key = "" while True: event = None @@ -145,18 +171,33 @@ async def compose_ws_application(scope, receive, send): ) messages = payload.get("messages") or [] latest = _safe_int(payload.get("last_ts"), last_ts) + if resolved_person_id <= 0: + resolved_person_id = _safe_int(payload.get("person_id"), 0) + typing_state = get_person_typing_state( + user_id=user_id, + person_id=resolved_person_id, + ) + typing_key = json.dumps(typing_state, sort_keys=True) + typing_changed = typing_key != last_typing_key + if typing_changed: + last_typing_key = typing_key + + outgoing_payload = {} if messages: last_ts = max(last_ts, latest) + outgoing_payload["messages"] = messages + outgoing_payload["last_ts"] = last_ts + else: + last_ts = max(last_ts, latest) + outgoing_payload["last_ts"] = last_ts + + if typing_changed: + outgoing_payload["typing"] = typing_state + + if messages or typing_changed: await send( { "type": "websocket.send", - "text": json.dumps( - { - "messages": messages, - "last_ts": last_ts, - } - ), + "text": json.dumps(outgoing_payload), } ) - else: - last_ts = max(last_ts, latest) diff --git a/core/realtime/typing_state.py b/core/realtime/typing_state.py new file mode 100644 index 0000000..0590261 --- /dev/null +++ b/core/realtime/typing_state.py @@ -0,0 +1,74 @@ +import time + +from django.core.cache import cache + +TYPING_TTL_SECONDS = 12 + + +def _person_key(user_id, person_id): + return f"compose:typing:user:{int(user_id)}:person:{int(person_id)}" + + +def set_person_typing_state( + *, + user_id, + person_id, + started, + source_service="", + display_name="", +): + if not user_id or not person_id: + return + now_ms = int(time.time() * 1000) + state = { + "typing": bool(started), + "source_service": str(source_service or ""), + "display_name": str(display_name or ""), + "updated_ts": now_ms, + "expires_ts": ( + now_ms + (TYPING_TTL_SECONDS * 1000) if started else now_ms + ), + } + cache.set( + _person_key(user_id, person_id), + state, + timeout=max(TYPING_TTL_SECONDS * 2, 30), + ) + + +def get_person_typing_state(*, user_id, person_id): + if not user_id or not person_id: + return { + "typing": False, + "source_service": "", + "display_name": "", + "updated_ts": 0, + "expires_ts": 0, + } + + key = _person_key(user_id, person_id) + state = dict(cache.get(key) or {}) + if not state: + return { + "typing": False, + "source_service": "", + "display_name": "", + "updated_ts": 0, + "expires_ts": 0, + } + + now_ms = int(time.time() * 1000) + is_typing = bool(state.get("typing")) + expires_ts = int(state.get("expires_ts") or 0) + if is_typing and expires_ts and now_ms > expires_ts: + state["typing"] = False + state["updated_ts"] = now_ms + cache.set(key, state, timeout=max(TYPING_TTL_SECONDS * 2, 30)) + + return { + "typing": bool(state.get("typing")), + "source_service": str(state.get("source_service") or ""), + "display_name": str(state.get("display_name") or ""), + "updated_ts": int(state.get("updated_ts") or 0), + "expires_ts": int(state.get("expires_ts") or 0), + } diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index 7b72bc5..01fc860 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -62,7 +62,7 @@