diff --git a/core/clients/signal.py b/core/clients/signal.py index 7e0c372..10c37c0 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -1,60 +1,63 @@ - -from django.conf import settings -from core.messaging import natural -import aiohttp -from core.util import logs -from core.clients import ClientBase -from signalbot import SignalBot -import aiohttp -import msgpack -from django.conf import settings -from signalbot import SignalBot, Command, Context -from asgiref.sync import sync_to_async -from django.urls import reverse -import json import asyncio -from core.util import logs -from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages -from core.lib import deferred -from core.messaging import replies, ai, natural, history, utils -from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage +import json + import aiohttp +from asgiref.sync import sync_to_async from django.conf import settings -from redis import asyncio as aioredis -from core.clients import signalapi - +from django.urls import reverse +from signalbot import Command, Context, SignalBot +from core.clients import ClientBase, signalapi +from core.lib.prompts.functions import delete_messages, truncate_and_summarize +from core.messaging import ai, history, natural, replies, utils +from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage from core.util import logs log = logs.get_logger("signalF") -SIGNAL_URL = "signal:8080" +if settings.DEBUG: + SIGNAL_HOST = "127.0.0.1" +else: + SIGNAL_HOST = "signal" -redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) +SIGNAL_PORT = 8080 + +SIGNAL_URL = f"{SIGNAL_HOST}:{SIGNAL_PORT}" class NewSignalBot(SignalBot): def __init__(self, ur, service, config): self.ur = ur self.service = service + self.signal_rest = config["signal_service"] # keep your own copy + self.phone_number = config["phone_number"] super().__init__(config) self.log = logs.get_logger("signalI") - self.bot_uuid = None # Initialize with None + self.bot_uuid = None - async def get_own_uuid(self) -> str: - """Fetch bot's UUID by checking contacts, groups, or profile.""" + async def get_own_uuid(self) -> str | None: async with aiohttp.ClientSession() as session: - uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" + # config may be "signal:8080" -- ensure http:// + base = self.signal_rest + if not base.startswith("http"): + base = f"http://{base}" + + uri = f"{base}/v1/contacts/{self.phone_number}" try: - resp = await session.get(uri_contacts) - if resp.status == 200: - contacts_data = await resp.json() - if isinstance(contacts_data, list): - for contact in contacts_data: - if contact.get("number") == self._phone_number: - return contact.get("uuid") + resp = await session.get(uri) + if resp.status != 200: + self.log.error(f"contacts lookup failed: {resp.status} {await resp.text()}") + return None + + contacts_data = await resp.json() + if isinstance(contacts_data, list): + for contact in contacts_data: + if contact.get("number") == self.phone_number: + return contact.get("uuid") + return None except Exception as e: self.log.error(f"Failed to get UUID from contacts: {e}") + return None async def initialize_bot(self): """Fetch bot's UUID and store it in self.bot_uuid.""" @@ -67,13 +70,24 @@ class NewSignalBot(SignalBot): except Exception as e: self.log.error(f"Failed to initialize bot UUID: {e}") - def start(self): - """Start bot without blocking event loop.""" - self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first - self._event_loop.create_task(self._detect_groups()) # Sync groups - self._event_loop.create_task(self._produce_consume_messages()) # Process messages + async def _async_post_init(self): + """ + Preserve SignalBot startup flow so protocol auto-detection runs. + This flips the client to plain HTTP/WS when HTTPS/WSS is unavailable. + """ + await self._check_signal_service() + await self.initialize_bot() + await self._detect_groups() + await self._resolve_commands() + await self._produce_consume_messages() - self.scheduler.start() # Start async job scheduler + def start(self): + """Start bot without blocking the caller's event loop.""" + task = self._event_loop.create_task( + self._rerun_on_exception(self._async_post_init) + ) + self._store_reference_to_task(task, self._running_tasks) + self.scheduler.start() class HandleMessage(Command): @@ -95,13 +109,9 @@ class HandleMessage(Command): "raw_message": c.message.raw_message } raw = json.loads(c.message.raw_message) - print(json.dumps(c.message.raw_message, indent=2)) - #dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") - #account = c.message.raw_message.get("account", "") account = raw.get("account", "") - #source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") source_name = raw.get("envelope", {}).get("sourceName", "") source_number = c.message.source_number @@ -121,7 +131,9 @@ class HandleMessage(Command): # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid - + if not identifier_uuid: + log.warning("No Signal identifier available for message routing.") + return # Handle attachments attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) @@ -138,23 +150,21 @@ class HandleMessage(Command): "height": attachment.get("height"), }) - # Get User from identifier - log.info(f"FUCK {self.service}") - identifiers = PersonIdentifier.objects.filter( - identifier=identifier_uuid, - service=self.service, + # Get users/person identifiers for this Signal sender/recipient. + identifiers = await sync_to_async(list)( + PersonIdentifier.objects.filter( + identifier=identifier_uuid, + service=self.service, + ) ) xmpp_attachments = [] - # attachments = [] # Asynchronously fetch all attachments tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list] fetched_attachments = await asyncio.gather(*tasks) log.info(f"ATTACHMENT LIST {attachment_list}") - log.info(f"FETCHED ATTACHMENTS {fetched_attachments}") for fetched, att in zip(fetched_attachments, attachment_list): - log.info(f"ITER {fetched} {att}") if not fetched: log.warning(f"Failed to fetch attachment {att['id']} from Signal.") continue @@ -166,52 +176,54 @@ class HandleMessage(Command): "filename": fetched["filename"], "size": fetched["size"], }) + + # Forward incoming Signal messages to XMPP and apply mutate rules. for identifier in identifiers: - #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" user = identifier.user - manipulations = Manipulation.objects.filter( - group__people=identifier.person, - user=identifier.user, - #mode="mutate", - filter_enabled=True, - enabled=True, - ) - # chat_history = await history.get_chat_history(session) - # await utils.update_last_interaction(session) - if manipulations: - manip = manipulations.first() - prompt = replies.generate_mutate_reply_prompt( - text, - None, - manip, - None, - ) - - - log.info("Running Signal context prompt") - result = await ai.run_prompt(prompt, manip.ai) - log.info(f"RESULT {result}") - # await history.store_own_message( - # session=session, - # text=result, - # ts=int(now().timestamp() * 1000), - # ) + mutate_manips = await sync_to_async(list)( + Manipulation.objects.filter( + group__people=identifier.person, + user=identifier.user, + mode="mutate", + filter_enabled=True, + enabled=True, + ) + ) + if mutate_manips: + for manip in mutate_manips: + prompt = replies.generate_mutate_reply_prompt( + text, + None, + manip, + None, + ) + log.info("Running Signal mutate prompt") + result = await ai.run_prompt(prompt, manip.ai) + log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") + await self.ur.xmpp.client.send_from_external( + user, + identifier, + result, + is_outgoing_message, + attachments=xmpp_attachments, + ) + else: log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") - await self.ur.xmpp.client.send_from_external(user, identifier, result, is_outgoing_message, attachments=xmpp_attachments) - - if not manipulations.exists(): - log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") - await self.ur.xmpp.client.send_from_external(user, identifier, text, is_outgoing_message, attachments=xmpp_attachments) - - #### - + await self.ur.xmpp.client.send_from_external( + user, + identifier, + text, + is_outgoing_message, + attachments=xmpp_attachments, + ) # TODO: Permission checks manips = await sync_to_async(list)( Manipulation.objects.filter(enabled=True) ) - processed_people = set() + session_cache = {} + stored_messages = set() for manip in manips: try: person_identifier = await sync_to_async(PersonIdentifier.objects.get)( @@ -220,30 +232,30 @@ class HandleMessage(Command): service="signal", person__in=manip.group.people.all(), ) - # Check if we've already processed this person - if person_identifier.person.id in processed_people: - log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}") - continue # Skip to next manipulation - if not manip.group.people.filter(id=person_identifier.person.id).exists(): - log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") - continue # Exit early if the person is not in the group except PersonIdentifier.DoesNotExist: - log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") - continue # Exit early if no valid identifier is found + log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid}.") + continue - # Find or create the corresponding ChatSession - chat_session = await history.get_chat_session(manip.user, person_identifier) + # Find/create ChatSession once per user/person. + session_key = (manip.user.id, person_identifier.person.id) + if session_key in session_cache: + chat_session = session_cache[session_key] + else: + chat_session = await history.get_chat_session(manip.user, person_identifier) + session_cache[session_key] = chat_session - # Store incoming or outgoing messages - log.info(f"Processing history store message {text}") - processed_people.add(person_identifier.person.id) - await history.store_message( - session=chat_session, - sender=source_uuid, - text=text, - ts=ts, - outgoing=is_from_bot, - ) + # Store each incoming/outgoing event once per session. + message_key = (chat_session.id, ts, source_uuid) + if message_key not in stored_messages: + log.info(f"Processing history store message {text}") + await history.store_message( + session=chat_session, + sender=source_uuid, + text=text, + ts=ts, + outgoing=is_from_bot, + ) + stored_messages.add(message_key) # Get the total history chat_history = await history.get_chat_history(chat_session) @@ -253,7 +265,9 @@ class HandleMessage(Command): reply_to_others, is_outgoing_message, ): - if manip.mode not in ["silent", "mutate"]: + if manip.mode in ["silent", "mutate"]: + pass + elif manip.mode in ["active", "notify", "instant"]: await utils.update_last_interaction(chat_session) prompt = replies.generate_reply_prompt( msg, @@ -264,81 +278,72 @@ class HandleMessage(Command): log.info("Running context prompt") result = await ai.run_prompt(prompt, manip.ai) - # Store bot's AI response with a +1s timestamp if manip.mode == "active": - await history.store_own_message( - session=chat_session, - text=result, - ts=ts + 1, - ) - # await natural.natural_send_message(c, result) - await self.ur.xmpp.client.send_from_external(manip.user, person_identifier, result, is_outgoing_message=True) - tss = await natural.natural_send_message( - result, - c.send, - c.start_typing, - c.stop_typing, - ) - elif manip.mode == "notify": - title = f"[GIA] Suggested message to {person_identifier.person.name}" - manip.user.sendmsg(result, title=title) - elif manip.mode == "instant": - # Delete all other QueuedMessages - existing_queue = QueuedMessage.objects.filter( - user=chat_session.user, - session=chat_session, - manipulation=manip, - custom_author="BOT", - ) + if manip.mode == "active": + await history.store_own_message( + session=chat_session, + text=result, + ts=ts + 1, + ) + await self.ur.xmpp.client.send_from_external( + manip.user, + person_identifier, + result, + is_outgoing_message=True, + ) + await natural.natural_send_message( + result, + c.send, + c.start_typing, + c.stop_typing, + ) + elif manip.mode == "notify": + title = f"[GIA] Suggested message to {person_identifier.person.name}" + manip.user.sendmsg(result, title=title) + elif manip.mode == "instant": + existing_queue = QueuedMessage.objects.filter( + user=chat_session.user, + session=chat_session, + manipulation=manip, + custom_author="BOT", + ) - await delete_messages(existing_queue) - qm = await history.store_own_message( - session=chat_session, - text=result, - ts=ts + 1, - manip=manip, - queue=True, + await delete_messages(existing_queue) + qm = await history.store_own_message( + session=chat_session, + text=result, + ts=ts + 1, + manip=manip, + queue=True, - ) - accept = reverse( - "message_accept_api", kwargs={"message_id":qm.id} - ) - reject = reverse( - "message_reject_api", kwargs={"message_id":qm.id} - ) - url = settings.URL - content = ( - f"{result}\n\n" - f"Accept: {url}{accept}\n" - f"Reject: {url}{reject}" - ) - title = f"[GIA] Suggested message to {person_identifier.person.name}" - manip.user.sendmsg(content, title=title) + ) + accept = reverse( + "message_accept_api", kwargs={"message_id": qm.id} + ) + reject = reverse( + "message_reject_api", kwargs={"message_id": qm.id} + ) + url = settings.URL + content = ( + f"{result}\n\n" + f"Accept: {url}{accept}\n" + f"Reject: {url}{reject}" + ) + title = f"[GIA] Suggested message to {person_identifier.person.name}" + manip.user.sendmsg(content, title=title) else: log.error(f"Mode {manip.mode} is not implemented") # Manage truncation & summarization await truncate_and_summarize(chat_session, manip.ai) - # END FOR - try: - existing_chat = Chat.objects.get( - source_uuid=source_uuid - ) - # if existing_chat.ts != ts: - # print("not equal", existing_chat.ts, ts) - # existing_chat.ts = ts - # existing_chat.save() - existing_chat.source_number = source_number - existing_chat.source_name = source_name - existing_chat.save() - except Chat.DoesNotExist: - existing_chat = Chat.objects.create( - source_number=source_number, - source_uuid=source_uuid, - source_name=source_name, - account=account, - ) - # + await sync_to_async(Chat.objects.update_or_create)( + source_uuid=source_uuid, + defaults={ + "source_number": source_number, + "source_name": source_name, + "account": account, + }, + ) class SignalClient(ClientBase):