import msgpack from django.core.management.base import BaseCommand from django.conf import settings from signalbot import SignalBot, Command, Context from asgiref.sync import sync_to_async import json import aiomysql import asyncio from core.util import logs from core.schemas import mc_s from core.lib.prompts.functions import gen_prompt, run_prompt, truncate_and_summarize, run_context_prompt, messages_to_string, natural_send_message from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message import aiohttp from django.utils import timezone SIGNAL_URL = "signal:8080" DB_URL = "giadb" log = logs.get_logger("processing") mysql_pool = None async def init_mysql_pool(): """ Initialize the MySQL connection pool. """ global mysql_pool mysql_pool = await aiomysql.create_pool( host=DB_URL, port=9306, db="Manticore", minsize=1, maxsize=10 ) async def close_mysql_pool(): """Close the MySQL connection pool properly.""" global mysql_pool if mysql_pool: mysql_pool.close() await mysql_pool.wait_closed() class NewSignalBot(SignalBot): def __init__(self, config): super().__init__(config) self.bot_uuid = None # Initialize with None async def get_own_uuid(self) -> str: """Fetch bot's UUID by checking contacts, groups, or profile.""" async with aiohttp.ClientSession() as session: uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" try: resp = await session.get(uri_contacts) if resp.status == 200: contacts_data = await resp.json() if isinstance(contacts_data, list): for contact in contacts_data: if contact.get("number") == self._phone_number: return contact.get("uuid") except Exception as e: log.error(f"Failed to get UUID from contacts: {e}") async def initialize_bot(self): """Fetch bot's UUID and store it in self.bot_uuid.""" try: self.bot_uuid = await self.get_own_uuid() if self.bot_uuid: log.info(f"Own UUID: {self.bot_uuid}") else: log.warning("Unable to fetch bot UUID.") except Exception as e: log.error(f"Failed to initialize bot UUID: {e}") def start(self): """Start bot without blocking event loop.""" self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first self._event_loop.create_task(self._detect_groups()) # Sync groups self._event_loop.create_task(self._produce_consume_messages()) # Process messages self.scheduler.start() # Start async job scheduler class HandleMessage(Command): async def handle(self, c: Context): msg = { "source": c.message.source, "source_number": c.message.source_number, "source_uuid": c.message.source_uuid, "timestamp": c.message.timestamp, "type": c.message.type.value, "text": c.message.text, "group": c.message.group, "reaction": c.message.reaction, "mentions": c.message.mentions, "raw_message": c.message.raw_message } dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") account = c.message.raw_message.get("account", "") source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") source_number = c.message.source_number source_uuid = c.message.source_uuid text = c.message.text ts = c.message.timestamp # Message originating from us same_recipient = source_uuid == dest is_from_bot = source_uuid == c.bot.bot_uuid is_to_bot = dest == c.bot.bot_uuid or dest is None reply_to_self = same_recipient and is_from_bot # Reply reply_to_others = is_to_bot and not same_recipient # Reply is_outgoing_message = is_from_bot and not is_to_bot # Do not reply # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid # log.info(json.dumps(msg, indent=2)) # TODO: Permission checks manips = await sync_to_async(list)( Manipulation.objects.filter(enabled=True) ) for manip in manips: try: person_identifier = await sync_to_async(PersonIdentifier.objects.get)( identifier=identifier_uuid, user=manip.user, service="signal", person__in=manip.group.people.all(), ) if not manip.group.people.filter(id=person_identifier.person.id).exists(): log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") continue # Exit early if the person is not in the group except PersonIdentifier.DoesNotExist: log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") continue # Exit early if no valid identifier is found # Find or create the corresponding ChatSession chat_session, created = await sync_to_async(ChatSession.objects.get_or_create)( identifier=person_identifier, user=manip.user ) # Store incoming or outgoing messages await sync_to_async(Message.objects.create)( user=chat_session.user, session=chat_session, sender_uuid=source_uuid, text=text, ts=ts, custom_author="USER" if is_from_bot else None ) # Manage truncation & summarization await truncate_and_summarize(chat_session, manip.ai) # Use chat session summary for context stored_messages = await sync_to_async(list)( Message.objects.filter(session=chat_session).order_by("ts") ) # recent_chat_history = "\n".join( # f"[{msg.ts}] {msg.text}" for msg in reversed(stored_messages) # ) recent_chat_history = messages_to_string(stored_messages) chat_history = f"Chat Summary:\n{chat_session.summary}\n\nRecent Messages:\n{recent_chat_history}" if chat_session.summary else f"Recent Messages:\n{recent_chat_history}" reply = False # Default to no reply # 🟢 CASE 1: Self-message (Bot or user messages itself) if reply_to_self: now = timezone.now() chat_session.identifier.person.last_interaction = now chat_session.last_interaction = now await sync_to_async(chat_session.identifier.person.save)() await sync_to_async(chat_session)() reply = True # ✅ Bot replies # 🔵 CASE 2: Incoming message (Someone else messages the bot) elif reply_to_others: now = timezone.now() chat_session.identifier.person.last_interaction = now chat_session.last_interaction = now await sync_to_async(chat_session.identifier.person.save)() await sync_to_async(chat_session)() reply = True # ✅ Bot replies # 🔴 CASE 3: Outgoing message (Bot messages someone else) elif is_outgoing_message: reply = False # ❌ No reply # ⚫ CASE 4: Unknown case (Failsafe) else: reply = False # ❌ No reply # Generate AI response if reply is enabled if reply: if manip.send_enabled: prompt = gen_prompt(msg, person_identifier.person, manip, chat_history) result = await run_context_prompt(c, prompt, manip.ai) # Store bot's AI response with a +1s timestamp await sync_to_async(Message.objects.create)( user=chat_session.user, session=chat_session, custom_author="BOT", text=result, ts=ts + 1, ) await natural_send_message(c, result) #await c.send(result) # END FOR try: existing_chat = Chat.objects.get( source_uuid=source_uuid ) # if existing_chat.ts != ts: # print("not equal", existing_chat.ts, ts) # existing_chat.ts = ts # existing_chat.save() existing_chat.source_number = source_number existing_chat.source_name = source_name existing_chat.save() except Chat.DoesNotExist: existing_chat = Chat.objects.create( source_number=source_number, source_uuid=source_uuid, source_name=source_name, account=account, ) # async def create_index(): schemas = { "main": mc_s.schema_main, # "rule_storage": mc_s.schema_rule_storage, # "meta": mc_s.schema_meta, # "internal": mc_s.schema_int, } try: async with mysql_pool.acquire() as conn: async with conn.cursor() as cur: for name, schema in schemas.items(): schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) create_query = ( f"create table if not exists {name}({schema_types}) engine='columnar'" ) log.info(f"Schema types {create_query}") await cur.execute(create_query) # SQLi except aiomysql.Error as e: log.error(f"MySQL error: {e}") async def main(): await init_mysql_pool() created = False while not created: try: await create_index() created = True except Exception as e: log.error(f"Error creating index: {e}") await asyncio.sleep(1) # Block the thread, just wait for the DB class Command(BaseCommand): def handle(self, *args, **options): bot = NewSignalBot({ "signal_service": SIGNAL_URL, "phone_number": "+447490296227", }) bot.register(HandleMessage()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) bot._event_loop = loop loop.run_until_complete(main()) bot.start() try: loop.run_forever() except (KeyboardInterrupt, SystemExit): log.info("Process terminating") finally: loop.close()