Fix Signal protocol startup and manipulation iteration dedupe

This commit is contained in:
2026-02-14 22:21:41 +00:00
parent f0c4b350a9
commit d22eb8c811

View File

@@ -1,60 +1,63 @@
from django.conf import settings
from core.messaging import natural
import aiohttp
from core.util import logs
from core.clients import ClientBase
from signalbot import SignalBot
import aiohttp
import msgpack
from django.conf import settings
from signalbot import SignalBot, Command, Context
from asgiref.sync import sync_to_async
from django.urls import reverse
import json
import asyncio import asyncio
from core.util import logs import json
from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages
from core.lib import deferred
from core.messaging import replies, ai, natural, history, utils
from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage
import aiohttp import aiohttp
from asgiref.sync import sync_to_async
from django.conf import settings from django.conf import settings
from redis import asyncio as aioredis from django.urls import reverse
from core.clients import signalapi from signalbot import Command, Context, SignalBot
from core.clients import ClientBase, signalapi
from core.lib.prompts.functions import delete_messages, truncate_and_summarize
from core.messaging import ai, history, natural, replies, utils
from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage
from core.util import logs from core.util import logs
log = logs.get_logger("signalF") log = logs.get_logger("signalF")
SIGNAL_URL = "signal:8080" if settings.DEBUG:
SIGNAL_HOST = "127.0.0.1"
else:
SIGNAL_HOST = "signal"
redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10) SIGNAL_PORT = 8080
SIGNAL_URL = f"{SIGNAL_HOST}:{SIGNAL_PORT}"
class NewSignalBot(SignalBot): class NewSignalBot(SignalBot):
def __init__(self, ur, service, config): def __init__(self, ur, service, config):
self.ur = ur self.ur = ur
self.service = service self.service = service
self.signal_rest = config["signal_service"] # keep your own copy
self.phone_number = config["phone_number"]
super().__init__(config) super().__init__(config)
self.log = logs.get_logger("signalI") self.log = logs.get_logger("signalI")
self.bot_uuid = None # Initialize with None self.bot_uuid = None
async def get_own_uuid(self) -> str: async def get_own_uuid(self) -> str | None:
"""Fetch bot's UUID by checking contacts, groups, or profile."""
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
uri_contacts = f"http://{self._signal.signal_service}/v1/contacts/{self._signal.phone_number}" # config may be "signal:8080" -- ensure http://
base = self.signal_rest
if not base.startswith("http"):
base = f"http://{base}"
uri = f"{base}/v1/contacts/{self.phone_number}"
try: try:
resp = await session.get(uri_contacts) resp = await session.get(uri)
if resp.status == 200: if resp.status != 200:
contacts_data = await resp.json() self.log.error(f"contacts lookup failed: {resp.status} {await resp.text()}")
if isinstance(contacts_data, list): return None
for contact in contacts_data:
if contact.get("number") == self._phone_number: contacts_data = await resp.json()
return contact.get("uuid") if isinstance(contacts_data, list):
for contact in contacts_data:
if contact.get("number") == self.phone_number:
return contact.get("uuid")
return None
except Exception as e: except Exception as e:
self.log.error(f"Failed to get UUID from contacts: {e}") self.log.error(f"Failed to get UUID from contacts: {e}")
return None
async def initialize_bot(self): async def initialize_bot(self):
"""Fetch bot's UUID and store it in self.bot_uuid.""" """Fetch bot's UUID and store it in self.bot_uuid."""
@@ -67,13 +70,24 @@ class NewSignalBot(SignalBot):
except Exception as e: except Exception as e:
self.log.error(f"Failed to initialize bot UUID: {e}") self.log.error(f"Failed to initialize bot UUID: {e}")
def start(self): async def _async_post_init(self):
"""Start bot without blocking event loop.""" """
self._event_loop.create_task(self.initialize_bot()) # Fetch UUID first Preserve SignalBot startup flow so protocol auto-detection runs.
self._event_loop.create_task(self._detect_groups()) # Sync groups This flips the client to plain HTTP/WS when HTTPS/WSS is unavailable.
self._event_loop.create_task(self._produce_consume_messages()) # Process messages """
await self._check_signal_service()
await self.initialize_bot()
await self._detect_groups()
await self._resolve_commands()
await self._produce_consume_messages()
self.scheduler.start() # Start async job scheduler def start(self):
"""Start bot without blocking the caller's event loop."""
task = self._event_loop.create_task(
self._rerun_on_exception(self._async_post_init)
)
self._store_reference_to_task(task, self._running_tasks)
self.scheduler.start()
class HandleMessage(Command): class HandleMessage(Command):
@@ -95,13 +109,9 @@ class HandleMessage(Command):
"raw_message": c.message.raw_message "raw_message": c.message.raw_message
} }
raw = json.loads(c.message.raw_message) raw = json.loads(c.message.raw_message)
print(json.dumps(c.message.raw_message, indent=2))
#dest = c.message.raw_message.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid")
dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid") dest = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("destinationUuid")
#account = c.message.raw_message.get("account", "")
account = raw.get("account", "") account = raw.get("account", "")
#source_name = msg["raw_message"].get("envelope", {}).get("sourceName", "")
source_name = raw.get("envelope", {}).get("sourceName", "") source_name = raw.get("envelope", {}).get("sourceName", "")
source_number = c.message.source_number source_number = c.message.source_number
@@ -121,7 +131,9 @@ class HandleMessage(Command):
# Determine the identifier to use # Determine the identifier to use
identifier_uuid = dest if is_from_bot else source_uuid identifier_uuid = dest if is_from_bot else source_uuid
if not identifier_uuid:
log.warning("No Signal identifier available for message routing.")
return
# Handle attachments # Handle attachments
attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", []) attachments = raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}).get("attachments", [])
@@ -138,23 +150,21 @@ class HandleMessage(Command):
"height": attachment.get("height"), "height": attachment.get("height"),
}) })
# Get User from identifier # Get users/person identifiers for this Signal sender/recipient.
log.info(f"FUCK {self.service}") identifiers = await sync_to_async(list)(
identifiers = PersonIdentifier.objects.filter( PersonIdentifier.objects.filter(
identifier=identifier_uuid, identifier=identifier_uuid,
service=self.service, service=self.service,
)
) )
xmpp_attachments = [] xmpp_attachments = []
# attachments = []
# Asynchronously fetch all attachments # Asynchronously fetch all attachments
tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list] tasks = [signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list]
fetched_attachments = await asyncio.gather(*tasks) fetched_attachments = await asyncio.gather(*tasks)
log.info(f"ATTACHMENT LIST {attachment_list}") log.info(f"ATTACHMENT LIST {attachment_list}")
log.info(f"FETCHED ATTACHMENTS {fetched_attachments}")
for fetched, att in zip(fetched_attachments, attachment_list): for fetched, att in zip(fetched_attachments, attachment_list):
log.info(f"ITER {fetched} {att}")
if not fetched: if not fetched:
log.warning(f"Failed to fetch attachment {att['id']} from Signal.") log.warning(f"Failed to fetch attachment {att['id']} from Signal.")
continue continue
@@ -166,52 +176,54 @@ class HandleMessage(Command):
"filename": fetched["filename"], "filename": fetched["filename"],
"size": fetched["size"], "size": fetched["size"],
}) })
# Forward incoming Signal messages to XMPP and apply mutate rules.
for identifier in identifiers: for identifier in identifiers:
#recipient_jid = f"{identifier.user.username}@{settings.XMPP_ADDRESS}"
user = identifier.user user = identifier.user
manipulations = Manipulation.objects.filter( mutate_manips = await sync_to_async(list)(
group__people=identifier.person, Manipulation.objects.filter(
user=identifier.user, group__people=identifier.person,
#mode="mutate", user=identifier.user,
filter_enabled=True, mode="mutate",
enabled=True, filter_enabled=True,
) enabled=True,
# chat_history = await history.get_chat_history(session) )
# await utils.update_last_interaction(session) )
if manipulations: if mutate_manips:
manip = manipulations.first() for manip in mutate_manips:
prompt = replies.generate_mutate_reply_prompt( prompt = replies.generate_mutate_reply_prompt(
text, text,
None, None,
manip, manip,
None, None,
) )
log.info("Running Signal mutate prompt")
result = await ai.run_prompt(prompt, manip.ai)
log.info("Running Signal context prompt") log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.")
result = await ai.run_prompt(prompt, manip.ai) await self.ur.xmpp.client.send_from_external(
log.info(f"RESULT {result}") user,
# await history.store_own_message( identifier,
# session=session, result,
# text=result, is_outgoing_message,
# ts=int(now().timestamp() * 1000), attachments=xmpp_attachments,
# ) )
else:
log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.")
await self.ur.xmpp.client.send_from_external(user, identifier, result, is_outgoing_message, attachments=xmpp_attachments) await self.ur.xmpp.client.send_from_external(
user,
if not manipulations.exists(): identifier,
log.info(f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP.") text,
await self.ur.xmpp.client.send_from_external(user, identifier, text, is_outgoing_message, attachments=xmpp_attachments) is_outgoing_message,
attachments=xmpp_attachments,
#### )
# TODO: Permission checks # TODO: Permission checks
manips = await sync_to_async(list)( manips = await sync_to_async(list)(
Manipulation.objects.filter(enabled=True) Manipulation.objects.filter(enabled=True)
) )
processed_people = set() session_cache = {}
stored_messages = set()
for manip in manips: for manip in manips:
try: try:
person_identifier = await sync_to_async(PersonIdentifier.objects.get)( person_identifier = await sync_to_async(PersonIdentifier.objects.get)(
@@ -220,30 +232,30 @@ class HandleMessage(Command):
service="signal", service="signal",
person__in=manip.group.people.all(), person__in=manip.group.people.all(),
) )
# Check if we've already processed this person
if person_identifier.person.id in processed_people:
log.warning(f"Skipping duplicate message storage for {person_identifier.person.name}")
continue # Skip to next manipulation
if not manip.group.people.filter(id=person_identifier.person.id).exists():
log.error(f"{manip.name}: Identifier {identifier_uuid} found, but person {person_identifier.person} is not in manip group. Skipping.")
continue # Exit early if the person is not in the group
except PersonIdentifier.DoesNotExist: except PersonIdentifier.DoesNotExist:
log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid} - Not storing.") log.warning(f"{manip.name}: Message from unknown identifier {identifier_uuid}.")
continue # Exit early if no valid identifier is found continue
# Find or create the corresponding ChatSession # Find/create ChatSession once per user/person.
chat_session = await history.get_chat_session(manip.user, person_identifier) session_key = (manip.user.id, person_identifier.person.id)
if session_key in session_cache:
chat_session = session_cache[session_key]
else:
chat_session = await history.get_chat_session(manip.user, person_identifier)
session_cache[session_key] = chat_session
# Store incoming or outgoing messages # Store each incoming/outgoing event once per session.
log.info(f"Processing history store message {text}") message_key = (chat_session.id, ts, source_uuid)
processed_people.add(person_identifier.person.id) if message_key not in stored_messages:
await history.store_message( log.info(f"Processing history store message {text}")
session=chat_session, await history.store_message(
sender=source_uuid, session=chat_session,
text=text, sender=source_uuid,
ts=ts, text=text,
outgoing=is_from_bot, ts=ts,
) outgoing=is_from_bot,
)
stored_messages.add(message_key)
# Get the total history # Get the total history
chat_history = await history.get_chat_history(chat_session) chat_history = await history.get_chat_history(chat_session)
@@ -253,7 +265,9 @@ class HandleMessage(Command):
reply_to_others, reply_to_others,
is_outgoing_message, is_outgoing_message,
): ):
if manip.mode not in ["silent", "mutate"]: if manip.mode in ["silent", "mutate"]:
pass
elif manip.mode in ["active", "notify", "instant"]:
await utils.update_last_interaction(chat_session) await utils.update_last_interaction(chat_session)
prompt = replies.generate_reply_prompt( prompt = replies.generate_reply_prompt(
msg, msg,
@@ -264,81 +278,72 @@ class HandleMessage(Command):
log.info("Running context prompt") log.info("Running context prompt")
result = await ai.run_prompt(prompt, manip.ai) result = await ai.run_prompt(prompt, manip.ai)
# Store bot's AI response with a +1s timestamp if manip.mode == "active": if manip.mode == "active":
await history.store_own_message( await history.store_own_message(
session=chat_session, session=chat_session,
text=result, text=result,
ts=ts + 1, ts=ts + 1,
) )
# await natural.natural_send_message(c, result) await self.ur.xmpp.client.send_from_external(
await self.ur.xmpp.client.send_from_external(manip.user, person_identifier, result, is_outgoing_message=True) manip.user,
tss = await natural.natural_send_message( person_identifier,
result, result,
c.send, is_outgoing_message=True,
c.start_typing, )
c.stop_typing, await natural.natural_send_message(
) result,
elif manip.mode == "notify": c.send,
title = f"[GIA] Suggested message to {person_identifier.person.name}" c.start_typing,
manip.user.sendmsg(result, title=title) c.stop_typing,
elif manip.mode == "instant": )
# Delete all other QueuedMessages elif manip.mode == "notify":
existing_queue = QueuedMessage.objects.filter( title = f"[GIA] Suggested message to {person_identifier.person.name}"
user=chat_session.user, manip.user.sendmsg(result, title=title)
session=chat_session, elif manip.mode == "instant":
manipulation=manip, existing_queue = QueuedMessage.objects.filter(
custom_author="BOT", user=chat_session.user,
) session=chat_session,
manipulation=manip,
custom_author="BOT",
)
await delete_messages(existing_queue) await delete_messages(existing_queue)
qm = await history.store_own_message( qm = await history.store_own_message(
session=chat_session, session=chat_session,
text=result, text=result,
ts=ts + 1, ts=ts + 1,
manip=manip, manip=manip,
queue=True, queue=True,
) )
accept = reverse( accept = reverse(
"message_accept_api", kwargs={"message_id":qm.id} "message_accept_api", kwargs={"message_id": qm.id}
) )
reject = reverse( reject = reverse(
"message_reject_api", kwargs={"message_id":qm.id} "message_reject_api", kwargs={"message_id": qm.id}
) )
url = settings.URL url = settings.URL
content = ( content = (
f"{result}\n\n" f"{result}\n\n"
f"Accept: {url}{accept}\n" f"Accept: {url}{accept}\n"
f"Reject: {url}{reject}" f"Reject: {url}{reject}"
) )
title = f"[GIA] Suggested message to {person_identifier.person.name}" title = f"[GIA] Suggested message to {person_identifier.person.name}"
manip.user.sendmsg(content, title=title) manip.user.sendmsg(content, title=title)
else: else:
log.error(f"Mode {manip.mode} is not implemented") log.error(f"Mode {manip.mode} is not implemented")
# Manage truncation & summarization # Manage truncation & summarization
await truncate_and_summarize(chat_session, manip.ai) await truncate_and_summarize(chat_session, manip.ai)
# END FOR
try: await sync_to_async(Chat.objects.update_or_create)(
existing_chat = Chat.objects.get( source_uuid=source_uuid,
source_uuid=source_uuid defaults={
) "source_number": source_number,
# if existing_chat.ts != ts: "source_name": source_name,
# print("not equal", existing_chat.ts, ts) "account": account,
# existing_chat.ts = ts },
# existing_chat.save() )
existing_chat.source_number = source_number
existing_chat.source_name = source_name
existing_chat.save()
except Chat.DoesNotExist:
existing_chat = Chat.objects.create(
source_number=source_number,
source_uuid=source_uuid,
source_name=source_name,
account=account,
)
#
class SignalClient(ClientBase): class SignalClient(ClientBase):