Refactor and implement queueing messages

This commit is contained in:
2025-02-12 18:45:21 +00:00
parent 213df9176e
commit 018d2f87c7
23 changed files with 804 additions and 338 deletions

View File

@@ -3,84 +3,24 @@ from django.core.management.base import BaseCommand
from django.conf import settings
from signalbot import SignalBot, Command, Context
from asgiref.sync import sync_to_async
from django.urls import reverse
import json
import aiomysql
import asyncio
from core.util import logs
from core.schemas import mc_s
from core.lib.prompts.functions import gen_prompt, run_prompt, truncate_and_summarize, run_context_prompt, messages_to_string, natural_send_message
from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message
from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages
from core.lib import deferred
from core.messaging import replies, ai, natural, history, utils
from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message, QueuedMessage
import aiohttp
from django.utils import timezone
from django.conf import settings
from core.lib.bot import NewSignalBot
from redis import asyncio as aioredis
SIGNAL_URL = "signal:8080"
DB_URL = "giadb"
log = logs.get_logger("processing")
mysql_pool = None
async def init_mysql_pool():
"""
Initialize the MySQL connection pool.
"""
global mysql_pool
mysql_pool = await aiomysql.create_pool(
host=DB_URL,
port=9306,
db="Manticore",
minsize=1,
maxsize=10
)
async def close_mysql_pool():
"""Close the MySQL connection pool properly."""
global mysql_pool
if mysql_pool:
mysql_pool.close()
await mysql_pool.wait_closed()
class NewSignalBot(SignalBot):
def __init__(self, config):
super().__init__(config)
self.bot_uuid = None # Initialize with None
async def get_own_uuid(self) -> str:
"""Fetch bot's UUID by checking contacts, groups, or profile."""
async with aiohttp.ClientSession() as session:
uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}"
try:
resp = await session.get(uri_contacts)
if resp.status == 200:
contacts_data = await resp.json()
if isinstance(contacts_data, list):
for contact in contacts_data:
if contact.get("number") == self._phone_number:
return contact.get("uuid")
except Exception as e:
log.error(f"Failed to get UUID from contacts: {e}")
async def initialize_bot(self):
"""Fetch bot's UUID and store it in self.bot_uuid."""
try:
self.bot_uuid = await self.get_own_uuid()
if self.bot_uuid:
log.info(f"Own UUID: {self.bot_uuid}")
else:
log.warning("Unable to fetch bot UUID.")
except Exception as e:
log.error(f"Failed to initialize bot UUID: {e}")
def start(self):
"""Start bot without blocking event loop."""
self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first
self._event_loop.create_task(self._detect_groups()) # Sync groups
self._event_loop.create_task(self._produce_consume_messages()) # Process messages
self.scheduler.start() # Start async job scheduler
class HandleMessage(Command):
async def handle(self, c: Context):
@@ -118,8 +58,6 @@ class HandleMessage(Command):
# Determine the identifier to use
identifier_uuid = dest if is_from_bot else source_uuid
# log.info(json.dumps(msg, indent=2))
# TODO: Permission checks
manips = await sync_to_async(list)(
@@ -141,79 +79,82 @@ class HandleMessage(Command):
continue # Exit early if no valid identifier is found
# Find or create the corresponding ChatSession
chat_session, created = await sync_to_async(ChatSession.objects.get_or_create)(
identifier=person_identifier,
user=manip.user
)
chat_session = await history.get_chat_session(manip.user, person_identifier)
# Store incoming or outgoing messages
await sync_to_async(Message.objects.create)(
user=chat_session.user,
await history.store_message(
session=chat_session,
sender_uuid=source_uuid,
sender=source_uuid,
text=text,
ts=ts,
custom_author="USER" if is_from_bot else None
outgoing=is_from_bot,
)
# Use chat session summary for context
log.info("Fetching stored messages")
stored_messages = await sync_to_async(list)(
Message.objects.filter(session=chat_session, user=chat_session.user).order_by("ts")
)
log.info("Fetched stored messages")
# Get the total history
chat_history = await history.get_chat_history(chat_session)
# recent_chat_history = "\n".join(
# f"[{msg.ts}] {msg.text}" for msg in reversed(stored_messages)
# )
recent_chat_history = messages_to_string(stored_messages)
if replies.should_reply(
reply_to_self,
reply_to_others,
is_outgoing_message,
):
if manip.mode != "silent":
await utils.update_last_interaction(chat_session)
prompt = replies.generate_reply_prompt(
msg,
person_identifier.person,
manip,
chat_history
)
chat_history = f"Chat Summary:\n{chat_session.summary}\n\nRecent Messages:\n{recent_chat_history}" if chat_session.summary else f"Recent Messages:\n{recent_chat_history}"
reply = False # Default to no reply
# 🟢 CASE 1: Self-message (Bot or user messages itself)
if reply_to_self:
now = timezone.now()
chat_session.identifier.person.last_interaction = now
chat_session.last_interaction = now
log.info("Updating time")
await sync_to_async(chat_session.identifier.person.save)()
await sync_to_async(chat_session.save)()
log.info("Updated time")
reply = True # ✅ Bot replies
# 🔵 CASE 2: Incoming message (Someone else messages the bot)
elif reply_to_others:
now = timezone.now()
chat_session.identifier.person.last_interaction = now
chat_session.last_interaction = now
await sync_to_async(chat_session.identifier.person.save)()
await sync_to_async(chat_session.save)()
reply = True # ✅ Bot replies
# 🔴 CASE 3: Outgoing message (Bot messages someone else)
elif is_outgoing_message:
reply = False # ❌ No reply
# ⚫ CASE 4: Unknown case (Failsafe)
else:
reply = False # ❌ No reply
# Generate AI response if reply is enabled
if reply:
if manip.send_enabled:
prompt = gen_prompt(msg, person_identifier.person, manip, chat_history)
log.info("Running context prompt")
result = await run_context_prompt(c, prompt, manip.ai)
result = await ai.run_prompt(prompt, manip.ai)
# Store bot's AI response with a +1s timestamp
log.info("Storing generated message")
if manip.mode == "active":
await history.store_own_message(
session=chat_session,
text=result,
ts=ts + 1,
)
log.info("NOT SENDING CHECK CODE IS OK")
# await natural.natural_send_message(c, result)
elif manip.mode == "notify":
title = f"[GIA] Suggested message to {person_identifier.person.name}"
manip.user.sendmsg(result, title=title)
elif manip.mode == "instant":
# Delete all other QueuedMessages
existing_queue = QueuedMessage.objects.filter(
user=chat_session.user,
session=chat_session,
manipulation=manip,
custom_author="BOT",
)
log.info("Stored generated message")
await natural_send_message(chat_session, ts, c, result)
log.info("Sent message")
#await c.send(result)
await delete_messages(existing_queue)
qm = await history.store_own_message(
session=chat_session,
text=result,
ts=ts + 1,
manip=manip,
queue=True,
)
accept = reverse(
"message_accept_api", kwargs={"message_id":qm.id}
)
reject = reverse(
"message_reject_api", kwargs={"message_id":qm.id}
)
url = settings.URL
content = (
f"{result}\n\n"
f"Accept: {url}{accept}\n"
f"Reject: {url}{reject}"
)
title = f"[GIA] Suggested message to {person_identifier.person.name}"
manip.user.sendmsg(content, title=title)
else:
log.error(f"Mode {manip.mode} is not implemented")
# Manage truncation & summarization
await truncate_and_summarize(chat_session, manip.ai)
@@ -239,38 +180,26 @@ class HandleMessage(Command):
)
#
async def pubsub():
redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10)
pubsub = redis.pubsub()
await pubsub.subscribe("processing")
async def create_index():
schemas = {
"main": mc_s.schema_main,
# "rule_storage": mc_s.schema_rule_storage,
# "meta": mc_s.schema_meta,
# "internal": mc_s.schema_int,
}
try:
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
for name, schema in schemas.items():
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
create_query = (
f"create table if not exists {name}({schema_types}) engine='columnar'"
)
log.info(f"Schema types {create_query}")
await cur.execute(create_query) # SQLi
except aiomysql.Error as e:
log.error(f"MySQL error: {e}")
async def main():
await init_mysql_pool()
created = False
while not created:
try:
await create_index()
created = True
except Exception as e:
log.error(f"Error creating index: {e}")
await asyncio.sleep(1) # Block the thread, just wait for the DB
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
try:
log.info("GOT", message)
data = message["data"]
unpacked = msgpack.unpackb(data, raw=False)
log.info(f"Unpacked: {unpacked}")
except TypeError:
log.info(f"FAILED {message}")
continue
if "type" in unpacked.keys():
if unpacked["type"] == "def":
await deferred.process_deferred(unpacked)
await asyncio.sleep(0.01)
class Command(BaseCommand):
def handle(self, *args, **options):
@@ -282,7 +211,7 @@ class Command(BaseCommand):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
bot._event_loop = loop
loop.run_until_complete(main())
loop.create_task(pubsub())
bot.start()
try:
loop.run_forever()