Files
GIA/core/clients/signal.py

743 lines
28 KiB
Python

import asyncio
import json
import time
from urllib.parse import quote_plus, urlparse
import aiohttp
from asgiref.sync import sync_to_async
from django.conf import settings
from django.urls import reverse
from signalbot import Command, Context, SignalBot
from core.clients import ClientBase, signalapi
from core.messaging import ai, history, media_bridge, natural, replies, utils
from core.models import Chat, Manipulation, PersonIdentifier, QueuedMessage
from core.util import logs
log = logs.get_logger("signalF")
_signal_http_url = getattr(settings, "SIGNAL_HTTP_URL", "").strip()
if _signal_http_url:
parsed = urlparse(
_signal_http_url if "://" in _signal_http_url else f"http://{_signal_http_url}"
)
SIGNAL_HOST = parsed.hostname or "signal"
SIGNAL_PORT = parsed.port or 8080
else:
SIGNAL_HOST = "signal"
SIGNAL_PORT = 8080
SIGNAL_URL = f"{SIGNAL_HOST}:{SIGNAL_PORT}"
def _get_nested(payload, path):
current = payload
for key in path:
if not isinstance(current, dict):
return None
current = current.get(key)
return current
def _looks_like_signal_attachment(entry):
return isinstance(entry, dict) and (
"id" in entry or "attachmentId" in entry or "contentType" in entry
)
def _normalize_attachment(entry):
attachment_id = entry.get("id") or entry.get("attachmentId")
if attachment_id is None:
return None
return {
"id": attachment_id,
"content_type": entry.get("contentType", "application/octet-stream"),
"filename": entry.get("filename") or str(attachment_id),
"size": entry.get("size") or 0,
"width": entry.get("width"),
"height": entry.get("height"),
}
def _extract_attachments(raw_payload):
envelope = raw_payload.get("envelope", {})
candidate_paths = [
("dataMessage", "attachments"),
("syncMessage", "sentMessage", "attachments"),
("syncMessage", "editMessage", "dataMessage", "attachments"),
]
results = []
seen = set()
for path in candidate_paths:
found = _get_nested(envelope, path)
if not isinstance(found, list):
continue
for entry in found:
normalized = _normalize_attachment(entry)
if not normalized:
continue
key = str(normalized["id"])
if key in seen:
continue
seen.add(key)
results.append(normalized)
# Fallback: scan for attachment-shaped lists under envelope.
if not results:
stack = [envelope]
while stack:
node = stack.pop()
if isinstance(node, dict):
for value in node.values():
stack.append(value)
elif isinstance(node, list):
if node and all(_looks_like_signal_attachment(item) for item in node):
for entry in node:
normalized = _normalize_attachment(entry)
if not normalized:
continue
key = str(normalized["id"])
if key in seen:
continue
seen.add(key)
results.append(normalized)
else:
for value in node:
stack.append(value)
return results
def _extract_receipt_timestamps(receipt_payload):
raw_ts = receipt_payload.get("timestamp")
if raw_ts is None:
raw_ts = receipt_payload.get("timestamps")
if isinstance(raw_ts, list):
out = []
for item in raw_ts:
try:
out.append(int(item))
except Exception:
continue
return out
if raw_ts is not None:
try:
return [int(raw_ts)]
except Exception:
return []
return []
def _typing_started(typing_payload):
action = str(typing_payload.get("action") or "").strip().lower()
if action in {"started", "start", "typing", "composing"}:
return True
explicit = typing_payload.get("isTyping")
if isinstance(explicit, bool):
return explicit
return True
def _identifier_candidates(*values):
out = []
seen = set()
for value in values:
cleaned = str(value or "").strip()
if not cleaned or cleaned in seen:
continue
seen.add(cleaned)
out.append(cleaned)
return out
class NewSignalBot(SignalBot):
def __init__(self, ur, service, config):
self.ur = ur
self.service = service
self.signal_rest = config["signal_service"] # keep your own copy
self.phone_number = config["phone_number"]
super().__init__(config)
self.log = logs.get_logger("signalI")
self.bot_uuid = None
async def get_own_uuid(self) -> str | None:
async with aiohttp.ClientSession() as session:
# config may be "signal:8080" -- ensure http://
base = self.signal_rest
if not base.startswith("http"):
base = f"http://{base}"
uri = f"{base}/v1/contacts/{self.phone_number}"
try:
resp = await session.get(uri)
if resp.status != 200:
self.log.error(
f"contacts lookup failed: {resp.status} {await resp.text()}"
)
return None
contacts_data = await resp.json()
if isinstance(contacts_data, list):
for contact in contacts_data:
if contact.get("number") == self.phone_number:
return contact.get("uuid")
return None
except Exception as e:
self.log.error(f"Failed to get UUID from contacts: {e}")
return None
async def initialize_bot(self):
"""Fetch bot's UUID and store it in self.bot_uuid."""
try:
self.bot_uuid = await self.get_own_uuid()
if self.bot_uuid:
self.log.info(f"Own UUID: {self.bot_uuid}")
else:
self.log.warning("Unable to fetch bot UUID.")
except Exception as e:
self.log.error(f"Failed to initialize bot UUID: {e}")
async def _async_post_init(self):
"""
Preserve SignalBot startup flow so protocol auto-detection runs.
This flips the client to plain HTTP/WS when HTTPS/WSS is unavailable.
"""
await self._check_signal_service()
await self.initialize_bot()
await self._detect_groups()
await self._resolve_commands()
await self._produce_consume_messages()
def start(self):
"""Start bot without blocking the caller's event loop."""
task = self._event_loop.create_task(
self._rerun_on_exception(self._async_post_init)
)
self._store_reference_to_task(task, self._running_tasks)
self.scheduler.start()
class HandleMessage(Command):
def __init__(self, ur, service, *args, **kwargs):
self.ur = ur
self.service = service
return super().__init__(*args, **kwargs)
async def handle(self, c: Context):
msg = {
"source": c.message.source,
"source_number": c.message.source_number,
"source_uuid": c.message.source_uuid,
"timestamp": c.message.timestamp,
"type": c.message.type.value,
"text": c.message.text,
"group": c.message.group,
"reaction": c.message.reaction,
"mentions": c.message.mentions,
"raw_message": c.message.raw_message,
}
raw = json.loads(c.message.raw_message)
sent_message = (
raw.get("envelope", {}).get("syncMessage", {}).get("sentMessage", {}) or {}
)
dest = sent_message.get("destinationUuid")
account = raw.get("account", "")
source_name = raw.get("envelope", {}).get("sourceName", "")
source_number = c.message.source_number
source_uuid = c.message.source_uuid
text = c.message.text
ts = c.message.timestamp
source_value = c.message.source
envelope = raw.get("envelope", {})
# Message originating from us
same_recipient = source_uuid == dest
is_from_bot = source_uuid == c.bot.bot_uuid
is_to_bot = dest == c.bot.bot_uuid or dest is None
reply_to_self = same_recipient and is_from_bot # Reply
reply_to_others = is_to_bot and not same_recipient # Reply
is_outgoing_message = is_from_bot and not is_to_bot # Do not reply
envelope_source_uuid = envelope.get("sourceUuid")
envelope_source_number = envelope.get("sourceNumber")
envelope_source = envelope.get("source")
destination_number = sent_message.get("destination")
primary_identifier = dest if is_from_bot else source_uuid
if dest or destination_number:
# Sync "sentMessage" events are outbound; route by destination only.
# This prevents copying one outbound message into multiple people
# when source fields include the bot's own identifier.
identifier_candidates = _identifier_candidates(dest, destination_number)
elif is_from_bot:
identifier_candidates = _identifier_candidates(primary_identifier)
else:
bot_identifiers = {
str(c.bot.bot_uuid or "").strip(),
str(getattr(c.bot, "phone_number", "") or "").strip(),
}
incoming_candidates = _identifier_candidates(
primary_identifier,
source_uuid,
source_number,
source_value,
envelope_source_uuid,
envelope_source_number,
envelope_source,
)
identifier_candidates = [
value
for value in incoming_candidates
if value and value not in bot_identifiers
]
if not identifier_candidates:
log.warning("No Signal identifier available for message routing.")
return
# Resolve person identifiers once for this event.
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(
identifier__in=identifier_candidates,
service=self.service,
)
)
typing_payload = envelope.get("typingMessage")
if isinstance(typing_payload, dict):
for identifier in identifiers:
if _typing_started(typing_payload):
await self.ur.started_typing(
self.service,
identifier=identifier,
payload=typing_payload,
)
else:
await self.ur.stopped_typing(
self.service,
identifier=identifier,
payload=typing_payload,
)
return
receipt_payload = envelope.get("receiptMessage")
if isinstance(receipt_payload, dict):
read_timestamps = _extract_receipt_timestamps(receipt_payload)
read_ts = (
envelope.get("timestamp")
or envelope.get("serverReceivedTimestamp")
or c.message.timestamp
)
for identifier in identifiers:
await self.ur.message_read(
self.service,
identifier=identifier,
message_timestamps=read_timestamps,
read_ts=read_ts,
payload=receipt_payload,
read_by=(source_uuid or source_number or ""),
)
return
# Handle attachments across multiple Signal payload variants.
attachment_list = _extract_attachments(raw)
xmpp_attachments = []
compose_media_urls = []
# Asynchronously fetch all attachments
log.info(f"ATTACHMENT LIST {attachment_list}")
if attachment_list:
tasks = [
signalapi.fetch_signal_attachment(att["id"]) for att in attachment_list
]
fetched_attachments = await asyncio.gather(*tasks)
else:
envelope = raw.get("envelope", {})
log.info(f"No attachments found. Envelope keys: {list(envelope.keys())}")
fetched_attachments = []
for fetched, att in zip(fetched_attachments, attachment_list):
if not fetched:
log.warning(f"Failed to fetch attachment {att['id']} from Signal.")
continue
# Attach fetched file to XMPP
xmpp_attachments.append(
{
"content": fetched["content"],
"content_type": fetched["content_type"],
"filename": fetched["filename"],
"size": fetched["size"],
}
)
blob_key = media_bridge.put_blob(
service="signal",
content=fetched["content"],
filename=fetched["filename"],
content_type=fetched["content_type"],
)
if blob_key:
compose_media_urls.append(
f"/compose/media/blob/?key={quote_plus(str(blob_key))}"
)
if (not text) and compose_media_urls:
text = "\n".join(compose_media_urls)
# Forward incoming Signal messages to XMPP and apply mutate rules.
identifier_text_overrides = {}
for identifier in identifiers:
user = identifier.user
session_key = (identifier.user.id, identifier.person.id)
mutate_manips = await sync_to_async(list)(
Manipulation.objects.filter(
group__people=identifier.person,
user=identifier.user,
mode="mutate",
filter_enabled=True,
enabled=True,
)
)
if mutate_manips:
uploaded_urls = []
for manip in mutate_manips:
prompt = replies.generate_mutate_reply_prompt(
text,
None,
manip,
None,
)
log.info("Running Signal mutate prompt")
result = await ai.run_prompt(prompt, manip.ai)
log.info(
f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP."
)
uploaded_urls = await self.ur.xmpp.client.send_from_external(
user,
identifier,
result,
is_outgoing_message,
attachments=xmpp_attachments,
)
resolved_text = text
if (not resolved_text) and uploaded_urls:
resolved_text = "\n".join(uploaded_urls)
elif (not resolved_text) and compose_media_urls:
resolved_text = "\n".join(compose_media_urls)
identifier_text_overrides[session_key] = resolved_text
else:
log.info(
f"Sending {len(xmpp_attachments)} attachments from Signal to XMPP."
)
uploaded_urls = await self.ur.xmpp.client.send_from_external(
user,
identifier,
text,
is_outgoing_message,
attachments=xmpp_attachments,
)
resolved_text = text
if (not resolved_text) and uploaded_urls:
resolved_text = "\n".join(uploaded_urls)
elif (not resolved_text) and compose_media_urls:
resolved_text = "\n".join(compose_media_urls)
identifier_text_overrides[session_key] = resolved_text
# Persist message history for every resolved identifier, even when no
# manipulations are active, so manual chat windows stay complete.
session_cache = {}
stored_messages = set()
for identifier in identifiers:
session_key = (identifier.user.id, identifier.person.id)
if session_key in session_cache:
chat_session = session_cache[session_key]
else:
chat_session = await history.get_chat_session(
identifier.user, identifier
)
session_cache[session_key] = chat_session
sender_key = source_uuid or source_number or identifier_candidates[0]
message_key = (chat_session.id, ts, sender_key)
message_text = identifier_text_overrides.get(session_key, text)
if message_key not in stored_messages:
await history.store_message(
session=chat_session,
sender=sender_key,
text=message_text,
ts=ts,
outgoing=is_from_bot,
)
stored_messages.add(message_key)
# Notify unified router to ensure service context is preserved
await self.ur.message_received(
self.service,
identifier=identifier,
text=message_text,
ts=ts,
payload=msg,
)
# TODO: Permission checks
manips = await sync_to_async(list)(Manipulation.objects.filter(enabled=True))
for manip in manips:
person_identifier = await sync_to_async(
lambda: PersonIdentifier.objects.filter(
identifier__in=identifier_candidates,
user=manip.user,
service="signal",
person__in=manip.group.people.all(),
).first()
)()
if person_identifier is None:
log.warning(
f"{manip.name}: Message from unknown identifier(s) "
f"{', '.join(identifier_candidates)}."
)
continue
# Find/create ChatSession once per user/person.
session_key = (manip.user.id, person_identifier.person.id)
if session_key in session_cache:
chat_session = session_cache[session_key]
else:
chat_session = await history.get_chat_session(
manip.user, person_identifier
)
session_cache[session_key] = chat_session
# Get the total history
chat_history = await history.get_chat_history(chat_session)
if replies.should_reply(
reply_to_self,
reply_to_others,
is_outgoing_message,
):
if manip.mode in ["silent", "mutate"]:
pass
elif manip.mode in ["active", "notify", "instant"]:
await utils.update_last_interaction(chat_session)
prompt = replies.generate_reply_prompt(
msg, person_identifier.person, manip, chat_history
)
log.info("Running context prompt")
result = await ai.run_prompt(prompt, manip.ai)
if manip.mode == "active":
await history.store_own_message(
session=chat_session,
text=result,
ts=ts + 1,
)
await self.ur.xmpp.client.send_from_external(
manip.user,
person_identifier,
result,
is_outgoing_message=True,
)
await natural.natural_send_message(
result,
c.send,
c.start_typing,
c.stop_typing,
)
elif manip.mode == "notify":
title = f"[GIA] Suggested message to {person_identifier.person.name}"
manip.user.sendmsg(result, title=title)
elif manip.mode == "instant":
existing_queue = QueuedMessage.objects.filter(
user=chat_session.user,
session=chat_session,
manipulation=manip,
custom_author="BOT",
)
await history.delete_queryset(existing_queue)
qm = await history.store_own_message(
session=chat_session,
text=result,
ts=ts + 1,
manip=manip,
queue=True,
)
accept = reverse(
"message_accept_api", kwargs={"message_id": qm.id}
)
reject = reverse(
"message_reject_api", kwargs={"message_id": qm.id}
)
url = settings.URL
content = (
f"{result}\n\n"
f"Accept: {url}{accept}\n"
f"Reject: {url}{reject}"
)
title = f"[GIA] Suggested message to {person_identifier.person.name}"
manip.user.sendmsg(content, title=title)
else:
log.error(f"Mode {manip.mode} is not implemented")
chat_lookup = {"account": account}
if source_uuid:
chat_lookup["source_uuid"] = source_uuid
elif source_number:
chat_lookup["source_number"] = source_number
else:
return
await sync_to_async(Chat.objects.update_or_create)(
**chat_lookup,
defaults={
"source_uuid": source_uuid,
"source_number": source_number,
"source_name": source_name,
"account": account,
},
)
class SignalClient(ClientBase):
def __init__(self, ur, *args, **kwargs):
super().__init__(ur, *args, **kwargs)
self._stopping = False
signal_number = str(getattr(settings, "SIGNAL_NUMBER", "")).strip()
self.client = NewSignalBot(
ur,
self.service,
{
"signal_service": SIGNAL_URL,
"phone_number": signal_number,
},
)
self.client.register(HandleMessage(self.ur, self.service))
self._command_task = None
async def _drain_runtime_commands(self):
"""Process queued runtime commands (e.g., web UI sends via composite router)."""
from core.clients import transport
# Process a small burst each loop to keep sends responsive.
for _ in range(5):
command = transport.pop_runtime_command(self.service)
if not command:
return
await self._execute_runtime_command(command)
async def _execute_runtime_command(self, command):
"""Execute a single runtime command like send_message_raw."""
from core.clients import transport
command_id = str((command or {}).get("id") or "").strip()
action = str((command or {}).get("action") or "").strip()
payload = dict((command or {}).get("payload") or {})
if not command_id:
return
if action == "send_message_raw":
recipient = str(payload.get("recipient") or "").strip()
text = payload.get("text")
attachments = payload.get("attachments") or []
try:
result = await signalapi.send_message_raw(
recipient_uuid=recipient,
text=text,
attachments=attachments,
)
if result is False or result is None:
raise RuntimeError("signal_send_failed")
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": True,
"timestamp": int(result)
if isinstance(result, int)
else int(time.time() * 1000),
},
)
except Exception as exc:
self.log.error(f"send_message_raw failed: {exc}", exc_info=True)
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": str(exc),
},
)
return
if action == "notify_xmpp_sent":
person_identifier_id = str(
payload.get("person_identifier_id") or ""
).strip()
text = str(payload.get("text") or "")
if not person_identifier_id:
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": "missing_person_identifier_id"},
)
return
try:
identifier = await sync_to_async(
lambda: PersonIdentifier.objects.filter(id=person_identifier_id)
.select_related("user", "person")
.first()
)()
if identifier is None:
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": "person_identifier_not_found"},
)
return
await self.ur.xmpp.client.send_from_external(
identifier.user,
identifier,
text,
True,
attachments=[],
)
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": True, "timestamp": int(time.time() * 1000)},
)
except Exception as exc:
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": str(exc)},
)
return
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": f"unsupported_action:{action or '-'}"},
)
async def _command_loop(self):
"""Background task to periodically drain queued commands."""
while not self._stopping:
try:
await self._drain_runtime_commands()
except Exception as exc:
self.log.warning(f"Command loop error: {exc}")
await asyncio.sleep(1)
def start(self):
self.log.info("Signal client starting...")
self.client._event_loop = self.loop
# Start background command processing loop
if not self._command_task or self._command_task.done():
self._command_task = self.loop.create_task(self._command_loop())
self.client.start()