From 8d2f28f57110cf781615231ec9f4c7490757bafe Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 21 Feb 2025 21:34:47 +0000 Subject: [PATCH] Implement XMPP relaying --- app/local_settings.py | 5 + app/urls.py | 20 ++ auth_django.py | 75 ++++ auth_django.sh | 2 + core/clients/signal.py | 44 +-- core/clients/signalapi.py | 80 +++++ core/forms.py | 12 +- core/lib/deferred.py | 71 ++-- core/management/commands/component.py | 454 ++++++++++++++++++++++++ core/management/commands/processing.py | 46 ++- core/models.py | 18 + core/templates/base.html | 5 +- core/templates/partials/queue-list.html | 71 ++++ core/views/queues.py | 39 +- core/views/signal.py | 6 - docker-compose.yml | 69 ++++ requirements.txt | 1 + 17 files changed, 941 insertions(+), 77 deletions(-) create mode 100755 auth_django.py create mode 100755 auth_django.sh create mode 100644 core/clients/signalapi.py create mode 100644 core/management/commands/component.py create mode 100644 core/templates/partials/queue-list.html diff --git a/app/local_settings.py b/app/local_settings.py index 0220046..f5b999f 100644 --- a/app/local_settings.py +++ b/app/local_settings.py @@ -48,3 +48,8 @@ if DEBUG: SETTINGS_EXPORT = ["BILLING_ENABLED"] SIGNAL_NUMBER = getenv("SIGNAL_NUMBER") + +XMPP_ADDRESS = getenv("XMPP_ADDRESS") +XMPP_JID = getenv("XMPP_JID") +XMPP_PORT = getenv("XMPP_PORT") +XMPP_SECRET = getenv("XMPP_SECRET") \ No newline at end of file diff --git a/app/urls.py b/app/urls.py index 5895159..c7ade4e 100644 --- a/app/urls.py +++ b/app/urls.py @@ -211,4 +211,24 @@ urlpatterns = [ # Queues path("api/v1/queue/message/accept//", queues.AcceptMessageAPI.as_view(), name="message_accept_api"), path("api/v1/queue/message/reject//", queues.RejectMessageAPI.as_view(), name="message_reject_api"), + path( + "queue//", + queues.QueueList.as_view(), + name="queues", + ), + path( + "queue//create/", + queues.QueueCreate.as_view(), + name="queue_create", + ), + path( + "queue//update//", + queues.QueueUpdate.as_view(), + name="queue_update", + ), + path( + "queue//delete//", + queues.QueueDelete.as_view(), + name="queue_delete", + ), ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) diff --git a/auth_django.py b/auth_django.py new file mode 100755 index 0000000..6c08eb9 --- /dev/null +++ b/auth_django.py @@ -0,0 +1,75 @@ + +# Create a debug log to confirm script execution + +import sys +import django +import os + +LOG_PATH = "auth_debug.log" + +def log(data): + with open(LOG_PATH, "a") as f: + f.write(f"{data}\n") + +# Set up Django environment +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings") # Adjust if needed +django.setup() + +from django.contrib.auth import authenticate +from django.contrib.auth.models import User + +def check_credentials(username, password): + """Authenticate user via Django""" + user = authenticate(username=username, password=password) + return user is not None and user.is_active + +def main(): + """Process authentication requests from Prosody""" + while True: + try: + # Read a single line from stdin + line = sys.stdin.readline().strip() + if not line: + break # Exit if input is empty (EOF) + + # Log received command (for debugging) + # log(f"Received: {line}") + + parts = line.split(":") + if len(parts) < 3: + log("Sending 0") + print("0", flush=True) # Invalid format, return failure + continue + + command, username, domain = parts[:3] + password = ":".join(parts[3:]) if len(parts) > 3 else None # Reconstruct password + + if command == "auth": + if password and check_credentials(username, password): + log(f"Authentication success") + log("Sent 1") + print("1", flush=True) # Success + else: + log(f"Authentication failure") + log("Sent 0") + print("0", flush=True) # Failure + + elif command == "isuser": + if User.objects.filter(username=username).exists(): + print("1", flush=True) # User exists + else: + print("0", flush=True) # User does not exist + + elif command == "setpass": + print("0", flush=True) # Not supported + + else: + print("0", flush=True) # Unknown command, return failure + + except Exception as e: + # Log any unexpected errors + log(f"Error: {str(e)}\n") + print("0", flush=True) # Return failure for any error + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/auth_django.sh b/auth_django.sh new file mode 100755 index 0000000..e9278aa --- /dev/null +++ b/auth_django.sh @@ -0,0 +1,2 @@ +#!/bin/sh +podman exec -i gia sh -c "cd /code && . /venv/bin/activate && python auth_django.py" diff --git a/core/clients/signal.py b/core/clients/signal.py index b6f0d05..8346cf0 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -6,36 +6,23 @@ from rest_framework import status from django.http import HttpResponse from core.models import QueuedMessage, Message import requests +from requests.exceptions import RequestException import orjson from django.conf import settings from core.messaging import natural import aiohttp from asgiref.sync import sync_to_async +from core.clients import signalapi -async def start_typing(uuid): - url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" - data = {"recipient": uuid} - - async with aiohttp.ClientSession() as session: - async with session.put(url, json=data) as response: - return await response.text() # Optional: Return response content - -async def stop_typing(uuid): - url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" - data = {"recipient": uuid} - - async with aiohttp.ClientSession() as session: - async with session.delete(url, json=data) as response: - return await response.text() # Optional: Return response content async def send_message(db_obj): recipient_uuid = db_obj.session.identifier.identifier text = db_obj.text - send = lambda x: send_message_raw(recipient_uuid, x) # returns ts - start_t = lambda: start_typing(recipient_uuid) - stop_t = lambda: stop_typing(recipient_uuid) + send = lambda x: signalapi.send_message_raw(recipient_uuid, x) # returns ts + start_t = lambda: signalapi.start_typing(recipient_uuid) + stop_t = lambda: signalapi.stop_typing(recipient_uuid) tss = await natural.natural_send_message( text, @@ -56,24 +43,3 @@ async def send_message(db_obj): ts=ts1, # use that time in db ) -async def send_message_raw(recipient_uuid, text): - - url = "http://signal:8080/v2/send" - data = { - "recipients": [recipient_uuid], - "message": text, - "number": settings.SIGNAL_NUMBER, - } - - async with aiohttp.ClientSession() as session: - async with session.post(url, json=data) as response: - response_text = await response.text() - response_status = response.status - - if response_status == status.HTTP_201_CREATED: - ts = orjson.loads(response_text).get("timestamp", None) - if not ts: - return False - return ts - else: - return False \ No newline at end of file diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py new file mode 100644 index 0000000..6fc06ec --- /dev/null +++ b/core/clients/signalapi.py @@ -0,0 +1,80 @@ + +from rest_framework import status + +import requests +from requests.exceptions import RequestException +import orjson +from django.conf import settings +import aiohttp + + +async def start_typing(uuid): + url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" + data = {"recipient": uuid} + + async with aiohttp.ClientSession() as session: + async with session.put(url, json=data) as response: + return await response.text() # Optional: Return response content + +async def stop_typing(uuid): + url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" + data = {"recipient": uuid} + + async with aiohttp.ClientSession() as session: + async with session.delete(url, json=data) as response: + return await response.text() # Optional: Return response content + + +async def send_message_raw(recipient_uuid, text): + + url = "http://signal:8080/v2/send" + data = { + "recipients": [recipient_uuid], + "message": text, + "number": settings.SIGNAL_NUMBER, + } + + async with aiohttp.ClientSession() as session: + async with session.post(url, json=data) as response: + response_text = await response.text() + response_status = response.status + + if response_status == status.HTTP_201_CREATED: + ts = orjson.loads(response_text).get("timestamp", None) + if not ts: + return False + return ts + else: + return False + +def send_message_raw_sync(recipient_uuid, text): + """ + Sends a message using the Signal REST API in a synchronous manner. + + Args: + recipient_uuid (str): The UUID of the recipient. + text (str): The message to send. + + Returns: + int | bool: Timestamp if successful, False otherwise. + """ + url = "http://signal:8080/v2/send" + data = { + "recipients": [recipient_uuid], + "message": text, + "number": settings.SIGNAL_NUMBER, + } + + try: + response = requests.post(url, json=data, timeout=10) # 10s timeout for safety + response.raise_for_status() # Raise an error for non-200 responses + except RequestException as e: + return False # Network or request error + + if response.status_code == status.HTTP_201_CREATED: # Signal server returns 201 on success + try: + ts = orjson.loads(response.text).get("timestamp") + return ts if ts else False + except orjson.JSONDecodeError: + return False + return False # If response status is not 201 \ No newline at end of file diff --git a/core/forms.py b/core/forms.py index 8c201a9..6406c9e 100644 --- a/core/forms.py +++ b/core/forms.py @@ -3,7 +3,7 @@ from django.contrib.auth.forms import UserCreationForm from django.forms import ModelForm from mixins.restrictions import RestrictedFormMixin -from .models import NotificationSettings, User, AI, PersonIdentifier, Person, Group, Persona, Manipulation, ChatSession, Message +from .models import NotificationSettings, User, AI, PersonIdentifier, Person, Group, Persona, Manipulation, ChatSession, Message, QueuedMessage # Create your forms here. @@ -162,3 +162,13 @@ class MessageForm(RestrictedFormMixin, forms.ModelForm): "text": "Content of the message.", "custom_author": "For detecting USER and BOT messages.", } + +class QueueForm(RestrictedFormMixin, forms.ModelForm): + class Meta: + model = QueuedMessage + fields = ("session", "manipulation", "text") + help_texts = { + "session": "Chat session this message will be sent in.", + "manipulation": "Manipulation that generated the message.", + "text": "Content of the proposed message.", + } \ No newline at end of file diff --git a/core/lib/deferred.py b/core/lib/deferred.py index a153725..19c114e 100644 --- a/core/lib/deferred.py +++ b/core/lib/deferred.py @@ -1,25 +1,35 @@ # Deferred processing library from core.util import logs from pydantic import BaseModel -from typing import Annotated +from typing import Annotated, Optional from uuid import UUID from pydantic import ValidationError -from core.models import QueuedMessage, Message +from core.models import QueuedMessage, Message, PersonIdentifier, User from core.clients import signal from core.lib.prompts.functions import delete_messages from asgiref.sync import sync_to_async +from django.conf import settings log = logs.get_logger("deferred") +class DeferredDetail(BaseModel): + reply_to_self: bool + reply_to_others: bool + is_outgoing_message: bool class DeferredRequest(BaseModel): type: str method: str - user_id: int - message_id: Annotated[str, UUID] + user_id: Optional[int] = None + message_id: Optional[Annotated[str, UUID]] = None + identifier: Optional[str] = None + msg: Optional[str] = None + service: Optional[str] = None + detail: Optional[DeferredDetail] = None -async def process_deferred(data: dict): + +async def process_deferred(data: dict, **kwargs): try: validated_data = DeferredRequest(**data) log.info(f"Validated Data: {validated_data}") @@ -32,23 +42,40 @@ async def process_deferred(data: dict): user_id = validated_data.user_id message_id = validated_data.message_id - try: - message = await sync_to_async(QueuedMessage.objects.get)( - user_id=user_id, - id=message_id, - ) - log.info(f"Got {message}") - except QueuedMessage.DoesNotExist: - log.info(f"Didn't get message from {message_id}") - return - if message.session.identifier.service == "signal": - log.info(f"Is sisngla") - if method == "accept_message": - await signal.send_message(message) - else: - log.warning(f"Method not yet supported: {method}") + if method == "accept_message": + try: + message = await sync_to_async(QueuedMessage.objects.get)( + user_id=user_id, + id=message_id, + ) + log.info(f"Got {message}") + except QueuedMessage.DoesNotExist: + log.info(f"Didn't get message from {message_id}") return + + if message.session.identifier.service == "signal": + await signal.send_message(message) + + else: + log.warning(f"Protocol not supported: {message.session.identifier.service}") + return + elif method == "xmpp": + xmpp = kwargs.get("xmpp") + service = validated_data.service + msg = validated_data.msg + + # Get User from identifier + identifiers = PersonIdentifier.objects.filter( + identifier=validated_data.identifier, + service=service, + ) + + for identifier in identifiers: + # Fair is fair, we can have multiple + log.info(f"Sending {msg} from {identifier}") + xmpp.send_from_external(identifier, msg, validated_data.detail) + else: - log.warning(f"Protocol not supported: {message.session.identifier.service}") - return + log.warning(f"Method not yet supported: {method}") + return \ No newline at end of file diff --git a/core/management/commands/component.py b/core/management/commands/component.py new file mode 100644 index 0000000..144066a --- /dev/null +++ b/core/management/commands/component.py @@ -0,0 +1,454 @@ +from core.util import logs +from django.core.management.base import BaseCommand +from slixmpp.componentxmpp import ComponentXMPP +from django.conf import settings +from core.models import User, Person, PersonIdentifier +from redis import asyncio as aioredis +import asyncio +import msgpack +from core.lib import deferred +from slixmpp.xmlstream import register_stanza_plugin +from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone +from slixmpp.stanza import Message + +log = logs.get_logger("component") + +redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) + +class EchoComponent(ComponentXMPP): + + """ + A simple Slixmpp component that echoes messages. + """ + + def __init__(self, jid, secret, server, port): + super().__init__(jid, secret, server, port) + # Register chat state plugins + register_stanza_plugin(Message, Active) + register_stanza_plugin(Message, Composing) + register_stanza_plugin(Message, Paused) + register_stanza_plugin(Message, Inactive) + register_stanza_plugin(Message, Gone) + + self.add_event_handler("session_start", self.session_start) + self.add_event_handler("disconnected", self.on_disconnected) + self.add_event_handler("message", self.message) + + # Presence event handlers + self.add_event_handler("presence_available", self.on_presence_available) + self.add_event_handler("presence_dnd", self.on_presence_dnd) + self.add_event_handler("presence_xa", self.on_presence_xa) + self.add_event_handler("presence_chat", self.on_presence_chat) + self.add_event_handler("presence_away", self.on_presence_away) + self.add_event_handler("presence_unavailable", self.on_presence_unavailable) + self.add_event_handler("presence_subscribe", self.on_presence_subscribe) + self.add_event_handler("presence_subscribed", self.on_presence_subscribed) + self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) + self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) + self.add_event_handler("roster_subscription_request", self.on_roster_subscription_request) + + # Chat state handlers + self.add_event_handler("chatstate_active", self.on_chatstate_active) + self.add_event_handler("chatstate_composing", self.on_chatstate_composing) + self.add_event_handler("chatstate_paused", self.on_chatstate_paused) + self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) + self.add_event_handler("chatstate_gone", self.on_chatstate_gone) + + def get_identifier(self, msg): + # Extract sender JID (full format: user@domain/resource) + sender_jid = str(msg["from"]) + + # Split into username@domain and optional resource + sender_parts = sender_jid.split("/", 1) + sender_bare_jid = sender_parts[0] # Always present: user@domain + sender_username, sender_domain = sender_bare_jid.split("@", 1) + + sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present + + # Extract recipient JID (should match component JID format) + recipient_jid = str(msg["to"]) + + if "@" in recipient_jid: + recipient_username, recipient_domain = recipient_jid.split("@", 1) + else: + recipient_username = recipient_jid + recipient_domain = recipient_jid + + # Extract message body + body = msg["body"] if msg["body"] else "[No Body]" + # Parse recipient_name and recipient_service (e.g., "mark|signal") + if "|" in recipient_username: + person_name, service = recipient_username.split("|") + person_name = person_name.title() # Capitalize for consistency + else: + person_name = recipient_username.title() + service = None + + + try: + # Lookup user in Django + log.info(f"User {sender_username}") + user = User.objects.get(username=sender_username) + + # Find Person object with name=person_name.lower() + log.info(f"Name {person_name.title()}") + person = Person.objects.get(user=user, name=person_name.title()) + + # Ensure a PersonIdentifier exists for this user, person, and service + log.info(f"Identifier {service}") + identifier = PersonIdentifier.objects.get(user=user, person=person, service=service) + + return identifier + + except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): + # If any lookup fails, reject the subscription + return None + + def on_chatstate_active(self, msg): + """ + Handle when a user is actively engaged in the chat. + """ + log.info(f"Chat state: Active from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_composing(self, msg): + """ + Handle when a user is typing a message. + """ + log.info(f"Chat state: Composing from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_paused(self, msg): + """ + Handle when a user has paused typing. + """ + log.info(f"Chat state: Paused from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_inactive(self, msg): + """ + Handle when a user is inactive in the chat. + """ + log.info(f"Chat state: Inactive from {msg['from']}.") + + identifier = self.get_identifier(msg) + + def on_chatstate_gone(self, msg): + """ + Handle when a user has left the chat. + """ + log.info(f"Chat state: Gone from {msg['from']}.") + + identifier = self.get_identifier(msg) + + + def on_presence_available(self, pres): + """ + Handle when a user becomes available. + """ + log.info(f"Presence available from {pres['from']}") + + def on_presence_dnd(self, pres): + """ + Handle when a user sets 'Do Not Disturb' status. + """ + log.info(f"User {pres['from']} is now in 'Do Not Disturb' mode.") + + def on_presence_xa(self, pres): + """ + Handle when a user sets 'Extended Away' status. + """ + log.info(f"User {pres['from']} is now 'Extended Away'.") + + def on_presence_chat(self, pres): + """ + Handle when a user is actively available for chat. + """ + log.info(f"User {pres['from']} is now available for chat.") + + def on_presence_away(self, pres): + """ + Handle when a user sets 'Away' status. + """ + log.info(f"User {pres['from']} is now 'Away'.") + + def on_presence_unavailable(self, pres): + """ + Handle when a user goes offline or unavailable. + """ + log.info(f"User {pres['from']} is now unavailable.") + + def on_presence_subscribe(self, pres): + """ + Handle incoming presence subscription requests. + Accept only if the recipient has a contact matching the sender. + """ + + sender_jid = str(pres['from']).split('/')[0] # Bare JID (user@domain) + recipient_jid = str(pres['to']).split('/')[0] + + log.info(f"Received subscription request from {sender_jid} to {recipient_jid}") + + try: + # Extract sender and recipient usernames + user_username, _ = sender_jid.split("@", 1) + recipient_username, _ = recipient_jid.split("@", 1) + + # Parse recipient_name and recipient_service (e.g., "mark|signal") + if "|" in recipient_username: + person_name, service = recipient_username.split("|") + person_name = person_name.title() # Capitalize for consistency + else: + person_name = recipient_username.title() + service = None + + # Lookup user in Django + log.info(f"User {user_username}") + user = User.objects.get(username=user_username) + + # Find Person object with name=person_name.lower() + log.info(f"Name {person_name.title()}") + person = Person.objects.get(user=user, name=person_name.title()) + + # Ensure a PersonIdentifier exists for this user, person, and service + log.info(f"Identifier {service}") + PersonIdentifier.objects.get(user=user, person=person, service=service) + + # If all checks pass, accept the subscription + self.send_presence(ptype="subscribed", pto=sender_jid) + log.info(f"Subscription request from {sender_jid} accepted for {recipient_jid}.") + + except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): + # If any lookup fails, reject the subscription + log.warning(f"Subscription request from {sender_jid} rejected (recipient does not have this contact).") + self.send_presence(ptype="unsubscribed", pto=sender_jid) + + + def on_presence_subscribed(self, pres): + """ + Handle successful subscription confirmations. + """ + log.info(f"Subscription to {pres['from']} was accepted.") + + def on_presence_unsubscribe(self, pres): + """ + Handle when a user unsubscribes from presence updates. + """ + log.info(f"User {pres['from']} has unsubscribed from presence updates.") + + def on_presence_unsubscribed(self, pres): + """ + Handle when a user's unsubscription request is confirmed. + """ + log.info(f"Unsubscription from {pres['from']} confirmed.") + + def on_roster_subscription_request(self, pres): + """ + Handle roster subscription requests. + """ + log.info(f"New roster subscription request from {pres['from']}.") + + def session_start(self, *args): + log.info("XMPP session started") + + def on_disconnected(self, *args): + """ + Handles XMPP disconnection and triggers a reconnect loop. + """ + log.warning("XMPP disconnected, attempting to reconnect...") + self.connect() + + def session_start(self, *args): + log.info(f"START {args}") + + def message(self, msg): + """ + Process incoming XMPP messages. + """ + + sym = lambda x: msg.reply(f"[>] {x}").send() + # log.info(f"Received message: {msg}") + + # Extract sender JID (full format: user@domain/resource) + sender_jid = str(msg["from"]) + + # Split into username@domain and optional resource + sender_parts = sender_jid.split("/", 1) + sender_bare_jid = sender_parts[0] # Always present: user@domain + sender_username, sender_domain = sender_bare_jid.split("@", 1) + + sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present + + # Extract recipient JID (should match component JID format) + recipient_jid = str(msg["to"]) + + if "@" in recipient_jid: + recipient_username, recipient_domain = recipient_jid.split("@", 1) + else: + recipient_username = recipient_jid + recipient_domain = recipient_jid + + # Extract message body + body = msg["body"] if msg["body"] else "[No Body]" + + # Log extracted information with variable name annotations + log_message = ( + f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " + f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " + f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " + f"Body: {body}" + ) + log.info(log_message) + + # Ensure recipient domain matches our configured component + expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config + if recipient_domain != expected_domain: + log.warning(f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}") + return + + # Lookup sender in Django's User model + try: + sender_user = User.objects.get(username=sender_username) + except User.DoesNotExist: + log.warning(f"Unknown sender: {sender_username}") + return + + if recipient_jid == settings.XMPP_JID: + log.info("Message to JID") + if body.startswith("."): + # Messaging the gateway directly + if body == ".contacts": + # Lookup Person objects linked to sender + persons = Person.objects.filter(user=sender_user) + if not persons.exists(): + log.info(f"No contacts found for {sender_username}") + sym("No contacts found.") + return + + # Construct contact list response + contact_names = [person.name for person in persons] + response_text = f"Contacts: " + ", ".join(contact_names) + sym(response_text) + elif body == ".whoami": + sym(str(sender_user.__dict__)) + else: + sym("No such command") + else: + log.info("Other message") + if "|" in recipient_username: + recipient_name, recipient_service = recipient_username.split("|") + + recipient_name = recipient_name.title() + + else: + recipient_name = recipient_username + recipient_service = None + + recipient_name = recipient_name.title() + + try: + person = Person.objects.get(user=sender_user, name=recipient_name) + except Person.DoesNotExist: + sym("This person does not exist.") + + if recipient_service: + try: + identifier = PersonIdentifier.objects.get(user=sender_user, + person=person, + service=recipient_service) + except PersonIdentifier.DoesNotExist: + sym("This service identifier does not exist.") + else: + # Get a random identifier + identifier = PersonIdentifier.objects.filter(user=sender_user, + person=person).first() + recipient_service = identifier.service + + # sym(str(person.__dict__)) + # sym(f"Service: {recipient_service}") + + identifier.send(body) + + def send_from_external(self, person_identifier, text, detail): + """ + This method will send an incoming external message to the correct XMPP user. + """ + + sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" + recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" + + if detail.is_outgoing_message: + carbon_msg = f""" + + + + + {text} + + + + + """ + log.info(f"Sending Carbon: {carbon_msg}") + self.send_raw(carbon_msg) + else: + + + log.info(f"Forwarding message from external service: {sender_jid} -> {recipient_jid}: {text}") + self.send_message(mto=recipient_jid, mfrom=sender_jid, mbody=text, mtype="chat") + + +async def stream(**kwargs): + pubsub = redis.pubsub() + await pubsub.subscribe("component") + + while True: + message = await pubsub.get_message(ignore_subscribe_messages=True) + if message is not None: + try: + log.info("GOT", message) + data = message["data"] + unpacked = msgpack.unpackb(data, raw=False) + log.info(f"Unpacked: {unpacked}") + except TypeError: + log.info(f"FAILED {message}") + continue + if "type" in unpacked.keys(): + if unpacked["type"] == "def": + await deferred.process_deferred( + unpacked, + **kwargs + ) + await asyncio.sleep(0.01) + +class Command(BaseCommand): + def handle(self, *args, **options): + xmpp = EchoComponent( + jid=settings.XMPP_JID, + secret=settings.XMPP_SECRET, + server=settings.XMPP_ADDRESS, + port=settings.XMPP_PORT, + ) + xmpp.register_plugin('xep_0030') # Service Discovery + xmpp.register_plugin('xep_0004') # Data Forms + xmpp.register_plugin('xep_0060') # PubSub + xmpp.register_plugin('xep_0199') # XMPP Ping + xmpp.register_plugin("xep_0085") # Chat State Notifications + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.create_task(stream(xmpp=xmpp)) + + # Connect to the XMPP server and start processing XMPP stanzas. + xmpp.connect() + xmpp.process() + + + try: + while True: + pass # Keep the component running + except (KeyboardInterrupt, SystemExit): + log.info("XMPP Component terminating") diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index de6e73c..f79122a 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -22,6 +22,9 @@ SIGNAL_URL = "signal:8080" log = logs.get_logger("processing") +redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) + + class HandleMessage(Command): async def handle(self, c: Context): msg = { @@ -36,9 +39,18 @@ class HandleMessage(Command): "mentions": c.message.mentions, "raw_message": c.message.raw_message } - dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") - account = c.message.raw_message.get("account", "") - source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") + log.info("1") + raw = json.loads(c.message.raw_message) + print(json.dumps(c.message.raw_message, indent=2)) + #dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") + dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") + + #account = c.message.raw_message.get("account", "") + account = raw.get("account", "") + log.info("2") + #source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") + source_name = raw.get("envelope", {}).get("sourceName", "") + log.info("3") source_number = c.message.source_number source_uuid = c.message.source_uuid @@ -58,6 +70,21 @@ class HandleMessage(Command): # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid + cast = { + "type": "def", + "method": "xmpp", + "service": "signal", + # "sender": source_uuid, + "identifier": identifier_uuid, + "msg": text, + "detail": { + "reply_to_self": reply_to_self, + "reply_to_others": reply_to_others, + "is_outgoing_message": is_outgoing_message, + } + } + packed = msgpack.packb(cast, use_bin_type=True) + await redis.publish("component", packed) # TODO: Permission checks manips = await sync_to_async(list)( @@ -116,8 +143,14 @@ class HandleMessage(Command): text=result, ts=ts + 1, ) - log.info("NOT SENDING CHECK CODE IS OK") + # log.info("NOT SENDING CHECK CODE IS OK") # await natural.natural_send_message(c, result) + tss = await natural.natural_send_message( + result, + c.send, + c.start_typing, + c.stop_typing, + ) elif manip.mode == "notify": title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(result, title=title) @@ -180,8 +213,7 @@ class HandleMessage(Command): ) # -async def pubsub(): - redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) +async def stream(): pubsub = redis.pubsub() await pubsub.subscribe("processing") @@ -211,7 +243,7 @@ class Command(BaseCommand): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) bot._event_loop = loop - loop.create_task(pubsub()) + loop.create_task(stream()) bot.start() try: loop.run_forever() diff --git a/core/models.py b/core/models.py index 2f1c43a..b101126 100644 --- a/core/models.py +++ b/core/models.py @@ -5,6 +5,7 @@ from django.conf import settings from django.contrib.auth.models import AbstractUser from django.db import models from core.lib.notify import raw_sendmsg +from core.clients import signalapi logger = logging.getLogger(__name__) @@ -118,6 +119,20 @@ class PersonIdentifier(models.Model): def __str__(self): return f"{self.person} ({self.service})" + def send(self, text): + """ + Send this contact a text. + """ + if self.service == "signal": + ts = signalapi.send_message_raw_sync( + self.identifier, + text + ) + return ts + else: + raise NotImplementedError(f"Service not implemented: {self.service}") + + class ChatSession(models.Model): """Represents an ongoing chat session, stores summarized history.""" id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) @@ -244,6 +259,9 @@ class Manipulation(models.Model): blank=True, null=True ) + def __str__(self): + return f"{self.name} [{self.group}]" + # class Perms(models.Model): # class Meta: diff --git a/core/templates/base.html b/core/templates/base.html index 4a215c8..92d5489 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -277,13 +277,16 @@ diff --git a/core/templates/partials/queue-list.html b/core/templates/partials/queue-list.html new file mode 100644 index 0000000..48f99f0 --- /dev/null +++ b/core/templates/partials/queue-list.html @@ -0,0 +1,71 @@ +{% load cache %} +{% load cachalot cache %} +{% get_last_invalidation 'core.QueuedMessage' as last %} +{% include 'mixins/partials/notify.html' %} +{% cache 600 objects_queue request.user.id object_list type last %} + + + + + + + + + + {% for item in object_list %} + + + + + + + + + {% endfor %} + +
idsessionmanipulationtstextactions
+ + + + + + {{ item.session }}{{ item.manipulation }}{{ item.ts }}{{ item.text.length }} +
+ + +
+
+{% endcache %} \ No newline at end of file diff --git a/core/views/queues.py b/core/views/queues.py index 6f98ae7..444b0a8 100644 --- a/core/views/queues.py +++ b/core/views/queues.py @@ -5,12 +5,21 @@ from rest_framework import status from django.http import HttpResponse from core.models import QueuedMessage, Message +from core.forms import QueueForm + import requests import orjson from django.conf import settings import redis import msgpack +from mixins.views import ( + ObjectCreate, + ObjectDelete, + ObjectList, + ObjectUpdate, +) + # def start_typing(uuid): # url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" # data = { @@ -54,4 +63,32 @@ class RejectMessageAPI(LoginRequiredMixin, APIView): message.delete() - return HttpResponse(status=status.HTTP_200_OK) \ No newline at end of file + return HttpResponse(status=status.HTTP_200_OK) + +class QueueList(LoginRequiredMixin, ObjectList): + list_template = "partials/queue-list.html" + model = QueuedMessage + page_title = "Queues" + + list_url_name = "queues" + list_url_args = ["type"] + + submit_url_name = "queue_create" + + +class QueueCreate(LoginRequiredMixin, ObjectCreate): + model = QueuedMessage + form_class = QueueForm + + submit_url_name = "queue_create" + + +class QueueUpdate(LoginRequiredMixin, ObjectUpdate): + model = QueuedMessage + form_class = QueueForm + + submit_url_name = "queue_update" + + +class QueueDelete(LoginRequiredMixin, ObjectDelete): + model = QueuedMessage diff --git a/core/views/signal.py b/core/views/signal.py index 448914c..f868000 100644 --- a/core/views/signal.py +++ b/core/views/signal.py @@ -33,7 +33,6 @@ class SignalAccounts(SuperUserRequiredMixin, ObjectList): url = f"http://signal:8080/v1/accounts" response = requests.get(url) accounts = orjson.loads(response.text) - print("ACCOUNTS", accounts) return accounts @@ -49,8 +48,6 @@ class SignalContactsList(SuperUserRequiredMixin, ObjectList): def get_queryset(self, *args, **kwargs): # url = signal:8080/v1/accounts - print("GET", self.request.GET) - print("KWARGS", self.kwargs) # /v1/configuration/{number}/settings # /v1/identities/{number} # /v1/contacts/{number} @@ -63,9 +60,6 @@ class SignalContactsList(SuperUserRequiredMixin, ObjectList): response = requests.get(f"http://signal:8080/v1/contacts/{self.kwargs['pk']}") contacts = orjson.loads(response.text) - print("identities", identities) - print("contacts", contacts) - # add identities to contacts for contact in contacts: for identity in identities: diff --git a/docker-compose.yml b/docker-compose.yml index 1de4aba..9e78108 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,10 @@ services: REGISTRATION_OPEN: "${REGISTRATION_OPEN}" OPERATION: "${OPERATION}" SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" depends_on: redis: condition: service_healthy @@ -98,6 +102,59 @@ services: REGISTRATION_OPEN: "${REGISTRATION_OPEN}" OPERATION: "${OPERATION}" SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" + depends_on: + redis: + condition: service_healthy + migration: + condition: service_started + collectstatic: + condition: service_started + # deploy: + # resources: + # limits: + # cpus: '0.25' + # memory: 0.25G + #network_mode: host + + component: + image: xf/gia:prod + container_name: component_gia + build: + context: . + args: + OPERATION: ${OPERATION} + command: sh -c '. /venv/bin/activate && python manage.py component' + volumes: + - ${REPO_DIR}:/code + - ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini + - ${APP_DATABASE_FILE}:/conf/db.sqlite3 + - type: bind + source: /code/vrun + target: /var/run + environment: + APP_PORT: "${APP_PORT}" + REPO_DIR: "${REPO_DIR}" + APP_LOCAL_SETTINGS: "${APP_LOCAL_SETTINGS}" + APP_DATABASE_FILE: "${APP_DATABASE_FILE}" + DOMAIN: "${DOMAIN}" + URL: "${URL}" + ALLOWED_HOSTS: "${ALLOWED_HOSTS}" + NOTIFY_TOPIC: "${NOTIFY_TOPIC}" + CSRF_TRUSTED_ORIGINS: "${CSRF_TRUSTED_ORIGINS}" + DEBUG: "${DEBUG}" + SECRET_KEY: "${SECRET_KEY}" + STATIC_ROOT: "${STATIC_ROOT}" + REGISTRATION_OPEN: "${REGISTRATION_OPEN}" + OPERATION: "${OPERATION}" + SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" depends_on: redis: condition: service_healthy @@ -143,6 +200,10 @@ services: REGISTRATION_OPEN: "${REGISTRATION_OPEN}" OPERATION: "${OPERATION}" SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" depends_on: redis: condition: service_healthy @@ -187,6 +248,10 @@ services: REGISTRATION_OPEN: "${REGISTRATION_OPEN}" OPERATION: "${OPERATION}" SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" # deploy: # resources: # limits: @@ -224,6 +289,10 @@ services: REGISTRATION_OPEN: "${REGISTRATION_OPEN}" OPERATION: "${OPERATION}" SIGNAL_NUMBER: "${SIGNAL_NUMBER}" + XMPP_ADDRESS: "${XMPP_ADDRESS}" + XMPP_JID: "${XMPP_JID}" + XMPP_PORT: "${XMPP_PORT}" + XMPP_SECRET: "${XMPP_SECRET}" # deploy: # resources: # limits: diff --git a/requirements.txt b/requirements.txt index 44f3fdb..27b7fd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,3 +35,4 @@ signalbot openai aiograpi aiomysql +slixmpp