from core.clients import ClientBase from django.conf import settings from slixmpp.componentxmpp import ComponentXMPP from django.conf import settings from core.models import User, Person, PersonIdentifier, ChatSession from asgiref.sync import sync_to_async from django.utils.timezone import now import asyncio from core.clients import signalapi from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone from slixmpp.stanza import Message from slixmpp.xmlstream.stanzabase import ET import aiohttp from core.messaging import history from core.util import logs class XMPPComponent(ComponentXMPP): """ A simple Slixmpp component that echoes messages. """ def __init__(self, ur, jid, secret, server, port): self.ur = ur self.log = logs.get_logger("XMPP") super().__init__(jid, secret, server, port) # Register chat state plugins register_stanza_plugin(Message, Active) register_stanza_plugin(Message, Composing) register_stanza_plugin(Message, Paused) register_stanza_plugin(Message, Inactive) register_stanza_plugin(Message, Gone) self.add_event_handler("session_start", self.session_start) self.add_event_handler("disconnected", self.on_disconnected) self.add_event_handler("message", self.message) # Presence event handlers self.add_event_handler("presence_available", self.on_presence_available) self.add_event_handler("presence_dnd", self.on_presence_dnd) self.add_event_handler("presence_xa", self.on_presence_xa) self.add_event_handler("presence_chat", self.on_presence_chat) self.add_event_handler("presence_away", self.on_presence_away) self.add_event_handler("presence_unavailable", self.on_presence_unavailable) self.add_event_handler("presence_subscribe", self.on_presence_subscribe) self.add_event_handler("presence_subscribed", self.on_presence_subscribed) self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) self.add_event_handler("roster_subscription_request", self.on_roster_subscription_request) # Chat state handlers self.add_event_handler("chatstate_active", self.on_chatstate_active) self.add_event_handler("chatstate_composing", self.on_chatstate_composing) self.add_event_handler("chatstate_paused", self.on_chatstate_paused) self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) self.add_event_handler("chatstate_gone", self.on_chatstate_gone) async def enable_carbons(self): """Enable XMPP Message Carbons (XEP-0280)""" try: iq = self.make_iq_set() iq["enable"] = ET.Element("{urn:xmpp:carbons:2}enable") await iq.send() self.log.info("Message Carbons enabled successfully") except Exception as e: self.log.error(f"Failed to enable Carbons: {e}") def get_identifier(self, msg): # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) # Split into username@domain and optional resource sender_parts = sender_jid.split("/", 1) sender_bare_jid = sender_parts[0] # Always present: user@domain sender_username, sender_domain = sender_bare_jid.split("@", 1) sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present # Extract recipient JID (should match component JID format) recipient_jid = str(msg["to"]) if "@" in recipient_jid: recipient_username, recipient_domain = recipient_jid.split("@", 1) else: recipient_username = recipient_jid recipient_domain = recipient_jid # Extract message body body = msg["body"] if msg["body"] else "[No Body]" # Parse recipient_name and recipient_service (e.g., "mark|signal") if "|" in recipient_username: person_name, service = recipient_username.split("|") person_name = person_name.title() # Capitalize for consistency else: person_name = recipient_username.title() service = None try: # Lookup user in Django self.log.info(f"User {sender_username}") user = User.objects.get(username=sender_username) # Find Person object with name=person_name.lower() self.log.info(f"Name {person_name.title()}") person = Person.objects.get(user=user, name=person_name.title()) # Ensure a PersonIdentifier exists for this user, person, and service self.log.info(f"Identifier {service}") identifier = PersonIdentifier.objects.get(user=user, person=person, service=service) return identifier except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): # If any lookup fails, reject the subscription return None def update_roster(self, jid, name=None): """ Adds or updates a user in the roster. """ iq = self.Iq() iq['type'] = 'set' iq['roster']['items'] = {jid: {'name': name or jid}} iq.send() self.log.info(f"Updated roster: Added {jid} ({name})") def on_chatstate_active(self, msg): """ Handle when a user is actively engaged in the chat. """ self.log.info(f"Chat state: Active from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_composing(self, msg): """ Handle when a user is typing a message. """ self.log.info(f"Chat state: Composing from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_paused(self, msg): """ Handle when a user has paused typing. """ self.log.info(f"Chat state: Paused from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_inactive(self, msg): """ Handle when a user is inactive in the chat. """ self.log.info(f"Chat state: Inactive from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_gone(self, msg): """ Handle when a user has left the chat. """ self.log.info(f"Chat state: Gone from {msg['from']}.") identifier = self.get_identifier(msg) def on_presence_available(self, pres): """ Handle when a user becomes available. """ self.log.info(f"Presence available from {pres['from']}") def on_presence_dnd(self, pres): """ Handle when a user sets 'Do Not Disturb' status. """ self.log.info(f"User {pres['from']} is now in 'Do Not Disturb' mode.") def on_presence_xa(self, pres): """ Handle when a user sets 'Extended Away' status. """ self.log.info(f"User {pres['from']} is now 'Extended Away'.") def on_presence_chat(self, pres): """ Handle when a user is actively available for chat. """ self.log.info(f"User {pres['from']} is now available for chat.") def on_presence_away(self, pres): """ Handle when a user sets 'Away' status. """ self.log.info(f"User {pres['from']} is now 'Away'.") def on_presence_unavailable(self, pres): """ Handle when a user goes offline or unavailable. """ self.log.info(f"User {pres['from']} is now unavailable.") def on_presence_subscribe(self, pres): """ Handle incoming presence subscription requests. Accept only if the recipient has a contact matching the sender. """ sender_jid = str(pres['from']).split('/')[0] # Bare JID (user@domain) recipient_jid = str(pres['to']).split('/')[0] self.log.info(f"Received subscription request from {sender_jid} to {recipient_jid}") try: # Extract sender and recipient usernames user_username, _ = sender_jid.split("@", 1) recipient_username, _ = recipient_jid.split("@", 1) # Parse recipient_name and recipient_service (e.g., "mark|signal") if "|" in recipient_username: person_name, service = recipient_username.split("|") person_name = person_name.title() # Capitalize for consistency else: person_name = recipient_username.title() service = None # Lookup user in Django self.log.info(f"User {user_username}") user = User.objects.get(username=user_username) # Find Person object with name=person_name.lower() self.log.info(f"Name {person_name.title()}") person = Person.objects.get(user=user, name=person_name.title()) # Ensure a PersonIdentifier exists for this user, person, and service self.log.info(f"Identifier {service}") PersonIdentifier.objects.get(user=user, person=person, service=service) component_jid = f"{person_name.lower()}|{service}@{self.boundjid.bare}" # Accept the subscription self.send_presence(ptype="subscribed", pto=sender_jid, pfrom=component_jid) self.log.info(f"Accepted subscription from {sender_jid}, sent from {component_jid}") # Send a presence request **from the recipient to the sender** (ASKS THEM TO ACCEPT BACK) # self.send_presence(ptype="subscribe", pto=sender_jid, pfrom=component_jid) # self.log.info(f"Sent presence subscription request from {component_jid} to {sender_jid}") # Add sender to roster # self.update_roster(sender_jid, name=sender_jid.split("@")[0]) # self.log.info(f"Added {sender_jid} to roster.") # Send presence update to sender **from the correct JID** self.send_presence(ptype="available", pto=sender_jid, pfrom=component_jid) self.log.info(f"Sent presence update from {component_jid} to {sender_jid}") except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): # If any lookup fails, reject the subscription self.log.warning(f"Subscription request from {sender_jid} rejected (recipient does not have this contact).") self.send_presence(ptype="unsubscribed", pto=sender_jid) except ValueError: return def on_presence_subscribed(self, pres): """ Handle successful subscription confirmations. """ self.log.info(f"Subscription to {pres['from']} was accepted.") def on_presence_unsubscribe(self, pres): """ Handle when a user unsubscribes from presence updates. """ self.log.info(f"User {pres['from']} has unsubscribed from presence updates.") def on_presence_unsubscribed(self, pres): """ Handle when a user's unsubscription request is confirmed. """ self.log.info(f"Unsubscription from {pres['from']} confirmed.") def on_roster_subscription_request(self, pres): """ Handle roster subscription requests. """ self.log.info(f"New roster subscription request from {pres['from']}.") async def session_start(self, *args): self.log.info("XMPP session started") await self.enable_carbons() def on_disconnected(self, *args): """ Handles XMPP disconnection and triggers a reconnect loop. """ self.log.warning("XMPP disconnected, attempting to reconnect...") self.connect() def session_start(self, *args): self.log.info(f"Session started: {args}") async def request_upload_slot(self, recipient, filename, content_type, size): """ Requests an upload slot from XMPP for HTTP File Upload (XEP-0363). Args: recipient (str): The JID of the recipient. filename (str): The filename for the upload. content_type (str): The file's MIME type. size (int): The file size in bytes. Returns: tuple | None: (upload_url, put_url, auth_header) or None if failed. """ # upload_service = await self['xep_0363'].find_upload_service() # if not upload_service: # self.log.error("No XEP-0363 upload service found.") # return None #self.log.info(f"Upload service: {upload_service}") upload_service_jid = "share.zm.is" try: slot = await self['xep_0363'].request_slot( jid=upload_service_jid, filename=filename, content_type=content_type, size=size ) if slot is None: self.log.error(f"Failed to obtain upload slot for {filename}") return None # Parse the XML response root = ET.fromstring(str(slot)) # Convert to string if necessary namespace = "{urn:xmpp:http:upload:0}" # Define the namespace get_url = root.find(f".//{namespace}get").attrib.get("url") put_element = root.find(f".//{namespace}put") put_url = put_element.attrib.get("url") # Extract the Authorization header correctly header_element = put_element.find(f"./{namespace}header[@name='Authorization']") auth_header = header_element.text.strip() if header_element is not None else None if not get_url or not put_url: self.log.error(f"Missing URLs in upload slot: {slot}") return None return get_url, put_url, auth_header except Exception as e: self.log.error(f"Exception while requesting upload slot: {e}") return None async def message(self, msg): """ Process incoming XMPP messages. """ sym = lambda x: msg.reply(f"[>] {x}").send() # self.log.info(f"Received message: {msg}") # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) # Split into username@domain and optional resource sender_parts = sender_jid.split("/", 1) sender_bare_jid = sender_parts[0] # Always present: user@domain sender_username, sender_domain = sender_bare_jid.split("@", 1) sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present # Extract recipient JID (should match component JID format) recipient_jid = str(msg["to"]) if "@" in recipient_jid: recipient_username, recipient_domain = recipient_jid.split("@", 1) else: recipient_username = recipient_jid recipient_domain = recipient_jid # Extract message body body = msg["body"] if msg["body"] else "[No Body]" attachments = [] self.log.info(f"Full XMPP Message: {ET.tostring(msg.xml, encoding='unicode')}") # Extract attachments from standard XMPP (if present) for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"): attachments.append({ "url": att.attrib.get("url"), "filename": att.attrib.get("filename"), "content_type": att.attrib.get("content_type"), }) # Extract attachments from XEP-0066 format (Out of Band Data) for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): attachments.append({ "url": oob.text, "filename": oob.text.split("/")[-1], # Extract filename from URL "content_type": "application/octet-stream", # Generic content-type }) self.log.info(f"Extracted {len(attachments)} attachments from XMPP message.") # Log extracted information with variable name annotations log_message = ( f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " f"Body: {body}" ) self.log.info(log_message) # Ensure recipient domain matches our configured component expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config if recipient_domain != expected_domain: self.log.warning(f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}") return # Lookup sender in Django's User model try: sender_user = User.objects.get(username=sender_username) except User.DoesNotExist: self.log.warning(f"Unknown sender: {sender_username}") return if recipient_jid == settings.XMPP_JID: self.log.info("Message to JID") if body.startswith("."): # Messaging the gateway directly if body == ".contacts": # Lookup Person objects linked to sender persons = Person.objects.filter(user=sender_user) if not persons.exists(): self.log.info(f"No contacts found for {sender_username}") sym("No contacts found.") return # Construct contact list response contact_names = [person.name for person in persons] response_text = f"Contacts: " + ", ".join(contact_names) sym(response_text) elif body == ".whoami": sym(str(sender_user.__dict__)) else: sym("No such command") else: self.log.info("Other message") if "|" in recipient_username: recipient_name, recipient_service = recipient_username.split("|") recipient_name = recipient_name.title() else: recipient_name = recipient_username recipient_service = None recipient_name = recipient_name.title() try: person = Person.objects.get(user=sender_user, name=recipient_name) except Person.DoesNotExist: sym("This person does not exist.") if recipient_service: try: identifier = PersonIdentifier.objects.get(user=sender_user, person=person, service=recipient_service) except PersonIdentifier.DoesNotExist: sym("This service identifier does not exist.") else: # Get a random identifier identifier = PersonIdentifier.objects.filter(user=sender_user, person=person).first() recipient_service = identifier.service # sym(str(person.__dict__)) # sym(f"Service: {recipient_service}") #tss = await identifier.send(body, attachments=attachments) # AM FIXING https://git.zm.is/XF/GIA/issues/5 session, _ = await sync_to_async(ChatSession.objects.get_or_create)( identifier=identifier, user=identifier.user, ) self.log.info(f"Component history store message {body}") await history.store_message( session=session, sender="XMPP", text=body, ts=now().timestamp(), #outgoing=detail.is_outgoing_message, ????????? TODO: ) self.log.info("Stored a message sent from XMPP in the history.") tss = await signalapi.send_message_raw( identifier.identifier, body, attachments, ) self.log.info(f"Message sent") async def request_upload_slots(self, recipient_jid, attachments): """Requests upload slots for multiple attachments concurrently.""" upload_tasks = [ self.request_upload_slot(recipient_jid, att["filename"], att["content_type"], att["size"]) for att in attachments ] upload_slots = await asyncio.gather(*upload_tasks) return [(att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None] async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid): """Uploads a file and immediately sends the corresponding XMPP message.""" upload_url, put_url, auth_header = upload_slot headers = {"Content-Type": att["content_type"]} if auth_header: headers["Authorization"] = auth_header async with aiohttp.ClientSession() as session: try: async with session.put(put_url, data=att["content"], headers=headers) as response: if response.status not in (200, 201): self.log.error(f"Upload failed: {response.status} {await response.text()}") return self.log.info(f"Successfully uploaded {att['filename']} to {upload_url}") # Send XMPP message immediately after successful upload await self.send_xmpp_message(recipient_jid, sender_jid, upload_url, attachment_url=upload_url) except Exception as e: self.log.error(f"Error uploading {att['filename']} to XMPP: {e}") async def send_xmpp_message(self, recipient_jid, sender_jid, body_text, attachment_url=None): """Sends an XMPP message with either text or an attachment URL.""" msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") msg["body"] = body_text # Body must contain only text or the URL if attachment_url: # Include (XEP-0066) to ensure client compatibility oob_element = ET.Element("{jabber:x:oob}x") url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") url_element.text = attachment_url msg.xml.append(oob_element) self.log.info(f"Sending XMPP message: {msg.xml}") msg.send() async def send_from_external(self, user, person_identifier, text, is_outgoing_message, attachments=[]): """Handles sending XMPP messages with text and attachments.""" sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" if is_outgoing_message: self.log.info(f"Forwarding outgoing message as {recipient_jid}") # Create the message as if it were sent by the user msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") msg["body"] = text # Original message content # Create the forwarded inside the element forwarded_elem = ET.Element("{urn:xmpp:forward:0}forwarded") # Create a element inside the forwarded stanza message_elem = ET.Element("message", attrib={"from": recipient_jid, "to": recipient_jid, "type": "chat"}) body_elem = ET.SubElement(message_elem, "body") body_elem.text = text # Attach the forwarded message forwarded_elem.append(message_elem) msg.xml.append(forwarded_elem) # Send the forwarded message msg.send() # Step 1: Send text message separately if text: await self.send_xmpp_message(recipient_jid, sender_jid, text) if not attachments: return # No attachments to process # Step 2: Request upload slots concurrently valid_uploads = await self.request_upload_slots(recipient_jid, attachments) self.log.info(f"Got upload slots") if not valid_uploads: self.log.warning("No valid upload slots obtained.") #return # Step 3: Upload each file and send its message immediately after upload upload_tasks = [ self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads ] await asyncio.gather(*upload_tasks) # Upload files concurrently class XMPPClient(ClientBase): def __init__(self, ur, *args, **kwargs): super().__init__(ur, *args, **kwargs) self.client = XMPPComponent( ur, jid=settings.XMPP_JID, secret=settings.XMPP_SECRET, server=settings.XMPP_ADDRESS, port=settings.XMPP_PORT, ) self.client.register_plugin('xep_0030') # Service Discovery self.client.register_plugin('xep_0004') # Data Forms self.client.register_plugin('xep_0060') # PubSub self.client.register_plugin('xep_0199') # XMPP Ping self.client.register_plugin("xep_0085") # Chat State Notifications self.client.register_plugin('xep_0363') # HTTP File Upload def start(self): self.log.info("XMPP client starting...") self.client.connect() #self.client.process()