From f5c6b535d84749c284090f19a57bc4dc6a438025 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Thu, 13 Mar 2025 17:26:26 +0000 Subject: [PATCH] Fix bridging and refactor --- core/clients/__init__.py | 38 ++ core/clients/signal.py | 346 ++++++++++++-- core/clients/signalapi.py | 75 ++- core/clients/xmpp.py | 635 +++++++++++++++++++++++++ core/lib/bot.py | 46 -- core/lib/deferred.py | 33 +- core/management/commands/component.py | 151 +++--- core/management/commands/processing.py | 12 +- core/management/commands/ur.py | 18 + core/messaging/history.py | 2 + core/models.py | 5 +- core/modules/__init__.py | 0 core/modules/router.py | 50 ++ docker-compose.yml | 55 +-- 14 files changed, 1264 insertions(+), 202 deletions(-) create mode 100644 core/clients/xmpp.py delete mode 100644 core/lib/bot.py create mode 100644 core/management/commands/ur.py create mode 100644 core/modules/__init__.py create mode 100644 core/modules/router.py diff --git a/core/clients/__init__.py b/core/clients/__init__.py index e69de29..4327ec3 100644 --- a/core/clients/__init__.py +++ b/core/clients/__init__.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from core.util import logs + + +class ClientBase(ABC): + def __init__(self, ur, loop, service): + self.ur = ur + self.loop = loop + self.service = service + self.log = logs.get_logger(service) + self.log.info(f"{self.service.capitalize()} client initialising...") + + @abstractmethod + def start(self): + ... + + # @abstractmethod + # async def send_message(self, recipient, message): + # """Abstract method for sending messages.""" + # ... + + async def message_received(self, *args, **kwargs): + self.ur.message_received(self.service, *args, **kwargs) + + async def message_read(self, *args, **kwargs): + self.ur.message_read(self.service, *args, **kwargs) + + async def started_typing(self, *args, **kwargs): + self.ur.started_typing(self.service, *args, **kwargs) + + async def stopped_typing(self, *args, **kwargs): + self.ur.stopped_typing(self.service, *args, **kwargs) + + async def reacted(self, *args, **kwargs): + self.ur.reacted(self.service, *args, **kwargs) + + async def replied(self, *args, **kwargs): + self.ur.replied(self.service, *args, **kwargs) diff --git a/core/clients/signal.py b/core/clients/signal.py index 8346cf0..011bd56 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -1,45 +1,329 @@ -from rest_framework.views import APIView -from django.contrib.auth.mixins import LoginRequiredMixin -from rest_framework import status - -from django.http import HttpResponse -from core.models import QueuedMessage, Message -import requests -from requests.exceptions import RequestException -import orjson from django.conf import settings from core.messaging import natural import aiohttp +from core.util import logs +from core.clients import ClientBase +from signalbot import SignalBot +import aiohttp +import msgpack +from django.conf import settings +from signalbot import SignalBot, Command, Context from asgiref.sync import sync_to_async +from django.urls import reverse +import json +import asyncio +from core.util import logs +from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages +from core.lib import deferred +from core.messaging import replies, ai, natural, history, utils +from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage +import aiohttp +from django.conf import settings +from redis import asyncio as aioredis from core.clients import signalapi +from core.util import logs -async def send_message(db_obj): - recipient_uuid = db_obj.session.identifier.identifier - text = db_obj.text +log = logs.get_logger("signalF") - send = lambda x: signalapi.send_message_raw(recipient_uuid, x) # returns ts - start_t = lambda: signalapi.start_typing(recipient_uuid) - stop_t = lambda: signalapi.stop_typing(recipient_uuid) - tss = await natural.natural_send_message( - text, - send, - start_t, - stop_t, - ) # list of ts - #result = await send_message_raw(recipient_uuid, text) - await sync_to_async(db_obj.delete)() - result = [x for x in tss if x] # all trueish ts - if result: # if at least one message was sent - ts1 = result.pop() # pick a time - await sync_to_async(Message.objects.create)( - user=db_obj.session.user, - session=db_obj.session, - custom_author="BOT", +SIGNAL_URL = "signal:8080" + +redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) + +class NewSignalBot(SignalBot): + def __init__(self, ur, service, config): + self.ur = ur + self.service = service + super().__init__(config) + self.log = logs.get_logger("signalI") + self.bot_uuid = None # Initialize with None + + async def get_own_uuid(self) -> str: + """Fetch bot's UUID by checking contacts, groups, or profile.""" + async with aiohttp.ClientSession() as session: + uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" + try: + resp = await session.get(uri_contacts) + if resp.status == 200: + contacts_data = await resp.json() + if isinstance(contacts_data, list): + for contact in contacts_data: + if contact.get("number") == self._phone_number: + return contact.get("uuid") + except Exception as e: + self.log.error(f"Failed to get UUID from contacts: {e}") + + async def initialize_bot(self): + """Fetch bot's UUID and store it in self.bot_uuid.""" + try: + self.bot_uuid = await self.get_own_uuid() + if self.bot_uuid: + self.log.info(f"Own UUID: {self.bot_uuid}") + else: + self.log.warning("Unable to fetch bot UUID.") + except Exception as e: + self.log.error(f"Failed to initialize bot UUID: {e}") + + def start(self): + """Start bot without blocking event loop.""" + self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first + self._event_loop.create_task(self._detect_groups()) # Sync groups + self._event_loop.create_task(self._produce_consume_messages()) # Process messages + + self.scheduler.start() # Start async job scheduler + + +class HandleMessage(Command): + def __init__(self, ur, service, *args, **kwargs): + self.ur = ur + self.service = service + return super().__init__(*args, **kwargs) + async def handle(self, c: Context): + msg = { + "source": c.message.source, + "source_number": c.message.source_number, + "source_uuid": c.message.source_uuid, + "timestamp": c.message.timestamp, + "type": c.message.type.value, + "text": c.message.text, + "group": c.message.group, + "reaction": c.message.reaction, + "mentions": c.message.mentions, + "raw_message": c.message.raw_message + } + raw = json.loads(c.message.raw_message) + print(json.dumps(c.message.raw_message, indent=2)) + #dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") + dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") + + #account = c.message.raw_message.get("account", "") + account = raw.get("account", "") + #source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") + source_name = raw.get("envelope", {}).get("sourceName", "") + + source_number = c.message.source_number + source_uuid = c.message.source_uuid + text = c.message.text + ts = c.message.timestamp + + # Message originating from us + same_recipient = source_uuid == dest + + is_from_bot = source_uuid == c.bot.bot_uuid + is_to_bot = dest == c.bot.bot_uuid or dest is None + + reply_to_self = same_recipient and is_from_bot # Reply + reply_to_others = is_to_bot and not same_recipient # Reply + is_outgoing_message = is_from_bot and not is_to_bot # Do not reply + + # Determine the identifier to use + identifier_uuid = dest if is_from_bot else source_uuid + + + # Handle attachments + attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) + if not attachments: + attachments = raw.get("envelope", {}).get("dataMessage", {}).get("attachments", []) + attachment_list = [] + for attachment in attachments: + attachment_list.append({ + "id": attachment["id"], + "content_type": attachment["contentType"], + "filename": attachment["filename"], + "size": attachment["size"], + "width": attachment.get("width"), + "height": attachment.get("height"), + }) + + # Get User from identifier + log.info(f"FUCK {self.service}") + identifiers = PersonIdentifier.objects.filter( + identifier=identifier_uuid, + service=self.service, + ) + xmpp_attachments = [] + # attachments = [] + + # Asynchronously fetch all attachments + tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list] + fetched_attachments = await asyncio.gather(*tasks) + log.info(f"ATTACHMENT LIST {attachment_list}") + log.info(f"FETCHED ATTACHMENTS {fetched_attachments}") + + for fetched, att in zip(fetched_attachments, attachment_list): + log.info(f"ITER {fetched} {att}") + if not fetched: + log.warning(f"Failed to fetch attachment {att['id']} from Signal.") + continue + + # Attach fetched file to XMPP + xmpp_attachments.append({ + "content": fetched["content"], + "content_type": fetched["content_type"], + "filename": fetched["filename"], + "size": fetched["size"], + }) + for identifier in identifiers: + #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" + user = identifier.user + + log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") + await self.ur.xmpp.client.send_from_external(user, identifier, text, is_outgoing_message, attachments=xmpp_attachments) + + #### + + + # TODO: Permission checks + manips = await sync_to_async(list)( + Manipulation.objects.filter(enabled=True) + ) + processed_people = set() + for manip in manips: + try: + person_identifier = await sync_to_async(PersonIdentifier.objects.get)( + identifier=identifier_uuid, + user=manip.user, + service="signal", + person__in=manip.group.people.all(), + ) + # Check if we've already processed this person + if person_identifier.person.id in processed_people: + log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}") + continue # Skip to next manipulation + if not manip.group.people.filter(id=person_identifier.person.id).exists(): + log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") + continue # Exit early if the person is not in the group + except PersonIdentifier.DoesNotExist: + log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") + continue # Exit early if no valid identifier is found + + # Find or create the corresponding ChatSession + chat_session = await history.get_chat_session(manip.user, person_identifier) + + # Store incoming or outgoing messages + log.info(f"Processing history store message {text}") + processed_people.add(person_identifier.person.id) + await history.store_message( + session=chat_session, + sender=source_uuid, text=text, - ts=ts1, # use that time in db + ts=ts, + outgoing=is_from_bot, ) + # Get the total history + chat_history = await history.get_chat_history(chat_session) + + if replies.should_reply( + reply_to_self, + reply_to_others, + is_outgoing_message, + ): + if manip.mode != "silent": + await utils.update_last_interaction(chat_session) + prompt = replies.generate_reply_prompt( + msg, + person_identifier.person, + manip, + chat_history + ) + + log.info("Running context prompt") + result = await ai.run_prompt(prompt, manip.ai) + # Store bot's AI response with a +1s timestamp if manip.mode == "active": + await history.store_own_message( + session=chat_session, + text=result, + ts=ts + 1, + ) + # await natural.natural_send_message(c, result) + tss = await natural.natural_send_message( + result, + c.send, + c.start_typing, + c.stop_typing, + ) + elif manip.mode == "notify": + title = f"[GIA] Suggested message to {person_identifier.person.name}" + manip.user.sendmsg(result, title=title) + elif manip.mode == "instant": + # Delete all other QueuedMessages + existing_queue = QueuedMessage.objects.filter( + user=chat_session.user, + session=chat_session, + manipulation=manip, + custom_author="BOT", + ) + + await delete_messages(existing_queue) + qm = await history.store_own_message( + session=chat_session, + text=result, + ts=ts + 1, + manip=manip, + queue=True, + + ) + accept = reverse( + "message_accept_api", kwargs={"message_id":qm.id} + ) + reject = reverse( + "message_reject_api", kwargs={"message_id":qm.id} + ) + url = settings.URL + content = ( + f"{result}\n\n" + f"Accept: {url}{accept}\n" + f"Reject: {url}{reject}" + ) + title = f"[GIA] Suggested message to {person_identifier.person.name}" + manip.user.sendmsg(content, title=title) + else: + log.error(f"Mode {manip.mode} is not implemented") + + # Manage truncation & summarization + await truncate_and_summarize(chat_session, manip.ai) + # END FOR + + try: + existing_chat = Chat.objects.get( + source_uuid=source_uuid + ) + # if existing_chat.ts != ts: + # print("not equal", existing_chat.ts, ts) + # existing_chat.ts = ts + # existing_chat.save() + existing_chat.source_number = source_number + existing_chat.source_name = source_name + existing_chat.save() + except Chat.DoesNotExist: + existing_chat = Chat.objects.create( + source_number=source_number, + source_uuid=source_uuid, + source_name=source_name, + account=account, + ) + # + + +class SignalClient(ClientBase): + def __init__(self, ur, *args, **kwargs): + super().__init__(ur, *args, **kwargs) + self.client = NewSignalBot( + ur, + self.service, + { + "signal_service": SIGNAL_URL, + "phone_number": "+447490296227", + }) + + self.client.register(HandleMessage(self.ur, self.service)) + + def start(self): + self.log.info("Signal client starting...") + self.client._event_loop = self.loop + + self.client.start() diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py index 8a22b69..dc72748 100644 --- a/core/clients/signalapi.py +++ b/core/clients/signalapi.py @@ -7,6 +7,7 @@ import orjson from django.conf import settings import aiohttp import base64 +import asyncio async def start_typing(uuid): @@ -25,16 +26,71 @@ async def stop_typing(uuid): async with session.delete(url, json=data) as response: return await response.text() # Optional: Return response content +async def download_and_encode_base64(file_url, filename, content_type): + """ + Downloads a file from a given URL asynchronously, converts it to Base64, + and returns it in Signal's expected format. -async def send_message_raw(recipient_uuid, text): + Args: + file_url (str): The URL of the file to download. + filename (str): The name of the file. + content_type (str): The MIME type of the file. + Returns: + str | None: The Base64 encoded attachment string in Signal's expected format, or None on failure. + """ + try: + async with aiohttp.ClientSession() as session: + async with session.get(file_url, timeout=10) as response: + if response.status != 200: + # log.error(f"Failed to download file: {file_url}, status: {response.status}") + return None + + file_data = await response.read() + base64_encoded = base64.b64encode(file_data).decode("utf-8") + + # Format according to Signal's expected structure + return f"data:{content_type};filename={filename};base64,{base64_encoded}" + + except aiohttp.ClientError as e: + # log.error(f"Failed to download file: {file_url}, error: {e}") + return None + +async def send_message_raw(recipient_uuid, text=None, attachments=[]): + """ + Sends a message using the Signal REST API, ensuring attachment links are not included in the text body. + + Args: + recipient_uuid (str): The UUID of the recipient. + text (str, optional): The message to send. + attachments (list, optional): A list of attachment dictionaries with URL, filename, and content_type. + + Returns: + int | bool: Timestamp if successful, False otherwise. + """ url = "http://signal:8080/v2/send" + data = { "recipients": [recipient_uuid], - "message": text, "number": settings.SIGNAL_NUMBER, + "base64_attachments": [] } + # Asynchronously download and encode all attachments + tasks = [download_and_encode_base64(att["url"], att["filename"], att["content_type"]) for att in attachments] + encoded_attachments = await asyncio.gather(*tasks) + + # Filter out failed downloads (None values) + data["base64_attachments"] = [att for att in encoded_attachments if att] + + # Remove the message body if it only contains an attachment link + if text and (text.strip() in [att["url"] for att in attachments]): + #log.info("Removing message body since it only contains an attachment link.") + text = None # Don't send the link as text + + if text: + data["message"] = text + async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: response_text = await response.text() @@ -42,11 +98,8 @@ async def send_message_raw(recipient_uuid, text): if response_status == status.HTTP_201_CREATED: ts = orjson.loads(response_text).get("timestamp", None) - if not ts: - return False - return ts - else: - return False + return ts if ts else False + return False async def fetch_signal_attachment(attachment_id): """ @@ -95,7 +148,7 @@ async def fetch_signal_attachment(attachment_id): -def download_and_encode_base64(file_url, filename, content_type): +def download_and_encode_base64_sync(file_url, filename, content_type): """ Downloads a file from a given URL, converts it to Base64, and returns it in Signal's expected format. @@ -142,7 +195,7 @@ def send_message_raw_sync(recipient_uuid, text=None, attachments=[]): # Convert attachments to Base64 for att in attachments: - base64_data = download_and_encode_base64(att["url"], att["filename"], att["content_type"]) + base64_data = download_and_encode_base64_sync(att["url"], att["filename"], att["content_type"]) if base64_data: data["base64_attachments"].append(base64_data) @@ -161,9 +214,9 @@ def send_message_raw_sync(recipient_uuid, text=None, attachments=[]): #log.error(f"Failed to send Signal message: {e}") return False - if response.status_code == 201: # Signal server returns 201 on success + if response.status_code == status.HTTP_201_CREATED: # Signal server returns 201 on success try: - ts = orjson.loads(response.text).get("timestamp") + ts = orjson.loads(response.text).get("timestamp", None) return ts if ts else False except orjson.JSONDecodeError: return False diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py new file mode 100644 index 0000000..98a7483 --- /dev/null +++ b/core/clients/xmpp.py @@ -0,0 +1,635 @@ +from core.clients import ClientBase +from django.conf import settings +from slixmpp.componentxmpp import ComponentXMPP +from django.conf import settings +from core.models import User, Person, PersonIdentifier, ChatSession +from asgiref.sync import sync_to_async +from django.utils.timezone import now +import asyncio +from core.clients import signalapi +from slixmpp.xmlstream import register_stanza_plugin +from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone +from slixmpp.stanza import Message +from slixmpp.xmlstream.stanzabase import ET +import aiohttp +from core.messaging import history +from core.util import logs + + +class XMPPComponent(ComponentXMPP): + + """ + A simple Slixmpp component that echoes messages. + """ + + def __init__(self, ur, jid, secret, server, port): + self.ur = ur + + self.log = logs.get_logger("XMPP") + + super().__init__(jid, secret, server, port) + # Register chat state plugins + register_stanza_plugin(Message, Active) + register_stanza_plugin(Message, Composing) + register_stanza_plugin(Message, Paused) + register_stanza_plugin(Message, Inactive) + register_stanza_plugin(Message, Gone) + + self.add_event_handler("session_start", self.session_start) + self.add_event_handler("disconnected", self.on_disconnected) + self.add_event_handler("message", self.message) + + # Presence event handlers + self.add_event_handler("presence_available", self.on_presence_available) + self.add_event_handler("presence_dnd", self.on_presence_dnd) + self.add_event_handler("presence_xa", self.on_presence_xa) + self.add_event_handler("presence_chat", self.on_presence_chat) + self.add_event_handler("presence_away", self.on_presence_away) + self.add_event_handler("presence_unavailable", self.on_presence_unavailable) + self.add_event_handler("presence_subscribe", self.on_presence_subscribe) + self.add_event_handler("presence_subscribed", self.on_presence_subscribed) + self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) + self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) + self.add_event_handler("roster_subscription_request", self.on_roster_subscription_request) + + # Chat state handlers + self.add_event_handler("chatstate_active", self.on_chatstate_active) + self.add_event_handler("chatstate_composing", self.on_chatstate_composing) + self.add_event_handler("chatstate_paused", self.on_chatstate_paused) + self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) + self.add_event_handler("chatstate_gone", self.on_chatstate_gone) + + async def enable_carbons(self): + """Enable XMPP Message Carbons (XEP-0280)""" + try: + iq = self.make_iq_set() + iq["enable"] = ET.Element("{urn:xmpp:carbons:2}enable") + await iq.send() + self.log.info("Message Carbons enabled successfully") + except Exception as e: + self.log.error(f"Failed to enable Carbons: {e}") + + def get_identifier(self, msg): + # Extract sender JID (full format: user@domain/resource) + sender_jid = str(msg["from"]) + + # Split into username@domain and optional resource + sender_parts = sender_jid.split("/", 1) + sender_bare_jid = sender_parts[0] # Always present: user@domain + sender_username, sender_domain = sender_bare_jid.split("@", 1) + + sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present + + # Extract recipient JID (should match component JID format) + recipient_jid = str(msg["to"]) + + if "@" in recipient_jid: + recipient_username, recipient_domain = recipient_jid.split("@", 1) + else: + recipient_username = recipient_jid + recipient_domain = recipient_jid + + # Extract message body + body = msg["body"] if msg["body"] else "[No Body]" + # Parse recipient_name and recipient_service (e.g., "mark|signal") + if "|" in recipient_username: + person_name, service = recipient_username.split("|") + person_name = person_name.title() # Capitalize for consistency + else: + person_name = recipient_username.title() + service = None + + + try: + # Lookup user in Django + self.log.info(f"User {sender_username}") + user = User.objects.get(username=sender_username) + + # Find Person object with name=person_name.lower() + self.log.info(f"Name {person_name.title()}") + person = Person.objects.get(user=user, name=person_name.title()) + + # Ensure a PersonIdentifier exists for this user, person, and service + self.log.info(f"Identifier {service}") + identifier = PersonIdentifier.objects.get(user=user, person=person, service=service) + + return identifier + + except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): + # If any lookup fails, reject the subscription + return None + + def update_roster(self, jid, name=None): + """ + Adds or updates a user in the roster. + """ + iq = self.Iq() + iq['type'] = 'set' + iq['roster']['items'] = {jid: {'name': name or jid}} + + iq.send() + self.log.info(f"Updated roster: Added {jid} ({name})") + + def on_chatstate_active(self, msg): + """ + Handle when a user is actively engaged in the chat. + """ + self.log.info(f"Chat state: Active from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_composing(self, msg): + """ + Handle when a user is typing a message. + """ + self.log.info(f"Chat state: Composing from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_paused(self, msg): + """ + Handle when a user has paused typing. + """ + self.log.info(f"Chat state: Paused from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_inactive(self, msg): + """ + Handle when a user is inactive in the chat. + """ + self.log.info(f"Chat state: Inactive from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_gone(self, msg): + """ + Handle when a user has left the chat. + """ + self.log.info(f"Chat state: Gone from {msg['from']}.") + + identifier = self.get_identifier(msg) + + + def on_presence_available(self, pres): + """ + Handle when a user becomes available. + """ + self.log.info(f"Presence available from {pres['from']}") + + def on_presence_dnd(self, pres): + """ + Handle when a user sets 'Do Not Disturb' status. + """ + self.log.info(f"User {pres['from']} is now in 'Do Not Disturb' mode.") + + def on_presence_xa(self, pres): + """ + Handle when a user sets 'Extended Away' status. + """ + self.log.info(f"User {pres['from']} is now 'Extended Away'.") + + def on_presence_chat(self, pres): + """ + Handle when a user is actively available for chat. + """ + self.log.info(f"User {pres['from']} is now available for chat.") + + def on_presence_away(self, pres): + """ + Handle when a user sets 'Away' status. + """ + self.log.info(f"User {pres['from']} is now 'Away'.") + + def on_presence_unavailable(self, pres): + """ + Handle when a user goes offline or unavailable. + """ + self.log.info(f"User {pres['from']} is now unavailable.") + + def on_presence_subscribe(self, pres): + """ + Handle incoming presence subscription requests. + Accept only if the recipient has a contact matching the sender. + """ + + sender_jid = str(pres['from']).split('/')[0] # Bare JID (user@domain) + recipient_jid = str(pres['to']).split('/')[0] + + self.log.info(f"Received subscription request from {sender_jid} to {recipient_jid}") + + try: + # Extract sender and recipient usernames + user_username, _ = sender_jid.split("@", 1) + recipient_username, _ = recipient_jid.split("@", 1) + + # Parse recipient_name and recipient_service (e.g., "mark|signal") + if "|" in recipient_username: + person_name, service = recipient_username.split("|") + person_name = person_name.title() # Capitalize for consistency + else: + person_name = recipient_username.title() + service = None + + # Lookup user in Django + self.log.info(f"User {user_username}") + user = User.objects.get(username=user_username) + + # Find Person object with name=person_name.lower() + self.log.info(f"Name {person_name.title()}") + person = Person.objects.get(user=user, name=person_name.title()) + + # Ensure a PersonIdentifier exists for this user, person, and service + self.log.info(f"Identifier {service}") + PersonIdentifier.objects.get(user=user, person=person, service=service) + + component_jid = f"{person_name.lower()}|{service}@{self.boundjid.bare}" + + # Accept the subscription + self.send_presence(ptype="subscribed", pto=sender_jid, pfrom=component_jid) + self.log.info(f"Accepted subscription from {sender_jid}, sent from {component_jid}") + + # Send a presence request **from the recipient to the sender** (ASKS THEM TO ACCEPT BACK) + # self.send_presence(ptype="subscribe", pto=sender_jid, pfrom=component_jid) + # self.log.info(f"Sent presence subscription request from {component_jid} to {sender_jid}") + + # Add sender to roster + # self.update_roster(sender_jid, name=sender_jid.split("@")[0]) + # self.log.info(f"Added {sender_jid} to roster.") + + # Send presence update to sender **from the correct JID** + self.send_presence(ptype="available", pto=sender_jid, pfrom=component_jid) + self.log.info(f"Sent presence update from {component_jid} to {sender_jid}") + + + except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): + # If any lookup fails, reject the subscription + self.log.warning(f"Subscription request from {sender_jid} rejected (recipient does not have this contact).") + self.send_presence(ptype="unsubscribed", pto=sender_jid) + + except ValueError: + return + + + def on_presence_subscribed(self, pres): + """ + Handle successful subscription confirmations. + """ + self.log.info(f"Subscription to {pres['from']} was accepted.") + + def on_presence_unsubscribe(self, pres): + """ + Handle when a user unsubscribes from presence updates. + """ + self.log.info(f"User {pres['from']} has unsubscribed from presence updates.") + + def on_presence_unsubscribed(self, pres): + """ + Handle when a user's unsubscription request is confirmed. + """ + self.log.info(f"Unsubscription from {pres['from']} confirmed.") + + def on_roster_subscription_request(self, pres): + """ + Handle roster subscription requests. + """ + self.log.info(f"New roster subscription request from {pres['from']}.") + + async def session_start(self, *args): + self.log.info("XMPP session started") + await self.enable_carbons() + + def on_disconnected(self, *args): + """ + Handles XMPP disconnection and triggers a reconnect loop. + """ + self.log.warning("XMPP disconnected, attempting to reconnect...") + self.connect() + + def session_start(self, *args): + self.log.info(f"Session started: {args}") + + async def request_upload_slot(self, recipient, filename, content_type, size): + """ + Requests an upload slot from XMPP for HTTP File Upload (XEP-0363). + + Args: + recipient (str): The JID of the recipient. + filename (str): The filename for the upload. + content_type (str): The file's MIME type. + size (int): The file size in bytes. + + Returns: + tuple | None: (upload_url, put_url, auth_header) or None if failed. + """ + # upload_service = await self['xep_0363'].find_upload_service() + # if not upload_service: + # self.log.error("No XEP-0363 upload service found.") + # return None + + #self.log.info(f"Upload service: {upload_service}") + + upload_service_jid = "share.zm.is" + + try: + slot = await self['xep_0363'].request_slot( + jid=upload_service_jid, + filename=filename, + content_type=content_type, + size=size + ) + + if slot is None: + self.log.error(f"Failed to obtain upload slot for {filename}") + return None + + # Parse the XML response + root = ET.fromstring(str(slot)) # Convert to string if necessary + namespace = "{urn:xmpp:http:upload:0}" # Define the namespace + + get_url = root.find(f".//{namespace}get").attrib.get("url") + put_element = root.find(f".//{namespace}put") + put_url = put_element.attrib.get("url") + + # Extract the Authorization header correctly + header_element = put_element.find(f"./{namespace}header[@name='Authorization']") + auth_header = header_element.text.strip() if header_element is not None else None + + if not get_url or not put_url: + self.log.error(f"Missing URLs in upload slot: {slot}") + return None + + return get_url, put_url, auth_header + + except Exception as e: + self.log.error(f"Exception while requesting upload slot: {e}") + return None + + + async def message(self, msg): + """ + Process incoming XMPP messages. + """ + + sym = lambda x: msg.reply(f"[>] {x}").send() + # self.log.info(f"Received message: {msg}") + + # Extract sender JID (full format: user@domain/resource) + sender_jid = str(msg["from"]) + + # Split into username@domain and optional resource + sender_parts = sender_jid.split("/", 1) + sender_bare_jid = sender_parts[0] # Always present: user@domain + sender_username, sender_domain = sender_bare_jid.split("@", 1) + + sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present + + # Extract recipient JID (should match component JID format) + recipient_jid = str(msg["to"]) + + if "@" in recipient_jid: + recipient_username, recipient_domain = recipient_jid.split("@", 1) + else: + recipient_username = recipient_jid + recipient_domain = recipient_jid + + # Extract message body + body = msg["body"] if msg["body"] else "[No Body]" + + attachments = [] + self.log.info(f"Full XMPP Message: {ET.tostring(msg.xml, encoding='unicode')}") + + # Extract attachments from standard XMPP (if present) + for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"): + attachments.append({ + "url": att.attrib.get("url"), + "filename": att.attrib.get("filename"), + "content_type": att.attrib.get("content_type"), + }) + + # Extract attachments from XEP-0066 format (Out of Band Data) + for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): + attachments.append({ + "url": oob.text, + "filename": oob.text.split("/")[-1], # Extract filename from URL + "content_type": "application/octet-stream", # Generic content-type + }) + + self.log.info(f"Extracted {len(attachments)} attachments from XMPP message.") + # Log extracted information with variable name annotations + log_message = ( + f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " + f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " + f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " + f"Body: {body}" + ) + self.log.info(log_message) + + # Ensure recipient domain matches our configured component + expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config + if recipient_domain != expected_domain: + self.log.warning(f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}") + return + + # Lookup sender in Django's User model + try: + sender_user = User.objects.get(username=sender_username) + except User.DoesNotExist: + self.log.warning(f"Unknown sender: {sender_username}") + return + + if recipient_jid == settings.XMPP_JID: + self.log.info("Message to JID") + if body.startswith("."): + # Messaging the gateway directly + if body == ".contacts": + # Lookup Person objects linked to sender + persons = Person.objects.filter(user=sender_user) + if not persons.exists(): + self.log.info(f"No contacts found for {sender_username}") + sym("No contacts found.") + return + + # Construct contact list response + contact_names = [person.name for person in persons] + response_text = f"Contacts: " + ", ".join(contact_names) + sym(response_text) + elif body == ".whoami": + sym(str(sender_user.__dict__)) + else: + sym("No such command") + else: + self.log.info("Other message") + if "|" in recipient_username: + recipient_name, recipient_service = recipient_username.split("|") + + recipient_name = recipient_name.title() + + else: + recipient_name = recipient_username + recipient_service = None + + recipient_name = recipient_name.title() + + try: + person = Person.objects.get(user=sender_user, name=recipient_name) + except Person.DoesNotExist: + sym("This person does not exist.") + + if recipient_service: + try: + identifier = PersonIdentifier.objects.get(user=sender_user, + person=person, + service=recipient_service) + except PersonIdentifier.DoesNotExist: + sym("This service identifier does not exist.") + else: + # Get a random identifier + identifier = PersonIdentifier.objects.filter(user=sender_user, + person=person).first() + recipient_service = identifier.service + + # sym(str(person.__dict__)) + # sym(f"Service: {recipient_service}") + + #tss = await identifier.send(body, attachments=attachments) + # AM FIXING https://git.zm.is/XF/GIA/issues/5 + session, _ = await sync_to_async(ChatSession.objects.get_or_create)( + identifier=identifier, + user=identifier.user, + ) + self.log.info(f"Component history store message {body}") + await history.store_message( + session=session, + sender="XMPP", + text=body, + ts=now().timestamp(), + #outgoing=detail.is_outgoing_message, ????????? TODO: + ) + self.log.info("Stored a message sent from XMPP in the history.") + + tss = await signalapi.send_message_raw( + identifier.identifier, + body, + attachments, + ) + self.log.info(f"Message sent") + + async def request_upload_slots(self, recipient_jid, attachments): + """Requests upload slots for multiple attachments concurrently.""" + upload_tasks = [ + self.request_upload_slot(recipient_jid, att["filename"], att["content_type"], att["size"]) + for att in attachments + ] + upload_slots = await asyncio.gather(*upload_tasks) + + return [(att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None] + + async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid): + """Uploads a file and immediately sends the corresponding XMPP message.""" + upload_url, put_url, auth_header = upload_slot + headers = {"Content-Type": att["content_type"]} + if auth_header: + headers["Authorization"] = auth_header + + async with aiohttp.ClientSession() as session: + try: + async with session.put(put_url, data=att["content"], headers=headers) as response: + if response.status not in (200, 201): + self.log.error(f"Upload failed: {response.status} {await response.text()}") + return + self.log.info(f"Successfully uploaded {att['filename']} to {upload_url}") + + # Send XMPP message immediately after successful upload + await self.send_xmpp_message(recipient_jid, sender_jid, upload_url, attachment_url=upload_url) + + except Exception as e: + self.log.error(f"Error uploading {att['filename']} to XMPP: {e}") + + async def send_xmpp_message(self, recipient_jid, sender_jid, body_text, attachment_url=None): + """Sends an XMPP message with either text or an attachment URL.""" + msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + msg["body"] = body_text # Body must contain only text or the URL + + if attachment_url: + # Include (XEP-0066) to ensure client compatibility + oob_element = ET.Element("{jabber:x:oob}x") + url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") + url_element.text = attachment_url + msg.xml.append(oob_element) + + self.log.info(f"Sending XMPP message: {msg.xml}") + msg.send() + + async def send_from_external(self, user, person_identifier, text, is_outgoing_message, attachments=[]): + """Handles sending XMPP messages with text and attachments.""" + + sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" + recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" + if is_outgoing_message: + self.log.info(f"Forwarding outgoing message as {recipient_jid}") + + # Create the message as if it were sent by the user + msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + msg["body"] = text # Original message content + + # Create the forwarded inside the element + forwarded_elem = ET.Element("{urn:xmpp:forward:0}forwarded") + + # Create a element inside the forwarded stanza + message_elem = ET.Element("message", attrib={"from": recipient_jid, "to": recipient_jid, "type": "chat"}) + body_elem = ET.SubElement(message_elem, "body") + body_elem.text = text + + # Attach the forwarded message + forwarded_elem.append(message_elem) + msg.xml.append(forwarded_elem) + + # Send the forwarded message + msg.send() + + + # Step 1: Send text message separately + if text: + await self.send_xmpp_message(recipient_jid, sender_jid, text) + + if not attachments: + return # No attachments to process + + # Step 2: Request upload slots concurrently + valid_uploads = await self.request_upload_slots(recipient_jid, attachments) + self.log.info(f"Got upload slots") + if not valid_uploads: + self.log.warning("No valid upload slots obtained.") + #return + + # Step 3: Upload each file and send its message immediately after upload + upload_tasks = [ + self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads + ] + await asyncio.gather(*upload_tasks) # Upload files concurrently + + +class XMPPClient(ClientBase): + def __init__(self, ur, *args, **kwargs): + super().__init__(ur, *args, **kwargs) + self.client = XMPPComponent( + ur, + jid=settings.XMPP_JID, + secret=settings.XMPP_SECRET, + server=settings.XMPP_ADDRESS, + port=settings.XMPP_PORT, + ) + + self.client.register_plugin('xep_0030') # Service Discovery + self.client.register_plugin('xep_0004') # Data Forms + self.client.register_plugin('xep_0060') # PubSub + self.client.register_plugin('xep_0199') # XMPP Ping + self.client.register_plugin("xep_0085") # Chat State Notifications + self.client.register_plugin('xep_0363') # HTTP File Upload + + def start(self): + self.log.info("XMPP client starting...") + + self.client.connect() + #self.client.process() \ No newline at end of file diff --git a/core/lib/bot.py b/core/lib/bot.py deleted file mode 100644 index e1b4d55..0000000 --- a/core/lib/bot.py +++ /dev/null @@ -1,46 +0,0 @@ -from signalbot import SignalBot -import aiohttp - -from core.util import logs - -log = logs.get_logger("signalbot") - - -class NewSignalBot(SignalBot): - def __init__(self, config): - super().__init__(config) - self.bot_uuid = None # Initialize with None - - async def get_own_uuid(self) -> str: - """Fetch bot's UUID by checking contacts, groups, or profile.""" - async with aiohttp.ClientSession() as session: - uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" - try: - resp = await session.get(uri_contacts) - if resp.status == 200: - contacts_data = await resp.json() - if isinstance(contacts_data, list): - for contact in contacts_data: - if contact.get("number") == self._phone_number: - return contact.get("uuid") - except Exception as e: - log.error(f"Failed to get UUID from contacts: {e}") - - async def initialize_bot(self): - """Fetch bot's UUID and store it in self.bot_uuid.""" - try: - self.bot_uuid = await self.get_own_uuid() - if self.bot_uuid: - log.info(f"Own UUID: {self.bot_uuid}") - else: - log.warning("Unable to fetch bot UUID.") - except Exception as e: - log.error(f"Failed to initialize bot UUID: {e}") - - def start(self): - """Start bot without blocking event loop.""" - self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first - self._event_loop.create_task(self._detect_groups()) # Sync groups - self._event_loop.create_task(self._produce_consume_messages()) # Process messages - - self.scheduler.start() # Start async job scheduler \ No newline at end of file diff --git a/core/lib/deferred.py b/core/lib/deferred.py index 73d2787..45d2f5c 100644 --- a/core/lib/deferred.py +++ b/core/lib/deferred.py @@ -32,6 +32,33 @@ class DeferredRequest(BaseModel): detail: Optional[DeferredDetail] = None attachments: Optional[list] = None +async def send_message(db_obj): + recipient_uuid = db_obj.session.identifier.identifier + text = db_obj.text + + send = lambda x: signalapi.send_message_raw(recipient_uuid, x) # returns ts + start_t = lambda: signalapi.start_typing(recipient_uuid) + stop_t = lambda: signalapi.stop_typing(recipient_uuid) + + tss = await natural.natural_send_message( + text, + send, + start_t, + stop_t, + ) # list of ts + #result = await send_message_raw(recipient_uuid, text) + await sync_to_async(db_obj.delete)() + result = [x for x in tss if x] # all trueish ts + if result: # if at least one message was sent + ts1 = result.pop() # pick a time + log.info(f"signal message create {text}") + await sync_to_async(Message.objects.create)( + user=db_obj.session.user, + session=db_obj.session, + custom_author="BOT", + text=text, + ts=ts1, # use that time in db + ) async def process_deferred(data: dict, **kwargs): try: @@ -59,7 +86,7 @@ async def process_deferred(data: dict, **kwargs): return if message.session.identifier.service == "signal": - await signal.send_message(message) + await send_message(message) else: log.warning(f"Protocol not supported: {message.session.identifier.service}") @@ -76,6 +103,7 @@ async def process_deferred(data: dict, **kwargs): service=service, ) xmpp_attachments = [] + # attachments = [] # Asynchronously fetch all attachments tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachments] @@ -95,9 +123,10 @@ async def process_deferred(data: dict, **kwargs): }) for identifier in identifiers: #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" + user = identifier.user log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") - await xmpp.send_from_external(identifier, msg, validated_data.detail, attachments=xmpp_attachments) + await xmpp.send_from_external(user, identifier, msg, validated_data.detail, attachments=xmpp_attachments) else: log.warning(f"Method not yet supported: {method}") return \ No newline at end of file diff --git a/core/management/commands/component.py b/core/management/commands/component.py index 76bef07..975956b 100644 --- a/core/management/commands/component.py +++ b/core/management/commands/component.py @@ -2,16 +2,20 @@ from core.util import logs from django.core.management.base import BaseCommand from slixmpp.componentxmpp import ComponentXMPP from django.conf import settings -from core.models import User, Person, PersonIdentifier +from core.models import User, Person, PersonIdentifier, ChatSession from redis import asyncio as aioredis +from asgiref.sync import sync_to_async +from django.utils.timezone import now import asyncio import msgpack from core.lib import deferred +from core.clients import signalapi from slixmpp.xmlstream import register_stanza_plugin from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone from slixmpp.stanza import Message from slixmpp.xmlstream.stanzabase import ElementBase, ET import aiohttp +from core.messaging import history log = logs.get_logger("component") @@ -355,7 +359,7 @@ class EchoComponent(ComponentXMPP): return None - def message(self, msg): + async def message(self, msg): """ Process incoming XMPP messages. """ @@ -481,68 +485,105 @@ class EchoComponent(ComponentXMPP): # sym(str(person.__dict__)) # sym(f"Service: {recipient_service}") - identifier.send(body, attachments=attachments) - - async def send_from_external(self, person_identifier, text, detail, attachments=[]): - sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" - recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" - - # First, send text separately if there's any - if text: - text_msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") - text_msg["body"] = text - log.info(f"Sending separate text message: {text}") - if detail.is_outgoing_message: - log.info("Outgoing message, not forwarding") - ... - else: - log.info(f"Final XMPP message: {text_msg.xml}") - text_msg.send() - - for att in attachments: - # Request an upload slot - upload_slot = await self.request_upload_slot( - recipient_jid, att["filename"], att["content_type"], att["size"] + #tss = await identifier.send(body, attachments=attachments) + # AM FIXING https://git.zm.is/XF/GIA/issues/5 + session, _ = await sync_to_async(ChatSession.objects.get_or_create)( + identifier=identifier, + user=user, ) - if not upload_slot: - log.warning(f"Failed to obtain upload slot for {att['filename']}") - continue + log.info(f"Component history store message {text}") + await history.store_message( + session=session, + sender="XMPP", + text=text, + ts=now().timestamp(), + outgoing=detail.is_outgoing_message, + ) + log.info("Stored a message sent from XMPP in the history.") - upload_url, put_url, auth_header = upload_slot + tss = await signalapi.send_message_raw( + identifier.identifier, + body, + attachments, + ) + log.info(f"Message sent") - # Upload file - headers = {"Content-Type": att["content_type"]} - if auth_header: - headers["Authorization"] = auth_header + async def request_upload_slots(self, recipient_jid, attachments): + """Requests upload slots for multiple attachments concurrently.""" + upload_tasks = [ + self.request_upload_slot(recipient_jid, att["filename"], att["content_type"], att["size"]) + for att in attachments + ] + upload_slots = await asyncio.gather(*upload_tasks) + return [(att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None] + + async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid): + """Uploads a file and immediately sends the corresponding XMPP message.""" + upload_url, put_url, auth_header = upload_slot + headers = {"Content-Type": att["content_type"]} + if auth_header: + headers["Authorization"] = auth_header + + async with aiohttp.ClientSession() as session: try: - async with aiohttp.ClientSession() as session: - async with session.put(put_url, data=att["content"], headers=headers) as response: - if response.status not in (200, 201): - log.error(f"Upload failed: {response.status} {await response.text()}") - continue + async with session.put(put_url, data=att["content"], headers=headers) as response: + if response.status not in (200, 201): + log.error(f"Upload failed: {response.status} {await response.text()}") + return + log.info(f"Successfully uploaded {att['filename']} to {upload_url}") - # Create and send message with only the file URL - msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") - msg["body"] = upload_url # Body must be only the URL - - # Include (XEP-0066) to ensure client compatibility - oob_element = ET.Element("{jabber:x:oob}x") - url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") - url_element.text = upload_url - msg.xml.append(oob_element) - - log.info(f"Sending file attachment message with URL: {upload_url}") - if detail.is_outgoing_message: - log.info("Outgoing message, not forwarding") - ... - else: - log.info(f"Final XMPP message: {msg.xml}") - msg.send() + # Send XMPP message immediately after successful upload + await self.send_xmpp_message(recipient_jid, sender_jid, upload_url, attachment_url=upload_url) except Exception as e: log.error(f"Error uploading {att['filename']} to XMPP: {e}") - + + async def send_xmpp_message(self, recipient_jid, sender_jid, body_text, attachment_url=None): + """Sends an XMPP message with either text or an attachment URL.""" + msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + msg["body"] = body_text # Body must contain only text or the URL + + if attachment_url: + # Include (XEP-0066) to ensure client compatibility + oob_element = ET.Element("{jabber:x:oob}x") + url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") + url_element.text = attachment_url + msg.xml.append(oob_element) + + log.info(f"Sending XMPP message: {msg.xml}") + msg.send() + + async def send_from_external(self, user, person_identifier, text, detail, attachments=[]): + """Handles sending XMPP messages with text and attachments.""" + + if detail.is_outgoing_message: + return + + + sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" + recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" + + # Step 1: Send text message separately + if text: + await self.send_xmpp_message(recipient_jid, sender_jid, text) + + if not attachments: + return # No attachments to process + + # Step 2: Request upload slots concurrently + valid_uploads = await self.request_upload_slots(recipient_jid, attachments) + log.info(f"Got upload slots") + if not valid_uploads: + log.warning("No valid upload slots obtained.") + #return + + # Step 3: Upload each file and send its message immediately after upload + upload_tasks = [ + self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads + ] + await asyncio.gather(*upload_tasks) # Upload files concurrently + async def stream(**kwargs): pubsub = redis.pubsub() await pubsub.subscribe("component") diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index a654e99..b6f3cf5 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -14,7 +14,7 @@ from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Messa import aiohttp from django.utils import timezone from django.conf import settings -from core.lib.bot import NewSignalBot +from core.clients.signal import NewSignalBot from redis import asyncio as aioredis SIGNAL_URL = "signal:8080" @@ -102,6 +102,7 @@ class HandleMessage(Command): manips = await sync_to_async(list)( Manipulation.objects.filter(enabled=True) ) + processed_people = set() for manip in manips: try: person_identifier = await sync_to_async(PersonIdentifier.objects.get)( @@ -110,6 +111,10 @@ class HandleMessage(Command): service="signal", person__in=manip.group.people.all(), ) + # Check if we've already processed this person + if person_identifier.person.id in processed_people: + log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}") + continue # Skip to next manipulation if not manip.group.people.filter(id=person_identifier.person.id).exists(): log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") continue # Exit early if the person is not in the group @@ -121,6 +126,8 @@ class HandleMessage(Command): chat_session = await history.get_chat_session(manip.user, person_identifier) # Store incoming or outgoing messages + log.info(f"Processing history store message {text}") + processed_people.add(person_identifier.person.id) await history.store_message( session=chat_session, sender=source_uuid, @@ -148,8 +155,7 @@ class HandleMessage(Command): log.info("Running context prompt") result = await ai.run_prompt(prompt, manip.ai) - # Store bot's AI response with a +1s timestamp - if manip.mode == "active": + # Store bot's AI response with a +1s timestamp if manip.mode == "active": await history.store_own_message( session=chat_session, text=result, diff --git a/core/management/commands/ur.py b/core/management/commands/ur.py new file mode 100644 index 0000000..18345e6 --- /dev/null +++ b/core/management/commands/ur.py @@ -0,0 +1,18 @@ +from core.util import logs +from django.core.management.base import BaseCommand +from django.conf import settings +from core.modules.router import UnifiedRouter +import asyncio + +log = logs.get_logger("UR") + +class Command(BaseCommand): + def handle(self, *args, **options): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + instance = UnifiedRouter(loop) + + instance.start() + + instance.run() \ No newline at end of file diff --git a/core/messaging/history.py b/core/messaging/history.py index 68f11cd..e365cf1 100644 --- a/core/messaging/history.py +++ b/core/messaging/history.py @@ -22,6 +22,7 @@ async def get_chat_session(user, identifier): return chat_session async def store_message(session, sender, text, ts, outgoing=False): + log.info(f"STORE MESSAGE {text}") msg = await sync_to_async(Message.objects.create)( user=session.user, session=session, @@ -34,6 +35,7 @@ async def store_message(session, sender, text, ts, outgoing=False): return msg async def store_own_message(session, text, ts, manip=None, queue=False): + log.info(f"STORE OWN MESSAGE {text}") cast = { "user": session.user, "session": session, diff --git a/core/models.py b/core/models.py index b29c21b..e2c501c 100644 --- a/core/models.py +++ b/core/models.py @@ -119,16 +119,17 @@ class PersonIdentifier(models.Model): def __str__(self): return f"{self.person} ({self.service})" - def send(self, text, attachments=[]): + async def send(self, text, attachments=[]): """ Send this contact a text. """ if self.service == "signal": - ts = signalapi.send_message_raw_sync( + ts = await signalapi.send_message_raw( self.identifier, text, attachments, ) + print("SENT") return ts else: raise NotImplementedError(f"Service not implemented: {self.service}") diff --git a/core/modules/__init__.py b/core/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/modules/router.py b/core/modules/router.py new file mode 100644 index 0000000..07d69f4 --- /dev/null +++ b/core/modules/router.py @@ -0,0 +1,50 @@ +from core.util import logs + +from core.clients.signal import SignalClient +from core.clients.xmpp import XMPPClient + +class UnifiedRouter(object): + """ + Unified Router. Contains generic functions for handling XMPP and Signal events. + """ + + def __init__(self, loop): + self.loop = loop + + self.log = logs.get_logger("router") + self.log.info("Initialised Unified Router Interface.") + + self.xmpp = XMPPClient(self, loop, "xmpp") + self.signal = SignalClient(self, loop, "signal") + + def start(self): + self.xmpp.start() + self.signal.start() + + + def run(self): + try: + self.xmpp.client.process() + self.loop.run_forever() + except (KeyboardInterrupt, SystemExit): + self.log.info("Process terminating") + finally: + self.loop.close() + + async def message_received(self, protocol, *args, **kwargs): + self.log.info(f"Message received ({protocol}) {args} {kwargs}") + + async def message_read(self, protocol, *args, **kwargs): + self.log.info(f"Message read ({protocol}) {args} {kwargs}") + + async def started_typing(self, protocol, *args, **kwargs): + self.log.info(f"Started typing ({protocol}) {args} {kwargs}") + + async def stopped_typing(self, protocol, *args, **kwargs): + self.log.info(f"Stopped typing ({protocol}) {args} {kwargs}") + + async def reacted(self, protocol, *args, **kwargs): + self.log.info(f"Reacted ({protocol}) {args} {kwargs}") + + async def replied(self, protocol, *args, **kwargs): + self.log.info(f"Replied ({protocol}) {args} {kwargs}") diff --git a/docker-compose.yml b/docker-compose.yml index 9e78108..65b5461 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,63 +71,14 @@ services: volumes: - "./signal-cli-config:/home/.local/share/signal-cli" - processing: + ur: image: xf/gia:prod - container_name: processing_gia + container_name: ur_gia build: context: . args: OPERATION: ${OPERATION} - command: sh -c '. /venv/bin/activate && python manage.py processing' - volumes: - - ${REPO_DIR}:/code - - ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini - - ${APP_DATABASE_FILE}:/conf/db.sqlite3 - - type: bind - source: /code/vrun - target: /var/run - environment: - APP_PORT: "${APP_PORT}" - REPO_DIR: "${REPO_DIR}" - APP_LOCAL_SETTINGS: "${APP_LOCAL_SETTINGS}" - APP_DATABASE_FILE: "${APP_DATABASE_FILE}" - DOMAIN: "${DOMAIN}" - URL: "${URL}" - ALLOWED_HOSTS: "${ALLOWED_HOSTS}" - NOTIFY_TOPIC: "${NOTIFY_TOPIC}" - CSRF_TRUSTED_ORIGINS: "${CSRF_TRUSTED_ORIGINS}" - DEBUG: "${DEBUG}" - SECRET_KEY: "${SECRET_KEY}" - STATIC_ROOT: "${STATIC_ROOT}" - REGISTRATION_OPEN: "${REGISTRATION_OPEN}" - OPERATION: "${OPERATION}" - SIGNAL_NUMBER: "${SIGNAL_NUMBER}" - XMPP_ADDRESS: "${XMPP_ADDRESS}" - XMPP_JID: "${XMPP_JID}" - XMPP_PORT: "${XMPP_PORT}" - XMPP_SECRET: "${XMPP_SECRET}" - depends_on: - redis: - condition: service_healthy - migration: - condition: service_started - collectstatic: - condition: service_started - # deploy: - # resources: - # limits: - # cpus: '0.25' - # memory: 0.25G - #network_mode: host - - component: - image: xf/gia:prod - container_name: component_gia - build: - context: . - args: - OPERATION: ${OPERATION} - command: sh -c '. /venv/bin/activate && python manage.py component' + command: sh -c '. /venv/bin/activate && python manage.py ur' volumes: - ${REPO_DIR}:/code - ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini