# Deferred processing library from core.util import logs from pydantic import BaseModel from typing import Annotated, Optional from uuid import UUID from pydantic import ValidationError from core.models import QueuedMessage, Message, PersonIdentifier, User from core.clients import signal from core.lib.prompts.functions import delete_messages from asgiref.sync import sync_to_async from django.conf import settings from core.clients import signalapi import asyncio log = logs.get_logger("deferred") class DeferredDetail(BaseModel): reply_to_self: bool reply_to_others: bool is_outgoing_message: bool class DeferredRequest(BaseModel): type: str method: str user_id: Optional[int] = None message_id: Optional[Annotated[str, UUID]] = None identifier: Optional[str] = None msg: Optional[str] = None service: Optional[str] = None detail: Optional[DeferredDetail] = None attachments: Optional[list] = None async def send_message(db_obj): recipient_uuid = db_obj.session.identifier.identifier text = db_obj.text send = lambda x: signalapi.send_message_raw(recipient_uuid, x) # returns ts start_t = lambda: signalapi.start_typing(recipient_uuid) stop_t = lambda: signalapi.stop_typing(recipient_uuid) tss = await natural.natural_send_message( text, send, start_t, stop_t, ) # list of ts #result = await send_message_raw(recipient_uuid, text) await sync_to_async(db_obj.delete)() result = [x for x in tss if x] # all trueish ts if result: # if at least one message was sent ts1 = result.pop() # pick a time log.info(f"signal message create {text}") await sync_to_async(Message.objects.create)( user=db_obj.session.user, session=db_obj.session, custom_author="BOT", text=text, ts=ts1, # use that time in db ) async def process_deferred(data: dict, **kwargs): try: validated_data = DeferredRequest(**data) log.info(f"Validated Data: {validated_data}") # Process the validated data except ValidationError as e: log.info(f"Validation Error: {e}") return method = validated_data.method user_id = validated_data.user_id message_id = validated_data.message_id if method == "accept_message": try: message = await sync_to_async(QueuedMessage.objects.get)( user_id=user_id, id=message_id, ) log.info(f"Got {message}") except QueuedMessage.DoesNotExist: log.info(f"Didn't get message from {message_id}") return if message.session.identifier.service == "signal": await send_message(message) else: log.warning(f"Protocol not supported: {message.session.identifier.service}") return elif method == "xmpp": # send xmpp message xmpp = kwargs.get("xmpp") service = validated_data.service msg = validated_data.msg attachments = validated_data.attachments # Get User from identifier identifiers = PersonIdentifier.objects.filter( identifier=validated_data.identifier, service=service, ) xmpp_attachments = [] # attachments = [] # Asynchronously fetch all attachments tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachments] fetched_attachments = await asyncio.gather(*tasks) for fetched, att in zip(fetched_attachments, attachments): if not fetched: log.warning(f"Failed to fetch attachment {att['id']} from Signal.") continue # Attach fetched file to XMPP xmpp_attachments.append({ "content": fetched["content"], "content_type": fetched["content_type"], "filename": fetched["filename"], "size": fetched["size"], }) for identifier in identifiers: #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" user = identifier.user log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") await xmpp.send_from_external(user, identifier, msg, validated_data.detail, attachments=xmpp_attachments) else: log.warning(f"Method not yet supported: {method}") return