Fix bridging and refactor

This commit is contained in:
2025-03-13 17:26:26 +00:00
parent 7fa76cd4ef
commit f5c6b535d8
14 changed files with 1264 additions and 202 deletions

View File

@@ -2,16 +2,20 @@ from core.util import logs
from django.core.management.base import BaseCommand
from slixmpp.componentxmpp import ComponentXMPP
from django.conf import settings
from core.models import User, Person, PersonIdentifier
from core.models import User, Person, PersonIdentifier, ChatSession
from redis import asyncio as aioredis
from asgiref.sync import sync_to_async
from django.utils.timezone import now
import asyncio
import msgpack
from core.lib import deferred
from core.clients import signalapi
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0085.stanza import Active, Composing, Paused, Inactive, Gone
from slixmpp.stanza import Message
from slixmpp.xmlstream.stanzabase import ElementBase, ET
import aiohttp
from core.messaging import history
log = logs.get_logger("component")
@@ -355,7 +359,7 @@ class EchoComponent(ComponentXMPP):
return None
def message(self, msg):
async def message(self, msg):
"""
Process incoming XMPP messages.
"""
@@ -481,68 +485,105 @@ class EchoComponent(ComponentXMPP):
# sym(str(person.__dict__))
# sym(f"Service: {recipient_service}")
identifier.send(body, attachments=attachments)
async def send_from_external(self, person_identifier, text, detail, attachments=[]):
sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}"
recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}"
# First, send text separately if there's any
if text:
text_msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat")
text_msg["body"] = text
log.info(f"Sending separate text message: {text}")
if detail.is_outgoing_message:
log.info("Outgoing message, not forwarding")
...
else:
log.info(f"Final XMPP message: {text_msg.xml}")
text_msg.send()
for att in attachments:
# Request an upload slot
upload_slot = await self.request_upload_slot(
recipient_jid, att["filename"], att["content_type"], att["size"]
#tss = await identifier.send(body, attachments=attachments)
# AM FIXING https://git.zm.is/XF/GIA/issues/5
session, _ = await sync_to_async(ChatSession.objects.get_or_create)(
identifier=identifier,
user=user,
)
if not upload_slot:
log.warning(f"Failed to obtain upload slot for {att['filename']}")
continue
log.info(f"Component history store message {text}")
await history.store_message(
session=session,
sender="XMPP",
text=text,
ts=now().timestamp(),
outgoing=detail.is_outgoing_message,
)
log.info("Stored a message sent from XMPP in the history.")
upload_url, put_url, auth_header = upload_slot
tss = await signalapi.send_message_raw(
identifier.identifier,
body,
attachments,
)
log.info(f"Message sent")
# Upload file
headers = {"Content-Type": att["content_type"]}
if auth_header:
headers["Authorization"] = auth_header
async def request_upload_slots(self, recipient_jid, attachments):
"""Requests upload slots for multiple attachments concurrently."""
upload_tasks = [
self.request_upload_slot(recipient_jid, att["filename"], att["content_type"], att["size"])
for att in attachments
]
upload_slots = await asyncio.gather(*upload_tasks)
return [(att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None]
async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid):
"""Uploads a file and immediately sends the corresponding XMPP message."""
upload_url, put_url, auth_header = upload_slot
headers = {"Content-Type": att["content_type"]}
if auth_header:
headers["Authorization"] = auth_header
async with aiohttp.ClientSession() as session:
try:
async with aiohttp.ClientSession() as session:
async with session.put(put_url, data=att["content"], headers=headers) as response:
if response.status not in (200, 201):
log.error(f"Upload failed: {response.status} {await response.text()}")
continue
async with session.put(put_url, data=att["content"], headers=headers) as response:
if response.status not in (200, 201):
log.error(f"Upload failed: {response.status} {await response.text()}")
return
log.info(f"Successfully uploaded {att['filename']} to {upload_url}")
# Create and send message with only the file URL
msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat")
msg["body"] = upload_url # Body must be only the URL
# Include <x><url> (XEP-0066) to ensure client compatibility
oob_element = ET.Element("{jabber:x:oob}x")
url_element = ET.SubElement(oob_element, "{jabber:x:oob}url")
url_element.text = upload_url
msg.xml.append(oob_element)
log.info(f"Sending file attachment message with URL: {upload_url}")
if detail.is_outgoing_message:
log.info("Outgoing message, not forwarding")
...
else:
log.info(f"Final XMPP message: {msg.xml}")
msg.send()
# Send XMPP message immediately after successful upload
await self.send_xmpp_message(recipient_jid, sender_jid, upload_url, attachment_url=upload_url)
except Exception as e:
log.error(f"Error uploading {att['filename']} to XMPP: {e}")
async def send_xmpp_message(self, recipient_jid, sender_jid, body_text, attachment_url=None):
"""Sends an XMPP message with either text or an attachment URL."""
msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat")
msg["body"] = body_text # Body must contain only text or the URL
if attachment_url:
# Include <x><url> (XEP-0066) to ensure client compatibility
oob_element = ET.Element("{jabber:x:oob}x")
url_element = ET.SubElement(oob_element, "{jabber:x:oob}url")
url_element.text = attachment_url
msg.xml.append(oob_element)
log.info(f"Sending XMPP message: {msg.xml}")
msg.send()
async def send_from_external(self, user, person_identifier, text, detail, attachments=[]):
"""Handles sending XMPP messages with text and attachments."""
if detail.is_outgoing_message:
return
sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}"
recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}"
# Step 1: Send text message separately
if text:
await self.send_xmpp_message(recipient_jid, sender_jid, text)
if not attachments:
return # No attachments to process
# Step 2: Request upload slots concurrently
valid_uploads = await self.request_upload_slots(recipient_jid, attachments)
log.info(f"Got upload slots")
if not valid_uploads:
log.warning("No valid upload slots obtained.")
#return
# Step 3: Upload each file and send its message immediately after upload
upload_tasks = [
self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads
]
await asyncio.gather(*upload_tasks) # Upload files concurrently
async def stream(**kwargs):
pubsub = redis.pubsub()
await pubsub.subscribe("component")

View File

@@ -14,7 +14,7 @@ from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Messa
import aiohttp
from django.utils import timezone
from django.conf import settings
from core.lib.bot import NewSignalBot
from core.clients.signal import NewSignalBot
from redis import asyncio as aioredis
SIGNAL_URL = "signal:8080"
@@ -102,6 +102,7 @@ class HandleMessage(Command):
manips = await sync_to_async(list)(
Manipulation.objects.filter(enabled=True)
)
processed_people = set()
for manip in manips:
try:
person_identifier = await sync_to_async(PersonIdentifier.objects.get)(
@@ -110,6 +111,10 @@ class HandleMessage(Command):
service="signal",
person__in=manip.group.people.all(),
)
# Check if we've already processed this person
if person_identifier.person.id in processed_people:
log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}")
continue # Skip to next manipulation
if not manip.group.people.filter(id=person_identifier.person.id).exists():
log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.")
continue # Exit early if the person is not in the group
@@ -121,6 +126,8 @@ class HandleMessage(Command):
chat_session = await history.get_chat_session(manip.user, person_identifier)
# Store incoming or outgoing messages
log.info(f"Processing history store message {text}")
processed_people.add(person_identifier.person.id)
await history.store_message(
session=chat_session,
sender=source_uuid,
@@ -148,8 +155,7 @@ class HandleMessage(Command):
log.info("Running context prompt")
result = await ai.run_prompt(prompt, manip.ai)
# Store bot's AI response with a +1s timestamp
if manip.mode == "active":
# Store bot's AI response with a +1s timestamp if manip.mode == "active":
await history.store_own_message(
session=chat_session,
text=result,

View File

@@ -0,0 +1,18 @@
from core.util import logs
from django.core.management.base import BaseCommand
from django.conf import settings
from core.modules.router import UnifiedRouter
import asyncio
log = logs.get_logger("UR")
class Command(BaseCommand):
def handle(self, *args, **options):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
instance = UnifiedRouter(loop)
instance.start()
instance.run()