Files
GIA/core/lib/prompts/functions.py

297 lines
11 KiB
Python

from core.lib.prompts import bases
from openai import AsyncOpenAI
from asgiref.sync import sync_to_async
from core.models import Message, ChatSession, AI, Person, Manipulation
from core.util import logs
import json
import asyncio
from django.utils import timezone
import random
SUMMARIZE_WHEN_EXCEEDING = 10
SUMMARIZE_BY = 5
MAX_SUMMARIES = 3 # Keep last 5 summaries
log = logs.get_logger("prompts")
def gen_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str):
"""
Generate a structured prompt using the attributes of the provided Person and Manipulation models.
"""
now = timezone.now()
persona = manip.persona
system_message = (
"You are my digital persona, responding on my behalf while embodying my personality, preferences, and unique style.\n\n"
"### Persona Profile ###\n"
f"- **MBTI:** {persona.mbti} ({persona.mbti_identity} balance)\n"
f"- **Tone:** {persona.tone} | **Humor:** {persona.humor_style}\n"
f"- **Core Values:** {persona.core_values}\n"
f"- **Communication Style:** {persona.communication_style}\n"
f"- **Flirting Style:** {persona.flirting_style}\n"
f"- **Likes:** {persona.likes} | **Dislikes:** {persona.dislikes}\n"
f"- **Response Tactics:** {persona.response_tactics}\n"
f"- **Persuasion Techniques:** {persona.persuasion_tactics}\n"
f"- **Boundaries:** {persona.boundaries} | **Adaptability:** {persona.adaptability}%\n\n"
"### Contact Information ###\n"
f"- **Summary:** {person.summary or 'N/A'}\n"
f"- **Profile:** {person.profile or 'N/A'}\n"
f"- **Revealed Details:** {person.revealed or 'N/A'}\n"
f"- **Sentiment Score:** {person.sentiment}\n"
f"- **Timezone:** {person.timezone or 'N/A'}\n"
f"- **Last Interaction:** {person.last_interaction or 'Never'}\n"
f"- **Current Date/Time:** {now}\n\n"
"### Conversation Context ###\n"
f"{chat_history if chat_history else 'No prior chat history.'}\n\n"
"### Response Guidelines ###\n"
"- **Engagement**: Keep responses engaging, with a balance of wit, depth, and confidence.\n"
"- **Flirting**: Be direct, playful, and, when appropriate, subtly provocative—without hesitation.\n"
"- **Pauses**: Use double newlines (`\\n\\n`) to pause where it enhances realism.\n"
"- **Flow Awareness**: Maintain continuity, avoid redundancy, and adjust response length based on interaction.\n"
)
user_message = f"[{msg['timestamp']}] <{person.name}> {msg['text']}"
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message},
]
async def run_context_prompt(
c,
prompt: list[str],
ai: AI,
):
cast = {"api_key": ai.api_key}
if ai.base_url is not None:
cast["api_key"] = ai.base_url
client = AsyncOpenAI(**cast)
await c.start_typing()
response = await client.chat.completions.create(
model=ai.model,
messages=prompt,
)
await c.stop_typing()
content = response.choices[0].message.content
return content
async def run_prompt(
prompt: list[str],
ai: AI,
):
cast = {"api_key": ai.api_key}
if ai.base_url is not None:
cast["api_key"] = ai.base_url
client = AsyncOpenAI(**cast)
response = await client.chat.completions.create(
model=ai.model,
messages=prompt,
)
content = response.choices[0].message.content
return content
async def delete_messages(queryset):
await sync_to_async(queryset.delete, thread_sensitive=True)()
async def truncate_and_summarize(
chat_session: ChatSession,
ai: AI,
):
"""
Summarizes messages in chunks to prevent unchecked growth.
- Summarizes only non-summary messages.
- Deletes older summaries if too many exist.
- Ensures only messages belonging to `chat_session.user` are modified.
"""
user = chat_session.user # Store the user for ownership checks
# 🔹 Get non-summary messages owned by the session's user
messages = await sync_to_async(list)(
Message.objects.filter(session=chat_session, user=user)
.exclude(custom_author="SUM")
.order_by("ts")
)
num_messages = len(messages)
log.info(f"num_messages for {chat_session.id}: {num_messages}")
if num_messages >= SUMMARIZE_WHEN_EXCEEDING:
log.info(f"Summarizing {SUMMARIZE_BY} messages for session {chat_session.id}")
# Get the first `SUMMARIZE_BY` non-summary messages
chunk_to_summarize = messages[:SUMMARIZE_BY]
if not chunk_to_summarize:
log.warning("No messages available to summarize (only summaries exist). Skipping summarization.")
return
last_ts = chunk_to_summarize[-1].ts # Preserve timestamp
# 🔹 Get past summaries, keeping only the last few (owned by the session user)
summary_messages = await sync_to_async(list)(
Message.objects.filter(session=chat_session, user=user, custom_author="SUM")
.order_by("ts")
)
# Delete old summaries if there are too many
log.info(f"Summaries: {len(summary_messages)}")
if len(summary_messages) >= MAX_SUMMARIES:
summary_text = await summarize_conversation(chat_session, summary_messages, ai, is_summary=True)
chat_session.summary = summary_text
await sync_to_async(chat_session.save)()
log.info(f"Updated ChatSession summary with {len(summary_messages)} summarized summaries.")
num_to_delete = len(summary_messages) - MAX_SUMMARIES
# await sync_to_async(
# Message.objects.filter(session=chat_session, user=user, id__in=[msg.id for msg in summary_messages[:num_to_delete]])
# .delete()
# )()
await delete_messages(
Message.objects.filter(
session=chat_session,
user=user,
id__in=[msg.id for msg in summary_messages[:num_to_delete]]
)
)
log.info(f"Deleted {num_to_delete} old summaries.")
# 🔹 Summarize conversation chunk
summary_text = await summarize_conversation(chat_session, chunk_to_summarize, ai)
# 🔹 Replace old messages with the summary
# await sync_to_async(
# Message.objects.filter(session=chat_session, user=user, id__in=[msg.id for msg in chunk_to_summarize])
# .delete()
# )()
log.info("About to delete messages1")
await delete_messages(Message.objects.filter(session=chat_session, user=user, id__in=[msg.id for msg in chunk_to_summarize]))
log.info(f"Deleted {len(chunk_to_summarize)} messages, replacing with summary.")
# 🔹 Store new summary message (ensuring session=user consistency)
await sync_to_async(Message.objects.create)(
user=user,
session=chat_session,
custom_author="SUM",
text=summary_text,
ts=last_ts, # Preserve timestamp
)
# 🔹 Update ChatSession summary with latest merged summary
# chat_session.summary = summary_text
# await sync_to_async(chat_session.save)()
log.info("✅ Summarization cycle complete.")
def messages_to_string(messages: list):
"""
Converts message objects to a formatted string, showing custom_author if set.
"""
message_texts = [
f"[{msg.ts}] <{msg.custom_author if msg.custom_author else msg.session.identifier.person.name}> {msg.text}"
for msg in messages
]
return "\n".join(message_texts)
async def summarize_conversation(
chat_session: ChatSession,
messages: list[Message],
ai,
is_summary=False,
):
"""
Summarizes all stored messages into a single summary.
- If `is_summary=True`, treats input as previous summaries and merges them while keeping detail.
- If `is_summary=False`, summarizes raw chat messages concisely.
"""
log.info(f"Summarizing messages for session {chat_session.id}")
# Convert messages to structured text format
message_texts = messages_to_string(messages)
#log.info(f"Raw messages to summarize:\n{message_texts}")
# Select appropriate summarization instruction
instruction = (
"Merge and refine these past summaries, keeping critical details and structure intact."
if is_summary
else "Summarize this conversation concisely, maintaining important details and tone."
)
summary_prompt = [
{"role": "system", "content": instruction},
{"role": "user", "content": f"Conversation:\n{message_texts}\n\nProvide a clear and structured summary:"},
]
# Generate AI-based summary
summary_text = await run_prompt(summary_prompt, ai)
#log.info(f"Generated Summary: {summary_text}")
return f"Summary: {summary_text}"
async def natural_send_message(chat_session, ts, c, text):
"""
Parses and sends messages with natural delays based on message length.
Args:
chat_session: The active chat session.
ts: Timestamp of the message.
c: The context or object with `.send()`, `.start_typing()`, and `.stop_typing()` methods.
text: A string containing multiple messages separated by double newlines (`\n\n`).
Behavior:
- Short messages are sent quickly with minimal delay.
- Longer messages include a "thinking" pause before typing.
- Typing indicator (`c.start_typing() / c.stop_typing()`) is used dynamically.
"""
await sync_to_async(Message.objects.create)(
user=chat_session.user,
session=chat_session,
custom_author="BOT",
text=text,
ts=ts + 1,
)
parts = text.split("\n\n") # Split into separate messages
log.info(f"Processing messages: {parts}")
for message in parts:
message = message.strip()
if not message:
continue
# Compute natural "thinking" delay based on message length
base_delay = 0.8 # Minimum delay
length_factor = len(message) / 25
# ~50 chars ≈ +1s processing
# ~25 chars ≈ +1s processing
natural_delay = min(base_delay + length_factor, 10) # Cap at 5s max
# Decide when to start thinking *before* typing
if natural_delay > 3.5: # Only delay if response is long
await asyncio.sleep(natural_delay - 3.5) # "Thinking" pause before typing
# Start typing
await c.start_typing()
await asyncio.sleep(natural_delay) # Finish remaining delay
await c.stop_typing()
# Send the message
await c.send(message)
# Optional: Small buffer between messages to prevent rapid-fire responses
await asyncio.sleep(0.5)