# Deferred processing library import asyncio from typing import Annotated, Optional from uuid import UUID from asgiref.sync import sync_to_async from django.conf import settings from pydantic import BaseModel, ValidationError from core.clients import signal, signalapi from core.lib.prompts.functions import delete_messages from core.models import Message, PersonIdentifier, QueuedMessage, User from core.util import logs log = logs.get_logger("deferred") class DeferredDetail(BaseModel): reply_to_self: bool reply_to_others: bool is_outgoing_message: bool class DeferredRequest(BaseModel): type: str method: str user_id: Optional[int] = None message_id: Optional[Annotated[str, UUID]] = None identifier: Optional[str] = None msg: Optional[str] = None service: Optional[str] = None detail: Optional[DeferredDetail] = None attachments: Optional[list] = None async def send_message(db_obj): recipient_uuid = db_obj.session.identifier.identifier text = db_obj.text send = lambda x: signalapi.send_message_raw(recipient_uuid, x) # returns ts start_t = lambda: signalapi.start_typing(recipient_uuid) stop_t = lambda: signalapi.stop_typing(recipient_uuid) tss = await natural.natural_send_message( text, send, start_t, stop_t, ) # list of ts # result = await send_message_raw(recipient_uuid, text) await sync_to_async(db_obj.delete)() result = [x for x in tss if x] # all trueish ts if result: # if at least one message was sent ts1 = result.pop() # pick a time log.info(f"signal message create {text}") await sync_to_async(Message.objects.create)( user=db_obj.session.user, session=db_obj.session, custom_author="BOT", text=text, ts=ts1, # use that time in db ) async def process_deferred(data: dict, **kwargs): try: validated_data = DeferredRequest(**data) log.info(f"Validated Data: {validated_data}") # Process the validated data except ValidationError as e: log.info(f"Validation Error: {e}") return method = validated_data.method user_id = validated_data.user_id message_id = validated_data.message_id if method == "accept_message": try: message = await sync_to_async(QueuedMessage.objects.get)( user_id=user_id, id=message_id, ) log.info(f"Got {message}") except QueuedMessage.DoesNotExist: log.info(f"Didn't get message from {message_id}") return if message.session.identifier.service == "signal": await send_message(message) else: log.warning(f"Protocol not supported: {message.session.identifier.service}") return elif method == "xmpp": # send xmpp message xmpp = kwargs.get("xmpp") service = validated_data.service msg = validated_data.msg attachments = validated_data.attachments # Get User from identifier identifiers = PersonIdentifier.objects.filter( identifier=validated_data.identifier, service=service, ) xmpp_attachments = [] # attachments = [] # Asynchronously fetch all attachments tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachments] fetched_attachments = await asyncio.gather(*tasks) for fetched, att in zip(fetched_attachments, attachments): if not fetched: log.warning(f"Failed to fetch attachment {att['id']} from Signal.") continue # Attach fetched file to XMPP xmpp_attachments.append( { "content": fetched["content"], "content_type": fetched["content_type"], "filename": fetched["filename"], "size": fetched["size"], } ) for identifier in identifiers: # recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" user = identifier.user log.info( f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP." ) await xmpp.send_from_external( user, identifier, msg, validated_data.detail, attachments=xmpp_attachments, ) else: log.warning(f"Method not yet supported: {method}") return