diff --git a/core/clients/signal.py b/core/clients/signal.py index 011bd56..7e0c372 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -170,8 +170,39 @@ class HandleMessage(Command): #recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}" user = identifier.user - log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") - await self.ur.xmpp.client.send_from_external(user, identifier, text, is_outgoing_message, attachments=xmpp_attachments) + manipulations = Manipulation.objects.filter( + group__people=identifier.person, + user=identifier.user, + #mode="mutate", + filter_enabled=True, + enabled=True, + ) + # chat_history = await history.get_chat_history(session) + # await utils.update_last_interaction(session) + if manipulations: + manip = manipulations.first() + prompt = replies.generate_mutate_reply_prompt( + text, + None, + manip, + None, + ) + + + log.info("Running Signal context prompt") + result = await ai.run_prompt(prompt, manip.ai) + log.info(f"RESULT {result}") + # await history.store_own_message( + # session=session, + # text=result, + # ts=int(now().timestamp() * 1000), + # ) + log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") + await self.ur.xmpp.client.send_from_external(user, identifier, result, is_outgoing_message, attachments=xmpp_attachments) + + if not manipulations.exists(): + log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") + await self.ur.xmpp.client.send_from_external(user, identifier, text, is_outgoing_message, attachments=xmpp_attachments) #### @@ -222,7 +253,7 @@ class HandleMessage(Command): reply_to_others, is_outgoing_message, ): - if manip.mode != "silent": + if manip.mode not in ["silent", "mutate"]: await utils.update_last_interaction(chat_session) prompt = replies.generate_reply_prompt( msg, @@ -240,6 +271,7 @@ class HandleMessage(Command): ts=ts + 1, ) # await natural.natural_send_message(c, result) + await self.ur.xmpp.client.send_from_external(manip.user, person_identifier, result, is_outgoing_message=True) tss = await natural.natural_send_message( result, c.send, diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py index 98a7483..ad5a953 100644 --- a/core/clients/xmpp.py +++ b/core/clients/xmpp.py @@ -2,7 +2,7 @@ from core.clients import ClientBase from django.conf import settings from slixmpp.componentxmpp import ComponentXMPP from django.conf import settings -from core.models import User, Person, PersonIdentifier, ChatSession +from core.models import User, Person, PersonIdentifier, ChatSession, Manipulation from asgiref.sync import sync_to_async from django.utils.timezone import now import asyncio @@ -14,6 +14,7 @@ from slixmpp.xmlstream.stanzabase import ET import aiohttp from core.messaging import history from core.util import logs +from core.messaging import replies, utils, ai class XMPPComponent(ComponentXMPP): @@ -503,17 +504,52 @@ class XMPPComponent(ComponentXMPP): session=session, sender="XMPP", text=body, - ts=now().timestamp(), + ts=int(now().timestamp() * 1000), #outgoing=detail.is_outgoing_message, ????????? TODO: ) self.log.info("Stored a message sent from XMPP in the history.") + manipulations = Manipulation.objects.filter( + group__people=identifier.person, + user=identifier.user, + mode="mutate", + enabled=True, + ) + self.log.info(f"MANIP11 {manipulations}") + if not manipulations: + tss = await signalapi.send_message_raw( + identifier.identifier, + body, + attachments, + ) + self.log.info(f"Message sent unaltered") + return + + manip = manipulations.first() + chat_history = await history.get_chat_history(session) + await utils.update_last_interaction(session) + prompt = replies.generate_mutate_reply_prompt( + body, + identifier.person, + manip, + chat_history, + ) + self.log.info("Running XMPP context prompt") + result = await ai.run_prompt(prompt, manip.ai) + self.log.info(f"RESULT {result}") + await history.store_own_message( + session=session, + text=result, + ts=int(now().timestamp() * 1000), + ) tss = await signalapi.send_message_raw( identifier.identifier, - body, + result, attachments, ) - self.log.info(f"Message sent") + self.log.info(f"Message sent with modifications") + + async def request_upload_slots(self, recipient_jid, attachments): """Requests upload slots for multiple attachments concurrently.""" @@ -567,30 +603,10 @@ class XMPPComponent(ComponentXMPP): sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" if is_outgoing_message: - self.log.info(f"Forwarding outgoing message as {recipient_jid}") - - # Create the message as if it were sent by the user - msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") - msg["body"] = text # Original message content - - # Create the forwarded inside the element - forwarded_elem = ET.Element("{urn:xmpp:forward:0}forwarded") - - # Create a element inside the forwarded stanza - message_elem = ET.Element("message", attrib={"from": recipient_jid, "to": recipient_jid, "type": "chat"}) - body_elem = ET.SubElement(message_elem, "body") - body_elem.text = text - - # Attach the forwarded message - forwarded_elem.append(message_elem) - msg.xml.append(forwarded_elem) - - # Send the forwarded message - msg.send() - + await self.send_xmpp_message(recipient_jid, sender_jid, f"YOU: {text}") # Step 1: Send text message separately - if text: + elif text: await self.send_xmpp_message(recipient_jid, sender_jid, text) if not attachments: diff --git a/core/forms.py b/core/forms.py index 6406c9e..c0ad540 100644 --- a/core/forms.py +++ b/core/forms.py @@ -131,7 +131,7 @@ class PersonaForm(RestrictedFormMixin, forms.ModelForm): class ManipulationForm(RestrictedFormMixin, forms.ModelForm): class Meta: model = Manipulation - fields = ("name", "group", "ai", "persona", "enabled", "mode") + fields = ("name", "group", "ai", "persona", "enabled", "mode", "filter_enabled") help_texts = { "name": "The name of this manipulation strategy.", "group": "The group involved in this manipulation strategy.", @@ -140,6 +140,7 @@ class ManipulationForm(RestrictedFormMixin, forms.ModelForm): "persona": "The persona used for this manipulation.", "enabled": "Whether this manipulation is enabled.", "mode": "Mode of operation.", + "filter_enabled": "Whether incoming messages will be filtered using the persona.", } diff --git a/core/management/commands/component.py b/core/management/commands/component.py index 975956b..f3d079e 100644 --- a/core/management/commands/component.py +++ b/core/management/commands/component.py @@ -27,562 +27,6 @@ class Attachment(ElementBase): plugin_attrib = "attachment" interfaces = {"url", "filename", "content_type"} -class EchoComponent(ComponentXMPP): - - """ - A simple Slixmpp component that echoes messages. - """ - - def __init__(self, jid, secret, server, port): - super().__init__(jid, secret, server, port) - # Register chat state plugins - register_stanza_plugin(Message, Active) - register_stanza_plugin(Message, Composing) - register_stanza_plugin(Message, Paused) - register_stanza_plugin(Message, Inactive) - register_stanza_plugin(Message, Gone) - - self.add_event_handler("session_start", self.session_start) - self.add_event_handler("disconnected", self.on_disconnected) - self.add_event_handler("message", self.message) - - # Presence event handlers - self.add_event_handler("presence_available", self.on_presence_available) - self.add_event_handler("presence_dnd", self.on_presence_dnd) - self.add_event_handler("presence_xa", self.on_presence_xa) - self.add_event_handler("presence_chat", self.on_presence_chat) - self.add_event_handler("presence_away", self.on_presence_away) - self.add_event_handler("presence_unavailable", self.on_presence_unavailable) - self.add_event_handler("presence_subscribe", self.on_presence_subscribe) - self.add_event_handler("presence_subscribed", self.on_presence_subscribed) - self.add_event_handler("presence_unsubscribe", self.on_presence_unsubscribe) - self.add_event_handler("presence_unsubscribed", self.on_presence_unsubscribed) - self.add_event_handler("roster_subscription_request", self.on_roster_subscription_request) - - # Chat state handlers - self.add_event_handler("chatstate_active", self.on_chatstate_active) - self.add_event_handler("chatstate_composing", self.on_chatstate_composing) - self.add_event_handler("chatstate_paused", self.on_chatstate_paused) - self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) - self.add_event_handler("chatstate_gone", self.on_chatstate_gone) - - def get_identifier(self, msg): - # Extract sender JID (full format: user@domain/resource) - sender_jid = str(msg["from"]) - - # Split into username@domain and optional resource - sender_parts = sender_jid.split("/", 1) - sender_bare_jid = sender_parts[0] # Always present: user@domain - sender_username, sender_domain = sender_bare_jid.split("@", 1) - - sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present - - # Extract recipient JID (should match component JID format) - recipient_jid = str(msg["to"]) - - if "@" in recipient_jid: - recipient_username, recipient_domain = recipient_jid.split("@", 1) - else: - recipient_username = recipient_jid - recipient_domain = recipient_jid - - # Extract message body - body = msg["body"] if msg["body"] else "[No Body]" - # Parse recipient_name and recipient_service (e.g., "mark|signal") - if "|" in recipient_username: - person_name, service = recipient_username.split("|") - person_name = person_name.title() # Capitalize for consistency - else: - person_name = recipient_username.title() - service = None - - - try: - # Lookup user in Django - log.info(f"User {sender_username}") - user = User.objects.get(username=sender_username) - - # Find Person object with name=person_name.lower() - log.info(f"Name {person_name.title()}") - person = Person.objects.get(user=user, name=person_name.title()) - - # Ensure a PersonIdentifier exists for this user, person, and service - log.info(f"Identifier {service}") - identifier = PersonIdentifier.objects.get(user=user, person=person, service=service) - - return identifier - - except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): - # If any lookup fails, reject the subscription - return None - - def update_roster(self, jid, name=None): - """ - Adds or updates a user in the roster. - """ - iq = self.Iq() - iq['type'] = 'set' - iq['roster']['items'] = {jid: {'name': name or jid}} - - iq.send() - log.info(f"Updated roster: Added {jid} ({name})") - - def on_chatstate_active(self, msg): - """ - Handle when a user is actively engaged in the chat. - """ - log.info(f"Chat state: Active from {msg['from']}.") - - identifier = self.get_identifier(msg) - - def on_chatstate_composing(self, msg): - """ - Handle when a user is typing a message. - """ - log.info(f"Chat state: Composing from {msg['from']}.") - - identifier = self.get_identifier(msg) - - def on_chatstate_paused(self, msg): - """ - Handle when a user has paused typing. - """ - log.info(f"Chat state: Paused from {msg['from']}.") - - identifier = self.get_identifier(msg) - - def on_chatstate_inactive(self, msg): - """ - Handle when a user is inactive in the chat. - """ - log.info(f"Chat state: Inactive from {msg['from']}.") - - identifier = self.get_identifier(msg) - - def on_chatstate_gone(self, msg): - """ - Handle when a user has left the chat. - """ - log.info(f"Chat state: Gone from {msg['from']}.") - - identifier = self.get_identifier(msg) - - - def on_presence_available(self, pres): - """ - Handle when a user becomes available. - """ - log.info(f"Presence available from {pres['from']}") - - def on_presence_dnd(self, pres): - """ - Handle when a user sets 'Do Not Disturb' status. - """ - log.info(f"User {pres['from']} is now in 'Do Not Disturb' mode.") - - def on_presence_xa(self, pres): - """ - Handle when a user sets 'Extended Away' status. - """ - log.info(f"User {pres['from']} is now 'Extended Away'.") - - def on_presence_chat(self, pres): - """ - Handle when a user is actively available for chat. - """ - log.info(f"User {pres['from']} is now available for chat.") - - def on_presence_away(self, pres): - """ - Handle when a user sets 'Away' status. - """ - log.info(f"User {pres['from']} is now 'Away'.") - - def on_presence_unavailable(self, pres): - """ - Handle when a user goes offline or unavailable. - """ - log.info(f"User {pres['from']} is now unavailable.") - - def on_presence_subscribe(self, pres): - """ - Handle incoming presence subscription requests. - Accept only if the recipient has a contact matching the sender. - """ - - sender_jid = str(pres['from']).split('/')[0] # Bare JID (user@domain) - recipient_jid = str(pres['to']).split('/')[0] - - log.info(f"Received subscription request from {sender_jid} to {recipient_jid}") - - try: - # Extract sender and recipient usernames - user_username, _ = sender_jid.split("@", 1) - recipient_username, _ = recipient_jid.split("@", 1) - - # Parse recipient_name and recipient_service (e.g., "mark|signal") - if "|" in recipient_username: - person_name, service = recipient_username.split("|") - person_name = person_name.title() # Capitalize for consistency - else: - person_name = recipient_username.title() - service = None - - # Lookup user in Django - log.info(f"User {user_username}") - user = User.objects.get(username=user_username) - - # Find Person object with name=person_name.lower() - log.info(f"Name {person_name.title()}") - person = Person.objects.get(user=user, name=person_name.title()) - - # Ensure a PersonIdentifier exists for this user, person, and service - log.info(f"Identifier {service}") - PersonIdentifier.objects.get(user=user, person=person, service=service) - - component_jid = f"{person_name.lower()}|{service}@{self.boundjid.bare}" - - # Accept the subscription - self.send_presence(ptype="subscribed", pto=sender_jid, pfrom=component_jid) - log.info(f"Accepted subscription from {sender_jid}, sent from {component_jid}") - - # Send a presence request **from the recipient to the sender** (ASKS THEM TO ACCEPT BACK) - # self.send_presence(ptype="subscribe", pto=sender_jid, pfrom=component_jid) - # log.info(f"Sent presence subscription request from {component_jid} to {sender_jid}") - - # Add sender to roster - # self.update_roster(sender_jid, name=sender_jid.split("@")[0]) - # log.info(f"Added {sender_jid} to roster.") - - # Send presence update to sender **from the correct JID** - self.send_presence(ptype="available", pto=sender_jid, pfrom=component_jid) - log.info(f"Sent presence update from {component_jid} to {sender_jid}") - - - except (User.DoesNotExist, Person.DoesNotExist, PersonIdentifier.DoesNotExist): - # If any lookup fails, reject the subscription - log.warning(f"Subscription request from {sender_jid} rejected (recipient does not have this contact).") - self.send_presence(ptype="unsubscribed", pto=sender_jid) - - - def on_presence_subscribed(self, pres): - """ - Handle successful subscription confirmations. - """ - log.info(f"Subscription to {pres['from']} was accepted.") - - def on_presence_unsubscribe(self, pres): - """ - Handle when a user unsubscribes from presence updates. - """ - log.info(f"User {pres['from']} has unsubscribed from presence updates.") - - def on_presence_unsubscribed(self, pres): - """ - Handle when a user's unsubscription request is confirmed. - """ - log.info(f"Unsubscription from {pres['from']} confirmed.") - - def on_roster_subscription_request(self, pres): - """ - Handle roster subscription requests. - """ - log.info(f"New roster subscription request from {pres['from']}.") - - def session_start(self, *args): - log.info("XMPP session started") - - def on_disconnected(self, *args): - """ - Handles XMPP disconnection and triggers a reconnect loop. - """ - log.warning("XMPP disconnected, attempting to reconnect...") - self.connect() - - def session_start(self, *args): - log.info(f"Session started: {args}") - - async def request_upload_slot(self, recipient, filename, content_type, size): - """ - Requests an upload slot from XMPP for HTTP File Upload (XEP-0363). - - Args: - recipient (str): The JID of the recipient. - filename (str): The filename for the upload. - content_type (str): The file's MIME type. - size (int): The file size in bytes. - - Returns: - tuple | None: (upload_url, put_url, auth_header) or None if failed. - """ - # upload_service = await self['xep_0363'].find_upload_service() - # if not upload_service: - # log.error("No XEP-0363 upload service found.") - # return None - - #log.info(f"Upload service: {upload_service}") - - upload_service_jid = "share.zm.is" - - try: - slot = await self['xep_0363'].request_slot( - jid=upload_service_jid, - filename=filename, - content_type=content_type, - size=size - ) - - if slot is None: - log.error(f"Failed to obtain upload slot for {filename}") - return None - - # Parse the XML response - root = ET.fromstring(str(slot)) # Convert to string if necessary - namespace = "{urn:xmpp:http:upload:0}" # Define the namespace - - get_url = root.find(f".//{namespace}get").attrib.get("url") - put_element = root.find(f".//{namespace}put") - put_url = put_element.attrib.get("url") - - # Extract the Authorization header correctly - header_element = put_element.find(f"./{namespace}header[@name='Authorization']") - auth_header = header_element.text.strip() if header_element is not None else None - - if not get_url or not put_url: - log.error(f"Missing URLs in upload slot: {slot}") - return None - - return get_url, put_url, auth_header - - except Exception as e: - log.error(f"Exception while requesting upload slot: {e}") - return None - - - async def message(self, msg): - """ - Process incoming XMPP messages. - """ - - sym = lambda x: msg.reply(f"[>] {x}").send() - # log.info(f"Received message: {msg}") - - # Extract sender JID (full format: user@domain/resource) - sender_jid = str(msg["from"]) - - # Split into username@domain and optional resource - sender_parts = sender_jid.split("/", 1) - sender_bare_jid = sender_parts[0] # Always present: user@domain - sender_username, sender_domain = sender_bare_jid.split("@", 1) - - sender_resource = sender_parts[1] if len(sender_parts) > 1 else None # Extract resource if present - - # Extract recipient JID (should match component JID format) - recipient_jid = str(msg["to"]) - - if "@" in recipient_jid: - recipient_username, recipient_domain = recipient_jid.split("@", 1) - else: - recipient_username = recipient_jid - recipient_domain = recipient_jid - - # Extract message body - body = msg["body"] if msg["body"] else "[No Body]" - - attachments = [] - log.info(f"Full XMPP Message: {ET.tostring(msg.xml, encoding='unicode')}") - - # Extract attachments from standard XMPP (if present) - for att in msg.xml.findall(".//{urn:xmpp:attachments}attachment"): - attachments.append({ - "url": att.attrib.get("url"), - "filename": att.attrib.get("filename"), - "content_type": att.attrib.get("content_type"), - }) - - # Extract attachments from XEP-0066 format (Out of Band Data) - for oob in msg.xml.findall(".//{jabber:x:oob}x/{jabber:x:oob}url"): - attachments.append({ - "url": oob.text, - "filename": oob.text.split("/")[-1], # Extract filename from URL - "content_type": "application/octet-stream", # Generic content-type - }) - - log.info(f"Extracted {len(attachments)} attachments from XMPP message.") - # Log extracted information with variable name annotations - log_message = ( - f"Sender JID: {sender_jid}, Sender Username: {sender_username}, Sender Domain: {sender_domain}, " - f"Sender Resource: {sender_resource if sender_resource else '[No Resource]'}, " - f"Recipient JID: {recipient_jid}, Recipient Username: {recipient_username}, Recipient Domain: {recipient_domain}, " - f"Body: {body}" - ) - log.info(log_message) - - # Ensure recipient domain matches our configured component - expected_domain = settings.XMPP_JID # 'jews.zm.is' in your config - if recipient_domain != expected_domain: - log.warning(f"Invalid recipient domain: {recipient_domain}, expected {expected_domain}") - return - - # Lookup sender in Django's User model - try: - sender_user = User.objects.get(username=sender_username) - except User.DoesNotExist: - log.warning(f"Unknown sender: {sender_username}") - return - - if recipient_jid == settings.XMPP_JID: - log.info("Message to JID") - if body.startswith("."): - # Messaging the gateway directly - if body == ".contacts": - # Lookup Person objects linked to sender - persons = Person.objects.filter(user=sender_user) - if not persons.exists(): - log.info(f"No contacts found for {sender_username}") - sym("No contacts found.") - return - - # Construct contact list response - contact_names = [person.name for person in persons] - response_text = f"Contacts: " + ", ".join(contact_names) - sym(response_text) - elif body == ".whoami": - sym(str(sender_user.__dict__)) - else: - sym("No such command") - else: - log.info("Other message") - if "|" in recipient_username: - recipient_name, recipient_service = recipient_username.split("|") - - recipient_name = recipient_name.title() - - else: - recipient_name = recipient_username - recipient_service = None - - recipient_name = recipient_name.title() - - try: - person = Person.objects.get(user=sender_user, name=recipient_name) - except Person.DoesNotExist: - sym("This person does not exist.") - - if recipient_service: - try: - identifier = PersonIdentifier.objects.get(user=sender_user, - person=person, - service=recipient_service) - except PersonIdentifier.DoesNotExist: - sym("This service identifier does not exist.") - else: - # Get a random identifier - identifier = PersonIdentifier.objects.filter(user=sender_user, - person=person).first() - recipient_service = identifier.service - - # sym(str(person.__dict__)) - # sym(f"Service: {recipient_service}") - - #tss = await identifier.send(body, attachments=attachments) - # AM FIXING https://git.zm.is/XF/GIA/issues/5 - session, _ = await sync_to_async(ChatSession.objects.get_or_create)( - identifier=identifier, - user=user, - ) - log.info(f"Component history store message {text}") - await history.store_message( - session=session, - sender="XMPP", - text=text, - ts=now().timestamp(), - outgoing=detail.is_outgoing_message, - ) - log.info("Stored a message sent from XMPP in the history.") - - tss = await signalapi.send_message_raw( - identifier.identifier, - body, - attachments, - ) - log.info(f"Message sent") - - async def request_upload_slots(self, recipient_jid, attachments): - """Requests upload slots for multiple attachments concurrently.""" - upload_tasks = [ - self.request_upload_slot(recipient_jid, att["filename"], att["content_type"], att["size"]) - for att in attachments - ] - upload_slots = await asyncio.gather(*upload_tasks) - - return [(att, slot) for att, slot in zip(attachments, upload_slots) if slot is not None] - - async def upload_and_send(self, att, upload_slot, recipient_jid, sender_jid): - """Uploads a file and immediately sends the corresponding XMPP message.""" - upload_url, put_url, auth_header = upload_slot - headers = {"Content-Type": att["content_type"]} - if auth_header: - headers["Authorization"] = auth_header - - async with aiohttp.ClientSession() as session: - try: - async with session.put(put_url, data=att["content"], headers=headers) as response: - if response.status not in (200, 201): - log.error(f"Upload failed: {response.status} {await response.text()}") - return - log.info(f"Successfully uploaded {att['filename']} to {upload_url}") - - # Send XMPP message immediately after successful upload - await self.send_xmpp_message(recipient_jid, sender_jid, upload_url, attachment_url=upload_url) - - except Exception as e: - log.error(f"Error uploading {att['filename']} to XMPP: {e}") - - async def send_xmpp_message(self, recipient_jid, sender_jid, body_text, attachment_url=None): - """Sends an XMPP message with either text or an attachment URL.""" - msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") - msg["body"] = body_text # Body must contain only text or the URL - - if attachment_url: - # Include (XEP-0066) to ensure client compatibility - oob_element = ET.Element("{jabber:x:oob}x") - url_element = ET.SubElement(oob_element, "{jabber:x:oob}url") - url_element.text = attachment_url - msg.xml.append(oob_element) - - log.info(f"Sending XMPP message: {msg.xml}") - msg.send() - - async def send_from_external(self, user, person_identifier, text, detail, attachments=[]): - """Handles sending XMPP messages with text and attachments.""" - - if detail.is_outgoing_message: - return - - - sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" - recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" - - # Step 1: Send text message separately - if text: - await self.send_xmpp_message(recipient_jid, sender_jid, text) - - if not attachments: - return # No attachments to process - - # Step 2: Request upload slots concurrently - valid_uploads = await self.request_upload_slots(recipient_jid, attachments) - log.info(f"Got upload slots") - if not valid_uploads: - log.warning("No valid upload slots obtained.") - #return - - # Step 3: Upload each file and send its message immediately after upload - upload_tasks = [ - self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads - ] - await asyncio.gather(*upload_tasks) # Upload files concurrently async def stream(**kwargs): pubsub = redis.pubsub() diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index b6f3cf5..c88307d 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -25,210 +25,6 @@ log = logs.get_logger("processing") redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) -class HandleMessage(Command): - async def handle(self, c: Context): - msg = { - "source": c.message.source, - "source_number": c.message.source_number, - "source_uuid": c.message.source_uuid, - "timestamp": c.message.timestamp, - "type": c.message.type.value, - "text": c.message.text, - "group": c.message.group, - "reaction": c.message.reaction, - "mentions": c.message.mentions, - "raw_message": c.message.raw_message - } - raw = json.loads(c.message.raw_message) - print(json.dumps(c.message.raw_message, indent=2)) - #dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") - dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") - - #account = c.message.raw_message.get("account", "") - account = raw.get("account", "") - #source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "") - source_name = raw.get("envelope", {}).get("sourceName", "") - - source_number = c.message.source_number - source_uuid = c.message.source_uuid - text = c.message.text - ts = c.message.timestamp - - # Message originating from us - same_recipient = source_uuid == dest - - is_from_bot = source_uuid == c.bot.bot_uuid - is_to_bot = dest == c.bot.bot_uuid or dest is None - - reply_to_self = same_recipient and is_from_bot # Reply - reply_to_others = is_to_bot and not same_recipient # Reply - is_outgoing_message = is_from_bot and not is_to_bot # Do not reply - - # Determine the identifier to use - identifier_uuid = dest if is_from_bot else source_uuid - - - # Handle attachments - attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) - attachment_list = [] - for attachment in attachments: - attachment_list.append({ - "id": attachment["id"], - "content_type": attachment["contentType"], - "filename": attachment["filename"], - "size": attachment["size"], - "width": attachment.get("width"), - "height": attachment.get("height"), - }) - - cast = { - "type": "def", - "method": "xmpp", - "service": "signal", - # "sender": source_uuid, - "identifier": identifier_uuid, - "msg": text, - "attachments": attachment_list, - "detail": { - "reply_to_self": reply_to_self, - "reply_to_others": reply_to_others, - "is_outgoing_message": is_outgoing_message, - } - } - packed = msgpack.packb(cast, use_bin_type=True) - await redis.publish("component", packed) - - # TODO: Permission checks - manips = await sync_to_async(list)( - Manipulation.objects.filter(enabled=True) - ) - processed_people = set() - for manip in manips: - try: - person_identifier = await sync_to_async(PersonIdentifier.objects.get)( - identifier=identifier_uuid, - user=manip.user, - service="signal", - person__in=manip.group.people.all(), - ) - # Check if we've already processed this person - if person_identifier.person.id in processed_people: - log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}") - continue # Skip to next manipulation - if not manip.group.people.filter(id=person_identifier.person.id).exists(): - log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.") - continue # Exit early if the person is not in the group - except PersonIdentifier.DoesNotExist: - log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") - continue # Exit early if no valid identifier is found - - # Find or create the corresponding ChatSession - chat_session = await history.get_chat_session(manip.user, person_identifier) - - # Store incoming or outgoing messages - log.info(f"Processing history store message {text}") - processed_people.add(person_identifier.person.id) - await history.store_message( - session=chat_session, - sender=source_uuid, - text=text, - ts=ts, - outgoing=is_from_bot, - ) - - # Get the total history - chat_history = await history.get_chat_history(chat_session) - - if replies.should_reply( - reply_to_self, - reply_to_others, - is_outgoing_message, - ): - if manip.mode != "silent": - await utils.update_last_interaction(chat_session) - prompt = replies.generate_reply_prompt( - msg, - person_identifier.person, - manip, - chat_history - ) - - log.info("Running context prompt") - result = await ai.run_prompt(prompt, manip.ai) - # Store bot's AI response with a +1s timestamp if manip.mode == "active": - await history.store_own_message( - session=chat_session, - text=result, - ts=ts + 1, - ) - # await natural.natural_send_message(c, result) - tss = await natural.natural_send_message( - result, - c.send, - c.start_typing, - c.stop_typing, - ) - elif manip.mode == "notify": - title = f"[GIA] Suggested message to {person_identifier.person.name}" - manip.user.sendmsg(result, title=title) - elif manip.mode == "instant": - # Delete all other QueuedMessages - existing_queue = QueuedMessage.objects.filter( - user=chat_session.user, - session=chat_session, - manipulation=manip, - custom_author="BOT", - ) - - await delete_messages(existing_queue) - qm = await history.store_own_message( - session=chat_session, - text=result, - ts=ts + 1, - manip=manip, - queue=True, - - ) - accept = reverse( - "message_accept_api", kwargs={"message_id":qm.id} - ) - reject = reverse( - "message_reject_api", kwargs={"message_id":qm.id} - ) - url = settings.URL - content = ( - f"{result}\n\n" - f"Accept: {url}{accept}\n" - f"Reject: {url}{reject}" - ) - title = f"[GIA] Suggested message to {person_identifier.person.name}" - manip.user.sendmsg(content, title=title) - else: - log.error(f"Mode {manip.mode} is not implemented") - - # Manage truncation & summarization - await truncate_and_summarize(chat_session, manip.ai) - # END FOR - - try: - existing_chat = Chat.objects.get( - source_uuid=source_uuid - ) - # if existing_chat.ts != ts: - # print("not equal", existing_chat.ts, ts) - # existing_chat.ts = ts - # existing_chat.save() - existing_chat.source_number = source_number - existing_chat.source_name = source_name - existing_chat.save() - except Chat.DoesNotExist: - existing_chat = Chat.objects.create( - source_number=source_number, - source_uuid=source_uuid, - source_name=source_name, - account=account, - ) - # async def stream(): pubsub = redis.pubsub() diff --git a/core/messaging/replies.py b/core/messaging/replies.py index 2e6fb1b..8261a9e 100644 --- a/core/messaging/replies.py +++ b/core/messaging/replies.py @@ -7,6 +7,8 @@ import asyncio from django.utils import timezone import random +log = logs.get_logger("replies") + def should_reply( reply_to_self, reply_to_others, @@ -24,7 +26,65 @@ def should_reply( return reply -def generate_reply_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str): +def generate_mutate_reply_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str, mutate: bool = False): + """ + Strictly rewrites the message in the persona’s tone and style + while keeping the original meaning. No added explanations. + """ + + persona = manip.persona + + # 🔹 **Strict Rules to Prevent AI from Adding Commentary** + strict_rules = ( + "- **DO NOT add explanations, comments, or meta-thoughts**.\n" + "- **DO NOT return multiple responses—return ONLY the rewritten message**.\n" + "- **DO NOT change the meaning, intent, or facts in the message**.\n" + "- **DO NOT soften insults unless the persona naturally would**.\n" + "- **DO NOT reframe as a question or suggestion—this is NOT a conversation**.\n" + "- **Rewrite as if the original sender wrote it this way, without extra commentary**.\n" + "- **Start immediately with the rewritten message—NO preface, intro, or context.**\n" + ) + + # 🔹 **What the AI SHOULD do** + transformation_guidelines = ( + "- **Rewrite the message in the persona’s unique tone and style**.\n" + "- **If the message is rude or harsh, reword it to match the persona’s confidence, cleverness, or wit**.\n" + "- **If the persona is sarcastic, teasing, or flirty, maintain that energy**.\n" + "- **Ensure the message feels natural, as if originally written that way**.\n" + "- **Preserve original sentence structure as much as possible, adjusting only for flow.**\n" + ) + + system_message = ( + "You are a text rewriter. Your task is to transform messages into a given persona’s unique style, " + "while keeping the original meaning intact.\n\n" + "### Persona Profile ###\n" + f"- **Tone:** {persona.tone} | **Humor:** {persona.humor_style}\n" + f"- **Core Values:** {persona.core_values}\n" + f"- **Communication Style:** {persona.communication_style}\n" + f"- **Flirting Style:** {persona.flirting_style}\n" + f"- **Likes:** {persona.likes} | **Dislikes:** {persona.dislikes}\n" + f"- **Response Tactics:** {persona.response_tactics}\n" + f"- **Persuasion Techniques:** {persona.persuasion_tactics}\n" + f"- **Boundaries:** {persona.boundaries} | **Adaptability:** {persona.adaptability}%\n\n" + + "### STRICT RULES ###\n" + f"{strict_rules}\n\n" + + "### TRANSFORMATION GUIDELINES ###\n" + f"{transformation_guidelines}\n\n" + + "### Original Message ###\n" + f"{msg}\n\n" + + "### Rewritten Message ###\n" + "(DO NOT include anything except the rewritten text. NO extra comments or formatting.)" + ) + + return [{"role": "system", "content": system_message}] + + + +def generate_reply_prompt(msg: dict, person: Person, manip: Manipulation, chat_history: str, mutate: bool = False): """ Generate a structured prompt using the attributes of the provided Person and Manipulation models. """ @@ -32,9 +92,23 @@ def generate_reply_prompt(msg: dict, person: Person, manip: Manipulation, chat_h now = timezone.now() persona = manip.persona + # 🔹 Define system instructions for filtering messages + filter_rules = ( + "- **Strict Filtering**: If the message includes topics that are in dislikes or boundaries, " + "modify or reject them completely to match the persona's values.\n" + "- **Rephrase Sensitively**: If the message contains something the persona dislikes but is not " + "a strict boundary, soften it or remove the negative elements.\n" + "- **Enforce Persona Style**: Modify the tone, humor, and engagement level based on the persona's " + "communication style.\n" + "- **Never Ask for Clarifications**: Always modify the message directly without requesting clarification.\n" + "- **Return Only the Modified Message**: Do not add any explanations or metadata.\n" + ) + system_message = ( "You are my digital persona, responding on my behalf while embodying my personality, preferences, and unique style.\n\n" - + "You must strictly apply the following persona-based filtering rules when modifying the message:\n\n" + f"{filter_rules}\n\n" + "### Persona Profile ###\n" f"- **MBTI:** {persona.mbti} ({persona.mbti_identity} balance)\n" f"- **Tone:** {persona.tone} | **Humor:** {persona.humor_style}\n" @@ -57,19 +131,19 @@ def generate_reply_prompt(msg: dict, person: Person, manip: Manipulation, chat_h "### Conversation Context ###\n" f"{chat_history if chat_history else 'No prior chat history.'}\n\n" - - "### Response Guidelines ###\n" - "- **Engagement**: Keep responses engaging, with a balance of wit, depth, and confidence.\n" - "- **Flirting**: Be direct, playful, and, when appropriate, subtly provocative—without hesitation.\n" - "- **Pauses**: Use double newlines (`\\n\\n`) to pause where it enhances realism.\n" - "- **Flow Awareness**: Maintain continuity, avoid redundancy, and adjust response length based on interaction.\n" ) - user_message = f"[{msg['timestamp']}] <{person.name}> {msg['text']}" + if not mutate: + user_message = f"[{msg['timestamp']}] <{person.name}> {msg['text']}" + log.info(f"User message: {user_message}") - return [ - {"role": "system", "content": system_message}, - {"role": "user", "content": user_message}, - ] - - \ No newline at end of file + return [ + {"role": "system", "content": system_message}, + {"role": "user", "content": user_message}, + ] + else: + user_message = f"Message to amend: {msg}" + return [ + {"role": "system", "content": system_message}, + {"role": "user", "content": user_message}, + ] diff --git a/core/migrations/0015_manipulation_filter_enabled_alter_manipulation_mode.py b/core/migrations/0015_manipulation_filter_enabled_alter_manipulation_mode.py new file mode 100644 index 0000000..d8587a7 --- /dev/null +++ b/core/migrations/0015_manipulation_filter_enabled_alter_manipulation_mode.py @@ -0,0 +1,23 @@ +# Generated by Django 5.1.6 on 2025-03-13 21:07 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0014_queuedmessage'), + ] + + operations = [ + migrations.AddField( + model_name='manipulation', + name='filter_enabled', + field=models.BooleanField(default=False), + ), + migrations.AlterField( + model_name='manipulation', + name='mode', + field=models.CharField(blank=True, choices=[('active', 'Send replies to messages'), ('instant', 'Click link to send reply'), ('prospective', 'Click link to open page'), ('notify', 'Send notification of ideal reply only'), ('mutate', 'Change messages sent on XMPP using the persona'), ('silent', 'Do not generate or send replies')], max_length=50, null=True), + ), + ] diff --git a/core/models.py b/core/models.py index e2c501c..59d3cfd 100644 --- a/core/models.py +++ b/core/models.py @@ -249,6 +249,7 @@ class Manipulation(models.Model): ai = models.ForeignKey(AI, on_delete=models.CASCADE) persona = models.ForeignKey(Persona, on_delete=models.CASCADE) enabled = models.BooleanField(default=False) + filter_enabled = models.BooleanField(default=False) mode = models.CharField( max_length=50, choices=[ @@ -256,6 +257,7 @@ class Manipulation(models.Model): ("instant", "Click link to send reply"), ("prospective", "Click link to open page"), ("notify", "Send notification of ideal reply only"), + ("mutate", "Change messages sent on XMPP using the persona"), ("silent", "Do not generate or send replies"), ], blank=True, null=True diff --git a/core/templates/partials/manipulation-list.html b/core/templates/partials/manipulation-list.html index ff2ed03..2184daf 100644 --- a/core/templates/partials/manipulation-list.html +++ b/core/templates/partials/manipulation-list.html @@ -18,6 +18,7 @@ persona enabled mode + filter actions {% for item in object_list %} @@ -47,6 +48,17 @@ {% endif %} {{ item.mode }} + + {% if item.filter %} + + + + {% else %} + + + + {% endif %} +