Files
GIA/core/clients/xmpp.py

651 lines
26 KiB
Python

from core.clients import ClientBase
from django.conf import settings
from slixmpp.componentxmpp import ComponentXMPP
from django.conf import settings
from core.models import User, Person, PersonIdentifier, ChatSession, Manipulation
from asgiref.sync import sync_to_async
from django.utils.timezone import now
import asyncio
from core.clients import signalapi
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone
from slixmpp.stanza import Message
from slixmpp.xmlstream.stanzabase import ET
import aiohttp
from core.messaging import history
from core.util import logs
from core.messaging import replies, utils, ai
class XMPPComponent(ComponentXMPP):
"""
A simple Slixmpp component that echoes messages.
"""
def __init__(self, ur, jid, secret, server, port):
self.ur = ur
self.log = logs.get_logger("XMPP")
super().__init__(jid, secret, server, port)
# Register chat state plugins
register_stanza_plugin(Message, Active)
register_stanza_plugin(Message, Composing)
register_stanza_plugin(Message, Paused)
register_stanza_plugin(Message, Inactive)
register_stanza_plugin(Message, Gone)
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("disconnected", self.on_disconnected)
self.add_event_handler("message", self.message)
# Presence event handlers
self.add_event_handler("presence_available", self.on_presence_available)
self.add_event_handler("presence_dnd", self.on_presence_dnd)
self.add_event_handler("presence_xa", self.on_presence_xa)
self.add_event_handler("presence_chat", self.on_presence_chat)
self.add_event_handler("presence_away", self.on_presence_away)
self.add_event_handler("presence_unavailable", self.on_presence_unavailable)
self.add_event_handler("presence_subscribe", self.on_presence_subscribe)
self.add_event_handler("presence_subscribed", self.on_presence_subscribed)
self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe)
self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed)
self.add_event_handler("roster_subscription_request", self.on_roster_subscription_request)
# Chat state handlers
self.add_event_handler("chatstate_active", self.on_chatstate_active)
self.add_event_handler("chatstate_composing", self.on_chatstate_composing)
self.add_event_handler("chatstate_paused", self.on_chatstate_paused)
self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive)
self.add_event_handler("chatstate_gone", self.on_chatstate_gone)
async def enable_carbons(self):
"""Enable XMPP Message Carbons (XEP-0280)"""
try:
iq = self.make_iq_set()
iq["enable"] = ET.Element("{urn:xmpp:carbons:2}enable")
await iq.send()
self.log.info("Message Carbons enabled successfully")
except Exception as e:
self.log.error(f"Failed to enable Carbons: {e}")
def get_identifier(self, msg):
# Extract sender JID (full format: user@domain/resource)
sender_jid = str(msg["from"])
# Split into username@domain and optional resource
sender_parts = sender_jid.split("/", 1)
sender_bare_jid = sender_parts[0] # Always present: user@domain
sender_username, sender_domain = sender_bare_jid.split("@", 1)
sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present
# Extract recipient JID (should match component JID format)
recipient_jid = str(msg["to"])
if "@" in recipient_jid:
recipient_username, recipient_domain = recipient_jid.split("@", 1)
else:
recipient_username = recipient_jid
recipient_domain = recipient_jid
# Extract message body
body = msg["body"] if msg["body"] else "[No Body]"
# Parse recipient_name and recipient_service (e.g., "mark|signal")
if "|" in recipient_username:
person_name, service = recipient_username.split("|")
person_name = person_name.title() # Capitalize for consistency
else:
person_name = recipient_username.title()
service = None
try:
# Lookup user in Django
self.log.info(f"User {sender_username}")
user = User.objects.get(username=sender_username)
# Find Person object with name=person_name.lower()
self.log.info(f"Name {person_name.title()}")
person = Person.objects.get(user=user, name=person_name.title())
# Ensure a PersonIdentifier exists for this user, person, and service
self.log.info(f"Identifier {service}")
identifier = PersonIdentifier.objects.get(user=user, person=person, service=service)
return identifier
except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist):
# If any lookup fails, reject the subscription
return None
def update_roster(self, jid, name=None):
"""
Adds or updates a user in the roster.
"""
iq = self.Iq()
iq['type'] = 'set'
iq['roster']['items'] = {jid: {'name': name or jid}}
iq.send()
self.log.info(f"Updated roster: Added {jid} ({name})")
def on_chatstate_active(self, msg):
"""
Handle when a user is actively engaged in the chat.
"""
self.log.info(f"Chat state: Active from {msg['from']}.")
identifier = self.get_identifier(msg)
def on_chatstate_composing(self, msg):
"""
Handle when a user is typing a message.
"""
self.log.info(f"Chat state: Composing from {msg['from']}.")
identifier = self.get_identifier(msg)
def on_chatstate_paused(self, msg):
"""
Handle when a user has paused typing.
"""
self.log.info(f"Chat state: Paused from {msg['from']}.")
identifier = self.get_identifier(msg)
def on_chatstate_inactive(self, msg):
"""
Handle when a user is inactive in the chat.
"""
self.log.info(f"Chat state: Inactive from {msg['from']}.")
identifier = self.get_identifier(msg)
def on_chatstate_gone(self, msg):
"""
Handle when a user has left the chat.
"""
self.log.info(f"Chat state: Gone from {msg['from']}.")
identifier = self.get_identifier(msg)
def on_presence_available(self, pres):
"""
Handle when a user becomes available.
"""
self.log.info(f"Presence available from {pres['from']}")
def on_presence_dnd(self, pres):
"""
Handle when a user sets 'Do Not Disturb' status.
"""
self.log.info(f"User {pres['from']} is now in 'Do Not Disturb' mode.")
def on_presence_xa(self, pres):
"""
Handle when a user sets 'Extended Away' status.
"""
self.log.info(f"User {pres['from']} is now 'Extended Away'.")
def on_presence_chat(self, pres):
"""
Handle when a user is actively available for chat.
"""
self.log.info(f"User {pres['from']} is now available for chat.")
def on_presence_away(self, pres):
"""
Handle when a user sets 'Away' status.
"""
self.log.info(f"User {pres['from']} is now 'Away'.")
def on_presence_unavailable(self, pres):
"""
Handle when a user goes offline or unavailable.
"""
self.log.info(f"User {pres['from']} is now unavailable.")
def on_presence_subscribe(self, pres):
"""
Handle incoming presence subscription requests.
Accept only if the recipient has a contact matching the sender.
"""
sender_jid = str(pres['from']).split('/')[0] # Bare JID (user@domain)
recipient_jid = str(pres['to']).split('/')[0]
self.log.info(f"Received subscription request from {sender_jid} to {recipient_jid}")
try:
# Extract sender and recipient usernames
user_username, _ = sender_jid.split("@", 1)
recipient_username, _ = recipient_jid.split("@", 1)
# Parse recipient_name and recipient_service (e.g., "mark|signal")
if "|" in recipient_username:
person_name, service = recipient_username.split("|")
person_name = person_name.title() # Capitalize for consistency
else:
person_name = recipient_username.title()
service = None
# Lookup user in Django
self.log.info(f"User {user_username}")
user = User.objects.get(username=user_username)
# Find Person object with name=person_name.lower()
self.log.info(f"Name {person_name.title()}")
person = Person.objects.get(user=user, name=person_name.title())
# Ensure a PersonIdentifier exists for this user, person, and service
self.log.info(f"Identifier {service}")
PersonIdentifier.objects.get(user=user, person=person, service=service)
component_jid = f"{person_name.lower()}|{service}@{self.boundjid.bare}"
# Accept the subscription
self.send_presence(ptype="subscribed", pto=sender_jid, pfrom=component_jid)
self.log.info(f"Accepted subscription from {sender_jid}, sent from {component_jid}")
# Send a presence request **from the recipient to the sender** (ASKS THEM TO ACCEPT BACK)
# self.send_presence(ptype="subscribe", pto=sender_jid, pfrom=component_jid)
# self.log.info(f"Sent presence subscription request from {component_jid} to {sender_jid}")
# Add sender to roster
# self.update_roster(sender_jid, name=sender_jid.split("@")[0])
# self.log.info(f"Added {sender_jid} to roster.")
# Send presence update to sender **from the correct JID**
self.send_presence(ptype="available", pto=sender_jid, pfrom=component_jid)
self.log.info(f"Sent presence update from {component_jid} to {sender_jid}")
except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist):
# If any lookup fails, reject the subscription
self.log.warning(f"Subscription request from {sender_jid} rejected (recipient does not have this contact).")
self.send_presence(ptype="unsubscribed", pto=sender_jid)
except ValueError:
return
def on_presence_subscribed(self, pres):
"""
Handle successful subscription confirmations.
"""
self.log.info(f"Subscription to {pres['from']} was accepted.")
def on_presence_unsubscribe(self, pres):
"""
Handle when a user unsubscribes from presence updates.
"""
self.log.info(f"User {pres['from']} has unsubscribed from presence updates.")
def on_presence_unsubscribed(self, pres):
"""
Handle when a user's unsubscription request is confirmed.
"""
self.log.info(f"Unsubscription from {pres['from']} confirmed.")
def on_roster_subscription_request(self, pres):
"""
Handle roster subscription requests.
"""
self.log.info(f"New roster subscription request from {pres['from']}.")
async def session_start(self, *args):
self.log.info("XMPP session started")
await self.enable_carbons()
def on_disconnected(self, *args):
"""
Handles XMPP disconnection and triggers a reconnect loop.
"""
self.log.warning("XMPP disconnected, attempting to reconnect...")
self.connect()
async def request_upload_slot(self, recipient, filename, content_type, size):
"""
Requests an upload slot from XMPP for HTTP File Upload (XEP-0363).
Args:
recipient (str): The JID of the recipient.
filename (str): The filename for the upload.
content_type (str): The file's MIME type.
size (int): The file size in bytes.
Returns:
tuple | None: (upload_url, put_url, auth_header) or None if failed.
"""
# upload_service = await self['xep_0363'].find_upload_service()
# if not upload_service:
# self.log.error("No XEP-0363 upload service found.")
# return None
#self.log.info(f"Upload service: {upload_service}")
upload_service_jid = "share.zm.is"
try:
slot = await self['xep_0363'].request_slot(
jid=upload_service_jid,
filename=filename,
content_type=content_type,
size=size
)
if slot is None:
self.log.error(f"Failed to obtain upload slot for {filename}")
return None
# Parse the XML response
root = ET.fromstring(str(slot)) # Convert to string if necessary
namespace = "{urn:xmpp:http:upload:0}" # Define the namespace
get_url = root.find(f".//{namespace}get").attrib.get("url")
put_element = root.find(f".//{namespace}put")
put_url = put_element.attrib.get("url")
# Extract the Authorization header correctly
header_element = put_element.find(f"./{namespace}header[@name='Authorization']")
auth_header = header_element.text.strip() if header_element is not None else None
if not get_url or not put_url:
self.log.error(f"Missing URLs in upload slot: {slot}")
return None
return get_url, put_url, auth_header
except Exception as e:
self.log.error(f"Exception while requesting upload slot: {e}")
return None
async def message(self, msg):
"""
Process incoming XMPP messages.
"""
sym = lambda x: msg.reply(f"[>] {x}").send()
# self.log.info(f"Received message: {msg}")
# Extract sender JID (full format: user@domain/resource)
sender_jid = str(msg["from"])
# Split into username@domain and optional resource
sender_parts = sender_jid.split("/", 1)
sender_bare_jid = sender_parts[0] # Always present: user@domain
sender_username, sender_domain = sender_bare_jid.split("@", 1)
sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present
# Extract recipient JID (should match component JID format)
recipient_jid = str(msg["to"])
if "@" in recipient_jid:
recipient_username, recipient_domain = recipient_jid.split("@", 1)
else:
recipient_username = recipient_jid
recipient_domain = recipient_jid
# Extract message body
body = msg["body"] if msg["body"] else "[No Body]"
attachments = []
self.log.info(f"Full XMPP Message: {ET.tostring(msg.xml, encoding='unicode')}")
# Extract attachments from standard XMPP <attachments> (if present)
for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"):
attachments.append({
"url": att.attrib.get("url"),
"filename": att.attrib.get("filename"),
"content_type": att.attrib.get("content_type"),
})
# Extract attachments from XEP-0066 <x><url> format (Out of Band Data)
for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"):
attachments.append({
"url": oob.text,
"filename": oob.text.split("/")[-1], # Extract filename from URL
"content_type": "application/octet-stream", # Generic content-type
})
self.log.info(f"Extracted {len(attachments)} attachments from XMPP message.")
# Log extracted information with variable name annotations
log_message = (
f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, "
f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, "
f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, "
f"Body: {body}"
)
self.log.info(log_message)
# Ensure recipient domain matches our configured component
expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config
if recipient_domain != expected_domain:
self.log.warning(f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}")
return
# Lookup sender in Django's User model
try:
sender_user = User.objects.get(username=sender_username)
except User.DoesNotExist:
self.log.warning(f"Unknown sender: {sender_username}")
return
if recipient_jid == settings.XMPP_JID:
self.log.info("Message to JID")
if body.startswith("."):
# Messaging the gateway directly
if body == ".contacts":
# Lookup Person objects linked to sender
persons = Person.objects.filter(user=sender_user)
if not persons.exists():
self.log.info(f"No contacts found for {sender_username}")
sym("No contacts found.")
return
# Construct contact list response
contact_names = [person.name for person in persons]
response_text = f"Contacts: " + ", ".join(contact_names)
sym(response_text)
elif body == ".whoami":
sym(str(sender_user.__dict__))
else:
sym("No such command")
else:
self.log.info("Other message")
if "|" in recipient_username:
recipient_name, recipient_service = recipient_username.split("|")
recipient_name = recipient_name.title()
else:
recipient_name = recipient_username
recipient_service = None
recipient_name = recipient_name.title()
try:
person = Person.objects.get(user=sender_user, name=recipient_name)
except Person.DoesNotExist:
sym("This person does not exist.")
if recipient_service:
try:
identifier = PersonIdentifier.objects.get(user=sender_user,
person=person,
service=recipient_service)
except PersonIdentifier.DoesNotExist:
sym("This service identifier does not exist.")
else:
# Get a random identifier
identifier = PersonIdentifier.objects.filter(user=sender_user,
person=person).first()
recipient_service = identifier.service
# sym(str(person.__dict__))
# sym(f"Service: {recipient_service}")
#tss = await identifier.send(body, attachments=attachments)
# AM FIXING https://git.zm.is/XF/GIA/issues/5
session, _ = await sync_to_async(ChatSession.objects.get_or_create)(
identifier=identifier,
user=identifier.user,
)
self.log.info(f"Component history store message {body}")
await history.store_message(
session=session,
sender="XMPP",
text=body,
ts=int(now().timestamp() * 1000),
#outgoing=detail.is_outgoing_message, ????????? TODO:
)
self.log.info("Stored a message sent from XMPP in the history.")
manipulations = Manipulation.objects.filter(
group__people=identifier.person,
user=identifier.user,
mode="mutate",
enabled=True,
)
self.log.info(f"MANIP11 {manipulations}")
if not manipulations:
tss = await signalapi.send_message_raw(
identifier.identifier,
body,
attachments,
)
self.log.info(f"Message sent unaltered")
return
manip = manipulations.first()
chat_history = await history.get_chat_history(session)
await utils.update_last_interaction(session)
prompt = replies.generate_mutate_reply_prompt(
body,
identifier.person,
manip,
chat_history,
)
self.log.info("Running XMPP context prompt")
result = await ai.run_prompt(prompt, manip.ai)
self.log.info(f"RESULT {result}")
await history.store_own_message(
session=session,
text=result,
ts=int(now().timestamp() * 1000),
)
tss = await signalapi.send_message_raw(
identifier.identifier,
result,
attachments,
)
self.log.info(f"Message sent with modifications")
async def request_upload_slots(self, recipient_jid, attachments):
"""Requests upload slots for multiple attachments concurrently."""
upload_tasks = [
self.request_upload_slot(recipient_jid, att["filename"], att["content_type"], att["size"])
for att in attachments
]
upload_slots = await asyncio.gather(*upload_tasks)
return [(att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None]
async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid):
"""Uploads a file and immediately sends the corresponding XMPP message."""
upload_url, put_url, auth_header = upload_slot
headers = {"Content-Type": att["content_type"]}
if auth_header:
headers["Authorization"] = auth_header
async with aiohttp.ClientSession() as session:
try:
async with session.put(put_url, data=att["content"], headers=headers) as response:
if response.status not in (200, 201):
self.log.error(f"Upload failed: {response.status} {await response.text()}")
return
self.log.info(f"Successfully uploaded {att['filename']} to {upload_url}")
# Send XMPP message immediately after successful upload
await self.send_xmpp_message(recipient_jid, sender_jid, upload_url, attachment_url=upload_url)
except Exception as e:
self.log.error(f"Error uploading {att['filename']} to XMPP: {e}")
async def send_xmpp_message(self, recipient_jid, sender_jid, body_text, attachment_url=None):
"""Sends an XMPP message with either text or an attachment URL."""
msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat")
msg["body"] = body_text # Body must contain only text or the URL
if attachment_url:
# Include <x><url> (XEP-0066) to ensure client compatibility
oob_element = ET.Element("{jabber:x:oob}x")
url_element = ET.SubElement(oob_element, "{jabber:x:oob}url")
url_element.text = attachment_url
msg.xml.append(oob_element)
self.log.info(f"Sending XMPP message: {msg.xml}")
msg.send()
async def send_from_external(self, user, person_identifier, text, is_outgoing_message, attachments=[]):
"""Handles sending XMPP messages with text and attachments."""
sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}"
recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}"
if is_outgoing_message:
await self.send_xmpp_message(recipient_jid, sender_jid, f"YOU: {text}")
# Step 1: Send text message separately
elif text:
await self.send_xmpp_message(recipient_jid, sender_jid, text)
if not attachments:
return # No attachments to process
# Step 2: Request upload slots concurrently
valid_uploads = await self.request_upload_slots(recipient_jid, attachments)
self.log.info(f"Got upload slots")
if not valid_uploads:
self.log.warning("No valid upload slots obtained.")
#return
# Step 3: Upload each file and send its message immediately after upload
upload_tasks = [
self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads
]
await asyncio.gather(*upload_tasks) # Upload files concurrently
class XMPPClient(ClientBase):
def __init__(self, ur, *args, **kwargs):
super().__init__(ur, *args, **kwargs)
self.client = XMPPComponent(
ur,
jid=settings.XMPP_JID,
secret=settings.XMPP_SECRET,
server=settings.XMPP_ADDRESS,
port=settings.XMPP_PORT,
)
self.client.register_plugin('xep_0030') # Service Discovery
self.client.register_plugin('xep_0004') # Data Forms
self.client.register_plugin('xep_0060') # PubSub
self.client.register_plugin('xep_0199') # XMPP Ping
self.client.register_plugin("xep_0085") # Chat State Notifications
self.client.register_plugin('xep_0363') # HTTP File Upload
def start(self):
self.log.info("XMPP client starting...")
# ensure slixmpp uses the same asyncio loop as the router
self.client.loop = self.loop
self.client.connect()
#self.client.process()