import asyncio import json from urllib.parse import urlparse import aiohttp from asgiref.sync import sync_to_async from django.conf import settings from django.urls import reverse from signalbot import Command, Context, SignalBot from core.clients import ClientBase, signalapi from core.messaging import ai, history, natural, replies, utils from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage from core.util import logs log = logs.get_logger("signalF") _signal_http_url = getattr(settings, "SIGNAL_HTTP_URL", "").strip() if _signal_http_url: parsed = urlparse( _signal_http_url if "://" in _signal_http_url else f"http://{_signal_http_url}" ) SIGNAL_HOST = parsed.hostname or "signal" SIGNAL_PORT = parsed.port or 8080 else: if settings.DEBUG: SIGNAL_HOST = "127.0.0.1" else: SIGNAL_HOST = "signal" SIGNAL_PORT = 8080 SIGNAL_URL = f"{SIGNAL_HOST}:{SIGNAL_PORT}" def _get_nested(payload, path): current = payload for key in path: if not isinstance(current, dict): return None current = current.get(key) return current def _looks_like_signal_attachment(entry): return isinstance(entry, dict) and ( "id" in entry or "attachmentId" in entry or "contentType" in entry ) def _normalize_attachment(entry): attachment_id = entry.get("id") or entry.get("attachmentId") if attachment_id is None: return None return { "id": attachment_id, "content_type": entry.get("contentType", "application/octet-stream"), "filename": entry.get("filename") or str(attachment_id), "size": entry.get("size") or 0, "width": entry.get("width"), "height": entry.get("height"), } def _extract_attachments(raw_payload): envelope = raw_payload.get("envelope", {}) candidate_paths = [ ("dataMessage", "attachments"), ("syncMessage", "sentMessage", "attachments"), ("syncMessage", "editMessage", "dataMessage", "attachments"), ] results = [] seen = set() for path in candidate_paths: found = _get_nested(envelope, path) if not isinstance(found, list): continue for entry in found: normalized = _normalize_attachment(entry) if not normalized: continue key = str(normalized["id"]) if key in seen: continue seen.add(key) results.append(normalized) # Fallback: scan for attachment-shaped lists under envelope. if not results: stack = [envelope] while stack: node = stack.pop() if isinstance(node, dict): for value in node.values(): stack.append(value) elif isinstance(node, list): if node and all(_looks_like_signal_attachment(item) for item in node): for entry in node: normalized = _normalize_attachment(entry) if not normalized: continue key = str(normalized["id"]) if key in seen: continue seen.add(key) results.append(normalized) else: for value in node: stack.append(value) return results def _extract_receipt_timestamps(receipt_payload): raw_ts = receipt_payload.get("timestamp") if raw_ts is None: raw_ts = receipt_payload.get("timestamps") if isinstance(raw_ts, list): out = [] for item in raw_ts: try: out.append(int(item)) except Exception: continue return out if raw_ts is not None: try: return [int(raw_ts)] except Exception: return [] return [] def _typing_started(typing_payload): action = str(typing_payload.get("action") or "").strip().lower() if action in {"started", "start", "typing", "composing"}: return True explicit = typing_payload.get("isTyping") if isinstance(explicit, bool): return explicit return True class NewSignalBot(SignalBot): def __init__(self, ur, service, config): self.ur = ur self.service = service self.signal_rest = config["signal_service"] # keep your own copy self.phone_number = config["phone_number"] super().__init__(config) self.log = logs.get_logger("signalI") self.bot_uuid = None async def get_own_uuid(self) -> str | None: async with aiohttp.ClientSession() as session: # config may be "signal:8080" -- ensure http:// base = self.signal_rest if not base.startswith("http"): base = f"http://{base}" uri = f"{base}/v1/contacts/{self.phone_number}" try: resp = await session.get(uri) if resp.status != 200: self.log.error( f"contacts lookup failed: {resp.status} {await resp.text()}" ) return None contacts_data = await resp.json() if isinstance(contacts_data, list): for contact in contacts_data: if contact.get("number") == self.phone_number: return contact.get("uuid") return None except Exception as e: self.log.error(f"Failed to get UUID from contacts: {e}") return None async def initialize_bot(self): """Fetch bot's UUID and store it in self.bot_uuid.""" try: self.bot_uuid = await self.get_own_uuid() if self.bot_uuid: self.log.info(f"Own UUID: {self.bot_uuid}") else: self.log.warning("Unable to fetch bot UUID.") except Exception as e: self.log.error(f"Failed to initialize bot UUID: {e}") async def _async_post_init(self): """ Preserve SignalBot startup flow so protocol auto-detection runs. This flips the client to plain HTTP/WS when HTTPS/WSS is unavailable. """ await self._check_signal_service() await self.initialize_bot() await self._detect_groups() await self._resolve_commands() await self._produce_consume_messages() def start(self): """Start bot without blocking the caller's event loop.""" task = self._event_loop.create_task( self._rerun_on_exception(self._async_post_init) ) self._store_reference_to_task(task, self._running_tasks) self.scheduler.start() class HandleMessage(Command): def __init__(self, ur, service, *args, **kwargs): self.ur = ur self.service = service return super().__init__(*args, **kwargs) async def handle(self, c: Context): msg = { "source": c.message.source, "source_number": c.message.source_number, "source_uuid": c.message.source_uuid, "timestamp": c.message.timestamp, "type": c.message.type.value, "text": c.message.text, "group": c.message.group, "reaction": c.message.reaction, "mentions": c.message.mentions, "raw_message": c.message.raw_message, } raw = json.loads(c.message.raw_message) dest = ( raw.get("envelope", {}) .get("syncMessage", {}) .get("sentMessage", {}) .get("destinationUuid") ) account = raw.get("account", "") source_name = raw.get("envelope", {}).get("sourceName", "") source_number = c.message.source_number source_uuid = c.message.source_uuid text = c.message.text ts = c.message.timestamp # Message originating from us same_recipient = source_uuid == dest is_from_bot = source_uuid == c.bot.bot_uuid is_to_bot = dest == c.bot.bot_uuid or dest is None reply_to_self = same_recipient and is_from_bot # Reply reply_to_others = is_to_bot and not same_recipient # Reply is_outgoing_message = is_from_bot and not is_to_bot # Do not reply # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid if not identifier_uuid: log.warning("No Signal identifier available for message routing.") return # Resolve person identifiers once for this event. identifiers = await sync_to_async(list)( PersonIdentifier.objects.filter( identifier=identifier_uuid, service=self.service, ) ) envelope = raw.get("envelope", {}) typing_payload = envelope.get("typingMessage") if isinstance(typing_payload, dict): for identifier in identifiers: if _typing_started(typing_payload): await self.ur.started_typing( self.service, identifier=identifier, payload=typing_payload, ) else: await self.ur.stopped_typing( self.service, identifier=identifier, payload=typing_payload, ) return receipt_payload = envelope.get("receiptMessage") if isinstance(receipt_payload, dict): read_timestamps = _extract_receipt_timestamps(receipt_payload) read_ts = ( envelope.get("timestamp") or envelope.get("serverReceivedTimestamp") or c.message.timestamp ) for identifier in identifiers: await self.ur.message_read( self.service, identifier=identifier, message_timestamps=read_timestamps, read_ts=read_ts, payload=receipt_payload, read_by=source_uuid, ) return # Handle attachments across multiple Signal payload variants. attachment_list = _extract_attachments(raw) xmpp_attachments = [] # Asynchronously fetch all attachments log.info(f"ATTACHMENT LIST {attachment_list}") if attachment_list: tasks = [ signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list ] fetched_attachments = await asyncio.gather(*tasks) else: envelope = raw.get("envelope", {}) log.info(f"No attachments found. Envelope keys: {list(envelope.keys())}") fetched_attachments = [] for fetched, att in zip(fetched_attachments, attachment_list): if not fetched: log.warning(f"Failed to fetch attachment {att['id']} from Signal.") continue # Attach fetched file to XMPP xmpp_attachments.append( { "content": fetched["content"], "content_type": fetched["content_type"], "filename": fetched["filename"], "size": fetched["size"], } ) # Forward incoming Signal messages to XMPP and apply mutate rules. for identifier in identifiers: user = identifier.user mutate_manips = await sync_to_async(list)( Manipulation.objects.filter( group__people=identifier.person, user=identifier.user, mode="mutate", filter_enabled=True, enabled=True, ) ) if mutate_manips: for manip in mutate_manips: prompt = replies.generate_mutate_reply_prompt( text, None, manip, None, ) log.info("Running Signal mutate prompt") result = await ai.run_prompt(prompt, manip.ai) log.info( f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP." ) await self.ur.xmpp.client.send_from_external( user, identifier, result, is_outgoing_message, attachments=xmpp_attachments, ) else: log.info( f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP." ) await self.ur.xmpp.client.send_from_external( user, identifier, text, is_outgoing_message, attachments=xmpp_attachments, ) # TODO: Permission checks manips = await sync_to_async(list)(Manipulation.objects.filter(enabled=True)) session_cache = {} stored_messages = set() for manip in manips: try: person_identifier = await sync_to_async(PersonIdentifier.objects.get)( identifier=identifier_uuid, user=manip.user, service="signal", person__in=manip.group.people.all(), ) except PersonIdentifier.DoesNotExist: log.warning( f"{manip.name}: Message from unknown identifier {identifier_uuid}." ) continue # Find/create ChatSession once per user/person. session_key = (manip.user.id, person_identifier.person.id) if session_key in session_cache: chat_session = session_cache[session_key] else: chat_session = await history.get_chat_session( manip.user, person_identifier ) session_cache[session_key] = chat_session # Store each incoming/outgoing event once per session. message_key = (chat_session.id, ts, source_uuid) if message_key not in stored_messages: log.info(f"Processing history store message {text}") await history.store_message( session=chat_session, sender=source_uuid, text=text, ts=ts, outgoing=is_from_bot, ) stored_messages.add(message_key) # Get the total history chat_history = await history.get_chat_history(chat_session) if replies.should_reply( reply_to_self, reply_to_others, is_outgoing_message, ): if manip.mode in ["silent", "mutate"]: pass elif manip.mode in ["active", "notify", "instant"]: await utils.update_last_interaction(chat_session) prompt = replies.generate_reply_prompt( msg, person_identifier.person, manip, chat_history ) log.info("Running context prompt") result = await ai.run_prompt(prompt, manip.ai) if manip.mode == "active": await history.store_own_message( session=chat_session, text=result, ts=ts + 1, ) await self.ur.xmpp.client.send_from_external( manip.user, person_identifier, result, is_outgoing_message=True, ) await natural.natural_send_message( result, c.send, c.start_typing, c.stop_typing, ) elif manip.mode == "notify": title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(result, title=title) elif manip.mode == "instant": existing_queue = QueuedMessage.objects.filter( user=chat_session.user, session=chat_session, manipulation=manip, custom_author="BOT", ) await history.delete_queryset(existing_queue) qm = await history.store_own_message( session=chat_session, text=result, ts=ts + 1, manip=manip, queue=True, ) accept = reverse( "message_accept_api", kwargs={"message_id": qm.id} ) reject = reverse( "message_reject_api", kwargs={"message_id": qm.id} ) url = settings.URL content = ( f"{result}\n\n" f"Accept: {url}{accept}\n" f"Reject: {url}{reject}" ) title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(content, title=title) else: log.error(f"Mode {manip.mode} is not implemented") await sync_to_async(Chat.objects.update_or_create)( source_uuid=source_uuid, defaults={ "source_number": source_number, "source_name": source_name, "account": account, }, ) class SignalClient(ClientBase): def __init__(self, ur, *args, **kwargs): super().__init__(ur, *args, **kwargs) self.client = NewSignalBot( ur, self.service, { "signal_service": SIGNAL_URL, "phone_number": "+447490296227", }, ) self.client.register(HandleMessage(self.ur, self.service)) def start(self): self.log.info("Signal client starting...") self.client._event_loop = self.loop self.client.start()