from django.conf import settings from core.messaging import natural import aiohttp from core.util import logs from core.clients import ClientBase from signalbot import SignalBot import aiohttp import msgpack from django.conf import settings from signalbot import SignalBot, Command, Context from asgiref.sync import sync_to_async from django.urls import reverse import json import asyncio from core.util import logs from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages from core.lib import deferred from core.messaging import replies, ai, natural, history, utils from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage import aiohttp from django.conf import settings from redis import asyncio as aioredis from core.clients import signalapi from core.util import logs log = logs.get_logger("signalF") SIGNAL_URL = "signal:8080" redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) class NewSignalBot(SignalBot): def __init__(self, ur, service, config): self.ur = ur self.service = service super().__init__(config) self.log = logs.get_logger("signalI") self.bot_uuid = None # Initialize with None async def get_own_uuid(self) -> str: """Fetch bot's UUID by checking contacts, groups, or profile.""" async with aiohttp.ClientSession() as session: uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" try: resp = await session.get(uri_contacts) if resp.status == 200: contacts_data = await resp.json() if isinstance(contacts_data, list): for contact in contacts_data: if contact.get("number") == self._phone_number: return contact.get("uuid") except Exception as e: self.log.error(f"Failed to get UUID from contacts: {e}") async def initialize_bot(self): """Fetch bot's UUID and store it in self.bot_uuid.""" try: self.bot_uuid = await self.get_own_uuid() if self.bot_uuid: self.log.info(f"Own UUID: {self.bot_uuid}") else: self.log.warning("Unable to fetch bot UUID.") except Exception as e: self.log.error(f"Failed to initialize bot UUID: {e}") def start(self): """Start bot without blocking event loop.""" self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first self._event_loop.create_task(self._detect_groups()) # Sync groups self._event_loop.create_task(self._produce_consume_messages()) # Process messages self.scheduler.start() # Start async job scheduler class HandleMessage(Command): def __init__(self, ur, service, *args, **kwargs): self.ur = ur self.service = service return super().__init__(*args, **kwargs) async def handle(self, c: Context): msg = { "source": c.message.source, "source_number": c.message.source_number, "source_uuid": c.message.source_uuid, "timestamp": c.message.timestamp, "type": c.message.type.value, "text": c.message.text, "group": c.message.group, "reaction": c.message.reaction, "mentions": c.message.mentions, "raw_message": c.message.raw_message } raw = json.loads(c.message.raw_message) print(json.dumps(c.message.raw_message, indent=2)) #dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") #account = c.message.raw_message.get("account", "") account = raw.get("account", "") #source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") source_name = raw.get("envelope", {}).get("sourceName", "") source_number = c.message.source_number source_uuid = c.message.source_uuid text = c.message.text ts = c.message.timestamp # Message originating from us same_recipient = source_uuid == dest is_from_bot = source_uuid == c.bot.bot_uuid is_to_bot = dest == c.bot.bot_uuid or dest is None reply_to_self = same_recipient and is_from_bot # Reply reply_to_others = is_to_bot and not same_recipient # Reply is_outgoing_message = is_from_bot and not is_to_bot # Do not reply # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid # Handle attachments attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) if not attachments: attachments = raw.get("envelope", {}).get("dataMessage", {}).get("attachments", []) attachment_list = [] for attachment in attachments: attachment_list.append({ "id": attachment["id"], "content_type": attachment["contentType"], "filename": attachment["filename"], "size": attachment["size"], "width": attachment.get("width"), "height": attachment.get("height"), }) # Get User from identifier log.info(f"FUCK {self.service}") identifiers = PersonIdentifier.objects.filter( identifier=identifier_uuid, service=self.service, ) xmpp_attachments = [] # attachments = [] # Asynchronously fetch all attachments tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list] fetched_attachments = await asyncio.gather(*tasks) log.info(f"ATTACHMENT LIST {attachment_list}") log.info(f"FETCHED ATTACHMENTS {fetched_attachments}") for fetched, att in zip(fetched_attachments, attachment_list): log.info(f"ITER {fetched} {att}") if not fetched: log.warning(f"Failed to fetch attachment {att['id']} from Signal.") continue # Attach fetched file to XMPP xmpp_attachments.append({ "content": fetched["content"], "content_type": fetched["content_type"], "filename": fetched["filename"], "size": fetched["size"], }) for identifier in identifiers: #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" user = identifier.user manipulations = Manipulation.objects.filter( group__people=identifier.person, user=identifier.user, #mode="mutate", filter_enabled=True, enabled=True, ) # chat_history = await history.get_chat_history(session) # await utils.update_last_interaction(session) if manipulations: manip = manipulations.first() prompt = replies.generate_mutate_reply_prompt( text, None, manip, None, ) log.info("Running Signal context prompt") result = await ai.run_prompt(prompt, manip.ai) log.info(f"RESULT {result}") # await history.store_own_message( # session=session, # text=result, # ts=int(now().timestamp() * 1000), # ) log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") await self.ur.xmpp.client.send_from_external(user, identifier, result, is_outgoing_message, attachments=xmpp_attachments) if not manipulations.exists(): log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") await self.ur.xmpp.client.send_from_external(user, identifier, text, is_outgoing_message, attachments=xmpp_attachments) #### # TODO: Permission checks manips = await sync_to_async(list)( Manipulation.objects.filter(enabled=True) ) processed_people = set() for manip in manips: try: person_identifier = await sync_to_async(PersonIdentifier.objects.get)( identifier=identifier_uuid, user=manip.user, service="signal", person__in=manip.group.people.all(), ) # Check if we've already processed this person if person_identifier.person.id in processed_people: log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}") continue # Skip to next manipulation if not manip.group.people.filter(id=person_identifier.person.id).exists(): log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") continue # Exit early if the person is not in the group except PersonIdentifier.DoesNotExist: log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") continue # Exit early if no valid identifier is found # Find or create the corresponding ChatSession chat_session = await history.get_chat_session(manip.user, person_identifier) # Store incoming or outgoing messages log.info(f"Processing history store message {text}") processed_people.add(person_identifier.person.id) await history.store_message( session=chat_session, sender=source_uuid, text=text, ts=ts, outgoing=is_from_bot, ) # Get the total history chat_history = await history.get_chat_history(chat_session) if replies.should_reply( reply_to_self, reply_to_others, is_outgoing_message, ): if manip.mode not in ["silent", "mutate"]: await utils.update_last_interaction(chat_session) prompt = replies.generate_reply_prompt( msg, person_identifier.person, manip, chat_history ) log.info("Running context prompt") result = await ai.run_prompt(prompt, manip.ai) # Store bot's AI response with a +1s timestamp if manip.mode == "active": await history.store_own_message( session=chat_session, text=result, ts=ts + 1, ) # await natural.natural_send_message(c, result) await self.ur.xmpp.client.send_from_external(manip.user, person_identifier, result, is_outgoing_message=True) tss = await natural.natural_send_message( result, c.send, c.start_typing, c.stop_typing, ) elif manip.mode == "notify": title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(result, title=title) elif manip.mode == "instant": # Delete all other QueuedMessages existing_queue = QueuedMessage.objects.filter( user=chat_session.user, session=chat_session, manipulation=manip, custom_author="BOT", ) await delete_messages(existing_queue) qm = await history.store_own_message( session=chat_session, text=result, ts=ts + 1, manip=manip, queue=True, ) accept = reverse( "message_accept_api", kwargs={"message_id":qm.id} ) reject = reverse( "message_reject_api", kwargs={"message_id":qm.id} ) url = settings.URL content = ( f"{result}\n\n" f"Accept: {url}{accept}\n" f"Reject: {url}{reject}" ) title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(content, title=title) else: log.error(f"Mode {manip.mode} is not implemented") # Manage truncation & summarization await truncate_and_summarize(chat_session, manip.ai) # END FOR try: existing_chat = Chat.objects.get( source_uuid=source_uuid ) # if existing_chat.ts != ts: # print("not equal", existing_chat.ts, ts) # existing_chat.ts = ts # existing_chat.save() existing_chat.source_number = source_number existing_chat.source_name = source_name existing_chat.save() except Chat.DoesNotExist: existing_chat = Chat.objects.create( source_number=source_number, source_uuid=source_uuid, source_name=source_name, account=account, ) # class SignalClient(ClientBase): def __init__(self, ur, *args, **kwargs): super().__init__(ur, *args, **kwargs) self.client = NewSignalBot( ur, self.service, { "signal_service": SIGNAL_URL, "phone_number": "+447490296227", }) self.client.register(HandleMessage(self.ur, self.service)) def start(self): self.log.info("Signal client starting...") self.client._event_loop = self.loop self.client.start()