Fix Signal messages and replies

This commit is contained in:
2026-03-03 15:51:58 +00:00
parent 56c620473f
commit d6bd56dace
31 changed files with 3317 additions and 668 deletions

View File

@@ -5,14 +5,23 @@ import time
from urllib.parse import quote_plus, urlparse
import aiohttp
import websockets
from asgiref.sync import sync_to_async
from django.conf import settings
from django.urls import reverse
from signalbot import Command, Context, SignalBot
from core.clients import ClientBase, signalapi
from core.clients import ClientBase, signalapi, transport
from core.messaging import ai, history, media_bridge, natural, replies, reply_sync, utils
from core.models import Chat, Manipulation, PersonIdentifier, PlatformChatLink, QueuedMessage
from core.models import (
Chat,
Manipulation,
Message,
Person,
PersonIdentifier,
PlatformChatLink,
QueuedMessage,
)
from core.util import logs
log = logs.get_logger("signalF")
@@ -214,6 +223,10 @@ def _identifier_candidates(*values):
return out
def _digits_only(value):
return re.sub(r"[^0-9]", "", str(value or "").strip())
class NewSignalBot(SignalBot):
def __init__(self, ur, service, config):
self.ur = ur
@@ -358,6 +371,12 @@ class HandleMessage(Command):
ts = c.message.timestamp
source_value = c.message.source
envelope = raw.get("envelope", {})
envelope_source_uuid = envelope.get("sourceUuid")
envelope_source_number = envelope.get("sourceNumber")
effective_source_uuid = str(envelope_source_uuid or source_uuid or "").strip()
effective_source_number = str(
envelope_source_number or source_number or ""
).strip()
signal_source_message_id = str(
envelope.get("serverGuid")
or envelope.get("guid")
@@ -369,21 +388,29 @@ class HandleMessage(Command):
bot_uuid = str(getattr(c.bot, "bot_uuid", "") or "").strip()
bot_phone = str(getattr(c.bot, "phone_number", "") or "").strip()
source_uuid_norm = str(source_uuid or "").strip()
source_number_norm = str(source_number or "").strip()
source_uuid_norm = effective_source_uuid
source_number_norm = effective_source_number
dest_norm = str(dest or "").strip()
destination_number_norm = str(destination_number or "").strip()
bot_phone_digits = re.sub(r"[^0-9]", "", bot_phone)
source_phone_digits = re.sub(r"[^0-9]", "", source_number_norm)
dest_phone_digits = re.sub(r"[^0-9]", "", destination_number_norm or dest_norm)
is_sync_outbound = bool(dest_norm or destination_number_norm)
# Message originating from us
same_recipient = source_uuid == dest
same_recipient = bool(
source_uuid_norm and dest_norm and source_uuid_norm == dest_norm
)
is_from_bot = bool(bot_uuid and source_uuid_norm and source_uuid_norm == bot_uuid)
if (not is_from_bot) and bot_phone_digits and source_phone_digits:
is_from_bot = source_phone_digits == bot_phone_digits
# Inbound deliveries usually do not have destination fields populated.
# When destination is missing, treat event as inbound even if source
# metadata drifts to our own identifiers.
if not is_sync_outbound:
is_from_bot = False
# For non-sync incoming events destination is usually absent and points to us.
is_to_bot = bool(bot_uuid and dest_norm and dest_norm == bot_uuid)
@@ -396,12 +423,10 @@ class HandleMessage(Command):
reply_to_others = is_to_bot and not same_recipient # Reply
is_outgoing_message = is_from_bot and not is_to_bot # Do not reply
envelope_source_uuid = envelope.get("sourceUuid")
envelope_source_number = envelope.get("sourceNumber")
envelope_source = envelope.get("source")
primary_identifier = dest if is_from_bot else source_uuid
if dest or destination_number:
primary_identifier = dest if is_from_bot else effective_source_uuid
if (dest or destination_number) and is_from_bot:
# Sync "sentMessage" events are outbound; route by destination only.
# This prevents copying one outbound message into multiple people
# when source fields include the bot's own identifier.
@@ -415,8 +440,8 @@ class HandleMessage(Command):
}
incoming_candidates = _identifier_candidates(
primary_identifier,
source_uuid,
source_number,
effective_source_uuid,
effective_source_number,
source_value,
envelope_source_uuid,
envelope_source_number,
@@ -438,6 +463,104 @@ class HandleMessage(Command):
service=self.service,
)
)
if not identifiers:
companion_candidates = []
for value in identifier_candidates:
if not value:
continue
companions = await sync_to_async(list)(
Chat.objects.filter(source_uuid=value).values_list(
"source_number", flat=True
)
)
companions += await sync_to_async(list)(
Chat.objects.filter(source_number=value).values_list(
"source_uuid", flat=True
)
)
companion_candidates.extend(companions)
companion_candidates = _identifier_candidates(*companion_candidates)
if companion_candidates:
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(
identifier__in=companion_candidates,
service=self.service,
)
)
if not identifiers:
# Final fallback: compare normalized phone digits to handle format drift
# between Signal payload values and stored identifiers.
candidate_digits = {_digits_only(value) for value in identifier_candidates}
candidate_digits = {value for value in candidate_digits if value}
if candidate_digits:
signal_rows = await sync_to_async(list)(
PersonIdentifier.objects.filter(service=self.service).select_related(
"user"
)
)
matched = []
for row in signal_rows:
stored_digits = _digits_only(row.identifier)
if stored_digits and stored_digits in candidate_digits:
matched.append(row)
identifiers = matched
if not identifiers and (not is_from_bot) and (not bool(c.message.group)):
# Single-user fallback: don't drop new private inbound contacts just
# because they are not pre-linked yet. Create a placeholder person +
# identifier so the chat appears and can be re-linked later.
owner_rows = await sync_to_async(list)(
PersonIdentifier.objects.filter(service=self.service)
.select_related("user")
.order_by("user_id", "id")
)
owner_users = []
seen_user_ids = set()
for row in owner_rows:
if row.user_id in seen_user_ids:
continue
seen_user_ids.add(row.user_id)
owner_users.append(row.user)
if len(owner_users) == 1:
owner = owner_users[0]
fallback_identifier = (
effective_source_number
or effective_source_uuid
or (identifier_candidates[0] if identifier_candidates else "")
)
fallback_identifier = str(fallback_identifier or "").strip()
if fallback_identifier:
person, _ = await sync_to_async(Person.objects.get_or_create)(
user=owner,
name=f"Signal {fallback_identifier}",
)
pi, _ = await sync_to_async(PersonIdentifier.objects.get_or_create)(
user=owner,
service=self.service,
identifier=fallback_identifier,
defaults={"person": person},
)
if pi.person_id != person.id:
pi.person = person
await sync_to_async(pi.save)(update_fields=["person"])
identifiers = [pi]
log.info(
"Signal inbound auto-linked new private contact identifier=%s user_id=%s",
fallback_identifier,
int(owner.id),
)
if not identifiers:
log.warning(
"Signal inbound unmatched: candidates=%s source_uuid=%s source_number=%s effective_source_uuid=%s effective_source_number=%s dest=%s destination_number=%s envelope_source_uuid=%s envelope_source_number=%s",
identifier_candidates,
str(source_uuid or ""),
str(source_number or ""),
str(effective_source_uuid or ""),
str(effective_source_number or ""),
str(dest or ""),
str(destination_number or ""),
str(envelope_source_uuid or ""),
str(envelope_source_number or ""),
)
typing_payload = envelope.get("typingMessage")
if isinstance(typing_payload, dict):
@@ -471,7 +594,7 @@ class HandleMessage(Command):
message_timestamps=read_timestamps,
read_ts=read_ts,
payload=receipt_payload,
read_by=(source_uuid or source_number or ""),
read_by=(effective_source_uuid or effective_source_number or ""),
)
return
@@ -493,7 +616,9 @@ class HandleMessage(Command):
target_ts=int(reaction_payload.get("target_ts") or 0),
emoji=str(reaction_payload.get("emoji") or ""),
source_service="signal",
actor=(source_uuid or source_number or ""),
actor=(
effective_source_uuid or effective_source_number or ""
),
remove=bool(reaction_payload.get("remove")),
payload=reaction_payload.get("raw") or {},
)
@@ -508,7 +633,9 @@ class HandleMessage(Command):
remove=bool(reaction_payload.get("remove")),
upstream_message_id="",
upstream_ts=int(reaction_payload.get("target_ts") or 0),
actor=(source_uuid or source_number or ""),
actor=(
effective_source_uuid or effective_source_number or ""
),
payload=reaction_payload.get("raw") or {},
)
except Exception as exc:
@@ -604,7 +731,11 @@ class HandleMessage(Command):
attachments=xmpp_attachments,
source_ref={
"upstream_message_id": "",
"upstream_author": str(source_uuid or source_number or ""),
"upstream_author": str(
effective_source_uuid
or effective_source_number
or ""
),
"upstream_ts": int(ts or 0),
},
)
@@ -626,7 +757,9 @@ class HandleMessage(Command):
attachments=xmpp_attachments,
source_ref={
"upstream_message_id": "",
"upstream_author": str(source_uuid or source_number or ""),
"upstream_author": str(
effective_source_uuid or effective_source_number or ""
),
"upstream_ts": int(ts or 0),
},
)
@@ -656,7 +789,11 @@ class HandleMessage(Command):
chat_session,
reply_ref,
)
sender_key = source_uuid or source_number or identifier_candidates[0]
sender_key = (
effective_source_uuid
or effective_source_number
or identifier_candidates[0]
)
message_key = (chat_session.id, ts, sender_key)
message_text = identifier_text_overrides.get(session_key, relay_text)
if message_key not in stored_messages:
@@ -797,18 +934,18 @@ class HandleMessage(Command):
log.error(f"Mode {manip.mode} is not implemented")
chat_lookup = {"account": account}
if source_uuid:
chat_lookup["source_uuid"] = source_uuid
elif source_number:
chat_lookup["source_number"] = source_number
if effective_source_uuid:
chat_lookup["source_uuid"] = effective_source_uuid
elif effective_source_number:
chat_lookup["source_number"] = effective_source_number
else:
return
await sync_to_async(Chat.objects.update_or_create)(
**chat_lookup,
defaults={
"source_uuid": source_uuid,
"source_number": source_number,
"source_uuid": effective_source_uuid,
"source_number": effective_source_number,
"source_name": source_name,
"account": account,
},
@@ -831,6 +968,7 @@ class SignalClient(ClientBase):
self.client.register(HandleMessage(self.ur, self.service))
self._command_task = None
self._raw_receive_task = None
async def _drain_runtime_commands(self):
"""Process queued runtime commands (e.g., web UI sends via composite router)."""
@@ -857,11 +995,13 @@ class SignalClient(ClientBase):
recipient = str(payload.get("recipient") or "").strip()
text = payload.get("text")
attachments = payload.get("attachments") or []
metadata = dict(payload.get("metadata") or {})
try:
result = await signalapi.send_message_raw(
recipient_uuid=recipient,
text=text,
attachments=attachments,
metadata=metadata,
)
if result is False or result is None:
raise RuntimeError("signal_send_failed")
@@ -947,10 +1087,387 @@ class SignalClient(ClientBase):
self.log.warning(f"Command loop error: {exc}")
await asyncio.sleep(1)
async def _resolve_signal_identifiers(self, source_uuid: str, source_number: str):
candidates = _identifier_candidates(source_uuid, source_number)
if not candidates:
return []
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(
identifier__in=candidates,
service=self.service,
)
)
if identifiers:
return identifiers
candidate_digits = {_digits_only(value) for value in candidates}
candidate_digits = {value for value in candidate_digits if value}
if not candidate_digits:
return []
rows = await sync_to_async(list)(
PersonIdentifier.objects.filter(service=self.service).select_related("user")
)
return [
row
for row in rows
if _digits_only(getattr(row, "identifier", "")) in candidate_digits
]
async def _auto_link_single_user_signal_identifier(self, source_uuid: str, source_number: str):
owner_rows = await sync_to_async(list)(
PersonIdentifier.objects.filter(service=self.service)
.select_related("user")
.order_by("user_id", "id")
)
users = []
seen = set()
for row in owner_rows:
if row.user_id in seen:
continue
seen.add(row.user_id)
users.append(row.user)
if len(users) != 1:
return []
owner = users[0]
fallback_identifier = str(source_number or source_uuid or "").strip()
if not fallback_identifier:
return []
person, _ = await sync_to_async(Person.objects.get_or_create)(
user=owner,
name=f"Signal {fallback_identifier}",
)
pi, _ = await sync_to_async(PersonIdentifier.objects.get_or_create)(
user=owner,
service=self.service,
identifier=fallback_identifier,
defaults={"person": person},
)
if pi.person_id != person.id:
pi.person = person
await sync_to_async(pi.save)(update_fields=["person"])
self.log.info(
"signal raw-receive auto-linked identifier=%s user_id=%s",
fallback_identifier,
int(owner.id),
)
return [pi]
async def _process_raw_inbound_event(self, raw_message: str):
try:
payload = json.loads(raw_message or "{}")
except Exception:
return
exception_payload = payload.get("exception") if isinstance(payload, dict) else None
if isinstance(exception_payload, dict):
err_type = str(exception_payload.get("type") or "").strip()
err_msg = str(exception_payload.get("message") or "").strip()
envelope = payload.get("envelope") or {}
envelope_source_uuid = ""
envelope_source_number = ""
envelope_ts = 0
envelope_keys = []
if isinstance(envelope, dict):
envelope_source_uuid = str(envelope.get("sourceUuid") or "").strip()
envelope_source_number = str(envelope.get("sourceNumber") or "").strip()
try:
envelope_ts = int(
envelope.get("timestamp")
or envelope.get("serverReceivedTimestamp")
or 0
)
except Exception:
envelope_ts = 0
envelope_keys = sorted(list(envelope.keys()))[:20]
payload_excerpt = json.dumps(payload, ensure_ascii=True)[:1200]
transport.update_runtime_state(
self.service,
last_inbound_exception_type=err_type,
last_inbound_exception_message=err_msg,
last_inbound_exception_ts=int(
(envelope.get("timestamp") if isinstance(envelope, dict) else 0)
or int(time.time() * 1000)
),
last_inbound_exception_account=str(payload.get("account") or "").strip(),
last_inbound_exception_source_uuid=envelope_source_uuid,
last_inbound_exception_source_number=envelope_source_number,
last_inbound_exception_envelope_ts=envelope_ts,
last_inbound_exception_envelope_keys=envelope_keys,
last_inbound_exception_payload_excerpt=payload_excerpt,
)
self.log.warning(
"signal raw-receive exception type=%s message=%s source_uuid=%s source_number=%s envelope_ts=%s",
err_type or "-",
err_msg or "-",
envelope_source_uuid or "-",
envelope_source_number or "-",
envelope_ts or 0,
)
return
envelope = payload.get("envelope") or {}
if not isinstance(envelope, dict):
return
sync_sent_message = _get_nested(envelope, ("syncMessage", "sentMessage")) or {}
if isinstance(sync_sent_message, dict) and sync_sent_message:
raw_text = sync_sent_message.get("message")
if isinstance(raw_text, dict):
text = str(
raw_text.get("message")
or raw_text.get("text")
or raw_text.get("body")
or ""
).strip()
else:
text = str(raw_text or "").strip()
destination_uuid = str(
sync_sent_message.get("destinationUuid")
or sync_sent_message.get("destination")
or ""
).strip()
destination_number = str(
sync_sent_message.get("destinationNumber")
or sync_sent_message.get("destinationE164")
or sync_sent_message.get("destination")
or ""
).strip()
identifiers = await self._resolve_signal_identifiers(
destination_uuid,
destination_number,
)
if not identifiers:
identifiers = await self._auto_link_single_user_signal_identifier(
destination_uuid,
destination_number,
)
if identifiers and text:
ts_raw = (
sync_sent_message.get("timestamp")
or envelope.get("timestamp")
or envelope.get("serverReceivedTimestamp")
or int(time.time() * 1000)
)
try:
ts = int(ts_raw)
except Exception:
ts = int(time.time() * 1000)
source_message_id = str(
envelope.get("serverGuid")
or envelope.get("guid")
or envelope.get("timestamp")
or ts
).strip()
sender_key = (
str(getattr(self.client, "bot_uuid", "") or "").strip()
or str(getattr(self.client, "phone_number", "") or "").strip()
or str(payload.get("account") or "").strip()
or "self"
)
source_chat_id = destination_number or destination_uuid or sender_key
reply_ref = reply_sync.extract_reply_ref(self.service, payload)
for identifier in identifiers:
session = await history.get_chat_session(identifier.user, identifier)
reply_target = await reply_sync.resolve_reply_target(
identifier.user,
session,
reply_ref,
)
exists = await sync_to_async(
lambda: Message.objects.filter(
user=identifier.user,
session=session,
source_service=self.service,
source_message_id=source_message_id,
).exists()
)()
if exists:
continue
await history.store_message(
session=session,
sender=sender_key,
text=text,
ts=ts,
outgoing=True,
source_service=self.service,
source_message_id=source_message_id,
source_chat_id=source_chat_id,
reply_to=reply_target,
reply_source_service=str(
reply_ref.get("reply_source_service") or ""
),
reply_source_message_id=str(
reply_ref.get("reply_source_message_id") or ""
),
message_meta={},
)
transport.update_runtime_state(
self.service,
last_inbound_ok_ts=int(time.time() * 1000),
last_inbound_exception_type="",
last_inbound_exception_message="",
)
return
if envelope.get("typingMessage") or envelope.get("receiptMessage"):
return
data_message = envelope.get("dataMessage") or {}
if not isinstance(data_message, dict):
return
source_uuid = str(envelope.get("sourceUuid") or envelope.get("source") or "").strip()
source_number = str(envelope.get("sourceNumber") or "").strip()
bot_uuid = str(getattr(self.client, "bot_uuid", "") or "").strip()
bot_phone = str(getattr(self.client, "phone_number", "") or "").strip()
if source_uuid and bot_uuid and source_uuid == bot_uuid:
return
if source_number and bot_phone and _digits_only(source_number) == _digits_only(bot_phone):
return
identifiers = await self._resolve_signal_identifiers(source_uuid, source_number)
if not identifiers:
identifiers = await self._auto_link_single_user_signal_identifier(
source_uuid, source_number
)
if not identifiers:
self.log.warning(
"signal raw-receive unmatched source_uuid=%s source_number=%s text=%s",
source_uuid,
source_number,
str(data_message.get("message") or "")[:160],
)
return
reaction_payload = _extract_signal_reaction(envelope)
if isinstance(reaction_payload, dict):
for identifier in identifiers:
try:
await history.apply_reaction(
identifier.user,
identifier,
target_message_id="",
target_ts=int(reaction_payload.get("target_ts") or 0),
emoji=str(reaction_payload.get("emoji") or ""),
source_service="signal",
actor=(source_uuid or source_number or ""),
remove=bool(reaction_payload.get("remove")),
payload=reaction_payload.get("raw") or {},
)
except Exception as exc:
self.log.warning("signal raw reaction history apply failed: %s", exc)
try:
await self.ur.xmpp.client.apply_external_reaction(
identifier.user,
identifier,
source_service="signal",
emoji=str(reaction_payload.get("emoji") or ""),
remove=bool(reaction_payload.get("remove")),
upstream_message_id="",
upstream_ts=int(reaction_payload.get("target_ts") or 0),
actor=(source_uuid or source_number or ""),
payload=reaction_payload.get("raw") or {},
)
except Exception as exc:
self.log.warning("signal raw reaction relay to XMPP failed: %s", exc)
transport.update_runtime_state(
self.service,
last_inbound_ok_ts=int(time.time() * 1000),
last_inbound_exception_type="",
last_inbound_exception_message="",
)
return
text = str(data_message.get("message") or "").strip()
if not text:
return
ts_raw = (
envelope.get("timestamp")
or envelope.get("serverReceivedTimestamp")
or int(time.time() * 1000)
)
try:
ts = int(ts_raw)
except Exception:
ts = int(time.time() * 1000)
source_message_id = str(
envelope.get("serverGuid")
or envelope.get("guid")
or envelope.get("timestamp")
or ts
).strip()
sender_key = source_uuid or source_number or (identifiers[0].identifier if identifiers else "")
source_chat_id = source_number or source_uuid or sender_key
reply_ref = reply_sync.extract_reply_ref(self.service, payload)
for identifier in identifiers:
session = await history.get_chat_session(identifier.user, identifier)
reply_target = await reply_sync.resolve_reply_target(
identifier.user,
session,
reply_ref,
)
exists = await sync_to_async(
lambda: Message.objects.filter(
user=identifier.user,
session=session,
source_service=self.service,
source_message_id=source_message_id,
).exists()
)()
if exists:
continue
local_message = await history.store_message(
session=session,
sender=sender_key,
text=text,
ts=ts,
outgoing=False,
source_service=self.service,
source_message_id=source_message_id,
source_chat_id=source_chat_id,
reply_to=reply_target,
reply_source_service=str(reply_ref.get("reply_source_service") or ""),
reply_source_message_id=str(
reply_ref.get("reply_source_message_id") or ""
),
message_meta={},
)
await self.ur.message_received(
self.service,
identifier=identifier,
text=text,
ts=ts,
payload=payload,
local_message=local_message,
)
transport.update_runtime_state(
self.service,
last_inbound_ok_ts=int(time.time() * 1000),
last_inbound_exception_type="",
last_inbound_exception_message="",
)
async def _raw_receive_loop(self):
signal_number = str(getattr(settings, "SIGNAL_NUMBER", "") or "").strip()
if not signal_number:
return
uri = f"ws://{SIGNAL_URL}/v1/receive/{signal_number}"
while not self._stopping:
try:
async with websockets.connect(uri, ping_interval=None) as websocket:
async for raw_message in websocket:
await self._process_raw_inbound_event(raw_message)
except asyncio.CancelledError:
raise
except Exception as exc:
self.log.warning("signal raw-receive loop error: %s", exc)
await asyncio.sleep(2)
def start(self):
self.log.info("Signal client starting...")
self.client._event_loop = self.loop
# Start background command processing loop
if not self._command_task or self._command_task.done():
self._command_task = self.loop.create_task(self._command_loop())
self.client.start()
if not self._raw_receive_task or self._raw_receive_task.done():
self._raw_receive_task = self.loop.create_task(self._raw_receive_loop())
# Use direct websocket receive loop as primary ingestion path.
# signalbot's internal receive consumer can compete for the same stream
# and starve inbound events in this deployment, so we keep it disabled.