# Deferred processing library from core.util import logs from pydantic import BaseModel from typing import Annotated, Optional from uuid import UUID from pydantic import ValidationError from core.models import QueuedMessage, Message, PersonIdentifier, User from core.clients import signal from core.lib.prompts.functions import delete_messages from asgiref.sync import sync_to_async from django.conf import settings from core.clients import signalapi import asyncio log = logs.get_logger("deferred") class DeferredDetail(BaseModel): reply_to_self: bool reply_to_others: bool is_outgoing_message: bool class DeferredRequest(BaseModel): type: str method: str user_id: Optional[int] = None message_id: Optional[Annotated[str, UUID]] = None identifier: Optional[str] = None msg: Optional[str] = None service: Optional[str] = None detail: Optional[DeferredDetail] = None attachments: Optional[list] = None async def process_deferred(data: dict, **kwargs): try: validated_data = DeferredRequest(**data) log.info(f"Validated Data: {validated_data}") # Process the validated data except ValidationError as e: log.info(f"Validation Error: {e}") return method = validated_data.method user_id = validated_data.user_id message_id = validated_data.message_id if method == "accept_message": try: message = await sync_to_async(QueuedMessage.objects.get)( user_id=user_id, id=message_id, ) log.info(f"Got {message}") except QueuedMessage.DoesNotExist: log.info(f"Didn't get message from {message_id}") return if message.session.identifier.service == "signal": await signal.send_message(message) else: log.warning(f"Protocol not supported: {message.session.identifier.service}") return elif method == "xmpp": # send xmpp message xmpp = kwargs.get("xmpp") service = validated_data.service msg = validated_data.msg attachments = validated_data.attachments # Get User from identifier identifiers = PersonIdentifier.objects.filter( identifier=validated_data.identifier, service=service, ) xmpp_attachments = [] # Asynchronously fetch all attachments tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachments] fetched_attachments = await asyncio.gather(*tasks) for fetched, att in zip(fetched_attachments, attachments): if not fetched: log.warning(f"Failed to fetch attachment {att['id']} from Signal.") continue # Attach fetched file to XMPP xmpp_attachments.append({ "content": fetched["content"], "content_type": fetched["content_type"], "filename": fetched["filename"], "size": fetched["size"], }) for identifier in identifiers: #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") await xmpp.send_from_external(identifier, msg, validated_data.detail, attachments=xmpp_attachments) else: log.warning(f"Method not yet supported: {method}") return