from core.util import logs from django.core.management.base import BaseCommand from slixmpp.componentxmpp import ComponentXMPP from django.conf import settings from core.models import User, Person, PersonIdentifier from redis import asyncio as aioredis import asyncio import msgpack from core.lib import deferred from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone from slixmpp.stanza import Message log = logs.get_logger("component") redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) class EchoComponent(ComponentXMPP): """ A simple Slixmpp component that echoes messages. """ def __init__(self, jid, secret, server, port): super().__init__(jid, secret, server, port) # Register chat state plugins register_stanza_plugin(Message, Active) register_stanza_plugin(Message, Composing) register_stanza_plugin(Message, Paused) register_stanza_plugin(Message, Inactive) register_stanza_plugin(Message, Gone) self.add_event_handler("session_start", self.session_start) self.add_event_handler("disconnected", self.on_disconnected) self.add_event_handler("message", self.message) # Presence event handlers self.add_event_handler("presence_available", self.on_presence_available) self.add_event_handler("presence_dnd", self.on_presence_dnd) self.add_event_handler("presence_xa", self.on_presence_xa) self.add_event_handler("presence_chat", self.on_presence_chat) self.add_event_handler("presence_away", self.on_presence_away) self.add_event_handler("presence_unavailable", self.on_presence_unavailable) self.add_event_handler("presence_subscribe", self.on_presence_subscribe) self.add_event_handler("presence_subscribed", self.on_presence_subscribed) self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) self.add_event_handler("roster_subscription_request", self.on_roster_subscription_request) # Chat state handlers self.add_event_handler("chatstate_active", self.on_chatstate_active) self.add_event_handler("chatstate_composing", self.on_chatstate_composing) self.add_event_handler("chatstate_paused", self.on_chatstate_paused) self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) self.add_event_handler("chatstate_gone", self.on_chatstate_gone) def get_identifier(self, msg): # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) # Split into username@domain and optional resource sender_parts = sender_jid.split("/", 1) sender_bare_jid = sender_parts[0] # Always present: user@domain sender_username, sender_domain = sender_bare_jid.split("@", 1) sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present # Extract recipient JID (should match component JID format) recipient_jid = str(msg["to"]) if "@" in recipient_jid: recipient_username, recipient_domain = recipient_jid.split("@", 1) else: recipient_username = recipient_jid recipient_domain = recipient_jid # Extract message body body = msg["body"] if msg["body"] else "[No Body]" # Parse recipient_name and recipient_service (e.g., "mark|signal") if "|" in recipient_username: person_name, service = recipient_username.split("|") person_name = person_name.title() # Capitalize for consistency else: person_name = recipient_username.title() service = None try: # Lookup user in Django log.info(f"User {sender_username}") user = User.objects.get(username=sender_username) # Find Person object with name=person_name.lower() log.info(f"Name {person_name.title()}") person = Person.objects.get(user=user, name=person_name.title()) # Ensure a PersonIdentifier exists for this user, person, and service log.info(f"Identifier {service}") identifier = PersonIdentifier.objects.get(user=user, person=person, service=service) return identifier except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): # If any lookup fails, reject the subscription return None def on_chatstate_active(self, msg): """ Handle when a user is actively engaged in the chat. """ log.info(f"Chat state: Active from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_composing(self, msg): """ Handle when a user is typing a message. """ log.info(f"Chat state: Composing from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_paused(self, msg): """ Handle when a user has paused typing. """ log.info(f"Chat state: Paused from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_inactive(self, msg): """ Handle when a user is inactive in the chat. """ log.info(f"Chat state: Inactive from {msg['from']}.") identifier = self.get_identifier(msg) def on_chatstate_gone(self, msg): """ Handle when a user has left the chat. """ log.info(f"Chat state: Gone from {msg['from']}.") identifier = self.get_identifier(msg) def on_presence_available(self, pres): """ Handle when a user becomes available. """ log.info(f"Presence available from {pres['from']}") def on_presence_dnd(self, pres): """ Handle when a user sets 'Do Not Disturb' status. """ log.info(f"User {pres['from']} is now in 'Do Not Disturb' mode.") def on_presence_xa(self, pres): """ Handle when a user sets 'Extended Away' status. """ log.info(f"User {pres['from']} is now 'Extended Away'.") def on_presence_chat(self, pres): """ Handle when a user is actively available for chat. """ log.info(f"User {pres['from']} is now available for chat.") def on_presence_away(self, pres): """ Handle when a user sets 'Away' status. """ log.info(f"User {pres['from']} is now 'Away'.") def on_presence_unavailable(self, pres): """ Handle when a user goes offline or unavailable. """ log.info(f"User {pres['from']} is now unavailable.") def on_presence_subscribe(self, pres): """ Handle incoming presence subscription requests. Accept only if the recipient has a contact matching the sender. """ sender_jid = str(pres['from']).split('/')[0] # Bare JID (user@domain) recipient_jid = str(pres['to']).split('/')[0] log.info(f"Received subscription request from {sender_jid} to {recipient_jid}") try: # Extract sender and recipient usernames user_username, _ = sender_jid.split("@", 1) recipient_username, _ = recipient_jid.split("@", 1) # Parse recipient_name and recipient_service (e.g., "mark|signal") if "|" in recipient_username: person_name, service = recipient_username.split("|") person_name = person_name.title() # Capitalize for consistency else: person_name = recipient_username.title() service = None # Lookup user in Django log.info(f"User {user_username}") user = User.objects.get(username=user_username) # Find Person object with name=person_name.lower() log.info(f"Name {person_name.title()}") person = Person.objects.get(user=user, name=person_name.title()) # Ensure a PersonIdentifier exists for this user, person, and service log.info(f"Identifier {service}") PersonIdentifier.objects.get(user=user, person=person, service=service) # If all checks pass, accept the subscription self.send_presence(ptype="subscribed", pto=sender_jid) log.info(f"Subscription request from {sender_jid} accepted for {recipient_jid}.") except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): # If any lookup fails, reject the subscription log.warning(f"Subscription request from {sender_jid} rejected (recipient does not have this contact).") self.send_presence(ptype="unsubscribed", pto=sender_jid) def on_presence_subscribed(self, pres): """ Handle successful subscription confirmations. """ log.info(f"Subscription to {pres['from']} was accepted.") def on_presence_unsubscribe(self, pres): """ Handle when a user unsubscribes from presence updates. """ log.info(f"User {pres['from']} has unsubscribed from presence updates.") def on_presence_unsubscribed(self, pres): """ Handle when a user's unsubscription request is confirmed. """ log.info(f"Unsubscription from {pres['from']} confirmed.") def on_roster_subscription_request(self, pres): """ Handle roster subscription requests. """ log.info(f"New roster subscription request from {pres['from']}.") def session_start(self, *args): log.info("XMPP session started") def on_disconnected(self, *args): """ Handles XMPP disconnection and triggers a reconnect loop. """ log.warning("XMPP disconnected, attempting to reconnect...") self.connect() def session_start(self, *args): log.info(f"START {args}") def message(self, msg): """ Process incoming XMPP messages. """ sym = lambda x: msg.reply(f"[>] {x}").send() # log.info(f"Received message: {msg}") # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) # Split into username@domain and optional resource sender_parts = sender_jid.split("/", 1) sender_bare_jid = sender_parts[0] # Always present: user@domain sender_username, sender_domain = sender_bare_jid.split("@", 1) sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present # Extract recipient JID (should match component JID format) recipient_jid = str(msg["to"]) if "@" in recipient_jid: recipient_username, recipient_domain = recipient_jid.split("@", 1) else: recipient_username = recipient_jid recipient_domain = recipient_jid # Extract message body body = msg["body"] if msg["body"] else "[No Body]" # Log extracted information with variable name annotations log_message = ( f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " f"Body: {body}" ) log.info(log_message) # Ensure recipient domain matches our configured component expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config if recipient_domain != expected_domain: log.warning(f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}") return # Lookup sender in Django's User model try: sender_user = User.objects.get(username=sender_username) except User.DoesNotExist: log.warning(f"Unknown sender: {sender_username}") return if recipient_jid == settings.XMPP_JID: log.info("Message to JID") if body.startswith("."): # Messaging the gateway directly if body == ".contacts": # Lookup Person objects linked to sender persons = Person.objects.filter(user=sender_user) if not persons.exists(): log.info(f"No contacts found for {sender_username}") sym("No contacts found.") return # Construct contact list response contact_names = [person.name for person in persons] response_text = f"Contacts: " + ", ".join(contact_names) sym(response_text) elif body == ".whoami": sym(str(sender_user.__dict__)) else: sym("No such command") else: log.info("Other message") if "|" in recipient_username: recipient_name, recipient_service = recipient_username.split("|") recipient_name = recipient_name.title() else: recipient_name = recipient_username recipient_service = None recipient_name = recipient_name.title() try: person = Person.objects.get(user=sender_user, name=recipient_name) except Person.DoesNotExist: sym("This person does not exist.") if recipient_service: try: identifier = PersonIdentifier.objects.get(user=sender_user, person=person, service=recipient_service) except PersonIdentifier.DoesNotExist: sym("This service identifier does not exist.") else: # Get a random identifier identifier = PersonIdentifier.objects.filter(user=sender_user, person=person).first() recipient_service = identifier.service # sym(str(person.__dict__)) # sym(f"Service: {recipient_service}") identifier.send(body) def send_from_external(self, person_identifier, text, detail): """ This method will send an incoming external message to the correct XMPP user. """ sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" if detail.is_outgoing_message: carbon_msg = f""" {text} """ log.info(f"Sending Carbon: {carbon_msg}") self.send_raw(carbon_msg) else: log.info(f"Forwarding message from external service: {sender_jid} -> {recipient_jid}: {text}") self.send_message(mto=recipient_jid, mfrom=sender_jid, mbody=text, mtype="chat") async def stream(**kwargs): pubsub = redis.pubsub() await pubsub.subscribe("component") while True: message = await pubsub.get_message(ignore_subscribe_messages=True) if message is not None: try: log.info("GOT", message) data = message["data"] unpacked = msgpack.unpackb(data, raw=False) log.info(f"Unpacked: {unpacked}") except TypeError: log.info(f"FAILED {message}") continue if "type" in unpacked.keys(): if unpacked["type"] == "def": await deferred.process_deferred( unpacked, **kwargs ) await asyncio.sleep(0.01) class Command(BaseCommand): def handle(self, *args, **options): xmpp = EchoComponent( jid=settings.XMPP_JID, secret=settings.XMPP_SECRET, server=settings.XMPP_ADDRESS, port=settings.XMPP_PORT, ) xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0004') # Data Forms xmpp.register_plugin('xep_0060') # PubSub xmpp.register_plugin('xep_0199') # XMPP Ping xmpp.register_plugin("xep_0085") # Chat State Notifications loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(stream(xmpp=xmpp)) # Connect to the XMPP server and start processing XMPP stanzas. xmpp.connect() xmpp.process() try: while True: pass # Keep the component running except (KeyboardInterrupt, SystemExit): log.info("XMPP Component terminating")