From b2b44c31cc4cc7d828941bccfa7dc5cc5183535b Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sun, 23 Feb 2025 18:34:03 +0000 Subject: [PATCH] Implement bridging Signal and XMPP --- core/clients/signalapi.py | 116 ++++++++++++-- core/lib/deferred.py | 32 +++- core/management/commands/component.py | 203 +++++++++++++++++++++---- core/management/commands/processing.py | 15 ++ core/models.py | 5 +- 5 files changed, 325 insertions(+), 46 deletions(-) diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py index 6fc06ec..8a22b69 100644 --- a/core/clients/signalapi.py +++ b/core/clients/signalapi.py @@ -6,6 +6,7 @@ from requests.exceptions import RequestException import orjson from django.conf import settings import aiohttp +import base64 async def start_typing(uuid): @@ -46,14 +47,88 @@ async def send_message_raw(recipient_uuid, text): return ts else: return False - -def send_message_raw_sync(recipient_uuid, text): + +async def fetch_signal_attachment(attachment_id): """ - Sends a message using the Signal REST API in a synchronous manner. - + Asynchronously fetches an attachment from Signal. + + Args: + attachment_id (str): The Signal attachment ID. + + Returns: + dict | None: + { + "content": , + "content_type": , + "filename": + } + or None if the request fails. + """ + url = f"http://signal:8080/v1/attachments/{attachment_id}" + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=10) as response: + if response.status != 200: + return None # Failed request + + content_type = response.headers.get("Content-Type", "application/octet-stream") + content = await response.read() + size = int(response.headers.get("Content-Length", len(content))) + + filename = attachment_id # Default fallback filename + content_disposition = response.headers.get("Content-Disposition") + if content_disposition: + parts = content_disposition.split(";") + for part in parts: + if "filename=" in part: + filename = part.split("=", 1)[1].strip().strip('"') + + return { + "content": content, + "content_type": content_type, + "filename": filename, + "size": size, + } + except aiohttp.ClientError: + return None # Network error + + + +def download_and_encode_base64(file_url, filename, content_type): + """ + Downloads a file from a given URL, converts it to Base64, and returns it in Signal's expected format. + + Args: + file_url (str): The URL of the file to download. + filename (str): The name of the file. + content_type (str): The MIME type of the file. + + Returns: + str: The Base64 encoded attachment string in Signal's expected format. + """ + try: + response = requests.get(file_url, timeout=10) + response.raise_for_status() + + file_data = response.content + base64_encoded = base64.b64encode(file_data).decode("utf-8") + + # Format according to Signal's expected structure + return f"data:{content_type};filename={filename};base64,{base64_encoded}" + except requests.RequestException as e: + #log.error(f"Failed to download file: {file_url}, error: {e}") + return None + + +def send_message_raw_sync(recipient_uuid, text=None, attachments=[]): + """ + Sends a message using the Signal REST API, ensuring attachment links are not included in the text body. + Args: recipient_uuid (str): The UUID of the recipient. - text (str): The message to send. + text (str, optional): The message to send. + attachments (list, optional): A list of attachment dictionaries with URL, filename, and content_type. Returns: int | bool: Timestamp if successful, False otherwise. @@ -61,20 +136,35 @@ def send_message_raw_sync(recipient_uuid, text): url = "http://signal:8080/v2/send" data = { "recipients": [recipient_uuid], - "message": text, "number": settings.SIGNAL_NUMBER, + "base64_attachments": [] } - try: - response = requests.post(url, json=data, timeout=10) # 10s timeout for safety - response.raise_for_status() # Raise an error for non-200 responses - except RequestException as e: - return False # Network or request error + # Convert attachments to Base64 + for att in attachments: + base64_data = download_and_encode_base64(att["url"], att["filename"], att["content_type"]) + if base64_data: + data["base64_attachments"].append(base64_data) - if response.status_code == status.HTTP_201_CREATED: # Signal server returns 201 on success + # Remove the message body if it only contains an attachment link + if text and (text.strip() in [att["url"] for att in attachments]): + #log.info("Removing message body since it only contains an attachment link.") + text = None # Don't send the link as text + + if text: + data["message"] = text + + try: + response = requests.post(url, json=data, timeout=10) + response.raise_for_status() + except requests.RequestException as e: + #log.error(f"Failed to send Signal message: {e}") + return False + + if response.status_code == 201: # Signal server returns 201 on success try: ts = orjson.loads(response.text).get("timestamp") return ts if ts else False except orjson.JSONDecodeError: return False - return False # If response status is not 201 \ No newline at end of file + return False # If response status is not 201 diff --git a/core/lib/deferred.py b/core/lib/deferred.py index 19c114e..73d2787 100644 --- a/core/lib/deferred.py +++ b/core/lib/deferred.py @@ -9,6 +9,9 @@ from core.clients import signal from core.lib.prompts.functions import delete_messages from asgiref.sync import sync_to_async from django.conf import settings +from core.clients import signalapi +import asyncio + log = logs.get_logger("deferred") @@ -27,6 +30,7 @@ class DeferredRequest(BaseModel): msg: Optional[str] = None service: Optional[str] = None detail: Optional[DeferredDetail] = None + attachments: Optional[list] = None async def process_deferred(data: dict, **kwargs): @@ -60,22 +64,40 @@ async def process_deferred(data: dict, **kwargs): else: log.warning(f"Protocol not supported: {message.session.identifier.service}") return - elif method == "xmpp": + elif method == "xmpp": # send xmpp message xmpp = kwargs.get("xmpp") service = validated_data.service msg = validated_data.msg + attachments = validated_data.attachments # Get User from identifier identifiers = PersonIdentifier.objects.filter( identifier=validated_data.identifier, service=service, ) + xmpp_attachments = [] + # Asynchronously fetch all attachments + tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachments] + fetched_attachments = await asyncio.gather(*tasks) + + for fetched, att in zip(fetched_attachments, attachments): + if not fetched: + log.warning(f"Failed to fetch attachment {att['id']} from Signal.") + continue + + # Attach fetched file to XMPP + xmpp_attachments.append({ + "content": fetched["content"], + "content_type": fetched["content_type"], + "filename": fetched["filename"], + "size": fetched["size"], + }) for identifier in identifiers: - # Fair is fair, we can have multiple - log.info(f"Sending {msg} from {identifier}") - xmpp.send_from_external(identifier, msg, validated_data.detail) - + #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" + + log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") + await xmpp.send_from_external(identifier, msg, validated_data.detail, attachments=xmpp_attachments) else: log.warning(f"Method not yet supported: {method}") return \ No newline at end of file diff --git a/core/management/commands/component.py b/core/management/commands/component.py index 144066a..4385ee8 100644 --- a/core/management/commands/component.py +++ b/core/management/commands/component.py @@ -10,11 +10,19 @@ from core.lib import deferred from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone from slixmpp.stanza import Message +from slixmpp.xmlstream.stanzabase import ElementBase, ET +import aiohttp log = logs.get_logger("component") redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) +class Attachment(ElementBase): + name = "attachment" + namespace = "urn:xmpp:attachments" + plugin_attrib = "attachment" + interfaces = {"url", "filename", "content_type"} + class EchoComponent(ComponentXMPP): """ @@ -104,6 +112,17 @@ class EchoComponent(ComponentXMPP): # If any lookup fails, reject the subscription return None + def update_roster(self, jid, name=None): + """ + Adds or updates a user in the roster. + """ + iq = self.Iq() + iq['type'] = 'set' + iq['roster']['items'] = {jid: {'name': name or jid}} + + iq.send() + log.info(f"Updated roster: Added {jid} ({name})") + def on_chatstate_active(self, msg): """ Handle when a user is actively engaged in the chat. @@ -217,9 +236,24 @@ class EchoComponent(ComponentXMPP): log.info(f"Identifier {service}") PersonIdentifier.objects.get(user=user, person=person, service=service) - # If all checks pass, accept the subscription - self.send_presence(ptype="subscribed", pto=sender_jid) - log.info(f"Subscription request from {sender_jid} accepted for {recipient_jid}.") + component_jid = f"{person_name.lower()}|{service}@{self.boundjid.bare}" + + # Accept the subscription + self.send_presence(ptype="subscribed", pto=sender_jid, pfrom=component_jid) + log.info(f"Accepted subscription from {sender_jid}, sent from {component_jid}") + + # Send a presence request **from the recipient to the sender** (ASKS THEM TO ACCEPT BACK) + # self.send_presence(ptype="subscribe", pto=sender_jid, pfrom=component_jid) + # log.info(f"Sent presence subscription request from {component_jid} to {sender_jid}") + + # Add sender to roster + # self.update_roster(sender_jid, name=sender_jid.split("@")[0]) + # log.info(f"Added {sender_jid} to roster.") + + # Send presence update to sender **from the correct JID** + self.send_presence(ptype="available", pto=sender_jid, pfrom=component_jid) + log.info(f"Sent presence update from {component_jid} to {sender_jid}") + except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): # If any lookup fails, reject the subscription @@ -264,6 +298,69 @@ class EchoComponent(ComponentXMPP): def session_start(self, *args): log.info(f"START {args}") + async def request_upload_slot(self, recipient, filename, content_type, size): + """ + Requests an upload slot from XMPP for HTTP File Upload (XEP-0363). + + Args: + recipient (str): The JID of the recipient. + filename (str): The filename for the upload. + content_type (str): The file's MIME type. + size (int): The file size in bytes. + + Returns: + tuple | None: (upload_url, put_url, auth_header) or None if failed. + """ + # upload_service = await self['xep_0363'].find_upload_service() + # if not upload_service: + # log.error("No XEP-0363 upload service found.") + # return None + + #log.info(f"Upload service: {upload_service}") + + upload_service_jid = "share.zm.is" + + try: + slot = await self['xep_0363'].request_slot( + jid=upload_service_jid, + filename=filename, + content_type=content_type, + size=size + ) + + if slot is None: + log.error(f"Failed to obtain upload slot for {filename}") + return None + + log.info(f"Slot: {slot}") + + # Parse the XML response + root = ET.fromstring(str(slot)) # Convert to string if necessary + namespace = "{urn:xmpp:http:upload:0}" # Define the namespace + + get_url = root.find(f".//{namespace}get").attrib.get("url") + put_element = root.find(f".//{namespace}put") + put_url = put_element.attrib.get("url") + + log.info(f"Put element: {ET.tostring(put_element, encoding='unicode')}") # Debugging + + # Extract the Authorization header correctly + header_element = put_element.find(f"./{namespace}header[@name='Authorization']") + auth_header = header_element.text.strip() if header_element is not None else None + + if not get_url or not put_url: + log.error(f"Missing URLs in upload slot: {slot}") + return None + + log.info(f"Got upload slot: {get_url}, Authorization: {auth_header}") + + return get_url, put_url, auth_header + + except Exception as e: + log.error(f"Exception while requesting upload slot: {e}") + return None + + def message(self, msg): """ Process incoming XMPP messages. @@ -294,6 +391,26 @@ class EchoComponent(ComponentXMPP): # Extract message body body = msg["body"] if msg["body"] else "[No Body]" + attachments = [] + log.info(f"Full XMPP Message: {ET.tostring(msg.xml, encoding='unicode')}") + + # Extract attachments from standard XMPP (if present) + for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"): + attachments.append({ + "url": att.attrib.get("url"), + "filename": att.attrib.get("filename"), + "content_type": att.attrib.get("content_type"), + }) + + # Extract attachments from XEP-0066 format (Out of Band Data) + for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): + attachments.append({ + "url": oob.text, + "filename": oob.text.split("/")[-1], # Extract filename from URL + "content_type": "application/octet-stream", # Generic content-type + }) + + log.info(f"Extracted {len(attachments)} attachments from XMPP message.") # Log extracted information with variable name annotations log_message = ( f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " @@ -370,36 +487,69 @@ class EchoComponent(ComponentXMPP): # sym(str(person.__dict__)) # sym(f"Service: {recipient_service}") - identifier.send(body) - - def send_from_external(self, person_identifier, text, detail): - """ - This method will send an incoming external message to the correct XMPP user. - """ + identifier.send(body, attachments=attachments) + async def send_from_external(self, person_identifier, text, detail, attachments=[]): sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" - if detail.is_outgoing_message: - carbon_msg = f""" - - - - - {text} - - - - - """ - log.info(f"Sending Carbon: {carbon_msg}") - self.send_raw(carbon_msg) - else: + # First, send text separately if there's any + if text: + text_msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + text_msg["body"] = text + log.info(f"Sending separate text message: {text}") + if detail.is_outgoing_message: + log.info("OUT") + ... + else: + log.info(f"Final XMPP message: {text_msg.xml}") + log.info(f"Sending message") + text_msg.send() + for att in attachments: + # Request an upload slot + upload_slot = await self.request_upload_slot( + recipient_jid, att["filename"], att["content_type"], att["size"] + ) + if not upload_slot: + log.warning(f"Failed to obtain upload slot for {att['filename']}") + continue - log.info(f"Forwarding message from external service: {sender_jid} -> {recipient_jid}: {text}") - self.send_message(mto=recipient_jid, mfrom=sender_jid, mbody=text, mtype="chat") + upload_url, put_url, auth_header = upload_slot + # Upload file + headers = {"Content-Type": att["content_type"]} + if auth_header: + headers["Authorization"] = auth_header + + try: + async with aiohttp.ClientSession() as session: + async with session.put(put_url, data=att["content"], headers=headers) as response: + if response.status not in (200, 201): + log.error(f"Upload failed: {response.status} {await response.text()}") + continue + + # Create and send message with only the file URL + msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + msg["body"] = upload_url # Body must be only the URL + + # Include (XEP-0066) to ensure client compatibility + oob_element = ET.Element("{jabber:x:oob}x") + url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") + url_element.text = upload_url + msg.xml.append(oob_element) + + log.info(f"Sending file attachment message with URL: {upload_url}") + if detail.is_outgoing_message: + log.info("OUT") + ... + else: + log.info(f"Final XMPP message: {msg.xml}") + log.info(f"Sending message") + msg.send() + + except Exception as e: + log.error(f"Error uploading {att['filename']} to XMPP: {e}") async def stream(**kwargs): pubsub = redis.pubsub() @@ -437,6 +587,7 @@ class Command(BaseCommand): xmpp.register_plugin('xep_0060') # PubSub xmpp.register_plugin('xep_0199') # XMPP Ping xmpp.register_plugin("xep_0085") # Chat State Notifications + xmpp.register_plugin('xep_0363') # HTTP File Upload loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index f79122a..85b1d93 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -69,6 +69,20 @@ class HandleMessage(Command): # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid + + + # Handle attachments + attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) + attachment_list = [] + for attachment in attachments: + attachment_list.append({ + "id": attachment["id"], + "content_type": attachment["contentType"], + "filename": attachment["filename"], + "size": attachment["size"], + "width": attachment.get("width"), + "height": attachment.get("height"), + }) cast = { "type": "def", @@ -77,6 +91,7 @@ class HandleMessage(Command): # "sender": source_uuid, "identifier": identifier_uuid, "msg": text, + "attachments": attachment_list, "detail": { "reply_to_self": reply_to_self, "reply_to_others": reply_to_others, diff --git a/core/models.py b/core/models.py index b101126..b29c21b 100644 --- a/core/models.py +++ b/core/models.py @@ -119,14 +119,15 @@ class PersonIdentifier(models.Model): def __str__(self): return f"{self.person} ({self.service})" - def send(self, text): + def send(self, text, attachments=[]): """ Send this contact a text. """ if self.service == "signal": ts = signalapi.send_message_raw_sync( self.identifier, - text + text, + attachments, ) return ts else: