import asyncio import mimetypes import re import time import uuid from urllib.parse import urlsplit import aiohttp from asgiref.sync import sync_to_async from django.conf import settings from django.utils.timezone import now from slixmpp.componentxmpp import ComponentXMPP from slixmpp.plugins.xep_0085.stanza import Active, Composing, Gone, Inactive, Paused from slixmpp.stanza import Message from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream.stanzabase import ET from core.clients import ClientBase, transport from core.messaging import ai, history, replies, utils from core.models import ( ChatSession, Manipulation, PatternMitigationAutoSettings, PatternMitigationCorrection, PatternMitigationGame, PatternMitigationPlan, PatternMitigationRule, Person, PersonIdentifier, User, WorkspaceConversation, ) from core.util import logs URL_PATTERN = re.compile(r"https?://[^\s<>'\"\\]+") EMOJI_ONLY_PATTERN = re.compile( r"^[\U0001F300-\U0001FAFF\u2600-\u27BF\uFE0F\u200D\u2640-\u2642\u2764]+$" ) def _clean_url(value): return str(value or "").strip().rstrip(".,);:!?\"'") def _filename_from_url(url_value): path = urlsplit(str(url_value or "")).path name = path.rsplit("/", 1)[-1] return name or "attachment" def _content_type_from_filename_or_url(url_value, default="application/octet-stream"): filename = _filename_from_url(url_value) guessed, _ = mimetypes.guess_type(filename) return guessed or default def _extract_xml_attachment_urls(message_stanza): urls = [] def _add(candidate): cleaned = _clean_url(candidate) if not cleaned: return if not cleaned.startswith("http://") and not cleaned.startswith("https://"): return if cleaned not in urls: urls.append(cleaned) # Explicit attachments and OOB payloads. for node in message_stanza.xml.findall(".//{urn:xmpp:attachments}attachment"): _add(node.attrib.get("url")) for node in message_stanza.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): _add(node.text) # XMPP references frequently carry attachment URIs. for node in message_stanza.xml.findall(".//{urn:xmpp:reference:0}reference"): _add(node.attrib.get("uri")) # Generic fallback for custom namespaces and rich message payloads. for node in message_stanza.xml.iter(): for key in ("url", "uri", "href", "src"): _add(node.attrib.get(key)) for match in URL_PATTERN.findall(str(node.text or "")): _add(match) return urls def _extract_xmpp_reaction(message_stanza): nodes = message_stanza.xml.findall(".//{urn:xmpp:reactions:0}reactions") if not nodes: return None node = nodes[0] target_id = str(node.attrib.get("id") or "").strip() emojis = [] for child in node.findall("{urn:xmpp:reactions:0}reaction"): value = str(child.text or "").strip() if value: emojis.append(value) return { "target_id": target_id, "emoji": emojis[0] if emojis else "", "remove": len(emojis) == 0, } def _extract_xmpp_reply_target_id(message_stanza): reply = message_stanza.xml.find(".//{urn:xmpp:reply:0}reply") if reply is None: return "" return str(reply.attrib.get("id") or reply.attrib.get("to") or "").strip() def _parse_greentext_reaction(body_text): lines = [line.strip() for line in str(body_text or "").splitlines() if line.strip()] if len(lines) != 2: return None if not lines[0].startswith(">"): return None quoted = lines[0][1:].strip() emoji = lines[1].strip() if not quoted or not emoji: return None if not EMOJI_ONLY_PATTERN.match(emoji): return None return {"quoted_text": quoted, "emoji": emoji} class XMPPComponent(ComponentXMPP): """ A simple Slixmpp component that echoes messages. """ def __init__(self, ur, jid, secret, server, port): self.ur = ur self._upload_config_warned = False self.log = logs.get_logger("XMPP") super().__init__(jid, secret, server, port) # Register chat state plugins register_stanza_plugin(Message, Active) register_stanza_plugin(Message, Composing) register_stanza_plugin(Message, Paused) register_stanza_plugin(Message, Inactive) register_stanza_plugin(Message, Gone) self.add_event_handler("session_start", self.session_start) self.add_event_handler("disconnected", self.on_disconnected) self.add_event_handler("message", self.message) # Presence event handlers self.add_event_handler("presence_available", self.on_presence_available) self.add_event_handler("presence_dnd", self.on_presence_dnd) self.add_event_handler("presence_xa", self.on_presence_xa) self.add_event_handler("presence_chat", self.on_presence_chat) self.add_event_handler("presence_away", self.on_presence_away) self.add_event_handler("presence_unavailable", self.on_presence_unavailable) self.add_event_handler("presence_subscribe", self.on_presence_subscribe) self.add_event_handler("presence_subscribed", self.on_presence_subscribed) self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) self.add_event_handler( "roster_subscription_request", self.on_roster_subscription_request ) # Chat state handlers self.add_event_handler("chatstate_active", self.on_chatstate_active) self.add_event_handler("chatstate_composing", self.on_chatstate_composing) self.add_event_handler("chatstate_paused", self.on_chatstate_paused) self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) self.add_event_handler("chatstate_gone", self.on_chatstate_gone) async def enable_carbons(self): """Enable XMPP Message Carbons (XEP-0280)""" try: iq = self.make_iq_set() iq["enable"] = ET.Element("{urn:xmpp:carbons:2}enable") await iq.send() self.log.info("Message Carbons enabled successfully") except Exception as e: self.log.error(f"Failed to enable Carbons: {e}") def get_identifier(self, msg): xmpp_message_id = str(msg.get("id") or "").strip() # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) # Split into username@domain and optional resource sender_parts = sender_jid.split("/", 1) sender_bare_jid = sender_parts[0] # Always present: user@domain sender_username, sender_domain = sender_bare_jid.split("@", 1) # Extract recipient JID (should match component JID format) recipient_jid = str(msg["to"]) if "@" in recipient_jid: recipient_username = recipient_jid.split("@", 1)[0] else: recipient_username = recipient_jid # Parse recipient_name and recipient_service (e.g., "mark|signal") if "|" in recipient_username: person_name, service = recipient_username.split("|") person_name = person_name.title() # Capitalize for consistency else: person_name = recipient_username.title() service = None try: # Lookup user in Django self.log.debug("Resolving XMPP sender user=%s", sender_username) user = User.objects.get(username=sender_username) # Find Person object with name=person_name.lower() self.log.debug("Resolving XMPP recipient person=%s", person_name.title()) person = Person.objects.get(user=user, name=person_name.title()) # Ensure a PersonIdentifier exists for this user, person, and service self.log.debug("Resolving XMPP identifier service=%s", service) identifier = PersonIdentifier.objects.get( user=user, person=person, service=service ) return identifier except Exception as e: self.log.error(f"Failed to resolve identifier from XMPP message: {e}") return None def _get_workspace_conversation(self, user, person): primary_identifier = ( PersonIdentifier.objects.filter(user=user, person=person) .order_by("service") .first() ) platform_type = primary_identifier.service if primary_identifier else "signal" conversation, _ = WorkspaceConversation.objects.get_or_create( user=user, platform_type=platform_type, title=f"{person.name} Workspace", defaults={"platform_thread_id": str(person.id)}, ) conversation.participants.add(person) return conversation def _get_or_create_plan(self, user, person): conversation = self._get_workspace_conversation(user, person) plan = conversation.mitigation_plans.order_by("-updated_at").first() if plan is None: plan = PatternMitigationPlan.objects.create( user=user, conversation=conversation, title=f"{person.name} Pattern Mitigation", objective="Mitigate repeated friction loops.", fundamental_items=[], creation_mode="guided", status="draft", ) PatternMitigationRule.objects.create( user=user, plan=plan, title="Safety Before Analysis", content="Prioritize de-escalation before analysis.", enabled=True, ) PatternMitigationGame.objects.create( user=user, plan=plan, title="Two-Turn Pause", instructions="Use two short turns then pause with a return time.", enabled=True, ) return plan async def _handle_mitigation_command(self, sender_user, body, sym): def parse_parts(raw): return [part.strip() for part in raw.split("|")] command = body.strip() if command == ".mitigation help": sym( "Mitigation commands: " ".mitigation list | " ".mitigation show | " ".mitigation rule-add ||<content> | " ".mitigation rule-del <person>|<title> | " ".mitigation game-add <person>|<title>|<instructions> | " ".mitigation game-del <person>|<title> | " ".mitigation correction-add <person>|<title>|<clarification> | " ".mitigation correction-del <person>|<title> | " ".mitigation fundamentals-set <person>|<item1;item2;...> | " ".mitigation plan-set <person>|<draft|active|archived>|<auto|guided> | " ".mitigation auto <person>|on|off | " ".mitigation auto-status <person>" ) return True if command == ".mitigation list": plans = await sync_to_async(list)( PatternMitigationPlan.objects.filter(user=sender_user) .select_related("conversation") .order_by("-updated_at")[:15] ) if not plans: sym("No mitigation plans found.") return True rows = [] for plan in plans: person_name = ( plan.conversation.participants.order_by("name").first().name if plan.conversation.participants.exists() else "Unknown" ) rows.append(f"{person_name}: {plan.title}") sym("Plans: " + " | ".join(rows)) return True if command.startswith(".mitigation show "): person_name = command.replace(".mitigation show ", "", 1).strip().title() person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) rule_count = await sync_to_async(plan.rules.count)() game_count = await sync_to_async(plan.games.count)() sym(f"{person.name}: {plan.title} | rules={rule_count} games={game_count}") return True if command.startswith(".mitigation rule-add "): payload = command.replace(".mitigation rule-add ", "", 1) parts = parse_parts(payload) if len(parts) < 3: sym("Usage: .mitigation rule-add <person>|<title>|<content>") return True person_name, title, content = ( parts[0].title(), parts[1], "|".join(parts[2:]), ) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) await sync_to_async(PatternMitigationRule.objects.create)( user=sender_user, plan=plan, title=title[:255], content=content, enabled=True, ) sym("Rule added.") return True if command.startswith(".mitigation rule-del "): payload = command.replace(".mitigation rule-del ", "", 1) parts = parse_parts(payload) if len(parts) < 2: sym("Usage: .mitigation rule-del <person>|<title>") return True person_name, title = parts[0].title(), "|".join(parts[1:]) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) deleted, _ = await sync_to_async( lambda: PatternMitigationRule.objects.filter( user=sender_user, plan=plan, title__iexact=title, ).delete() )() sym("Rule deleted." if deleted else "Rule not found.") return True if command.startswith(".mitigation game-add "): payload = command.replace(".mitigation game-add ", "", 1) parts = parse_parts(payload) if len(parts) < 3: sym("Usage: .mitigation game-add <person>|<title>|<instructions>") return True person_name, title, content = ( parts[0].title(), parts[1], "|".join(parts[2:]), ) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) await sync_to_async(PatternMitigationGame.objects.create)( user=sender_user, plan=plan, title=title[:255], instructions=content, enabled=True, ) sym("Game added.") return True if command.startswith(".mitigation game-del "): payload = command.replace(".mitigation game-del ", "", 1) parts = parse_parts(payload) if len(parts) < 2: sym("Usage: .mitigation game-del <person>|<title>") return True person_name, title = parts[0].title(), "|".join(parts[1:]) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) deleted, _ = await sync_to_async( lambda: PatternMitigationGame.objects.filter( user=sender_user, plan=plan, title__iexact=title, ).delete() )() sym("Game deleted." if deleted else "Game not found.") return True if command.startswith(".mitigation correction-add "): payload = command.replace(".mitigation correction-add ", "", 1) parts = parse_parts(payload) if len(parts) < 3: sym( "Usage: .mitigation correction-add <person>|<title>|<clarification>" ) return True person_name, title, clarification = ( parts[0].title(), parts[1], "|".join(parts[2:]), ) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) await sync_to_async(PatternMitigationCorrection.objects.create)( user=sender_user, plan=plan, title=title[:255], clarification=clarification, source_phrase="", perspective="second_person", share_target="both", language_style="adapted", enabled=True, ) sym("Correction added.") return True if command.startswith(".mitigation correction-del "): payload = command.replace(".mitigation correction-del ", "", 1) parts = parse_parts(payload) if len(parts) < 2: sym("Usage: .mitigation correction-del <person>|<title>") return True person_name, title = parts[0].title(), "|".join(parts[1:]) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) deleted, _ = await sync_to_async( lambda: PatternMitigationCorrection.objects.filter( user=sender_user, plan=plan, title__iexact=title, ).delete() )() sym("Correction deleted." if deleted else "Correction not found.") return True if command.startswith(".mitigation fundamentals-set "): payload = command.replace(".mitigation fundamentals-set ", "", 1) parts = parse_parts(payload) if len(parts) < 2: sym("Usage: .mitigation fundamentals-set <person>|<item1;item2;...>") return True person_name, values = parts[0].title(), "|".join(parts[1:]) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) items = [item.strip() for item in values.split(";") if item.strip()] plan.fundamental_items = items await sync_to_async(plan.save)( update_fields=["fundamental_items", "updated_at"] ) sym(f"Fundamentals updated ({len(items)}).") return True if command.startswith(".mitigation plan-set "): payload = command.replace(".mitigation plan-set ", "", 1) parts = parse_parts(payload) if len(parts) < 3: sym( "Usage: .mitigation plan-set <person>|<draft|active|archived>|<auto|guided>" ) return True person_name, status_value, mode_value = ( parts[0].title(), parts[1].lower(), parts[2].lower(), ) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True plan = await sync_to_async(self._get_or_create_plan)(sender_user, person) valid_status = {key for key, _ in PatternMitigationPlan.STATUS_CHOICES} valid_modes = { key for key, _ in PatternMitigationPlan.CREATION_MODE_CHOICES } if status_value in valid_status: plan.status = status_value if mode_value in valid_modes: plan.creation_mode = mode_value await sync_to_async(plan.save)( update_fields=["status", "creation_mode", "updated_at"] ) sym(f"Plan updated: status={plan.status}, mode={plan.creation_mode}") return True if command.startswith(".mitigation auto "): payload = command.replace(".mitigation auto ", "", 1) parts = parse_parts(payload) if len(parts) < 2: sym("Usage: .mitigation auto <person>|on|off") return True person_name, state = parts[0].title(), parts[1].lower() person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True conversation = await sync_to_async(self._get_workspace_conversation)( sender_user, person ) auto_obj, _ = await sync_to_async( PatternMitigationAutoSettings.objects.get_or_create )( user=sender_user, conversation=conversation, ) auto_obj.enabled = state in {"on", "true", "1", "yes"} await sync_to_async(auto_obj.save)(update_fields=["enabled", "updated_at"]) sym( f"Automation {'enabled' if auto_obj.enabled else 'disabled'} for {person.name}." ) return True if command.startswith(".mitigation auto-status "): person_name = ( command.replace(".mitigation auto-status ", "", 1).strip().title() ) person = await sync_to_async( lambda: Person.objects.filter( user=sender_user, name__iexact=person_name ).first() )() if not person: sym("Unknown person.") return True conversation = await sync_to_async(self._get_workspace_conversation)( sender_user, person ) auto_obj, _ = await sync_to_async( PatternMitigationAutoSettings.objects.get_or_create )( user=sender_user, conversation=conversation, ) sym( f"{person.name}: auto={'on' if auto_obj.enabled else 'off'}, " f"pattern={'on' if auto_obj.auto_pattern_recognition else 'off'}, " f"corrections={'on' if auto_obj.auto_create_corrections else 'off'}" ) return True return False def update_roster(self, jid, name=None): """ Adds or updates a user in the roster. """ iq = self.Iq() iq["type"] = "set" iq["roster"]["items"] = {jid: {"name": name or jid}} iq.send() self.log.debug("Updated roster: added %s (%s)", jid, name) def on_chatstate_active(self, msg): """ Handle when a user is actively engaged in the chat. """ self.log.debug("Chat state active from %s", msg["from"]) self.get_identifier(msg) def on_chatstate_composing(self, msg): """ Handle when a user is typing a message. """ self.log.debug("Chat state composing from %s", msg["from"]) identifier = self.get_identifier(msg) if identifier: asyncio.create_task( self.ur.started_typing( "xmpp", identifier=identifier, ) ) def on_chatstate_paused(self, msg): """ Handle when a user has paused typing. """ self.log.debug("Chat state paused from %s", msg["from"]) identifier = self.get_identifier(msg) if identifier: asyncio.create_task( self.ur.stopped_typing( "xmpp", identifier=identifier, ) ) def on_chatstate_inactive(self, msg): """ Handle when a user is inactive in the chat. """ self.log.debug("Chat state inactive from %s", msg["from"]) self.get_identifier(msg) def on_chatstate_gone(self, msg): """ Handle when a user has left the chat. """ self.log.debug("Chat state gone from %s", msg["from"]) self.get_identifier(msg) def on_presence_available(self, pres): """ Handle when a user becomes available. """ self.log.debug("Presence available from %s", pres["from"]) def on_presence_dnd(self, pres): """ Handle when a user sets 'Do Not Disturb' status. """ self.log.debug("Presence dnd from %s", pres["from"]) def on_presence_xa(self, pres): """ Handle when a user sets 'Extended Away' status. """ self.log.debug("Presence extended-away from %s", pres["from"]) def on_presence_chat(self, pres): """ Handle when a user is actively available for chat. """ self.log.debug("Presence chat-available from %s", pres["from"]) def on_presence_away(self, pres): """ Handle when a user sets 'Away' status. """ self.log.debug("Presence away from %s", pres["from"]) def on_presence_unavailable(self, pres): """ Handle when a user goes offline or unavailable. """ self.log.debug("Presence unavailable from %s", pres["from"]) def on_presence_subscribe(self, pres): """ Handle incoming presence subscription requests. Accept only if the recipient has a contact matching the sender. """ sender_jid = str(pres["from"]).split("/")[0] # Bare JID (user@domain) recipient_jid = str(pres["to"]).split("/")[0] self.log.debug( f"Received subscription request from {sender_jid} to {recipient_jid}" ) try: # Extract sender and recipient usernames user_username, _ = sender_jid.split("@", 1) recipient_username, _ = recipient_jid.split("@", 1) # Parse recipient_name and recipient_service (e.g., "mark|signal") if "|" in recipient_username: person_name, service = recipient_username.split("|") person_name = person_name.title() # Capitalize for consistency else: person_name = recipient_username.title() service = None # Lookup user in Django self.log.debug("Resolving subscription user=%s", user_username) user = User.objects.get(username=user_username) # Find Person object with name=person_name.lower() self.log.debug("Resolving subscription person=%s", person_name.title()) person = Person.objects.get(user=user, name=person_name.title()) # Ensure a PersonIdentifier exists for this user, person, and service self.log.debug("Resolving subscription identifier service=%s", service) PersonIdentifier.objects.get(user=user, person=person, service=service) component_jid = f"{person_name.lower()}|{service}@{self.boundjid.bare}" # Accept the subscription self.send_presence(ptype="subscribed", pto=sender_jid, pfrom=component_jid) self.log.debug( f"Accepted subscription from {sender_jid}, sent from {component_jid}" ) # Send a presence request **from the recipient to the sender** (ASKS THEM TO ACCEPT BACK) # self.send_presence(ptype="subscribe", pto=sender_jid, pfrom=component_jid) # Add sender to roster # self.update_roster(sender_jid, name=sender_jid.split("@")[0]) # Send presence update to sender **from the correct JID** self.send_presence(ptype="available", pto=sender_jid, pfrom=component_jid) self.log.debug( "Sent presence update from %s to %s", component_jid, sender_jid ) except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): # If any lookup fails, reject the subscription self.log.warning( f"Subscription request from {sender_jid} rejected (recipient does not have this contact)." ) self.send_presence(ptype="unsubscribed", pto=sender_jid) except ValueError: return def on_presence_subscribed(self, pres): """ Handle successful subscription confirmations. """ self.log.debug("Subscription to %s accepted", pres["from"]) def on_presence_unsubscribe(self, pres): """ Handle when a user unsubscribes from presence updates. """ self.log.debug("Presence unsubscribe from %s", pres["from"]) def on_presence_unsubscribed(self, pres): """ Handle when a user's unsubscription request is confirmed. """ self.log.debug("Presence unsubscribed confirmation from %s", pres["from"]) def on_roster_subscription_request(self, pres): """ Handle roster subscription requests. """ self.log.debug("Roster subscription request from %s", pres["from"]) async def session_start(self, *args): self.log.info("XMPP session started") await self.enable_carbons() def on_disconnected(self, *args): """ Handles XMPP disconnection and triggers a reconnect loop. """ self.log.warning("XMPP disconnected, attempting to reconnect...") self.connect() async def request_upload_slot(self, recipient, filename, content_type, size): """ Requests an upload slot from XMPP for HTTP File Upload (XEP-0363). Args: recipient (str): The JID of the recipient. filename (str): The filename for the upload. content_type (str): The file's MIME type. size (int): The file size in bytes. Returns: tuple | None: (upload_url, put_url, auth_header) or None if failed. """ # upload_service = await self['xep_0363'].find_upload_service() # if not upload_service: # self.log.error("No XEP-0363 upload service found.") # return None upload_service_jid = str( getattr(settings, "XMPP_UPLOAD_SERVICE", "") or getattr(settings, "XMPP_UPLOAD_JID", "") ).strip() if not upload_service_jid: discovered = None try: discovered = await self["xep_0363"].find_upload_service() except Exception as exc: self.log.debug("XMPP upload service discovery failed: %s", exc) if discovered: discovered_jid = "" try: discovered_jid = str(getattr(discovered, "jid", "") or "").strip() except Exception: discovered_jid = "" if not discovered_jid: raw_discovered = str(discovered or "").strip() if raw_discovered.startswith("<"): try: node = ET.fromstring(raw_discovered) discovered_jid = str(node.attrib.get("from") or "").strip() except Exception: discovered_jid = "" else: discovered_jid = raw_discovered upload_service_jid = discovered_jid if upload_service_jid: self.log.info( "Discovered XMPP upload service via XEP-0363: %s", upload_service_jid, ) else: if not self._upload_config_warned: self.log.warning( "XMPP upload service not configured/discoverable; skipping attachment upload. " "Set XMPP_UPLOAD_SERVICE (or XMPP_UPLOAD_JID)." ) self._upload_config_warned = True return None try: slot = await self["xep_0363"].request_slot( jid=upload_service_jid, filename=filename, content_type=content_type, size=size, ) if slot is None: self.log.error(f"Failed to obtain upload slot for {filename}") return None # Parse the XML response root = ET.fromstring(str(slot)) # Convert to string if necessary namespace = "{urn:xmpp:http:upload:0}" # Define the namespace get_url = root.find(f".//{namespace}get").attrib.get("url") put_element = root.find(f".//{namespace}put") put_url = put_element.attrib.get("url") # Extract the Authorization header correctly header_element = put_element.find( f"./{namespace}header[@name='Authorization']" ) auth_header = ( header_element.text.strip() if header_element is not None else None ) if not get_url or not put_url: self.log.error(f"Missing URLs in upload slot: {slot}") return None return get_url, put_url, auth_header except Exception as e: self.log.error(f"Exception while requesting upload slot: {e}") return None async def message(self, msg): """ Process incoming XMPP messages. """ def sym(value): msg.reply(f"[>] {value}").send() xmpp_message_id = str(msg.get("id") or "").strip() # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) # Split into username@domain and optional resource sender_parts = sender_jid.split("/", 1) sender_bare_jid = sender_parts[0] # Always present: user@domain sender_username, sender_domain = sender_bare_jid.split("@", 1) sender_resource = ( sender_parts[1] if len(sender_parts) > 1 else None ) # Extract resource if present # Extract recipient JID (should match component JID format) recipient_jid = str(msg["to"]) if "@" in recipient_jid: recipient_username, recipient_domain = recipient_jid.split("@", 1) else: recipient_username = recipient_jid recipient_domain = recipient_jid # Extract message body body = msg["body"] if msg["body"] else "" parsed_reaction = _extract_xmpp_reaction(msg) parsed_reply_target = _extract_xmpp_reply_target_id(msg) greentext_reaction = _parse_greentext_reaction(body) attachments = [] self.log.debug( "Received XMPP stanza: %s", ET.tostring(msg.xml, encoding="unicode") ) # Extract attachments from standard XMPP payloads. for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"): url_value = _clean_url(att.attrib.get("url")) if not url_value: continue attachments.append( { "url": url_value, "filename": att.attrib.get("filename") or _filename_from_url(url_value), "content_type": att.attrib.get("content_type") or "application/octet-stream", } ) # Extract attachments from XEP-0066 OOB payloads. for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): url_value = _clean_url(oob.text) if not url_value: continue guessed_content_type = _content_type_from_filename_or_url(url_value) attachments.append( { "url": url_value, "filename": _filename_from_url(url_value), "content_type": guessed_content_type, } ) # Fallback extraction for alternate attachment encodings. extracted_urls = _extract_xml_attachment_urls(msg) existing_urls = {str(item.get("url") or "").strip() for item in attachments} for url_value in extracted_urls: if url_value in existing_urls: continue guessed_content_type = _content_type_from_filename_or_url(url_value) attachments.append( { "url": url_value, "filename": _filename_from_url(url_value), "content_type": guessed_content_type, } ) if ( not body or body.strip().lower() in {"[no body]", "(no text)"} ) and attachments: attachment_urls = [ str(item.get("url") or "").strip() for item in attachments if str(item.get("url") or "").strip() ] if attachment_urls: body = "\n".join(attachment_urls) relay_body = body attachment_urls_for_body = [ str(item.get("url") or "").strip() for item in attachments if str(item.get("url") or "").strip() ] if attachment_urls_for_body: joined_urls = "\n".join(attachment_urls_for_body).strip() if str(relay_body or "").strip() == joined_urls: relay_body = "" self.log.debug("Extracted %s attachments from XMPP message", len(attachments)) # Log extracted information with variable name annotations log_message = ( f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " f"Body: {body or '[No Body]'}" ) self.log.debug(log_message) # Ensure recipient domain matches our configured component expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config if recipient_domain != expected_domain: self.log.warning( f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}" ) return # Lookup sender in Django's User model try: sender_user = User.objects.get(username=sender_username) except User.DoesNotExist: self.log.warning(f"Unknown sender: {sender_username}") return if recipient_jid == settings.XMPP_JID: self.log.debug("Handling command message sent to gateway JID") if body.startswith("."): # Messaging the gateway directly if body == ".contacts": # Lookup Person objects linked to sender persons = Person.objects.filter(user=sender_user) if not persons.exists(): self.log.debug("No contacts found for %s", sender_username) sym("No contacts found.") return # Construct contact list response contact_names = [person.name for person in persons] response_text = "Contacts: " + ", ".join(contact_names) sym(response_text) elif body == ".help": sym("Commands: .contacts, .whoami, .mitigation help") elif body.startswith(".mitigation"): handled = await self._handle_mitigation_command( sender_user, body, sym, ) if not handled: sym("Unknown mitigation command. Try .mitigation help") elif body == ".whoami": sym(str(sender_user.__dict__)) else: sym("No such command") else: self.log.debug("Handling routed message to contact") if "|" in recipient_username: recipient_name, recipient_service = recipient_username.split("|") recipient_name = recipient_name.title() else: recipient_name = recipient_username recipient_service = None recipient_name = recipient_name.title() try: person = Person.objects.get(user=sender_user, name=recipient_name) except Person.DoesNotExist: sym("This person does not exist.") if recipient_service: try: identifier = PersonIdentifier.objects.get( user=sender_user, person=person, service=recipient_service ) except PersonIdentifier.DoesNotExist: sym("This service identifier does not exist.") else: # Get a random identifier identifier = PersonIdentifier.objects.filter( user=sender_user, person=person ).first() recipient_service = identifier.service # sym(str(person.__dict__)) # sym(f"Service: {recipient_service}") if parsed_reaction or greentext_reaction: # TODO(web-ui-react): expose explicit web compose reaction actions # that call this same bridge path (without text heuristics). # TODO(edit-sync): extend bridge mapping to include edit message ids # and reconcile upstream edit capability differences in UI. # TODO(retract-sync): propagate delete/retract state through this # same mapping layer for protocol parity. reaction_payload = parsed_reaction or { "target_id": parsed_reply_target, "emoji": str((greentext_reaction or {}).get("emoji") or ""), "remove": False, } if not str(reaction_payload.get("target_id") or "").strip(): text_hint = str((greentext_reaction or {}).get("quoted_text") or "") hint_match = transport.resolve_bridge_from_text_hint( user_id=identifier.user_id, person_id=identifier.person_id, service=recipient_service, text_hint=text_hint, ) reaction_payload["target_id"] = str( (hint_match or {}).get("xmpp_message_id") or "" ) self.log.debug( "reaction-bridge xmpp-inbound actor=%s service=%s target_xmpp_id=%s emoji=%s remove=%s via=%s", sender_username, recipient_service, str(reaction_payload.get("target_id") or "") or "-", str(reaction_payload.get("emoji") or "") or "-", bool(reaction_payload.get("remove")), "xmpp:reactions" if parsed_reaction else "greentext", ) bridge = transport.resolve_bridge_from_xmpp( user_id=identifier.user_id, person_id=identifier.person_id, service=recipient_service, xmpp_message_id=str(reaction_payload.get("target_id") or ""), ) if not bridge: bridge = await history.resolve_bridge_ref( user=identifier.user, identifier=identifier, source_service=recipient_service, xmpp_message_id=str(reaction_payload.get("target_id") or ""), ) if not bridge: self.log.warning( "reaction-bridge xmpp-resolve-miss actor=%s service=%s target_xmpp_id=%s", sender_username, recipient_service, str(reaction_payload.get("target_id") or "") or "-", ) sym("Could not find upstream message for this reaction.") return sent_ok = await transport.send_reaction( recipient_service, identifier.identifier, emoji=str(reaction_payload.get("emoji") or ""), target_message_id=str( (bridge or {}).get("upstream_message_id") or "" ), target_timestamp=int((bridge or {}).get("upstream_ts") or 0), target_author=str((bridge or {}).get("upstream_author") or ""), remove=bool(reaction_payload.get("remove")), ) if not sent_ok: self.log.warning( "reaction-bridge upstream-send-failed actor=%s service=%s recipient=%s target_upstream_id=%s target_upstream_ts=%s", sender_username, recipient_service, identifier.identifier, str((bridge or {}).get("upstream_message_id") or "") or "-", int((bridge or {}).get("upstream_ts") or 0), ) sym("Upstream protocol did not accept this reaction.") return await history.apply_reaction( user=identifier.user, identifier=identifier, target_message_id=str((bridge or {}).get("local_message_id") or ""), target_ts=int((bridge or {}).get("upstream_ts") or 0), emoji=str(reaction_payload.get("emoji") or ""), source_service="xmpp", actor=sender_username, remove=bool(reaction_payload.get("remove")), payload={ "target_xmpp_id": str(reaction_payload.get("target_id") or ""), "xmpp_message_id": xmpp_message_id, }, ) self.log.debug( "reaction-bridge xmpp-apply-ok actor=%s service=%s local_message_id=%s", sender_username, recipient_service, str((bridge or {}).get("local_message_id") or "") or "-", ) return # tss = await identifier.send(body, attachments=attachments) # AM FIXING https://git.zm.is/XF/GIA/issues/5 session, _ = await sync_to_async(ChatSession.objects.get_or_create)( identifier=identifier, user=identifier.user, ) self.log.debug("Storing outbound XMPP message in history") local_message = await history.store_message( session=session, sender="XMPP", text=body, ts=int(now().timestamp() * 1000), outgoing=True, ) self.log.debug("Stored outbound XMPP message in history") manipulations = Manipulation.objects.filter( group__people=identifier.person, user=identifier.user, mode="mutate", enabled=True, ) self.log.debug("Found %s active manipulations", manipulations.count()) if not manipulations: await self.ur.stopped_typing( "xmpp", identifier=identifier, payload={"reason": "message_sent"}, ) await identifier.send( relay_body, attachments, metadata={ "xmpp_source_id": xmpp_message_id, "xmpp_source_ts": int(now().timestamp() * 1000), "xmpp_body": relay_body, "legacy_message_id": str(local_message.id), }, ) self.log.debug("Message sent unaltered") return manip = manipulations.first() chat_history = await history.get_chat_history(session) await utils.update_last_interaction(session) prompt = replies.generate_mutate_reply_prompt( relay_body, identifier.person, manip, chat_history, ) self.log.debug("Running XMPP context prompt") result = await ai.run_prompt(prompt, manip.ai) self.log.debug("Generated mutated response for XMPP message") await history.store_own_message( session=session, text=result, ts=int(now().timestamp() * 1000), ) await self.ur.stopped_typing( "xmpp", identifier=identifier, payload={"reason": "message_sent"}, ) await identifier.send( result, attachments, metadata={ "xmpp_source_id": xmpp_message_id, "xmpp_source_ts": int(now().timestamp() * 1000), "xmpp_body": result, "legacy_message_id": str(local_message.id), }, ) self.log.debug("Message sent with modifications") async def request_upload_slots(self, recipient_jid, attachments): """Requests upload slots for multiple attachments concurrently.""" upload_tasks = [ self.request_upload_slot( recipient_jid, att["filename"], att["content_type"], att["size"] ) for att in attachments ] upload_slots = await asyncio.gather(*upload_tasks) return [ (att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None ] async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid): """Uploads a file and immediately sends the corresponding XMPP message.""" upload_url, put_url, auth_header = upload_slot headers = {"Content-Type": att["content_type"]} if auth_header: headers["Authorization"] = auth_header async with aiohttp.ClientSession() as session: try: async with session.put( put_url, data=att["content"], headers=headers ) as response: if response.status not in (200, 201): self.log.error( f"Upload failed: {response.status} {await response.text()}" ) return None self.log.debug( "Successfully uploaded %s to %s", att["filename"], upload_url ) # Send XMPP message immediately after successful upload xmpp_msg_id = await self.send_xmpp_message( recipient_jid, sender_jid, upload_url, attachment_url=upload_url ) return { "url": upload_url, "xmpp_message_id": xmpp_msg_id, } except Exception as e: self.log.error(f"Error uploading {att['filename']} to XMPP: {e}") return None async def send_xmpp_message( self, recipient_jid, sender_jid, body_text, attachment_url=None ): """Sends an XMPP message with either text or an attachment URL.""" msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") if not msg.get("id"): msg["id"] = uuid.uuid4().hex msg_id = str(msg.get("id") or "").strip() msg["body"] = body_text # Body must contain only text or the URL if attachment_url: # Include <x><url> (XEP-0066) to ensure client compatibility oob_element = ET.Element("{jabber:x:oob}x") url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") url_element.text = attachment_url msg.xml.append(oob_element) self.log.debug("Sending XMPP message: %s", msg.xml) msg.send() return msg_id async def send_xmpp_reaction( self, recipient_jid, sender_jid, *, target_xmpp_id: str, emoji: str, remove: bool = False, ): msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") if not msg.get("id"): msg["id"] = uuid.uuid4().hex msg["body"] = "" reactions_node = ET.Element( "{urn:xmpp:reactions:0}reactions", {"id": str(target_xmpp_id or "").strip()}, ) if not remove and str(emoji or "").strip(): reaction_node = ET.SubElement( reactions_node, "{urn:xmpp:reactions:0}reaction", ) reaction_node.text = str(emoji) msg.xml.append(reactions_node) msg.send() return str(msg.get("id") or "").strip() async def apply_external_reaction( self, user, person_identifier, *, source_service, emoji, remove, upstream_message_id="", upstream_ts=0, actor="", payload=None, ): self.log.debug( "reaction-bridge external-in source=%s user=%s person=%s upstream_id=%s upstream_ts=%s emoji=%s remove=%s", source_service, user.id, person_identifier.person_id, str(upstream_message_id or "") or "-", int(upstream_ts or 0), str(emoji or "") or "-", bool(remove), ) bridge = transport.resolve_bridge_from_upstream( user_id=user.id, person_id=person_identifier.person_id, service=source_service, upstream_message_id=str(upstream_message_id or ""), upstream_ts=int(upstream_ts or 0), ) if not bridge: bridge = await history.resolve_bridge_ref( user=user, identifier=person_identifier, source_service=source_service, upstream_message_id=str(upstream_message_id or ""), upstream_author=str(actor or ""), upstream_ts=int(upstream_ts or 0), ) if not bridge: self.log.warning( "reaction-bridge external-resolve-miss source=%s user=%s person=%s upstream_id=%s upstream_ts=%s", source_service, user.id, person_identifier.person_id, str(upstream_message_id or "") or "-", int(upstream_ts or 0), ) return False target_xmpp_id = str((bridge or {}).get("xmpp_message_id") or "").strip() if not target_xmpp_id: self.log.warning( "reaction-bridge external-target-missing source=%s user=%s person=%s", source_service, user.id, person_identifier.person_id, ) return False sender_jid = ( f"{person_identifier.person.name.lower()}|" f"{person_identifier.service}@{settings.XMPP_JID}" ) recipient_jid = f"{user.username}@{settings.XMPP_ADDRESS}" await self.send_xmpp_reaction( recipient_jid, sender_jid, target_xmpp_id=target_xmpp_id, emoji=str(emoji or ""), remove=bool(remove), ) await history.apply_reaction( user=user, identifier=person_identifier, target_message_id=str((bridge or {}).get("local_message_id") or ""), target_ts=int((bridge or {}).get("upstream_ts") or 0), emoji=str(emoji or ""), source_service=source_service, actor=str(actor or person_identifier.identifier), remove=bool(remove), payload=dict(payload or {}), ) self.log.debug( "reaction-bridge external-apply-ok source=%s user=%s person=%s xmpp_id=%s local_message_id=%s", source_service, user.id, person_identifier.person_id, target_xmpp_id, str((bridge or {}).get("local_message_id") or "") or "-", ) return True async def send_chat_state(self, recipient_jid, sender_jid, started): """Send XMPP chat-state update to the client.""" msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") state_tag = "composing" if started else "paused" msg.xml.append( ET.Element(f"{{http://jabber.org/protocol/chatstates}}{state_tag}") ) self.log.debug( "Sending XMPP chat-state %s: %s -> %s", state_tag, sender_jid, recipient_jid, ) msg.send() async def send_typing_for_person(self, user, person_identifier, started): sender_jid = ( f"{person_identifier.person.name.lower()}|" f"{person_identifier.service}@{settings.XMPP_JID}" ) recipient_jid = f"{user.username}@{settings.XMPP_ADDRESS}" await self.send_chat_state(recipient_jid, sender_jid, started) async def send_from_external( self, user, person_identifier, text, is_outgoing_message, attachments=[], source_ref=None, ): """Handles sending XMPP messages with text and attachments.""" sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" if is_outgoing_message: xmpp_id = await self.send_xmpp_message( recipient_jid, sender_jid, f"YOU: {text}", ) transport.record_bridge_mapping( user_id=user.id, person_id=person_identifier.person_id, service=person_identifier.service, xmpp_message_id=xmpp_id, xmpp_ts=int(time.time() * 1000), upstream_message_id=str( (source_ref or {}).get("upstream_message_id") or "" ), upstream_author=str((source_ref or {}).get("upstream_author") or ""), upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), text_preview=str(text or ""), local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), ) await history.save_bridge_ref( user=user, identifier=person_identifier, source_service=person_identifier.service, local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), local_ts=int( (source_ref or {}).get("xmpp_source_ts") or int(time.time() * 1000) ), xmpp_message_id=xmpp_id, upstream_message_id=str( (source_ref or {}).get("upstream_message_id") or "" ), upstream_author=str((source_ref or {}).get("upstream_author") or ""), upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), ) # Step 1: Send text message separately elif text: xmpp_id = await self.send_xmpp_message(recipient_jid, sender_jid, text) transport.record_bridge_mapping( user_id=user.id, person_id=person_identifier.person_id, service=person_identifier.service, xmpp_message_id=xmpp_id, xmpp_ts=int(time.time() * 1000), upstream_message_id=str( (source_ref or {}).get("upstream_message_id") or "" ), upstream_author=str((source_ref or {}).get("upstream_author") or ""), upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), text_preview=str(text or ""), local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), ) await history.save_bridge_ref( user=user, identifier=person_identifier, source_service=person_identifier.service, local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), local_ts=int( (source_ref or {}).get("xmpp_source_ts") or int(time.time() * 1000) ), xmpp_message_id=xmpp_id, upstream_message_id=str( (source_ref or {}).get("upstream_message_id") or "" ), upstream_author=str((source_ref or {}).get("upstream_author") or ""), upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), ) if not attachments: return [] # No attachments to process # Step 2: Request upload slots concurrently valid_uploads = await self.request_upload_slots(recipient_jid, attachments) self.log.debug("Got upload slots") if not valid_uploads: self.log.debug("No valid upload slots obtained; attachment relay skipped") return [] # Step 3: Upload each file and send its message immediately after upload upload_tasks = [ self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads ] uploaded_rows = await asyncio.gather(*upload_tasks) # Upload files concurrently normalized_rows = [dict(row or {}) for row in uploaded_rows if row] for row in normalized_rows: transport.record_bridge_mapping( user_id=user.id, person_id=person_identifier.person_id, service=person_identifier.service, xmpp_message_id=str(row.get("xmpp_message_id") or "").strip(), xmpp_ts=int(time.time() * 1000), upstream_message_id=str( (source_ref or {}).get("upstream_message_id") or "" ), upstream_author=str((source_ref or {}).get("upstream_author") or ""), upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), text_preview=str(row.get("url") or text or ""), local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), ) await history.save_bridge_ref( user=user, identifier=person_identifier, source_service=person_identifier.service, local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), local_ts=int( (source_ref or {}).get("xmpp_source_ts") or int(time.time() * 1000) ), xmpp_message_id=str(row.get("xmpp_message_id") or "").strip(), upstream_message_id=str( (source_ref or {}).get("upstream_message_id") or "" ), upstream_author=str((source_ref or {}).get("upstream_author") or ""), upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), ) return [ str(row.get("url") or "").strip() for row in normalized_rows if str(row.get("url") or "").strip() ] class XMPPClient(ClientBase): def __init__(self, ur, *args, **kwargs): super().__init__(ur, *args, **kwargs) self.client = XMPPComponent( ur, jid=settings.XMPP_JID, secret=settings.XMPP_SECRET, server=settings.XMPP_ADDRESS, port=settings.XMPP_PORT, ) self.client.register_plugin("xep_0030") # Service Discovery self.client.register_plugin("xep_0004") # Data Forms self.client.register_plugin("xep_0060") # PubSub self.client.register_plugin("xep_0199") # XMPP Ping self.client.register_plugin("xep_0085") # Chat State Notifications self.client.register_plugin("xep_0363") # HTTP File Upload def start(self): self.log.info("XMPP client starting...") # ensure slixmpp uses the same asyncio loop as the router self.client.loop = self.loop self.client.connect() # self.client.process() async def start_typing_for_person(self, user, person_identifier): await self.client.send_typing_for_person(user, person_identifier, True) async def stop_typing_for_person(self, user, person_identifier): await self.client.send_typing_for_person(user, person_identifier, False)