import msgpack from django.core.management.base import BaseCommand from django.conf import settings from signalbot import SignalBot, Command, Context from asgiref.sync import sync_to_async from django.urls import reverse import json import asyncio from core.util import logs from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages from core.lib import deferred from core.messaging import replies, ai, natural, history, utils from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message, QueuedMessage import aiohttp from django.utils import timezone from django.conf import settings from core.lib.bot import NewSignalBot from redis import asyncio as aioredis SIGNAL_URL = "signal:8080" log = logs.get_logger("processing") redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) class HandleMessage(Command): async def handle(self, c: Context): msg = { "source": c.message.source, "source_number": c.message.source_number, "source_uuid": c.message.source_uuid, "timestamp": c.message.timestamp, "type": c.message.type.value, "text": c.message.text, "group": c.message.group, "reaction": c.message.reaction, "mentions": c.message.mentions, "raw_message": c.message.raw_message } raw = json.loads(c.message.raw_message) print(json.dumps(c.message.raw_message, indent=2)) #dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") #account = c.message.raw_message.get("account", "") account = raw.get("account", "") #source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") source_name = raw.get("envelope", {}).get("sourceName", "") source_number = c.message.source_number source_uuid = c.message.source_uuid text = c.message.text ts = c.message.timestamp # Message originating from us same_recipient = source_uuid == dest is_from_bot = source_uuid == c.bot.bot_uuid is_to_bot = dest == c.bot.bot_uuid or dest is None reply_to_self = same_recipient and is_from_bot # Reply reply_to_others = is_to_bot and not same_recipient # Reply is_outgoing_message = is_from_bot and not is_to_bot # Do not reply # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid # Handle attachments attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) attachment_list = [] for attachment in attachments: attachment_list.append({ "id": attachment["id"], "content_type": attachment["contentType"], "filename": attachment["filename"], "size": attachment["size"], "width": attachment.get("width"), "height": attachment.get("height"), }) cast = { "type": "def", "method": "xmpp", "service": "signal", # "sender": source_uuid, "identifier": identifier_uuid, "msg": text, "attachments": attachment_list, "detail": { "reply_to_self": reply_to_self, "reply_to_others": reply_to_others, "is_outgoing_message": is_outgoing_message, } } packed = msgpack.packb(cast, use_bin_type=True) await redis.publish("component", packed) # TODO: Permission checks manips = await sync_to_async(list)( Manipulation.objects.filter(enabled=True) ) for manip in manips: try: person_identifier = await sync_to_async(PersonIdentifier.objects.get)( identifier=identifier_uuid, user=manip.user, service="signal", person__in=manip.group.people.all(), ) if not manip.group.people.filter(id=person_identifier.person.id).exists(): log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") continue # Exit early if the person is not in the group except PersonIdentifier.DoesNotExist: log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") continue # Exit early if no valid identifier is found # Find or create the corresponding ChatSession chat_session = await history.get_chat_session(manip.user, person_identifier) # Store incoming or outgoing messages await history.store_message( session=chat_session, sender=source_uuid, text=text, ts=ts, outgoing=is_from_bot, ) # Get the total history chat_history = await history.get_chat_history(chat_session) if replies.should_reply( reply_to_self, reply_to_others, is_outgoing_message, ): if manip.mode != "silent": await utils.update_last_interaction(chat_session) prompt = replies.generate_reply_prompt( msg, person_identifier.person, manip, chat_history ) log.info("Running context prompt") result = await ai.run_prompt(prompt, manip.ai) # Store bot's AI response with a +1s timestamp if manip.mode == "active": await history.store_own_message( session=chat_session, text=result, ts=ts + 1, ) # await natural.natural_send_message(c, result) tss = await natural.natural_send_message( result, c.send, c.start_typing, c.stop_typing, ) elif manip.mode == "notify": title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(result, title=title) elif manip.mode == "instant": # Delete all other QueuedMessages existing_queue = QueuedMessage.objects.filter( user=chat_session.user, session=chat_session, manipulation=manip, custom_author="BOT", ) await delete_messages(existing_queue) qm = await history.store_own_message( session=chat_session, text=result, ts=ts + 1, manip=manip, queue=True, ) accept = reverse( "message_accept_api", kwargs={"message_id":qm.id} ) reject = reverse( "message_reject_api", kwargs={"message_id":qm.id} ) url = settings.URL content = ( f"{result}\n\n" f"Accept: {url}{accept}\n" f"Reject: {url}{reject}" ) title = f"[GIA] Suggested message to {person_identifier.person.name}" manip.user.sendmsg(content, title=title) else: log.error(f"Mode {manip.mode} is not implemented") # Manage truncation & summarization await truncate_and_summarize(chat_session, manip.ai) # END FOR try: existing_chat = Chat.objects.get( source_uuid=source_uuid ) # if existing_chat.ts != ts: # print("not equal", existing_chat.ts, ts) # existing_chat.ts = ts # existing_chat.save() existing_chat.source_number = source_number existing_chat.source_name = source_name existing_chat.save() except Chat.DoesNotExist: existing_chat = Chat.objects.create( source_number=source_number, source_uuid=source_uuid, source_name=source_name, account=account, ) # async def stream(): pubsub = redis.pubsub() await pubsub.subscribe("processing") while True: message = await pubsub.get_message(ignore_subscribe_messages=True) if message is not None: try: data = message["data"] unpacked = msgpack.unpackb(data, raw=False) except TypeError: continue if "type" in unpacked.keys(): if unpacked["type"] == "def": await deferred.process_deferred(unpacked) await asyncio.sleep(0.01) class Command(BaseCommand): def handle(self, *args, **options): bot = NewSignalBot({ "signal_service": SIGNAL_URL, "phone_number": "+447490296227", }) bot.register(HandleMessage()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) bot._event_loop = loop loop.create_task(stream()) bot.start() try: loop.run_forever() except (KeyboardInterrupt, SystemExit): log.info("Process terminating") finally: loop.close()