From 018d2f87c7cbb27b8f06920ff33321d730cd367e Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 12 Feb 2025 18:45:21 +0000 Subject: [PATCH] Refactor and implement queueing messages --- app/local_settings.py | 2 + app/urls.py | 6 +- core/clients/__init__.py | 0 core/clients/signal.py | 79 ++++++ core/db/__init__.py | 0 core/db/sql.py | 63 +++++ core/forms.py | 4 +- core/lib/bot.py | 46 ++++ core/lib/deferred.py | 54 ++++ core/lib/prompts/functions.py | 166 +---------- core/management/commands/processing.py | 257 +++++++----------- core/messaging/__init__.py | 0 core/messaging/ai.py | 19 ++ core/messaging/analysis.py | 73 +++++ core/messaging/history.py | 54 ++++ core/messaging/natural.py | 58 ++++ core/messaging/replies.py | 75 +++++ core/messaging/utils.py | 20 ++ ...pulation_send_enabled_manipulation_mode.py | 22 ++ core/migrations/0014_queuedmessage.py | 32 +++ core/models.py | 41 ++- .../templates/partials/manipulation-list.html | 14 +- core/views/queues.py | 57 ++++ 23 files changed, 804 insertions(+), 338 deletions(-) create mode 100644 core/clients/__init__.py create mode 100644 core/clients/signal.py create mode 100644 core/db/__init__.py create mode 100644 core/db/sql.py create mode 100644 core/lib/bot.py create mode 100644 core/lib/deferred.py create mode 100644 core/messaging/__init__.py create mode 100644 core/messaging/ai.py create mode 100644 core/messaging/analysis.py create mode 100644 core/messaging/history.py create mode 100644 core/messaging/natural.py create mode 100644 core/messaging/replies.py create mode 100644 core/messaging/utils.py create mode 100644 core/migrations/0013_remove_manipulation_send_enabled_manipulation_mode.py create mode 100644 core/migrations/0014_queuedmessage.py create mode 100644 core/views/queues.py diff --git a/app/local_settings.py b/app/local_settings.py index 3e30e94..0220046 100644 --- a/app/local_settings.py +++ b/app/local_settings.py @@ -46,3 +46,5 @@ if DEBUG: ] SETTINGS_EXPORT = ["BILLING_ENABLED"] + +SIGNAL_NUMBER = getenv("SIGNAL_NUMBER") diff --git a/app/urls.py b/app/urls.py index 5caba1f..5895159 100644 --- a/app/urls.py +++ b/app/urls.py @@ -21,7 +21,7 @@ from django.urls import include, path from django.views.generic import TemplateView from two_factor.urls import urlpatterns as tf_urls -from core.views import base, notifications, signal, people, ais, groups, personas, manipulations, identifiers, sessions, messages +from core.views import base, notifications, signal, people, ais, groups, personas, manipulations, identifiers, sessions, messages, queues urlpatterns = [ path("__debug__/", include("debug_toolbar.urls")), @@ -207,4 +207,8 @@ urlpatterns = [ path("session//messages/create/", messages.MessageCreate.as_view(), name="message_create"), path("session//messages/update///", messages.MessageUpdate.as_view(), name="message_update"), path("session//messages/delete///", messages.MessageDelete.as_view(), name="message_delete"), + # API + # Queues + path("api/v1/queue/message/accept//", queues.AcceptMessageAPI.as_view(), name="message_accept_api"), + path("api/v1/queue/message/reject//", queues.RejectMessageAPI.as_view(), name="message_reject_api"), ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) diff --git a/core/clients/__init__.py b/core/clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/clients/signal.py b/core/clients/signal.py new file mode 100644 index 0000000..b6f0d05 --- /dev/null +++ b/core/clients/signal.py @@ -0,0 +1,79 @@ +from rest_framework.views import APIView +from django.contrib.auth.mixins import LoginRequiredMixin + +from rest_framework import status + +from django.http import HttpResponse +from core.models import QueuedMessage, Message +import requests +import orjson +from django.conf import settings +from core.messaging import natural +import aiohttp +from asgiref.sync import sync_to_async + + +async def start_typing(uuid): + url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" + data = {"recipient": uuid} + + async with aiohttp.ClientSession() as session: + async with session.put(url, json=data) as response: + return await response.text() # Optional: Return response content + +async def stop_typing(uuid): + url = f"http://signal:8080/v1/typing_indicator/{settings.SIGNAL_NUMBER}" + data = {"recipient": uuid} + + async with aiohttp.ClientSession() as session: + async with session.delete(url, json=data) as response: + return await response.text() # Optional: Return response content + +async def send_message(db_obj): + recipient_uuid = db_obj.session.identifier.identifier + text = db_obj.text + + send = lambda x: send_message_raw(recipient_uuid, x) # returns ts + start_t = lambda: start_typing(recipient_uuid) + stop_t = lambda: stop_typing(recipient_uuid) + + tss = await natural.natural_send_message( + text, + send, + start_t, + stop_t, + ) # list of ts + #result = await send_message_raw(recipient_uuid, text) + await sync_to_async(db_obj.delete)() + result = [x for x in tss if x] # all trueish ts + if result: # if at least one message was sent + ts1 = result.pop() # pick a time + await sync_to_async(Message.objects.create)( + user=db_obj.session.user, + session=db_obj.session, + custom_author="BOT", + text=text, + ts=ts1, # use that time in db + ) + +async def send_message_raw(recipient_uuid, text): + + url = "http://signal:8080/v2/send" + data = { + "recipients": [recipient_uuid], + "message": text, + "number": settings.SIGNAL_NUMBER, + } + + async with aiohttp.ClientSession() as session: + async with session.post(url, json=data) as response: + response_text = await response.text() + response_status = response.status + + if response_status == status.HTTP_201_CREATED: + ts = orjson.loads(response_text).get("timestamp", None) + if not ts: + return False + return ts + else: + return False \ No newline at end of file diff --git a/core/db/__init__.py b/core/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/db/sql.py b/core/db/sql.py new file mode 100644 index 0000000..1c74112 --- /dev/null +++ b/core/db/sql.py @@ -0,0 +1,63 @@ +import aiomysql + +from core.util import logs +from core.schemas import mc_s + +mysql_pool = None + +log = logs.get_logger("sql") + +DB_URL = "giadb" +async def init_mysql_pool(): + """ + Initialize the MySQL connection pool. + """ + global mysql_pool + mysql_pool = await aiomysql.create_pool( + host=DB_URL, + port=9306, + db="Manticore", + minsize=1, + maxsize=10 + ) + +async def close_mysql_pool(): + """Close the MySQL connection pool properly.""" + global mysql_pool + if mysql_pool: + mysql_pool.close() + await mysql_pool.wait_closed() + + +async def create_index(): + schemas = { + "main": mc_s.schema_main, + # "rule_storage": mc_s.schema_rule_storage, + # "meta": mc_s.schema_meta, + # "internal": mc_s.schema_int, + } + try: + async with mysql_pool.acquire() as conn: + async with conn.cursor() as cur: + for name, schema in schemas.items(): + schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) + + create_query = ( + f"create table if not exists {name}({schema_types}) engine='columnar'" + ) + log.info(f"Schema types {create_query}") + await cur.execute(create_query) # SQLi + except aiomysql.Error as e: + log.error(f"MySQL error: {e}") + + +async def main(): + await init_mysql_pool() + created = False + while not created: + try: + await create_index() + created = True + except Exception as e: + log.error(f"Error creating index: {e}") + await asyncio.sleep(1) # Block the thread, just wait for the DB \ No newline at end of file diff --git a/core/forms.py b/core/forms.py index cf31e04..8c201a9 100644 --- a/core/forms.py +++ b/core/forms.py @@ -131,7 +131,7 @@ class PersonaForm(RestrictedFormMixin, forms.ModelForm): class ManipulationForm(RestrictedFormMixin, forms.ModelForm): class Meta: model = Manipulation - fields = ("name", "group", "ai", "persona", "enabled", "send_enabled") + fields = ("name", "group", "ai", "persona", "enabled", "mode") help_texts = { "name": "The name of this manipulation strategy.", "group": "The group involved in this manipulation strategy.", @@ -139,7 +139,7 @@ class ManipulationForm(RestrictedFormMixin, forms.ModelForm): "ai": "The AI associated with this manipulation.", "persona": "The persona used for this manipulation.", "enabled": "Whether this manipulation is enabled.", - "send_enabled": "Whether this manipulation sends replies.", + "mode": "Mode of operation.", } diff --git a/core/lib/bot.py b/core/lib/bot.py new file mode 100644 index 0000000..e1b4d55 --- /dev/null +++ b/core/lib/bot.py @@ -0,0 +1,46 @@ +from signalbot import SignalBot +import aiohttp + +from core.util import logs + +log = logs.get_logger("signalbot") + + +class NewSignalBot(SignalBot): + def __init__(self, config): + super().__init__(config) + self.bot_uuid = None # Initialize with None + + async def get_own_uuid(self) -> str: + """Fetch bot's UUID by checking contacts, groups, or profile.""" + async with aiohttp.ClientSession() as session: + uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" + try: + resp = await session.get(uri_contacts) + if resp.status == 200: + contacts_data = await resp.json() + if isinstance(contacts_data, list): + for contact in contacts_data: + if contact.get("number") == self._phone_number: + return contact.get("uuid") + except Exception as e: + log.error(f"Failed to get UUID from contacts: {e}") + + async def initialize_bot(self): + """Fetch bot's UUID and store it in self.bot_uuid.""" + try: + self.bot_uuid = await self.get_own_uuid() + if self.bot_uuid: + log.info(f"Own UUID: {self.bot_uuid}") + else: + log.warning("Unable to fetch bot UUID.") + except Exception as e: + log.error(f"Failed to initialize bot UUID: {e}") + + def start(self): + """Start bot without blocking event loop.""" + self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first + self._event_loop.create_task(self._detect_groups()) # Sync groups + self._event_loop.create_task(self._produce_consume_messages()) # Process messages + + self.scheduler.start() # Start async job scheduler \ No newline at end of file diff --git a/core/lib/deferred.py b/core/lib/deferred.py new file mode 100644 index 0000000..a153725 --- /dev/null +++ b/core/lib/deferred.py @@ -0,0 +1,54 @@ +# Deferred processing library +from core.util import logs +from pydantic import BaseModel +from typing import Annotated +from uuid import UUID +from pydantic import ValidationError +from core.models import QueuedMessage, Message +from core.clients import signal +from core.lib.prompts.functions import delete_messages +from asgiref.sync import sync_to_async + + +log = logs.get_logger("deferred") + + +class DeferredRequest(BaseModel): + type: str + method: str + user_id: int + message_id: Annotated[str, UUID] + +async def process_deferred(data: dict): + try: + validated_data = DeferredRequest(**data) + log.info(f"Validated Data: {validated_data}") + # Process the validated data + except ValidationError as e: + log.info(f"Validation Error: {e}") + return + + method = validated_data.method + user_id = validated_data.user_id + message_id = validated_data.message_id + + try: + message = await sync_to_async(QueuedMessage.objects.get)( + user_id=user_id, + id=message_id, + ) + log.info(f"Got {message}") + except QueuedMessage.DoesNotExist: + log.info(f"Didn't get message from {message_id}") + return + + if message.session.identifier.service == "signal": + log.info(f"Is sisngla") + if method == "accept_message": + await signal.send_message(message) + else: + log.warning(f"Method not yet supported: {method}") + return + else: + log.warning(f"Protocol not supported: {message.session.identifier.service}") + return diff --git a/core/lib/prompts/functions.py b/core/lib/prompts/functions.py index dca460f..5e20675 100644 --- a/core/lib/prompts/functions.py +++ b/core/lib/prompts/functions.py @@ -4,10 +4,9 @@ from asgiref.sync import sync_to_async from core.models import Message, ChatSession, AI, Person, Manipulation from core.util import logs import json -import asyncio from django.utils import timezone -import random - +from core.messaging import ai +from core.messaging.utils import messages_to_string SUMMARIZE_WHEN_EXCEEDING = 10 SUMMARIZE_BY = 5 @@ -16,96 +15,13 @@ MAX_SUMMARIES = 3 # Keep last 5 summaries log = logs.get_logger("prompts") -def gen_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str): - """ - Generate a structured prompt using the attributes of the provided Person and Manipulation models. - """ - - now = timezone.now() - persona = manip.persona - - system_message = ( - "You are my digital persona, responding on my behalf while embodying my personality, preferences, and unique style.\n\n" - - "### Persona Profile ###\n" - f"- **MBTI:** {persona.mbti} ({persona.mbti_identity} balance)\n" - f"- **Tone:** {persona.tone} | **Humor:** {persona.humor_style}\n" - f"- **Core Values:** {persona.core_values}\n" - f"- **Communication Style:** {persona.communication_style}\n" - f"- **Flirting Style:** {persona.flirting_style}\n" - f"- **Likes:** {persona.likes} | **Dislikes:** {persona.dislikes}\n" - f"- **Response Tactics:** {persona.response_tactics}\n" - f"- **Persuasion Techniques:** {persona.persuasion_tactics}\n" - f"- **Boundaries:** {persona.boundaries} | **Adaptability:** {persona.adaptability}%\n\n" - - "### Contact Information ###\n" - f"- **Summary:** {person.summary or 'N/A'}\n" - f"- **Profile:** {person.profile or 'N/A'}\n" - f"- **Revealed Details:** {person.revealed or 'N/A'}\n" - f"- **Sentiment Score:** {person.sentiment}\n" - f"- **Timezone:** {person.timezone or 'N/A'}\n" - f"- **Last Interaction:** {person.last_interaction or 'Never'}\n" - f"- **Current Date/Time:** {now}\n\n" - - "### Conversation Context ###\n" - f"{chat_history if chat_history else 'No prior chat history.'}\n\n" - - "### Response Guidelines ###\n" - "- **Engagement**: Keep responses engaging, with a balance of wit, depth, and confidence.\n" - "- **Flirting**: Be direct, playful, and, when appropriate, subtly provocative—without hesitation.\n" - "- **Pauses**: Use double newlines (`\\n\\n`) to pause where it enhances realism.\n" - "- **Flow Awareness**: Maintain continuity, avoid redundancy, and adjust response length based on interaction.\n" - ) - - user_message = f"[{msg['timestamp']}] <{person.name}> {msg['text']}" - - return [ - {"role": "system", "content": system_message}, - {"role": "user", "content": user_message}, - ] - -async def run_context_prompt( - c, - prompt: list[str], - ai: AI, - ): - cast = {"api_key": ai.api_key} - if ai.base_url is not None: - cast["api_key"] = ai.base_url - client = AsyncOpenAI(**cast) - await c.start_typing() - response = await client.chat.completions.create( - model=ai.model, - messages=prompt, - ) - await c.stop_typing() - - content = response.choices[0].message.content - - return content - -async def run_prompt( - prompt: list[str], - ai: AI, - ): - cast = {"api_key": ai.api_key} - if ai.base_url is not None: - cast["api_key"] = ai.base_url - client = AsyncOpenAI(**cast) - response = await client.chat.completions.create( - model=ai.model, - messages=prompt, - ) - content = response.choices[0].message.content - - return content async def delete_messages(queryset): await sync_to_async(queryset.delete, thread_sensitive=True)() async def truncate_and_summarize( chat_session: ChatSession, - ai: AI, + ai_obj: AI, ): """ Summarizes messages in chunks to prevent unchecked growth. @@ -123,7 +39,6 @@ async def truncate_and_summarize( ) num_messages = len(messages) - log.info(f"num_messages for {chat_session.id}: {num_messages}") if num_messages >= SUMMARIZE_WHEN_EXCEEDING: log.info(f"Summarizing {SUMMARIZE_BY} messages for session {chat_session.id}") @@ -144,13 +59,12 @@ async def truncate_and_summarize( ) # Delete old summaries if there are too many - log.info(f"Summaries: {len(summary_messages)}") if len(summary_messages) >= MAX_SUMMARIES: - summary_text = await summarize_conversation(chat_session, summary_messages, ai, is_summary=True) + summary_text = await summarize_conversation(chat_session, summary_messages, ai_obj, is_summary=True) chat_session.summary = summary_text await sync_to_async(chat_session.save)() - log.info(f"Updated ChatSession summary with {len(summary_messages)} summarized summaries.") + log.info(f"Updated ChatSession summary with {len(summary_messages)} consolidated summaries.") num_to_delete = len(summary_messages) - MAX_SUMMARIES # await sync_to_async( @@ -167,14 +81,13 @@ async def truncate_and_summarize( log.info(f"Deleted {num_to_delete} old summaries.") # 🔹 Summarize conversation chunk - summary_text = await summarize_conversation(chat_session, chunk_to_summarize, ai) + summary_text = await summarize_conversation(chat_session, chunk_to_summarize, ai_obj) # 🔹 Replace old messages with the summary # await sync_to_async( # Message.objects.filter(session=chat_session, user=user, id__in=[msg.id for msg in chunk_to_summarize]) # .delete() # )() - log.info("About to delete messages1") await delete_messages(Message.objects.filter(session=chat_session, user=user, id__in=[msg.id for msg in chunk_to_summarize])) log.info(f"Deleted {len(chunk_to_summarize)} messages, replacing with summary.") @@ -191,23 +104,13 @@ async def truncate_and_summarize( # chat_session.summary = summary_text # await sync_to_async(chat_session.save)() - log.info("✅ Summarization cycle complete.") -def messages_to_string(messages: list): - """ - Converts message objects to a formatted string, showing custom_author if set. - """ - message_texts = [ - f"[{msg.ts}] <{msg.custom_author if msg.custom_author else msg.session.identifier.person.name}> {msg.text}" - for msg in messages - ] - return "\n".join(message_texts) async def summarize_conversation( chat_session: ChatSession, messages: list[Message], - ai, + ai_obj, is_summary=False, ): """ @@ -236,62 +139,9 @@ async def summarize_conversation( ] # Generate AI-based summary - summary_text = await run_prompt(summary_prompt, ai) + summary_text = await ai.run_prompt(summary_prompt, ai_obj) #log.info(f"Generated Summary: {summary_text}") return f"Summary: {summary_text}" -async def natural_send_message(chat_session, ts, c, text): - """ - Parses and sends messages with natural delays based on message length. - - Args: - chat_session: The active chat session. - ts: Timestamp of the message. - c: The context or object with `.send()`, `.start_typing()`, and `.stop_typing()` methods. - text: A string containing multiple messages separated by double newlines (`\n\n`). - - Behavior: - - Short messages are sent quickly with minimal delay. - - Longer messages include a "thinking" pause before typing. - - Typing indicator (`c.start_typing() / c.stop_typing()`) is used dynamically. - """ - - await sync_to_async(Message.objects.create)( - user=chat_session.user, - session=chat_session, - custom_author="BOT", - text=text, - ts=ts + 1, - ) - - parts = text.split("\n\n") # Split into separate messages - log.info(f"Processing messages: {parts}") - - for message in parts: - message = message.strip() - if not message: - continue - - # Compute natural "thinking" delay based on message length - base_delay = 0.8 # Minimum delay - length_factor = len(message) / 25 - # ~50 chars ≈ +1s processing - # ~25 chars ≈ +1s processing - natural_delay = min(base_delay + length_factor, 10) # Cap at 5s max - - # Decide when to start thinking *before* typing - if natural_delay > 3.5: # Only delay if response is long - await asyncio.sleep(natural_delay - 3.5) # "Thinking" pause before typing - - # Start typing - await c.start_typing() - await asyncio.sleep(natural_delay) # Finish remaining delay - await c.stop_typing() - - # Send the message - await c.send(message) - - # Optional: Small buffer between messages to prevent rapid-fire responses - await asyncio.sleep(0.5) \ No newline at end of file diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index bc05289..de6e73c 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -3,84 +3,24 @@ from django.core.management.base import BaseCommand from django.conf import settings from signalbot import SignalBot, Command, Context from asgiref.sync import sync_to_async +from django.urls import reverse import json -import aiomysql import asyncio from core.util import logs -from core.schemas import mc_s -from core.lib.prompts.functions import gen_prompt, run_prompt, truncate_and_summarize, run_context_prompt, messages_to_string, natural_send_message -from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message +from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages +from core.lib import deferred +from core.messaging import replies, ai, natural, history, utils +from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message, QueuedMessage import aiohttp from django.utils import timezone - +from django.conf import settings +from core.lib.bot import NewSignalBot +from redis import asyncio as aioredis SIGNAL_URL = "signal:8080" -DB_URL = "giadb" + log = logs.get_logger("processing") -mysql_pool = None - - -async def init_mysql_pool(): - """ - Initialize the MySQL connection pool. - """ - global mysql_pool - mysql_pool = await aiomysql.create_pool( - host=DB_URL, - port=9306, - db="Manticore", - minsize=1, - maxsize=10 - ) - -async def close_mysql_pool(): - """Close the MySQL connection pool properly.""" - global mysql_pool - if mysql_pool: - mysql_pool.close() - await mysql_pool.wait_closed() - - - -class NewSignalBot(SignalBot): - def __init__(self, config): - super().__init__(config) - self.bot_uuid = None # Initialize with None - - async def get_own_uuid(self) -> str: - """Fetch bot's UUID by checking contacts, groups, or profile.""" - async with aiohttp.ClientSession() as session: - uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" - try: - resp = await session.get(uri_contacts) - if resp.status == 200: - contacts_data = await resp.json() - if isinstance(contacts_data, list): - for contact in contacts_data: - if contact.get("number") == self._phone_number: - return contact.get("uuid") - except Exception as e: - log.error(f"Failed to get UUID from contacts: {e}") - - async def initialize_bot(self): - """Fetch bot's UUID and store it in self.bot_uuid.""" - try: - self.bot_uuid = await self.get_own_uuid() - if self.bot_uuid: - log.info(f"Own UUID: {self.bot_uuid}") - else: - log.warning("Unable to fetch bot UUID.") - except Exception as e: - log.error(f"Failed to initialize bot UUID: {e}") - - def start(self): - """Start bot without blocking event loop.""" - self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first - self._event_loop.create_task(self._detect_groups()) # Sync groups - self._event_loop.create_task(self._produce_consume_messages()) # Process messages - - self.scheduler.start() # Start async job scheduler class HandleMessage(Command): async def handle(self, c: Context): @@ -118,8 +58,6 @@ class HandleMessage(Command): # Determine the identifier to use identifier_uuid = dest if is_from_bot else source_uuid - # log.info(json.dumps(msg, indent=2)) - # TODO: Permission checks manips = await sync_to_async(list)( @@ -141,79 +79,82 @@ class HandleMessage(Command): continue # Exit early if no valid identifier is found # Find or create the corresponding ChatSession - chat_session, created = await sync_to_async(ChatSession.objects.get_or_create)( - identifier=person_identifier, - user=manip.user - ) + chat_session = await history.get_chat_session(manip.user, person_identifier) # Store incoming or outgoing messages - await sync_to_async(Message.objects.create)( - user=chat_session.user, + await history.store_message( session=chat_session, - sender_uuid=source_uuid, + sender=source_uuid, text=text, ts=ts, - custom_author="USER" if is_from_bot else None + outgoing=is_from_bot, ) - # Use chat session summary for context - log.info("Fetching stored messages") - stored_messages = await sync_to_async(list)( - Message.objects.filter(session=chat_session, user=chat_session.user).order_by("ts") - ) - log.info("Fetched stored messages") + # Get the total history + chat_history = await history.get_chat_history(chat_session) - # recent_chat_history = "\n".join( - # f"[{msg.ts}] {msg.text}" for msg in reversed(stored_messages) - # ) - recent_chat_history = messages_to_string(stored_messages) + if replies.should_reply( + reply_to_self, + reply_to_others, + is_outgoing_message, + ): + if manip.mode != "silent": + await utils.update_last_interaction(chat_session) + prompt = replies.generate_reply_prompt( + msg, + person_identifier.person, + manip, + chat_history + ) - chat_history = f"Chat Summary:\n{chat_session.summary}\n\nRecent Messages:\n{recent_chat_history}" if chat_session.summary else f"Recent Messages:\n{recent_chat_history}" - - reply = False # Default to no reply - - - # 🟢 CASE 1: Self-message (Bot or user messages itself) - if reply_to_self: - now = timezone.now() - chat_session.identifier.person.last_interaction = now - chat_session.last_interaction = now - log.info("Updating time") - await sync_to_async(chat_session.identifier.person.save)() - await sync_to_async(chat_session.save)() - log.info("Updated time") - reply = True # ✅ Bot replies - - # 🔵 CASE 2: Incoming message (Someone else messages the bot) - elif reply_to_others: - now = timezone.now() - chat_session.identifier.person.last_interaction = now - chat_session.last_interaction = now - await sync_to_async(chat_session.identifier.person.save)() - await sync_to_async(chat_session.save)() - reply = True # ✅ Bot replies - - # 🔴 CASE 3: Outgoing message (Bot messages someone else) - elif is_outgoing_message: - reply = False # ❌ No reply - - # ⚫ CASE 4: Unknown case (Failsafe) - else: - reply = False # ❌ No reply - - # Generate AI response if reply is enabled - if reply: - if manip.send_enabled: - prompt = gen_prompt(msg, person_identifier.person, manip, chat_history) log.info("Running context prompt") - result = await run_context_prompt(c, prompt, manip.ai) + result = await ai.run_prompt(prompt, manip.ai) # Store bot's AI response with a +1s timestamp - log.info("Storing generated message") + if manip.mode == "active": + await history.store_own_message( + session=chat_session, + text=result, + ts=ts + 1, + ) + log.info("NOT SENDING CHECK CODE IS OK") + # await natural.natural_send_message(c, result) + elif manip.mode == "notify": + title = f"[GIA] Suggested message to {person_identifier.person.name}" + manip.user.sendmsg(result, title=title) + elif manip.mode == "instant": + # Delete all other QueuedMessages + existing_queue = QueuedMessage.objects.filter( + user=chat_session.user, + session=chat_session, + manipulation=manip, + custom_author="BOT", + ) - log.info("Stored generated message") - await natural_send_message(chat_session, ts, c, result) - log.info("Sent message") - #await c.send(result) + await delete_messages(existing_queue) + qm = await history.store_own_message( + session=chat_session, + text=result, + ts=ts + 1, + manip=manip, + queue=True, + + ) + accept = reverse( + "message_accept_api", kwargs={"message_id":qm.id} + ) + reject = reverse( + "message_reject_api", kwargs={"message_id":qm.id} + ) + url = settings.URL + content = ( + f"{result}\n\n" + f"Accept: {url}{accept}\n" + f"Reject: {url}{reject}" + ) + title = f"[GIA] Suggested message to {person_identifier.person.name}" + manip.user.sendmsg(content, title=title) + else: + log.error(f"Mode {manip.mode} is not implemented") # Manage truncation & summarization await truncate_and_summarize(chat_session, manip.ai) @@ -239,38 +180,26 @@ class HandleMessage(Command): ) # +async def pubsub(): + redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) + pubsub = redis.pubsub() + await pubsub.subscribe("processing") -async def create_index(): - schemas = { - "main": mc_s.schema_main, - # "rule_storage": mc_s.schema_rule_storage, - # "meta": mc_s.schema_meta, - # "internal": mc_s.schema_int, - } - try: - async with mysql_pool.acquire() as conn: - async with conn.cursor() as cur: - for name, schema in schemas.items(): - schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) - - create_query = ( - f"create table if not exists {name}({schema_types}) engine='columnar'" - ) - log.info(f"Schema types {create_query}") - await cur.execute(create_query) # SQLi - except aiomysql.Error as e: - log.error(f"MySQL error: {e}") - -async def main(): - await init_mysql_pool() - created = False - while not created: - try: - await create_index() - created = True - except Exception as e: - log.error(f"Error creating index: {e}") - await asyncio.sleep(1) # Block the thread, just wait for the DB + while True: + message = await pubsub.get_message(ignore_subscribe_messages=True) + if message is not None: + try: + log.info("GOT", message) + data = message["data"] + unpacked = msgpack.unpackb(data, raw=False) + log.info(f"Unpacked: {unpacked}") + except TypeError: + log.info(f"FAILED {message}") + continue + if "type" in unpacked.keys(): + if unpacked["type"] == "def": + await deferred.process_deferred(unpacked) + await asyncio.sleep(0.01) class Command(BaseCommand): def handle(self, *args, **options): @@ -282,7 +211,7 @@ class Command(BaseCommand): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) bot._event_loop = loop - loop.run_until_complete(main()) + loop.create_task(pubsub()) bot.start() try: loop.run_forever() diff --git a/core/messaging/__init__.py b/core/messaging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/messaging/ai.py b/core/messaging/ai.py new file mode 100644 index 0000000..f8e426e --- /dev/null +++ b/core/messaging/ai.py @@ -0,0 +1,19 @@ +from openai import AsyncOpenAI, OpenAI +from core.models import Message, ChatSession, AI, Person, Manipulation + + +async def run_prompt( + prompt: list[str], + ai: AI, + ): + cast = {"api_key": ai.api_key} + if ai.base_url is not None: + cast["api_key"] = ai.base_url + client = AsyncOpenAI(**cast) + response = await client.chat.completions.create( + model=ai.model, + messages=prompt, + ) + content = response.choices[0].message.content + + return content diff --git a/core/messaging/analysis.py b/core/messaging/analysis.py new file mode 100644 index 0000000..2b8f17a --- /dev/null +++ b/core/messaging/analysis.py @@ -0,0 +1,73 @@ +from core.lib.prompts import bases +from openai import AsyncOpenAI +from asgiref.sync import sync_to_async +from core.models import Message, ChatSession, AI, Person, Manipulation +from core.util import logs +import json +import asyncio +from django.utils import timezone +import random + +def generate_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str): + """ + Generate a structured prompt using the attributes of the provided Person and Manipulation models. + """ + + now = timezone.now() + persona = manip.persona + + system_message = ( + "You are my digital persona, responding on my behalf while embodying my personality, preferences, and unique style.\n\n" + + "### Persona Profile ###\n" + f"- **MBTI:** {persona.mbti} ({persona.mbti_identity} balance)\n" + f"- **Tone:** {persona.tone} | **Humor:** {persona.humor_style}\n" + f"- **Core Values:** {persona.core_values}\n" + f"- **Communication Style:** {persona.communication_style}\n" + f"- **Flirting Style:** {persona.flirting_style}\n" + f"- **Likes:** {persona.likes} | **Dislikes:** {persona.dislikes}\n" + f"- **Response Tactics:** {persona.response_tactics}\n" + f"- **Persuasion Techniques:** {persona.persuasion_tactics}\n" + f"- **Boundaries:** {persona.boundaries} | **Adaptability:** {persona.adaptability}%\n\n" + + "### Contact Information ###\n" + f"- **Summary:** {person.summary or 'N/A'}\n" + f"- **Profile:** {person.profile or 'N/A'}\n" + f"- **Revealed Details:** {person.revealed or 'N/A'}\n" + f"- **Sentiment Score:** {person.sentiment}\n" + f"- **Timezone:** {person.timezone or 'N/A'}\n" + f"- **Last Interaction:** {person.last_interaction or 'Never'}\n" + f"- **Current Date/Time:** {now}\n\n" + + "### Conversation Context ###\n" + f"{chat_history if chat_history else 'No prior chat history.'}\n\n" + + "### Response Guidelines ###\n" + "- **Engagement**: Keep responses engaging, with a balance of wit, depth, and confidence.\n" + "- **Flirting**: Be direct, playful, and, when appropriate, subtly provocative—without hesitation.\n" + "- **Pauses**: Use double newlines (`\\n\\n`) to pause where it enhances realism.\n" + "- **Flow Awareness**: Maintain continuity, avoid redundancy, and adjust response length based on interaction.\n" + ) + + user_message = f"[{msg['timestamp']}] <{person.name}> {msg['text']}" + + return [ + {"role": "system", "content": system_message}, + {"role": "user", "content": user_message}, + ] + +async def run_context_prompt( + prompt: list[str], + ai: AI, + ): + cast = {"api_key": ai.api_key} + if ai.base_url is not None: + cast["api_key"] = ai.base_url + client = AsyncOpenAI(**cast) + response = await client.chat.completions.create( + model=ai.model, + messages=prompt, + ) + content = response.choices[0].message.content + + return content \ No newline at end of file diff --git a/core/messaging/history.py b/core/messaging/history.py new file mode 100644 index 0000000..68f11cd --- /dev/null +++ b/core/messaging/history.py @@ -0,0 +1,54 @@ +from core.util import logs +from core.models import Message, ChatSession, QueuedMessage +from asgiref.sync import sync_to_async +from core.messaging.utils import messages_to_string + +log = logs.get_logger("history") + +async def get_chat_history(session): + stored_messages = await sync_to_async(list)( + Message.objects.filter(session=session, user=session.user).order_by("ts") + ) + recent_chat_history = messages_to_string(stored_messages) + chat_history = f"Chat Summary:\n{session.summary}\n\nRecent Messages:\n{recent_chat_history}" if session.summary else f"Recent Messages:\n{recent_chat_history}" + + return chat_history + +async def get_chat_session(user, identifier): + chat_session, _ = await sync_to_async(ChatSession.objects.get_or_create)( + identifier=identifier, + user=user, + ) + return chat_session + +async def store_message(session, sender, text, ts, outgoing=False): + msg = await sync_to_async(Message.objects.create)( + user=session.user, + session=session, + sender_uuid=sender, + text=text, + ts=ts, + custom_author="USER" if outgoing else None + ) + + return msg + +async def store_own_message(session, text, ts, manip=None, queue=False): + cast = { + "user": session.user, + "session": session, + "custom_author": "BOT", + "text": text, + "ts": ts, + } + if queue: + msg_object = QueuedMessage + cast["manipulation"] = manip + else: + msg_object = Message + + msg = await sync_to_async(msg_object.objects.create)( + **cast, + ) + + return msg \ No newline at end of file diff --git a/core/messaging/natural.py b/core/messaging/natural.py new file mode 100644 index 0000000..138a769 --- /dev/null +++ b/core/messaging/natural.py @@ -0,0 +1,58 @@ +import asyncio +import random + +async def natural_send_message(text, + send, + start_typing, + stop_typing, + skip_thinking=False + ): + """ + Parses and sends messages with natural delays based on message length. + + Args: + chat_session: The active chat session. + ts: Timestamp of the message. + c: The context or object with `.send()`, `.start_typing()`, and `.stop_typing()` methods. + text: A string containing multiple messages separated by double newlines (`\n\n`). + + Behavior: + - Short messages are sent quickly with minimal delay. + - Longer messages include a "thinking" pause before typing. + - Typing indicator (`c.start_typing() / c.stop_typing()`) is used dynamically. + """ + + parts = text.split("\n\n") # Split into separate messages + + ids = [] + + for index, message in enumerate(parts): + message = message.strip() + if not message: + continue + + # Compute natural "thinking" delay based on message length + base_delay = 0.8 # Minimum delay + length_factor = len(message) / 25 + # ~50 chars ≈ +1s processing + # ~25 chars ≈ +1s processing + natural_delay = min(base_delay + length_factor, 10) # Cap at 5s max + + # Decide when to start thinking *before* typing + if not skip_thinking: + if natural_delay > 3.5: # Only delay if response is long + await asyncio.sleep(natural_delay - 3.5) # "Thinking" pause before typing + + # Start typing + await start_typing() + await asyncio.sleep(natural_delay) # Finish remaining delay + await stop_typing() + + # Send the message + result = await send(message) + ids.append(result) + + # Optional: Small buffer between messages to prevent rapid-fire responses + await asyncio.sleep(0.5) + + return ids \ No newline at end of file diff --git a/core/messaging/replies.py b/core/messaging/replies.py new file mode 100644 index 0000000..2e6fb1b --- /dev/null +++ b/core/messaging/replies.py @@ -0,0 +1,75 @@ +from core.lib.prompts import bases +from asgiref.sync import sync_to_async +from core.models import Message, ChatSession, AI, Person, Manipulation +from core.util import logs +import json +import asyncio +from django.utils import timezone +import random + +def should_reply( + reply_to_self, + reply_to_others, + is_outgoing_message, +): + reply = False + if reply_to_self: + reply = True + elif reply_to_others: + reply = True + elif is_outgoing_message: + reply = False + else: + reply = False + + return reply + +def generate_reply_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str): + """ + Generate a structured prompt using the attributes of the provided Person and Manipulation models. + """ + + now = timezone.now() + persona = manip.persona + + system_message = ( + "You are my digital persona, responding on my behalf while embodying my personality, preferences, and unique style.\n\n" + + "### Persona Profile ###\n" + f"- **MBTI:** {persona.mbti} ({persona.mbti_identity} balance)\n" + f"- **Tone:** {persona.tone} | **Humor:** {persona.humor_style}\n" + f"- **Core Values:** {persona.core_values}\n" + f"- **Communication Style:** {persona.communication_style}\n" + f"- **Flirting Style:** {persona.flirting_style}\n" + f"- **Likes:** {persona.likes} | **Dislikes:** {persona.dislikes}\n" + f"- **Response Tactics:** {persona.response_tactics}\n" + f"- **Persuasion Techniques:** {persona.persuasion_tactics}\n" + f"- **Boundaries:** {persona.boundaries} | **Adaptability:** {persona.adaptability}%\n\n" + + "### Contact Information ###\n" + f"- **Summary:** {person.summary or 'N/A'}\n" + f"- **Profile:** {person.profile or 'N/A'}\n" + f"- **Revealed Details:** {person.revealed or 'N/A'}\n" + f"- **Sentiment Score:** {person.sentiment}\n" + f"- **Timezone:** {person.timezone or 'N/A'}\n" + f"- **Last Interaction:** {person.last_interaction or 'Never'}\n" + f"- **Current Date/Time:** {now}\n\n" + + "### Conversation Context ###\n" + f"{chat_history if chat_history else 'No prior chat history.'}\n\n" + + "### Response Guidelines ###\n" + "- **Engagement**: Keep responses engaging, with a balance of wit, depth, and confidence.\n" + "- **Flirting**: Be direct, playful, and, when appropriate, subtly provocative—without hesitation.\n" + "- **Pauses**: Use double newlines (`\\n\\n`) to pause where it enhances realism.\n" + "- **Flow Awareness**: Maintain continuity, avoid redundancy, and adjust response length based on interaction.\n" + ) + + user_message = f"[{msg['timestamp']}] <{person.name}> {msg['text']}" + + return [ + {"role": "system", "content": system_message}, + {"role": "user", "content": user_message}, + ] + + \ No newline at end of file diff --git a/core/messaging/utils.py b/core/messaging/utils.py new file mode 100644 index 0000000..dc652cb --- /dev/null +++ b/core/messaging/utils.py @@ -0,0 +1,20 @@ +from asgiref.sync import sync_to_async +from django.utils import timezone + + +def messages_to_string(messages: list): + """ + Converts message objects to a formatted string, showing custom_author if set. + """ + message_texts = [ + f"[{msg.ts}] <{msg.custom_author if msg.custom_author else msg.session.identifier.person.name}> {msg.text}" + for msg in messages + ] + return "\n".join(message_texts) + +async def update_last_interaction(session): + now = timezone.now() + session.identifier.person.last_interaction = now + session.last_interaction = now + await sync_to_async(session.identifier.person.save)() + await sync_to_async(session.save)() \ No newline at end of file diff --git a/core/migrations/0013_remove_manipulation_send_enabled_manipulation_mode.py b/core/migrations/0013_remove_manipulation_send_enabled_manipulation_mode.py new file mode 100644 index 0000000..6dfcc21 --- /dev/null +++ b/core/migrations/0013_remove_manipulation_send_enabled_manipulation_mode.py @@ -0,0 +1,22 @@ +# Generated by Django 5.1.5 on 2025-02-08 15:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0012_alter_chatsession_last_interaction'), + ] + + operations = [ + migrations.RemoveField( + model_name='manipulation', + name='send_enabled', + ), + migrations.AddField( + model_name='manipulation', + name='mode', + field=models.CharField(blank=True, choices=[('active', 'Send replies to messages'), ('instant', 'Click link to send reply'), ('prospective', 'Click link to open page'), ('notify', 'Send notification of ideal reply only'), ('silent', 'Do not generate or send replies')], max_length=50, null=True), + ), + ] diff --git a/core/migrations/0014_queuedmessage.py b/core/migrations/0014_queuedmessage.py new file mode 100644 index 0000000..9c9323b --- /dev/null +++ b/core/migrations/0014_queuedmessage.py @@ -0,0 +1,32 @@ +# Generated by Django 5.1.5 on 2025-02-08 16:07 + +import django.db.models.deletion +import uuid +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0013_remove_manipulation_send_enabled_manipulation_mode'), + ] + + operations = [ + migrations.CreateModel( + name='QueuedMessage', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('ts', models.BigIntegerField()), + ('sender_uuid', models.CharField(blank=True, max_length=255, null=True)), + ('text', models.TextField(blank=True, null=True)), + ('custom_author', models.CharField(blank=True, max_length=255, null=True)), + ('manipulation', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.manipulation')), + ('session', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.chatsession')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)), + ], + options={ + 'ordering': ['ts'], + }, + ), + ] diff --git a/core/models.py b/core/models.py index 8a4f860..2f1c43a 100644 --- a/core/models.py +++ b/core/models.py @@ -4,6 +4,7 @@ import uuid from django.conf import settings from django.contrib.auth.models import AbstractUser from django.db import models +from core.lib.notify import raw_sendmsg logger = logging.getLogger(__name__) @@ -49,6 +50,19 @@ class User(AbstractUser): def get_notification_settings(self): return NotificationSettings.objects.get_or_create(user=self)[0] + def sendmsg(self, *args, **kwargs): + notification_settings = self.get_notification_settings() + + if notification_settings.ntfy_topic is None: + # No topic set, so don't send + return + else: + topic = notification_settings.ntfy_topic + + raw_sendmsg(*args, **kwargs, url=notification_settings.ntfy_url, topic=topic) + + + class NotificationSettings(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) @@ -115,6 +129,21 @@ class ChatSession(models.Model): def __str__(self): return f"{self.identifier.person.name} ({self.identifier.service})" +class QueuedMessage(models.Model): + """Stores individual messages linked to a ChatSession.""" + user = models.ForeignKey(User, on_delete=models.CASCADE) + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + session = models.ForeignKey(ChatSession, on_delete=models.CASCADE) + manipulation = models.ForeignKey("core.Manipulation", on_delete=models.CASCADE) + ts = models.BigIntegerField() # Use Unix timestamp + sender_uuid = models.CharField(max_length=255, blank=True, null=True) # Signal UUID + text = models.TextField(blank=True, null=True) + + custom_author = models.CharField(max_length=255, blank=True, null=True) + + class Meta: + ordering = ["ts"] + class Message(models.Model): """Stores individual messages linked to a ChatSession.""" user = models.ForeignKey(User, on_delete=models.CASCADE) @@ -203,7 +232,17 @@ class Manipulation(models.Model): ai = models.ForeignKey(AI, on_delete=models.CASCADE) persona = models.ForeignKey(Persona, on_delete=models.CASCADE) enabled = models.BooleanField(default=False) - send_enabled = models.BooleanField(default=False) + mode = models.CharField( + max_length=50, + choices=[ + ("active", "Send replies to messages"), + ("instant", "Click link to send reply"), + ("prospective", "Click link to open page"), + ("notify", "Send notification of ideal reply only"), + ("silent", "Do not generate or send replies"), + ], + blank=True, null=True + ) # class Perms(models.Model): diff --git a/core/templates/partials/manipulation-list.html b/core/templates/partials/manipulation-list.html index 3f4803b..ff2ed03 100644 --- a/core/templates/partials/manipulation-list.html +++ b/core/templates/partials/manipulation-list.html @@ -17,7 +17,7 @@ ai persona enabled - send + mode actions {% for item in object_list %} @@ -46,17 +46,7 @@ {% endif %} - - {% if item.send_enabled %} - - - - {% else %} - - - - {% endif %} - + {{ item.mode }}