From d6bd56dace1d1fe9232a89bbfa4bb023a87e2f64 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 3 Mar 2026 15:51:58 +0000 Subject: [PATCH] Fix Signal messages and replies --- Makefile | 58 +- app/urls.py | 5 + core/clients/signal.py | 565 ++++++++++++- core/clients/signalapi.py | 54 +- core/clients/transport.py | 57 +- core/commands/handlers/bp.py | 374 +++++++-- core/commands/policies.py | 106 +++ core/messaging/reply_sync.py | 87 +- core/migrations/0031_commandvariantpolicy.py | 107 +++ ...032_commandvariantpolicy_store_document.py | 18 + core/models.py | 39 + core/tasks/engine.py | 28 +- core/templates/base.html | 3 + core/templates/pages/command-routing.html | 304 +++++-- core/templates/pages/tasks-settings.html | 783 +++++++++--------- core/templates/partials/compose-panel.html | 41 +- core/templates/partials/signal-accounts.html | 18 +- .../templates/partials/signal-chats-list.html | 3 - core/tests/test_command_routing_variant_ui.py | 54 ++ core/tests/test_command_variant_policy.py | 225 +++++ core/tests/test_phase1_command_reply.py | 42 + core/tests/test_repeat_answer_and_tasks.py | 61 ++ core/tests/test_signal_relink.py | 45 + core/tests/test_signal_reply_send.py | 223 +++++ core/tests/test_signal_unlink_fallback.py | 43 + core/tests/test_tasks_settings_and_toggle.py | 50 +- core/views/automation.py | 177 +++- core/views/compose.py | 251 +++++- core/views/signal.py | 78 +- core/views/tasks.py | 83 +- core/views/whatsapp.py | 3 + 31 files changed, 3317 insertions(+), 668 deletions(-) create mode 100644 core/commands/policies.py create mode 100644 core/migrations/0031_commandvariantpolicy.py create mode 100644 core/migrations/0032_commandvariantpolicy_store_document.py create mode 100644 core/tests/test_command_routing_variant_ui.py create mode 100644 core/tests/test_command_variant_policy.py create mode 100644 core/tests/test_signal_relink.py create mode 100644 core/tests/test_signal_reply_send.py create mode 100644 core/tests/test_signal_unlink_fallback.py diff --git a/Makefile b/Makefile index 1cec80d..e2b404f 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,15 @@ QUADLET_MGR := ./scripts/quadlet/manage.sh +MODULES ?= core.tests run: bash $(QUADLET_MGR) up build: - OPERATION=uwsgi podman build --build-arg OPERATION=uwsgi -t localhost/xf/gia:prod -f Dockerfile . + @if command -v docker-compose >/dev/null 2>&1; then \ + docker-compose --env-file=stack.env build app; \ + else \ + OPERATION=uwsgi podman build --build-arg OPERATION=uwsgi -t localhost/xf/gia:prod -f Dockerfile .; \ + fi stop: bash $(QUADLET_MGR) down @@ -31,17 +36,58 @@ test: @if command -v docker-compose >/dev/null 2>&1; then \ docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py test $(MODULES) -v 2"; \ else \ - podman exec gia sh -lc "cd /code && . /venv/bin/activate && python manage.py test $(MODULES) -v 2"; \ + if podman ps --format '{{.Names}}' | grep -qx gia; then \ + podman exec gia sh -lc "cd /code && . /venv/bin/activate && python manage.py test $(MODULES) -v 2"; \ + else \ + echo "Container 'gia' is not running. Start the stack first with 'make run' (or mrl)." >&2; \ + exit 125; \ + fi; \ fi migrate: - docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py migrate" + @if command -v docker-compose >/dev/null 2>&1; then \ + docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py migrate"; \ + else \ + if podman ps --format '{{.Names}}' | grep -qx gia; then \ + podman exec gia sh -lc "cd /code && . /venv/bin/activate && python manage.py migrate"; \ + else \ + echo "Container 'gia' is not running. Start the stack first with 'make run' (or mrl)." >&2; \ + exit 125; \ + fi; \ + fi makemigrations: - docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py makemigrations" + @if command -v docker-compose >/dev/null 2>&1; then \ + docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py makemigrations"; \ + else \ + if podman ps --format '{{.Names}}' | grep -qx gia; then \ + podman exec gia sh -lc "cd /code && . /venv/bin/activate && python manage.py makemigrations"; \ + else \ + echo "Container 'gia' is not running. Start the stack first with 'make run' (or mrl)." >&2; \ + exit 125; \ + fi; \ + fi auth: - docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py createsuperuser" + @if command -v docker-compose >/dev/null 2>&1; then \ + docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py createsuperuser"; \ + else \ + if podman ps --format '{{.Names}}' | grep -qx gia; then \ + podman exec gia sh -lc "cd /code && . /venv/bin/activate && python manage.py createsuperuser"; \ + else \ + echo "Container 'gia' is not running. Start the stack first with 'make run' (or mrl)." >&2; \ + exit 125; \ + fi; \ + fi token: - docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py addstatictoken m" + @if command -v docker-compose >/dev/null 2>&1; then \ + docker-compose --env-file=stack.env run --rm app sh -c ". /venv/bin/activate && python manage.py addstatictoken m"; \ + else \ + if podman ps --format '{{.Names}}' | grep -qx gia; then \ + podman exec gia sh -lc "cd /code && . /venv/bin/activate && python manage.py addstatictoken m"; \ + else \ + echo "Container 'gia' is not running. Start the stack first with 'make run' (or mrl)." >&2; \ + exit 125; \ + fi; \ + fi diff --git a/app/urls.py b/app/urls.py index dca8f69..369c9e7 100644 --- a/app/urls.py +++ b/app/urls.py @@ -147,6 +147,11 @@ urlpatterns = [ signal.SignalAccountAdd.as_view(), name="signal_account_add", ), + path( + "services/signal//unlink//", + signal.SignalAccountUnlink.as_view(), + name="signal_account_unlink", + ), path( "services/whatsapp//add/", whatsapp.WhatsAppAccountAdd.as_view(), diff --git a/core/clients/signal.py b/core/clients/signal.py index 9ea984c..bd187a1 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -5,14 +5,23 @@ import time from urllib.parse import quote_plus, urlparse import aiohttp +import websockets from asgiref.sync import sync_to_async from django.conf import settings from django.urls import reverse from signalbot import Command, Context, SignalBot -from core.clients import ClientBase, signalapi +from core.clients import ClientBase, signalapi, transport from core.messaging import ai, history, media_bridge, natural, replies, reply_sync, utils -from core.models import Chat, Manipulation, PersonIdentifier, PlatformChatLink, QueuedMessage +from core.models import ( + Chat, + Manipulation, + Message, + Person, + PersonIdentifier, + PlatformChatLink, + QueuedMessage, +) from core.util import logs log = logs.get_logger("signalF") @@ -214,6 +223,10 @@ def _identifier_candidates(*values): return out +def _digits_only(value): + return re.sub(r"[^0-9]", "", str(value or "").strip()) + + class NewSignalBot(SignalBot): def __init__(self, ur, service, config): self.ur = ur @@ -358,6 +371,12 @@ class HandleMessage(Command): ts = c.message.timestamp source_value = c.message.source envelope = raw.get("envelope", {}) + envelope_source_uuid = envelope.get("sourceUuid") + envelope_source_number = envelope.get("sourceNumber") + effective_source_uuid = str(envelope_source_uuid or source_uuid or "").strip() + effective_source_number = str( + envelope_source_number or source_number or "" + ).strip() signal_source_message_id = str( envelope.get("serverGuid") or envelope.get("guid") @@ -369,21 +388,29 @@ class HandleMessage(Command): bot_uuid = str(getattr(c.bot, "bot_uuid", "") or "").strip() bot_phone = str(getattr(c.bot, "phone_number", "") or "").strip() - source_uuid_norm = str(source_uuid or "").strip() - source_number_norm = str(source_number or "").strip() + source_uuid_norm = effective_source_uuid + source_number_norm = effective_source_number dest_norm = str(dest or "").strip() destination_number_norm = str(destination_number or "").strip() bot_phone_digits = re.sub(r"[^0-9]", "", bot_phone) source_phone_digits = re.sub(r"[^0-9]", "", source_number_norm) dest_phone_digits = re.sub(r"[^0-9]", "", destination_number_norm or dest_norm) + is_sync_outbound = bool(dest_norm or destination_number_norm) # Message originating from us - same_recipient = source_uuid == dest + same_recipient = bool( + source_uuid_norm and dest_norm and source_uuid_norm == dest_norm + ) is_from_bot = bool(bot_uuid and source_uuid_norm and source_uuid_norm == bot_uuid) if (not is_from_bot) and bot_phone_digits and source_phone_digits: is_from_bot = source_phone_digits == bot_phone_digits + # Inbound deliveries usually do not have destination fields populated. + # When destination is missing, treat event as inbound even if source + # metadata drifts to our own identifiers. + if not is_sync_outbound: + is_from_bot = False # For non-sync incoming events destination is usually absent and points to us. is_to_bot = bool(bot_uuid and dest_norm and dest_norm == bot_uuid) @@ -396,12 +423,10 @@ class HandleMessage(Command): reply_to_others = is_to_bot and not same_recipient # Reply is_outgoing_message = is_from_bot and not is_to_bot # Do not reply - envelope_source_uuid = envelope.get("sourceUuid") - envelope_source_number = envelope.get("sourceNumber") envelope_source = envelope.get("source") - primary_identifier = dest if is_from_bot else source_uuid - if dest or destination_number: + primary_identifier = dest if is_from_bot else effective_source_uuid + if (dest or destination_number) and is_from_bot: # Sync "sentMessage" events are outbound; route by destination only. # This prevents copying one outbound message into multiple people # when source fields include the bot's own identifier. @@ -415,8 +440,8 @@ class HandleMessage(Command): } incoming_candidates = _identifier_candidates( primary_identifier, - source_uuid, - source_number, + effective_source_uuid, + effective_source_number, source_value, envelope_source_uuid, envelope_source_number, @@ -438,6 +463,104 @@ class HandleMessage(Command): service=self.service, ) ) + if not identifiers: + companion_candidates = [] + for value in identifier_candidates: + if not value: + continue + companions = await sync_to_async(list)( + Chat.objects.filter(source_uuid=value).values_list( + "source_number", flat=True + ) + ) + companions += await sync_to_async(list)( + Chat.objects.filter(source_number=value).values_list( + "source_uuid", flat=True + ) + ) + companion_candidates.extend(companions) + companion_candidates = _identifier_candidates(*companion_candidates) + if companion_candidates: + identifiers = await sync_to_async(list)( + PersonIdentifier.objects.filter( + identifier__in=companion_candidates, + service=self.service, + ) + ) + if not identifiers: + # Final fallback: compare normalized phone digits to handle format drift + # between Signal payload values and stored identifiers. + candidate_digits = {_digits_only(value) for value in identifier_candidates} + candidate_digits = {value for value in candidate_digits if value} + if candidate_digits: + signal_rows = await sync_to_async(list)( + PersonIdentifier.objects.filter(service=self.service).select_related( + "user" + ) + ) + matched = [] + for row in signal_rows: + stored_digits = _digits_only(row.identifier) + if stored_digits and stored_digits in candidate_digits: + matched.append(row) + identifiers = matched + if not identifiers and (not is_from_bot) and (not bool(c.message.group)): + # Single-user fallback: don't drop new private inbound contacts just + # because they are not pre-linked yet. Create a placeholder person + + # identifier so the chat appears and can be re-linked later. + owner_rows = await sync_to_async(list)( + PersonIdentifier.objects.filter(service=self.service) + .select_related("user") + .order_by("user_id", "id") + ) + owner_users = [] + seen_user_ids = set() + for row in owner_rows: + if row.user_id in seen_user_ids: + continue + seen_user_ids.add(row.user_id) + owner_users.append(row.user) + if len(owner_users) == 1: + owner = owner_users[0] + fallback_identifier = ( + effective_source_number + or effective_source_uuid + or (identifier_candidates[0] if identifier_candidates else "") + ) + fallback_identifier = str(fallback_identifier or "").strip() + if fallback_identifier: + person, _ = await sync_to_async(Person.objects.get_or_create)( + user=owner, + name=f"Signal {fallback_identifier}", + ) + pi, _ = await sync_to_async(PersonIdentifier.objects.get_or_create)( + user=owner, + service=self.service, + identifier=fallback_identifier, + defaults={"person": person}, + ) + if pi.person_id != person.id: + pi.person = person + await sync_to_async(pi.save)(update_fields=["person"]) + identifiers = [pi] + log.info( + "Signal inbound auto-linked new private contact identifier=%s user_id=%s", + fallback_identifier, + int(owner.id), + ) + if not identifiers: + log.warning( + "Signal inbound unmatched: candidates=%s source_uuid=%s source_number=%s effective_source_uuid=%s effective_source_number=%s dest=%s destination_number=%s envelope_source_uuid=%s envelope_source_number=%s", + identifier_candidates, + str(source_uuid or ""), + str(source_number or ""), + str(effective_source_uuid or ""), + str(effective_source_number or ""), + str(dest or ""), + str(destination_number or ""), + str(envelope_source_uuid or ""), + str(envelope_source_number or ""), + ) typing_payload = envelope.get("typingMessage") if isinstance(typing_payload, dict): @@ -471,7 +594,7 @@ class HandleMessage(Command): message_timestamps=read_timestamps, read_ts=read_ts, payload=receipt_payload, - read_by=(source_uuid or source_number or ""), + read_by=(effective_source_uuid or effective_source_number or ""), ) return @@ -493,7 +616,9 @@ class HandleMessage(Command): target_ts=int(reaction_payload.get("target_ts") or 0), emoji=str(reaction_payload.get("emoji") or ""), source_service="signal", - actor=(source_uuid or source_number or ""), + actor=( + effective_source_uuid or effective_source_number or "" + ), remove=bool(reaction_payload.get("remove")), payload=reaction_payload.get("raw") or {}, ) @@ -508,7 +633,9 @@ class HandleMessage(Command): remove=bool(reaction_payload.get("remove")), upstream_message_id="", upstream_ts=int(reaction_payload.get("target_ts") or 0), - actor=(source_uuid or source_number or ""), + actor=( + effective_source_uuid or effective_source_number or "" + ), payload=reaction_payload.get("raw") or {}, ) except Exception as exc: @@ -604,7 +731,11 @@ class HandleMessage(Command): attachments=xmpp_attachments, source_ref={ "upstream_message_id": "", - "upstream_author": str(source_uuid or source_number or ""), + "upstream_author": str( + effective_source_uuid + or effective_source_number + or "" + ), "upstream_ts": int(ts or 0), }, ) @@ -626,7 +757,9 @@ class HandleMessage(Command): attachments=xmpp_attachments, source_ref={ "upstream_message_id": "", - "upstream_author": str(source_uuid or source_number or ""), + "upstream_author": str( + effective_source_uuid or effective_source_number or "" + ), "upstream_ts": int(ts or 0), }, ) @@ -656,7 +789,11 @@ class HandleMessage(Command): chat_session, reply_ref, ) - sender_key = source_uuid or source_number or identifier_candidates[0] + sender_key = ( + effective_source_uuid + or effective_source_number + or identifier_candidates[0] + ) message_key = (chat_session.id, ts, sender_key) message_text = identifier_text_overrides.get(session_key, relay_text) if message_key not in stored_messages: @@ -797,18 +934,18 @@ class HandleMessage(Command): log.error(f"Mode {manip.mode} is not implemented") chat_lookup = {"account": account} - if source_uuid: - chat_lookup["source_uuid"] = source_uuid - elif source_number: - chat_lookup["source_number"] = source_number + if effective_source_uuid: + chat_lookup["source_uuid"] = effective_source_uuid + elif effective_source_number: + chat_lookup["source_number"] = effective_source_number else: return await sync_to_async(Chat.objects.update_or_create)( **chat_lookup, defaults={ - "source_uuid": source_uuid, - "source_number": source_number, + "source_uuid": effective_source_uuid, + "source_number": effective_source_number, "source_name": source_name, "account": account, }, @@ -831,6 +968,7 @@ class SignalClient(ClientBase): self.client.register(HandleMessage(self.ur, self.service)) self._command_task = None + self._raw_receive_task = None async def _drain_runtime_commands(self): """Process queued runtime commands (e.g., web UI sends via composite router).""" @@ -857,11 +995,13 @@ class SignalClient(ClientBase): recipient = str(payload.get("recipient") or "").strip() text = payload.get("text") attachments = payload.get("attachments") or [] + metadata = dict(payload.get("metadata") or {}) try: result = await signalapi.send_message_raw( recipient_uuid=recipient, text=text, attachments=attachments, + metadata=metadata, ) if result is False or result is None: raise RuntimeError("signal_send_failed") @@ -947,10 +1087,387 @@ class SignalClient(ClientBase): self.log.warning(f"Command loop error: {exc}") await asyncio.sleep(1) + async def _resolve_signal_identifiers(self, source_uuid: str, source_number: str): + candidates = _identifier_candidates(source_uuid, source_number) + if not candidates: + return [] + identifiers = await sync_to_async(list)( + PersonIdentifier.objects.filter( + identifier__in=candidates, + service=self.service, + ) + ) + if identifiers: + return identifiers + candidate_digits = {_digits_only(value) for value in candidates} + candidate_digits = {value for value in candidate_digits if value} + if not candidate_digits: + return [] + rows = await sync_to_async(list)( + PersonIdentifier.objects.filter(service=self.service).select_related("user") + ) + return [ + row + for row in rows + if _digits_only(getattr(row, "identifier", "")) in candidate_digits + ] + + async def _auto_link_single_user_signal_identifier(self, source_uuid: str, source_number: str): + owner_rows = await sync_to_async(list)( + PersonIdentifier.objects.filter(service=self.service) + .select_related("user") + .order_by("user_id", "id") + ) + users = [] + seen = set() + for row in owner_rows: + if row.user_id in seen: + continue + seen.add(row.user_id) + users.append(row.user) + if len(users) != 1: + return [] + owner = users[0] + fallback_identifier = str(source_number or source_uuid or "").strip() + if not fallback_identifier: + return [] + person, _ = await sync_to_async(Person.objects.get_or_create)( + user=owner, + name=f"Signal {fallback_identifier}", + ) + pi, _ = await sync_to_async(PersonIdentifier.objects.get_or_create)( + user=owner, + service=self.service, + identifier=fallback_identifier, + defaults={"person": person}, + ) + if pi.person_id != person.id: + pi.person = person + await sync_to_async(pi.save)(update_fields=["person"]) + self.log.info( + "signal raw-receive auto-linked identifier=%s user_id=%s", + fallback_identifier, + int(owner.id), + ) + return [pi] + + async def _process_raw_inbound_event(self, raw_message: str): + try: + payload = json.loads(raw_message or "{}") + except Exception: + return + exception_payload = payload.get("exception") if isinstance(payload, dict) else None + if isinstance(exception_payload, dict): + err_type = str(exception_payload.get("type") or "").strip() + err_msg = str(exception_payload.get("message") or "").strip() + envelope = payload.get("envelope") or {} + envelope_source_uuid = "" + envelope_source_number = "" + envelope_ts = 0 + envelope_keys = [] + if isinstance(envelope, dict): + envelope_source_uuid = str(envelope.get("sourceUuid") or "").strip() + envelope_source_number = str(envelope.get("sourceNumber") or "").strip() + try: + envelope_ts = int( + envelope.get("timestamp") + or envelope.get("serverReceivedTimestamp") + or 0 + ) + except Exception: + envelope_ts = 0 + envelope_keys = sorted(list(envelope.keys()))[:20] + payload_excerpt = json.dumps(payload, ensure_ascii=True)[:1200] + transport.update_runtime_state( + self.service, + last_inbound_exception_type=err_type, + last_inbound_exception_message=err_msg, + last_inbound_exception_ts=int( + (envelope.get("timestamp") if isinstance(envelope, dict) else 0) + or int(time.time() * 1000) + ), + last_inbound_exception_account=str(payload.get("account") or "").strip(), + last_inbound_exception_source_uuid=envelope_source_uuid, + last_inbound_exception_source_number=envelope_source_number, + last_inbound_exception_envelope_ts=envelope_ts, + last_inbound_exception_envelope_keys=envelope_keys, + last_inbound_exception_payload_excerpt=payload_excerpt, + ) + self.log.warning( + "signal raw-receive exception type=%s message=%s source_uuid=%s source_number=%s envelope_ts=%s", + err_type or "-", + err_msg or "-", + envelope_source_uuid or "-", + envelope_source_number or "-", + envelope_ts or 0, + ) + return + envelope = payload.get("envelope") or {} + if not isinstance(envelope, dict): + return + sync_sent_message = _get_nested(envelope, ("syncMessage", "sentMessage")) or {} + if isinstance(sync_sent_message, dict) and sync_sent_message: + raw_text = sync_sent_message.get("message") + if isinstance(raw_text, dict): + text = str( + raw_text.get("message") + or raw_text.get("text") + or raw_text.get("body") + or "" + ).strip() + else: + text = str(raw_text or "").strip() + + destination_uuid = str( + sync_sent_message.get("destinationUuid") + or sync_sent_message.get("destination") + or "" + ).strip() + destination_number = str( + sync_sent_message.get("destinationNumber") + or sync_sent_message.get("destinationE164") + or sync_sent_message.get("destination") + or "" + ).strip() + identifiers = await self._resolve_signal_identifiers( + destination_uuid, + destination_number, + ) + if not identifiers: + identifiers = await self._auto_link_single_user_signal_identifier( + destination_uuid, + destination_number, + ) + if identifiers and text: + ts_raw = ( + sync_sent_message.get("timestamp") + or envelope.get("timestamp") + or envelope.get("serverReceivedTimestamp") + or int(time.time() * 1000) + ) + try: + ts = int(ts_raw) + except Exception: + ts = int(time.time() * 1000) + source_message_id = str( + envelope.get("serverGuid") + or envelope.get("guid") + or envelope.get("timestamp") + or ts + ).strip() + sender_key = ( + str(getattr(self.client, "bot_uuid", "") or "").strip() + or str(getattr(self.client, "phone_number", "") or "").strip() + or str(payload.get("account") or "").strip() + or "self" + ) + source_chat_id = destination_number or destination_uuid or sender_key + reply_ref = reply_sync.extract_reply_ref(self.service, payload) + for identifier in identifiers: + session = await history.get_chat_session(identifier.user, identifier) + reply_target = await reply_sync.resolve_reply_target( + identifier.user, + session, + reply_ref, + ) + exists = await sync_to_async( + lambda: Message.objects.filter( + user=identifier.user, + session=session, + source_service=self.service, + source_message_id=source_message_id, + ).exists() + )() + if exists: + continue + await history.store_message( + session=session, + sender=sender_key, + text=text, + ts=ts, + outgoing=True, + source_service=self.service, + source_message_id=source_message_id, + source_chat_id=source_chat_id, + reply_to=reply_target, + reply_source_service=str( + reply_ref.get("reply_source_service") or "" + ), + reply_source_message_id=str( + reply_ref.get("reply_source_message_id") or "" + ), + message_meta={}, + ) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + return + if envelope.get("typingMessage") or envelope.get("receiptMessage"): + return + data_message = envelope.get("dataMessage") or {} + if not isinstance(data_message, dict): + return + + source_uuid = str(envelope.get("sourceUuid") or envelope.get("source") or "").strip() + source_number = str(envelope.get("sourceNumber") or "").strip() + bot_uuid = str(getattr(self.client, "bot_uuid", "") or "").strip() + bot_phone = str(getattr(self.client, "phone_number", "") or "").strip() + if source_uuid and bot_uuid and source_uuid == bot_uuid: + return + if source_number and bot_phone and _digits_only(source_number) == _digits_only(bot_phone): + return + + identifiers = await self._resolve_signal_identifiers(source_uuid, source_number) + if not identifiers: + identifiers = await self._auto_link_single_user_signal_identifier( + source_uuid, source_number + ) + if not identifiers: + self.log.warning( + "signal raw-receive unmatched source_uuid=%s source_number=%s text=%s", + source_uuid, + source_number, + str(data_message.get("message") or "")[:160], + ) + return + + reaction_payload = _extract_signal_reaction(envelope) + if isinstance(reaction_payload, dict): + for identifier in identifiers: + try: + await history.apply_reaction( + identifier.user, + identifier, + target_message_id="", + target_ts=int(reaction_payload.get("target_ts") or 0), + emoji=str(reaction_payload.get("emoji") or ""), + source_service="signal", + actor=(source_uuid or source_number or ""), + remove=bool(reaction_payload.get("remove")), + payload=reaction_payload.get("raw") or {}, + ) + except Exception as exc: + self.log.warning("signal raw reaction history apply failed: %s", exc) + try: + await self.ur.xmpp.client.apply_external_reaction( + identifier.user, + identifier, + source_service="signal", + emoji=str(reaction_payload.get("emoji") or ""), + remove=bool(reaction_payload.get("remove")), + upstream_message_id="", + upstream_ts=int(reaction_payload.get("target_ts") or 0), + actor=(source_uuid or source_number or ""), + payload=reaction_payload.get("raw") or {}, + ) + except Exception as exc: + self.log.warning("signal raw reaction relay to XMPP failed: %s", exc) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + return + + text = str(data_message.get("message") or "").strip() + if not text: + return + + ts_raw = ( + envelope.get("timestamp") + or envelope.get("serverReceivedTimestamp") + or int(time.time() * 1000) + ) + try: + ts = int(ts_raw) + except Exception: + ts = int(time.time() * 1000) + source_message_id = str( + envelope.get("serverGuid") + or envelope.get("guid") + or envelope.get("timestamp") + or ts + ).strip() + sender_key = source_uuid or source_number or (identifiers[0].identifier if identifiers else "") + source_chat_id = source_number or source_uuid or sender_key + reply_ref = reply_sync.extract_reply_ref(self.service, payload) + + for identifier in identifiers: + session = await history.get_chat_session(identifier.user, identifier) + reply_target = await reply_sync.resolve_reply_target( + identifier.user, + session, + reply_ref, + ) + exists = await sync_to_async( + lambda: Message.objects.filter( + user=identifier.user, + session=session, + source_service=self.service, + source_message_id=source_message_id, + ).exists() + )() + if exists: + continue + local_message = await history.store_message( + session=session, + sender=sender_key, + text=text, + ts=ts, + outgoing=False, + source_service=self.service, + source_message_id=source_message_id, + source_chat_id=source_chat_id, + reply_to=reply_target, + reply_source_service=str(reply_ref.get("reply_source_service") or ""), + reply_source_message_id=str( + reply_ref.get("reply_source_message_id") or "" + ), + message_meta={}, + ) + await self.ur.message_received( + self.service, + identifier=identifier, + text=text, + ts=ts, + payload=payload, + local_message=local_message, + ) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + + async def _raw_receive_loop(self): + signal_number = str(getattr(settings, "SIGNAL_NUMBER", "") or "").strip() + if not signal_number: + return + uri = f"ws://{SIGNAL_URL}/v1/receive/{signal_number}" + while not self._stopping: + try: + async with websockets.connect(uri, ping_interval=None) as websocket: + async for raw_message in websocket: + await self._process_raw_inbound_event(raw_message) + except asyncio.CancelledError: + raise + except Exception as exc: + self.log.warning("signal raw-receive loop error: %s", exc) + await asyncio.sleep(2) + def start(self): self.log.info("Signal client starting...") self.client._event_loop = self.loop # Start background command processing loop if not self._command_task or self._command_task.done(): self._command_task = self.loop.create_task(self._command_loop()) - self.client.start() + if not self._raw_receive_task or self._raw_receive_task.done(): + self._raw_receive_task = self.loop.create_task(self._raw_receive_loop()) + # Use direct websocket receive loop as primary ingestion path. + # signalbot's internal receive consumer can compete for the same stream + # and starve inbound events in this deployment, so we keep it disabled. diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py index 98b65d9..155cf46 100644 --- a/core/clients/signalapi.py +++ b/core/clients/signalapi.py @@ -1,5 +1,6 @@ import asyncio import base64 +import logging import aiohttp import orjson @@ -7,6 +8,8 @@ import requests from django.conf import settings from rest_framework import status +log = logging.getLogger(__name__) + async def start_typing(uuid): base = getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080").rstrip("/") @@ -70,7 +73,7 @@ async def download_and_encode_base64(file_url, filename, content_type, session=N return None -async def send_message_raw(recipient_uuid, text=None, attachments=None): +async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata=None): """ Sends a message using the Signal REST API, ensuring attachment links are not included in the text body. @@ -90,6 +93,7 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None): "number": settings.SIGNAL_NUMBER, "base64_attachments": [], } + meta = dict(metadata or {}) async def _attachment_to_base64(attachment, session): row = dict(attachment or {}) @@ -132,15 +136,47 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None): if text: data["message"] = text - async with aiohttp.ClientSession() as session: - async with session.post(url, json=data) as response: - response_text = await response.text() - response_status = response.status + quote_timestamp = int(meta.get("quote_timestamp") or 0) + quote_author = str(meta.get("quote_author") or "").strip() + quote_text = str(meta.get("quote_text") or "").strip() + has_quote = quote_timestamp > 0 and bool(quote_author) - if response_status == status.HTTP_201_CREATED: - ts = orjson.loads(response_text).get("timestamp", None) - return ts if ts else False - return False + payloads = [dict(data)] + if has_quote: + flat_quote_payload = dict(data) + flat_quote_payload["quote_timestamp"] = int(quote_timestamp) + flat_quote_payload["quote_author"] = quote_author + if quote_text: + flat_quote_payload["quote_message"] = quote_text + + nested_quote_payload = dict(data) + nested_quote_payload["quote"] = { + "id": int(quote_timestamp), + "author": quote_author, + } + if quote_text: + nested_quote_payload["quote"]["text"] = quote_text + + payloads = [flat_quote_payload, nested_quote_payload, dict(data)] + + async with aiohttp.ClientSession() as session: + for index, payload in enumerate(payloads): + async with session.post(url, json=payload) as response: + response_text = await response.text() + response_status = response.status + if response_status == status.HTTP_201_CREATED: + ts = orjson.loads(response_text).get("timestamp", None) + return ts if ts else False + if index == len(payloads) - 1: + return False + if response_status not in {status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY}: + return False + log.warning( + "signal send quote payload rejected (%s), trying fallback shape: %s", + response_status, + response_text[:200], + ) + return False async def send_reaction( diff --git a/core/clients/transport.py b/core/clients/transport.py index 462deea..41dc8e1 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -1,7 +1,9 @@ import asyncio import base64 import io +import os import secrets +import shutil import time from typing import Any from urllib.parse import quote_plus @@ -479,6 +481,46 @@ def _account_key(value: str) -> str: return raw +def _wipe_signal_cli_local_state() -> bool: + """ + Best-effort local signal-cli state reset for json-rpc deployments where + REST account delete endpoints are unavailable. + """ + config_roots = ( + "/code/signal-cli-config", + "/signal-cli-config", + "/home/.local/share/signal-cli", + ) + removed_any = False + for root in config_roots: + if not os.path.isdir(root): + continue + try: + entries = os.listdir(root) + except Exception: + continue + for entry in entries: + if not entry: + continue + # Keep runtime configuration scaffold; wipe account/pairing state. + if entry in {"jsonrpc2.yml", "jsonrpc.yml"}: + continue + path = os.path.join(root, entry) + if os.path.isdir(path): + try: + shutil.rmtree(path) + removed_any = True + except Exception: + continue + else: + try: + os.remove(path) + removed_any = True + except Exception: + continue + return removed_any + + def unlink_account(service: str, account: str) -> bool: service_key = _service_key(service) account_value = str(account or "").strip() @@ -492,14 +534,18 @@ def unlink_account(service: str, account: str) -> bool: "/" ) target = quote_plus(account_value) + unlinked = False for path in (f"/v1/accounts/{target}", f"/v1/account/{target}"): try: response = requests.delete(f"{base}{path}", timeout=20) if response.ok: - return True + unlinked = True + break except Exception: continue - return False + if unlinked: + return True + return _wipe_signal_cli_local_state() if service_key in {"whatsapp", "instagram"}: state = get_runtime_state(service_key) @@ -715,8 +761,13 @@ async def send_message_raw( prepared_attachments = await prepare_outbound_attachments( service_key, attachments or [] ) - result = await signalapi.send_message_raw(recipient, text, prepared_attachments) meta = dict(metadata or {}) + result = await signalapi.send_message_raw( + recipient, + text, + prepared_attachments, + metadata=meta, + ) xmpp_source_id = str(meta.get("xmpp_source_id") or "").strip() if xmpp_source_id and result: from core.models import PersonIdentifier diff --git a/core/commands/handlers/bp.py b/core/commands/handlers/bp.py index b4d81fb..e0624cd 100644 --- a/core/commands/handlers/bp.py +++ b/core/commands/handlers/bp.py @@ -8,6 +8,7 @@ from django.conf import settings from core.commands.base import CommandContext, CommandHandler, CommandResult from core.commands.delivery import post_status_in_source, post_to_channel_binding +from core.commands.policies import BP_VARIANT_META, load_variant_policy from core.messaging import ai as ai_runner from core.messaging.text_export import plain_text_blob from core.messaging.utils import messages_to_string @@ -18,6 +19,7 @@ from core.models import ( CommandAction, CommandChannelBinding, CommandRun, + CommandVariantPolicy, Message, ) @@ -91,6 +93,45 @@ def _clamp_transcript(transcript: str, max_chars: int) -> str: class BPCommandHandler(CommandHandler): slug = "bp" + def _variant_key_for_text(self, text: str) -> str: + parsed = parse_bp_subcommand(text) + if parsed.command == "set": + return "bp_set" + if parsed.command == "set_range": + return "bp_set_range" + return "bp" + + def _variant_display_name(self, variant_key: str) -> str: + meta = BP_VARIANT_META.get(str(variant_key or "").strip(), {}) + return str(meta.get("name") or variant_key or "bp") + + async def _effective_policy( + self, + *, + profile, + variant_key: str, + action_types: set[str], + ) -> dict: + policy = await sync_to_async(load_variant_policy)(profile, variant_key) + if isinstance(policy, CommandVariantPolicy): + return { + "enabled": bool(policy.enabled), + "generation_mode": str(policy.generation_mode or "verbatim"), + "send_plan_to_egress": bool(policy.send_plan_to_egress) + and ("post_result" in action_types), + "send_status_to_source": bool(policy.send_status_to_source), + "send_status_to_egress": bool(policy.send_status_to_egress), + "store_document": bool(getattr(policy, "store_document", True)), + } + return { + "enabled": True, + "generation_mode": "ai" if variant_key == "bp" else "verbatim", + "send_plan_to_egress": "post_result" in action_types, + "send_status_to_source": str(profile.visibility_mode or "") == "status_in_source", + "send_status_to_egress": False, + "store_document": True, + } + async def _fanout(self, run: CommandRun, text: str) -> dict: profile = run.profile trigger = await sync_to_async( @@ -124,6 +165,39 @@ class BPCommandHandler(CommandHandler): failed_bindings += 1 return {"sent_bindings": sent_bindings, "failed_bindings": failed_bindings} + async def _fanout_status(self, run: CommandRun, text: str) -> dict: + profile = run.profile + trigger = await sync_to_async( + lambda: Message.objects.select_related("session", "user") + .filter(id=run.trigger_message_id) + .first() + )() + if trigger is None: + return {"sent_bindings": 0, "failed_bindings": 0} + bindings = await sync_to_async(list)( + CommandChannelBinding.objects.filter( + profile=profile, + enabled=True, + direction="egress", + ) + ) + sent_bindings = 0 + failed_bindings = 0 + for binding in bindings: + ok = await post_to_channel_binding( + trigger_message=trigger, + binding_service=binding.service, + binding_channel_identifier=binding.channel_identifier, + text=text, + origin_tag=f"bp-status-egress:{run.id}", + command_slug=self.slug, + ) + if ok: + sent_bindings += 1 + else: + failed_bindings += 1 + return {"sent_bindings": sent_bindings, "failed_bindings": failed_bindings} + async def _load_window(self, trigger: Message, anchor: Message) -> list[Message]: return await sync_to_async(list)( Message.objects.filter( @@ -188,7 +262,8 @@ class BPCommandHandler(CommandHandler): trigger: Message, run: CommandRun, profile, - action_types: set[str], + policy: dict, + variant_key: str, parsed: BPParsedCommand, ) -> CommandResult: mode = str(parsed.command or "") @@ -202,23 +277,63 @@ class BPCommandHandler(CommandHandler): await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) return CommandResult(ok=False, status="failed", error=run.error) rows = await self._load_window(trigger, anchor) - content = plain_text_blob(rows) - if not content.strip(): + deterministic_content = plain_text_blob(rows) + if not deterministic_content.strip(): run.status = "failed" run.error = "bp_set_range_empty_content" await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) return CommandResult(ok=False, status="failed", error=run.error) + if str(policy.get("generation_mode") or "verbatim") == "ai": + ai_obj = await sync_to_async(lambda: AI.objects.filter(user=trigger.user).first())() + if ai_obj is None: + run.status = "failed" + run.error = "ai_not_configured" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + prompt = [ + { + "role": "system", + "content": ( + "Transform source chat text into a structured business plan in markdown. " + "Do not reference any user template." + ), + }, + {"role": "user", "content": deterministic_content}, + ] + try: + content = str( + await ai_runner.run_prompt( + prompt, + ai_obj, + operation="command_bp_set_range_extract", + ) + or "" + ).strip() + except Exception as exc: + run.status = "failed" + run.error = f"bp_ai_failed:{exc}" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + if not content: + run.status = "failed" + run.error = "empty_ai_response" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + else: + content = deterministic_content annotation = self._annotation("set_range", len(rows)) - doc = await self._persist_document( - run=run, - trigger=trigger, - profile=profile, - anchor=anchor, - content=content, - mode="set_range", - source_message_ids=[str(row.id) for row in rows], - annotation=annotation, - ) + doc = None + if bool(policy.get("store_document", True)): + doc = await self._persist_document( + run=run, + trigger=trigger, + profile=profile, + anchor=anchor, + content=content, + mode="set_range", + source_message_ids=[str(row.id) for row in rows], + annotation=annotation, + ) elif mode == "set": source_ids: list[str] = [] if anchor is not None and not remainder: @@ -244,17 +359,57 @@ class BPCommandHandler(CommandHandler): await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) return CommandResult(ok=False, status="failed", error=run.error) + if str(policy.get("generation_mode") or "verbatim") == "ai": + ai_obj = await sync_to_async(lambda: AI.objects.filter(user=trigger.user).first())() + if ai_obj is None: + run.status = "failed" + run.error = "ai_not_configured" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + prompt = [ + { + "role": "system", + "content": ( + "Transform source chat text into a structured business plan in markdown. " + "Do not reference any user template." + ), + }, + {"role": "user", "content": content}, + ] + try: + ai_content = str( + await ai_runner.run_prompt( + prompt, + ai_obj, + operation="command_bp_set_extract", + ) + or "" + ).strip() + except Exception as exc: + run.status = "failed" + run.error = f"bp_ai_failed:{exc}" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + if not ai_content: + run.status = "failed" + run.error = "empty_ai_response" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + content = ai_content + annotation = self._annotation("set", 1 if not has_addendum else 2, has_addendum) - doc = await self._persist_document( - run=run, - trigger=trigger, - profile=profile, - anchor=anchor, - content=content, - mode="set", - source_message_ids=source_ids, - annotation=annotation, - ) + doc = None + if bool(policy.get("store_document", True)): + doc = await self._persist_document( + run=run, + trigger=trigger, + profile=profile, + anchor=anchor, + content=content, + mode="set", + source_message_ids=source_ids, + annotation=annotation, + ) else: run.status = "failed" run.error = "bp_unknown_subcommand" @@ -262,31 +417,38 @@ class BPCommandHandler(CommandHandler): return CommandResult(ok=False, status="failed", error=run.error) fanout_stats = {"sent_bindings": 0, "failed_bindings": 0} - if "post_result" in action_types: - fanout_body = f"{doc.content_markdown}\n\n{doc.structured_payload.get('annotation', '')}".strip() + if bool(policy.get("send_plan_to_egress")): + fanout_body = f"{content}\n\n{annotation}".strip() fanout_stats = await self._fanout(run, fanout_body) - if "status_in_source" == profile.visibility_mode: - status_text = ( - f"[bp] {doc.structured_payload.get('annotation', '').strip()} " - f"Saved as {doc.title}." - ).strip() - sent_count = int(fanout_stats.get("sent_bindings") or 0) - failed_count = int(fanout_stats.get("failed_bindings") or 0) - if sent_count or failed_count: - status_text += f" · fanout sent:{sent_count}" - if failed_count: - status_text += f" failed:{failed_count}" + sent_count = int(fanout_stats.get("sent_bindings") or 0) + failed_count = int(fanout_stats.get("failed_bindings") or 0) + status_text = ( + f"[bp:{self._variant_display_name(variant_key)}:{policy.get('generation_mode')}] " + f"{annotation.strip()} " + f"{'Saved as ' + doc.title + ' · ' if doc else 'Not saved (store_document disabled) · '}" + f"fanout sent:{sent_count}" + ).strip() + if failed_count: + status_text += f" failed:{failed_count}" + + if bool(policy.get("send_status_to_source")): await post_status_in_source( trigger_message=trigger, text=status_text, origin_tag=f"bp-status:{trigger.id}", ) + if bool(policy.get("send_status_to_egress")): + await self._fanout_status(run, status_text) run.status = "ok" run.error = "" await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) - return CommandResult(ok=True, status="ok", payload={"document_id": str(doc.id)}) + return CommandResult( + ok=True, + status="ok", + payload={"document_id": str(doc.id) if doc else ""}, + ) async def _execute_legacy_ai( self, @@ -294,8 +456,8 @@ class BPCommandHandler(CommandHandler): trigger: Message, run: CommandRun, profile, - action_types: set[str], - ctx: CommandContext, + policy: dict, + variant_key: str, ) -> CommandResult: if trigger.reply_to_id is None: run.status = "failed" @@ -322,69 +484,90 @@ class BPCommandHandler(CommandHandler): template_text = profile.template_text or default_template max_template_chars = int(getattr(settings, "BP_MAX_TEMPLATE_CHARS", 5000) or 5000) template_text = str(template_text or "")[:max_template_chars] - ai_obj = await sync_to_async(lambda: AI.objects.filter(user=trigger.user).first())() - if ai_obj is None: - run.status = "failed" - run.error = "ai_not_configured" - await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) - return CommandResult(ok=False, status="failed", error=run.error) + generation_mode = str(policy.get("generation_mode") or "ai") + if generation_mode == "verbatim": + summary = plain_text_blob(rows) + if not summary.strip(): + run.status = "failed" + run.error = "bp_verbatim_empty_content" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) + else: + ai_obj = await sync_to_async(lambda: AI.objects.filter(user=trigger.user).first())() + if ai_obj is None: + run.status = "failed" + run.error = "ai_not_configured" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) - prompt = [ - {"role": "system", "content": _bp_system_prompt()}, - { - "role": "user", - "content": ( - "Template:\n" - f"{template_text}\n\n" - "Messages:\n" - f"{transcript}" - ), - }, - ] - try: - summary = str(await ai_runner.run_prompt(prompt, ai_obj, operation="command_bp_extract") or "").strip() - if not summary: - raise RuntimeError("empty_ai_response") - except Exception as exc: - run.status = "failed" - run.error = f"bp_ai_failed:{exc}" - await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) - return CommandResult(ok=False, status="failed", error=run.error) + prompt = [ + {"role": "system", "content": _bp_system_prompt()}, + { + "role": "user", + "content": ( + "Template:\n" + f"{template_text}\n\n" + "Messages:\n" + f"{transcript}" + ), + }, + ] + try: + summary = str(await ai_runner.run_prompt(prompt, ai_obj, operation="command_bp_extract") or "").strip() + if not summary: + raise RuntimeError("empty_ai_response") + except Exception as exc: + run.status = "failed" + run.error = f"bp_ai_failed:{exc}" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="failed", error=run.error) annotation = self._annotation("legacy", len(rows)) - document = await self._persist_document( - run=run, - trigger=trigger, - profile=profile, - anchor=anchor, - content=summary, - mode="legacy_ai", - source_message_ids=[str(row.id) for row in rows], - annotation=annotation, - ) + document = None + if bool(policy.get("store_document", True)): + document = await self._persist_document( + run=run, + trigger=trigger, + profile=profile, + anchor=anchor, + content=summary, + mode="legacy_ai", + source_message_ids=[str(row.id) for row in rows], + annotation=annotation, + ) fanout_stats = {"sent_bindings": 0, "failed_bindings": 0} - if "post_result" in action_types: + if bool(policy.get("send_plan_to_egress")): fanout_stats = await self._fanout(run, summary) - if "status_in_source" == profile.visibility_mode: - status_text = f"[bp] Generated business plan: {document.title}" - sent_count = int(fanout_stats.get("sent_bindings") or 0) - failed_count = int(fanout_stats.get("failed_bindings") or 0) - if sent_count or failed_count: - status_text += f" · fanout sent:{sent_count}" - if failed_count: - status_text += f" failed:{failed_count}" + sent_count = int(fanout_stats.get("sent_bindings") or 0) + failed_count = int(fanout_stats.get("failed_bindings") or 0) + status_text = ( + f"[bp:{self._variant_display_name(variant_key)}:{generation_mode}] " + f"Generated business plan: " + f"{document.title if document else 'not saved (store_document disabled)'} " + f"· fanout sent:{sent_count}" + ) + if failed_count: + status_text += f" failed:{failed_count}" + + if bool(policy.get("send_status_to_source")): await post_status_in_source( trigger_message=trigger, text=status_text, origin_tag=f"bp-status:{trigger.id}", ) + if bool(policy.get("send_status_to_egress")): + await self._fanout_status(run, status_text) run.status = "ok" run.error = "" await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) - return CommandResult(ok=True, status="ok", payload={"document_id": str(document.id)}) + return CommandResult( + ok=True, + status="ok", + payload={"document_id": str(document.id) if document else ""}, + ) async def execute(self, ctx: CommandContext) -> CommandResult: trigger = await sync_to_async( @@ -418,13 +601,26 @@ class BPCommandHandler(CommandHandler): run.error = "" await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + variant_key = self._variant_key_for_text(ctx.message_text) + policy = await self._effective_policy( + profile=profile, + variant_key=variant_key, + action_types=action_types, + ) + if not bool(policy.get("enabled")): + run.status = "skipped" + run.error = f"variant_disabled:{variant_key}" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + return CommandResult(ok=False, status="skipped", error=run.error) + parsed = parse_bp_subcommand(ctx.message_text) if parsed.command and bool(getattr(settings, "BP_SUBCOMMANDS_V1", True)): return await self._execute_set_or_range( trigger=trigger, run=run, profile=profile, - action_types=action_types, + policy=policy, + variant_key=variant_key, parsed=parsed, ) @@ -432,6 +628,6 @@ class BPCommandHandler(CommandHandler): trigger=trigger, run=run, profile=profile, - action_types=action_types, - ctx=ctx, + policy=policy, + variant_key=variant_key, ) diff --git a/core/commands/policies.py b/core/commands/policies.py new file mode 100644 index 0000000..107f05d --- /dev/null +++ b/core/commands/policies.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from typing import Iterable + +from core.models import CommandAction, CommandProfile, CommandVariantPolicy + +BP_VARIANT_KEYS = ("bp", "bp_set", "bp_set_range") +BP_VARIANT_META = { + "bp": { + "name": "bp", + "trigger_token": "#bp#", + "template_supported": True, + "position": 0, + }, + "bp_set": { + "name": "bp set", + "trigger_token": "#bp set#", + "template_supported": False, + "position": 1, + }, + "bp_set_range": { + "name": "bp set range", + "trigger_token": "#bp set range#", + "template_supported": False, + "position": 2, + }, +} + + +def _legacy_defaults(profile: CommandProfile, post_result_enabled: bool) -> dict: + return { + "enabled": True, + "generation_mode": "ai", + "send_plan_to_egress": bool(post_result_enabled), + "send_status_to_source": str(profile.visibility_mode or "") == "status_in_source", + "send_status_to_egress": False, + "store_document": True, + } + + +def _bp_defaults( + profile: CommandProfile, + variant_key: str, + post_result_enabled: bool, +) -> dict: + defaults = _legacy_defaults(profile, post_result_enabled) + if variant_key in {"bp_set", "bp_set_range"}: + defaults["generation_mode"] = "verbatim" + else: + defaults["generation_mode"] = "ai" + return defaults + + +def ensure_variant_policies_for_profile( + profile: CommandProfile, + *, + action_rows: Iterable[CommandAction] | None = None, +) -> dict[str, CommandVariantPolicy]: + actions = list(action_rows) if action_rows is not None else list(profile.actions.all()) + post_result_enabled = any( + row.action_type == "post_result" and bool(row.enabled) for row in actions + ) + result: dict[str, CommandVariantPolicy] = {} + + if str(profile.slug or "").strip() == "bp": + for key in BP_VARIANT_KEYS: + meta = BP_VARIANT_META.get(key, {}) + defaults = _bp_defaults(profile, key, post_result_enabled) + policy, _ = CommandVariantPolicy.objects.get_or_create( + profile=profile, + variant_key=key, + defaults={ + **defaults, + "position": int(meta.get("position") or 0), + }, + ) + result[key] = policy + else: + defaults = _legacy_defaults(profile, post_result_enabled) + policy, _ = CommandVariantPolicy.objects.get_or_create( + profile=profile, + variant_key="default", + defaults={ + **defaults, + "generation_mode": "verbatim", + "position": 0, + }, + ) + result["default"] = policy + + return result + + +def load_variant_policy(profile: CommandProfile, variant_key: str) -> CommandVariantPolicy | None: + key = str(variant_key or "").strip() + if not key: + return None + policy = ( + profile.variant_policies.filter(variant_key=key) + .order_by("position", "id") + .first() + ) + if policy is not None: + return policy + ensured = ensure_variant_policies_for_profile(profile) + return ensured.get(key) diff --git a/core/messaging/reply_sync.py b/core/messaging/reply_sync.py index 54d204f..99473fe 100644 --- a/core/messaging/reply_sync.py +++ b/core/messaging/reply_sync.py @@ -56,18 +56,83 @@ def _find_origin_tag(value: Any, depth: int = 0) -> str: def _extract_signal_reply(raw_payload: dict[str, Any]) -> dict[str, str]: envelope = _as_dict((raw_payload or {}).get("envelope")) - data_message = _as_dict( - envelope.get("dataMessage") - or envelope.get("syncMessage", {}).get("sentMessage", {}).get("message") + sync_message = _as_dict(envelope.get("syncMessage")) + sent_message = _as_dict(sync_message.get("sentMessage")) + data_candidates = [ + _as_dict(envelope.get("dataMessage")), + _as_dict(sent_message.get("message")), + _as_dict(sent_message), + _as_dict((raw_payload or {}).get("dataMessage")), + _as_dict(raw_payload), + ] + quote_key_candidates = ( + "id", + "targetSentTimestamp", + "targetTimestamp", + "quotedMessageId", + "quoted_message_id", + "quotedMessageID", + "messageId", + "message_id", + "timestamp", ) - quote = _as_dict(data_message.get("quote")) - quote_id = _clean(quote.get("id")) - if quote_id: - return { - "reply_source_message_id": quote_id, - "reply_source_service": "signal", - "reply_source_chat_id": "", - } + quote_author_candidates = ( + "author", + "authorUuid", + "authorAci", + "authorNumber", + "source", + "sourceNumber", + "sourceUuid", + ) + quote_candidates: list[dict[str, Any]] = [] + for data_message in data_candidates: + if not data_message: + continue + direct_quote = _as_dict(data_message.get("quote") or data_message.get("Quote")) + if direct_quote: + quote_candidates.append(direct_quote) + + stack = [data_message] + while stack: + current = stack.pop() + if not isinstance(current, dict): + continue + for key, value in current.items(): + if isinstance(value, dict): + key_text = str(key or "").strip().lower() + if "quote" in key_text or "reply" in key_text: + quote_candidates.append(_as_dict(value)) + stack.append(value) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + stack.append(item) + + for quote in quote_candidates: + quote_id = "" + for key in quote_key_candidates: + quote_id = _clean(quote.get(key)) + if quote_id: + break + if not quote_id: + nested = _as_dict(quote.get("id")) + if nested: + for key in quote_key_candidates: + quote_id = _clean(nested.get(key)) + if quote_id: + break + if quote_id: + reply_chat_id = "" + for key in quote_author_candidates: + reply_chat_id = _clean(quote.get(key)) + if reply_chat_id: + break + return { + "reply_source_message_id": quote_id, + "reply_source_service": "signal", + "reply_source_chat_id": reply_chat_id, + } return {} diff --git a/core/migrations/0031_commandvariantpolicy.py b/core/migrations/0031_commandvariantpolicy.py new file mode 100644 index 0000000..74ddbad --- /dev/null +++ b/core/migrations/0031_commandvariantpolicy.py @@ -0,0 +1,107 @@ +# Generated by Django 5.2.11 on 2026-03-02 14:17 + +import django.db.models.deletion +from django.db import migrations, models + + +def _backfill_variant_policies(apps, schema_editor): + CommandProfile = apps.get_model("core", "CommandProfile") + CommandAction = apps.get_model("core", "CommandAction") + CommandVariantPolicy = apps.get_model("core", "CommandVariantPolicy") + + for profile in CommandProfile.objects.all().iterator(): + actions = list(CommandAction.objects.filter(profile=profile)) + post_result_enabled = any( + str(getattr(row, "action_type", "")) == "post_result" + and bool(getattr(row, "enabled", False)) + for row in actions + ) + send_status_to_source = ( + str(getattr(profile, "visibility_mode", "") or "") == "status_in_source" + ) + if str(getattr(profile, "slug", "") or "") == "bp": + rows = ( + ("bp", "ai", 0), + ("bp_set", "verbatim", 1), + ("bp_set_range", "verbatim", 2), + ) + else: + rows = (("default", "verbatim", 0),) + + for key, generation_mode, position in rows: + CommandVariantPolicy.objects.get_or_create( + profile=profile, + variant_key=key, + defaults={ + "enabled": True, + "generation_mode": generation_mode, + "send_plan_to_egress": bool(post_result_enabled), + "send_status_to_source": bool(send_status_to_source), + "send_status_to_egress": False, + "position": int(position), + }, + ) + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0030_chattasksource_settings"), + ] + + operations = [ + migrations.CreateModel( + name="CommandVariantPolicy", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("variant_key", models.CharField(default="default", max_length=64)), + ("enabled", models.BooleanField(default=True)), + ( + "generation_mode", + models.CharField( + choices=[("ai", "AI"), ("verbatim", "Verbatim")], + default="verbatim", + max_length=32, + ), + ), + ("send_plan_to_egress", models.BooleanField(default=True)), + ("send_status_to_source", models.BooleanField(default=True)), + ("send_status_to_egress", models.BooleanField(default=False)), + ("position", models.PositiveIntegerField(default=0)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "profile", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="variant_policies", + to="core.commandprofile", + ), + ), + ], + options={ + "ordering": ["position", "id"], + "indexes": [ + models.Index( + fields=["profile", "enabled", "variant_key"], + name="core_comman_profile_7913f5_idx", + ) + ], + "constraints": [ + models.UniqueConstraint( + fields=("profile", "variant_key"), + name="unique_command_variant_policy_per_profile", + ) + ], + }, + ), + migrations.RunPython(_backfill_variant_policies, migrations.RunPython.noop), + ] diff --git a/core/migrations/0032_commandvariantpolicy_store_document.py b/core/migrations/0032_commandvariantpolicy_store_document.py new file mode 100644 index 0000000..b467bff --- /dev/null +++ b/core/migrations/0032_commandvariantpolicy_store_document.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.11 on 2026-03-02 17:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0031_commandvariantpolicy'), + ] + + operations = [ + migrations.AddField( + model_name='commandvariantpolicy', + name='store_document', + field=models.BooleanField(default=True), + ), + ] diff --git a/core/models.py b/core/models.py index d4a98e0..a97c5fa 100644 --- a/core/models.py +++ b/core/models.py @@ -1725,6 +1725,45 @@ class CommandChannelBinding(models.Model): ] +class CommandVariantPolicy(models.Model): + GENERATION_MODE_CHOICES = ( + ("ai", "AI"), + ("verbatim", "Verbatim"), + ) + + profile = models.ForeignKey( + CommandProfile, + on_delete=models.CASCADE, + related_name="variant_policies", + ) + variant_key = models.CharField(max_length=64, default="default") + enabled = models.BooleanField(default=True) + generation_mode = models.CharField( + max_length=32, + choices=GENERATION_MODE_CHOICES, + default="verbatim", + ) + send_plan_to_egress = models.BooleanField(default=True) + send_status_to_source = models.BooleanField(default=True) + send_status_to_egress = models.BooleanField(default=False) + store_document = models.BooleanField(default=True) + position = models.PositiveIntegerField(default=0) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + ordering = ["position", "id"] + constraints = [ + models.UniqueConstraint( + fields=["profile", "variant_key"], + name="unique_command_variant_policy_per_profile", + ) + ] + indexes = [ + models.Index(fields=["profile", "enabled", "variant_key"]), + ] + + class CommandAction(models.Model): ACTION_CHOICES = ( ("extract_bp", "Extract Business Plan"), diff --git a/core/tasks/engine.py b/core/tasks/engine.py index 8b23d9c..a85fc66 100644 --- a/core/tasks/engine.py +++ b/core/tasks/engine.py @@ -9,6 +9,7 @@ from core.clients.transport import send_message_raw from core.messaging import ai as ai_runner from core.models import ( AI, + Chat, ChatTaskSource, DerivedTask, DerivedTaskEvent, @@ -30,18 +31,43 @@ def _channel_variants(service: str, channel: str) -> list[str]: if not value: return [] variants = [value] - if str(service or "").strip().lower() == "whatsapp": + service_key = str(service or "").strip().lower() + if service_key == "whatsapp": bare = value.split("@", 1)[0].strip() if bare and bare not in variants: variants.append(bare) + direct = f"{bare}@s.whatsapp.net" if bare else "" + if direct and direct not in variants: + variants.append(direct) group = f"{bare}@g.us" if bare else "" if group and group not in variants: variants.append(group) + if service_key == "signal": + digits = re.sub(r"[^0-9]", "", value) + if digits and digits not in variants: + variants.append(digits) + if digits: + plus = f"+{digits}" + if plus not in variants: + variants.append(plus) return variants async def _resolve_source_mappings(message: Message) -> list[ChatTaskSource]: variants = _channel_variants(message.source_service or "", message.source_chat_id or "") + if str(message.source_service or "").strip().lower() == "signal": + signal_value = str(message.source_chat_id or "").strip() + if signal_value: + companions = await sync_to_async(list)( + Chat.objects.filter(source_uuid=signal_value).values_list("source_number", flat=True) + ) + companions += await sync_to_async(list)( + Chat.objects.filter(source_number=signal_value).values_list("source_uuid", flat=True) + ) + for candidate in companions: + for expanded in _channel_variants("signal", str(candidate or "").strip()): + if expanded and expanded not in variants: + variants.append(expanded) if not variants: return [] return await sync_to_async(list)( diff --git a/core/templates/base.html b/core/templates/base.html index 9c35a0d..ecaeaae 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -398,6 +398,9 @@ Command Routing + + Task Settings + Translation diff --git a/core/templates/pages/command-routing.html b/core/templates/pages/command-routing.html index cab8958..abe3fe2 100644 --- a/core/templates/pages/command-routing.html +++ b/core/templates/pages/command-routing.html @@ -4,30 +4,44 @@

Command Routing

-

Manage command profiles, channel bindings, business-plan outputs, and translation bridges.

+

Configure commands, channel bindings, and per-command delivery in a predictable way.

+ {% if scope_service and scope_identifier %} +
+ Scoped to this chat only: {{ scope_service }} · {{ scope_identifier }} +
+ {% endif %}

Create Command Profile

-

Create reusable command behavior. Example: #bp# reply command for business-plan extraction.

+

Create reusable command behavior. bp set and bp set range are fixed bp subcommands and will appear automatically.

{% csrf_token %} -
-
- - -

Stable command id, e.g. bp.

+ {% if scope_service and scope_identifier %} + + + {% endif %} +
+
+ +
+ +
-
+
-
- - +
+ +
- + @@ -37,53 +51,33 @@

{{ profile.name }} ({{ profile.slug }})

-

Flag Definitions

+

Help

    -
  • enabled: master on/off switch for this command profile.
  • -
  • reply required: command only runs when the trigger message is sent as a reply to another message.
  • -
  • exact match: message text must be exactly the trigger token (for example #bp#) with no extra text.
  • -
  • visibility = status_in_source: post command status updates back into the source channel.
  • -
  • visibility = silent: do not post status updates in the source channel.
  • -
  • binding direction ingress: channels where trigger messages are accepted.
  • -
  • binding direction egress: channels where command outputs are posted.
  • -
  • binding direction scratchpad_mirror: scratchpad/mirror channel used for relay-only behavior.
  • -
  • action extract_bp: run AI extraction to produce business plan content.
  • -
  • action save_document: save/editable document and revision history.
  • -
  • action post_result: fan out generated result to enabled egress bindings.
  • -
  • position: execution order (lower runs first).
  • +
  • Send plan to egress: posts generated plan to enabled egress bindings.
  • +
  • Send status to source: posts a short confirmation message in the source chat.
  • +
  • Send status to egress: posts a short confirmation to egress channels.
  • +
  • Template support: only bp uses the template, and only in AI mode.
- {% if profile.slug == "bp" %} -

Supported Triggers (BP)

-
    -
  • #bp#: primary BP trigger (uses the standard BP extraction flow).
  • -
  • #bp set#: deterministic no-AI set/update from reply/addendum text.
  • -
  • #bp set range#: deterministic no-AI set/update from reply-anchor to trigger range.
  • -
- {% endif %}
+
{% csrf_token %} + {% if scope_service and scope_identifier %} + + + {% endif %}
-
- - +
+ +
-
- -
- -
-
-
+
Flags @@ -93,26 +87,148 @@
- +
-
+
+
+

Variant Policies

+

Delivery switches control where plan/status are posted. Egress bindings define destinations.

+

Turn off Save Document to run/fanout without storing a business plan artifact.

+ + + + + + + + + + + + + + + + {% for variant in profile.variant_rows %} + + + {% csrf_token %} + + + + {% if scope_service and scope_identifier %} + + + {% endif %} + + + + + + + + + + + + {% if variant.warn_verbatim_plan %} + + + + {% endif %} + {% empty %} + + {% endfor %} + +
VariantTriggerEnabledGenerationSave DocumentPlan -> EgressStatus -> SourceStatus -> Egress
+ {{ variant.variant_label }} + {% if not variant.template_supported %} + no template + {% endif %} + {{ variant.trigger_token }} +
+ +
+
+

+ Warning: {{ variant.variant_label }} is in verbatim mode with plan fanout enabled. + Recipients will get raw transcript-style output. +

+
No variants configured.
+ +
+
+ {% csrf_token %} + + + {% if scope_service and scope_identifier %} + + + {% endif %} + +
+
+ {% csrf_token %} + + + {% if scope_service and scope_identifier %} + + + {% endif %} + +
+
+ +

Effective Destinations

+ {% if profile.enabled_egress_bindings %} +
    + {% for row in profile.enabled_egress_bindings %} +
  • {{ row.service }} · {{ row.channel_identifier }}
  • + {% endfor %} +
+

{{ profile.enabled_egress_bindings|length }} enabled egress destination{{ profile.enabled_egress_bindings|length|pluralize }}.

+ {% else %} +
No enabled egress destinations. Plan fanout will show sent:0.
+ {% endif %} + + {% if preview_profile_id and preview_profile_id == profile.id|stringformat:"s" %} +
+

Dry Run Preview

+
    + {% for variant in profile.variant_rows %} +
  • + {{ variant.variant_label }}: {% if variant.row.enabled %}enabled{% else %}disabled{% endif %}, mode={{ variant.row.generation_mode }}, + save_document={{ variant.row.store_document }}, + plan->egress={{ variant.row.send_plan_to_egress }}, + status->source={{ variant.row.send_status_to_source }}, + status->egress={{ variant.row.send_status_to_egress }} +
  • + {% endfor %} +
+
+ {% endif %} +
+

Channel Bindings

-

A command runs only when the source channel is in ingress. Output is sent to all enabled egress bindings.

+

Ingress accepts triggers. Egress receives plan/status fanout if enabled in variant policy.

- {% for binding in profile.channel_bindings.all %} + {% for binding in profile.visible_bindings %} @@ -123,6 +239,10 @@ {% csrf_token %} + {% if scope_service and scope_identifier %} + + + {% endif %} @@ -136,6 +256,10 @@ {% csrf_token %} + {% if scope_service and scope_identifier %} + + + {% endif %}
@@ -144,7 +268,7 @@ {% for value in directions %} @@ -155,16 +279,19 @@
- {% for value in channel_services %} - + {% endfor %}
+ {% if scope_service %} + + {% endif %}
- +
@@ -172,13 +299,15 @@
+
+

Actions

-

Enable/disable each step and set execution order with position.

+

Enable/disable each step and use the reorder capsule to change execution order.

DirectionServiceChannelActions
{% if binding.direction == "ingress" %}Ingress (Accept Triggers) - {% elif binding.direction == "egress" %}Egress (Post Results) + {% elif binding.direction == "egress" %}Egress (Delivery Destination) {% else %}Scratchpad Mirror {% endif %}
- + {% for action_row in profile.actions.all %} @@ -191,28 +320,41 @@ {% endif %} - +
TypeEnabledOrderActions
TypeEnabledReorderActions
{{ action_row.enabled }}{{ forloop.counter }} -
-
+ + {% csrf_token %} - + {% if scope_service and scope_identifier %} + + + {% endif %} + -
+ {% csrf_token %} - + {% if scope_service and scope_identifier %} + + + {% endif %} +
-
+ +
{% csrf_token %} + {% if scope_service and scope_identifier %} + + + {% endif %}
@@ -230,6 +372,10 @@ {% csrf_token %} + {% if scope_service and scope_identifier %} + + + {% endif %} @@ -261,4 +407,34 @@ + {% endblock %} diff --git a/core/templates/pages/tasks-settings.html b/core/templates/pages/tasks-settings.html index 06a4461..4f46aa9 100644 --- a/core/templates/pages/tasks-settings.html +++ b/core/templates/pages/tasks-settings.html @@ -3,402 +3,345 @@

Task Settings

-

Configure task derivation, chat mapping, completion parsing, and external sync behavior.

+

Project defaults flow into channel overrides. Use Quick Setup for normal operation; open Advanced Setup for full controls.

-
-

Setting Definitions

+
-

Projects: top-level containers for derived tasks. A single group can map to any project.

-

Epics: optional sub-grouping inside a project. Use these for parallel workstreams in the same project.

-

Group Mapping: binds a chat channel (service + channel identifier) to a project and optional epic. Task extraction only runs where mappings exist.

-

Matching Hierarchy: channel mapping flags override project flags. Project flags are defaults; mapping flags are per-chat precision controls.

-

False-Positive Controls: defaults are safe: match_mode=strict, require_prefix=true, and prefixes task:/todo:. Freeform matching is off by default.

-

Task ID Announcements: when enabled, newly derived tasks post an in-chat confirmation containing the new task reference (for example #17). Default is off.

-

Legacy Backfill: opening this page applies safe defaults to older project and mapping rows created before strict prefix-only matching.

-

Completion Phrases: explicit trigger words used to detect completion markers like done #12, completed #12, fixed #12.

-

Provider: external sync adapter toggle. In current setup, mock provider validates append-only sync flow and retry behavior.

-

Sync Event Log: audit of provider sync attempts and outcomes. Retry replays the event without mutating immutable task source records.

-
-
- - {% if prefill_service and prefill_identifier %} -
-

Quick Setup For Current Chat

-

Prefilled from compose for {{ prefill_service }} · {{ prefill_identifier }}. Create/update project + epic + channel mapping in one step.

-
- {% csrf_token %} - - - - - -
-
- - -
-
- - -
-
- -
- -
-
-
- - -
-
- - - - - -
-
- {% endif %} - -
-
-
-

Projects

-

Create project scopes used by group mappings and derived tasks.

-
- {% csrf_token %} - - - -
- - -
-
- -
- -
-
-
- - -
- - - - - -
-
    - {% for row in projects %} -
  • - {{ row.name }} - - mode={{ row.settings_effective.match_mode }}, - prefixes={{ row.allowed_prefixes_csv }}, - require_prefix={{ row.settings_effective.require_prefix }}, - announce_id={{ row.settings_effective.announce_task_id }} - -
  • - {% empty %} -
  • No projects.
  • - {% endfor %} -
-
-
- -
-
-

Epics

-

Create project-local epics to refine routing and reporting.

-
- {% csrf_token %} - - - -
- -
- -
-
-
- - -
- -
-
-
- -
-
-

Group Mapping (Chat -> Project/Epic)

-

Each mapped group becomes eligible for derived task extraction and completion tracking.

-
- {% csrf_token %} - - - -
-
- -
- -
-
-
- - -
-
- -
- -
-
-
- -
- -
-
-
- -
-
-
-
- -
- -
-
-
- - -
-
- - -
-
- - - - - -
- - - - {% for row in sources %} - - - - - - - - {% empty %} - - {% endfor %} - -
ChatProjectEpicMatchAnnounce
{{ row.service }} · {{ row.channel_identifier }}{{ row.project.name }}{{ row.epic.name }}{{ row.settings_effective.match_mode }}{% if row.settings_effective.require_prefix %} +prefix{% endif %}{{ row.settings_effective.announce_task_id }}
No mappings.
-
-
- -
-
-

Project Matching Flags

-

Project defaults apply to all mapped chats unless channel-level override changes them.

-
- {% csrf_token %} - - - -
- -
- -
-
-
- -
- -
-
-
- - -
-
- - -
- - - - - -
-
- -
-

Channel Override Flags

-

These flags override project defaults for one mapped chat only.

-
- {% csrf_token %} - - - -
- -
- -
-
-
- -
- -
-
-
- - -
-
- - -
- - - - - - -
-
-
- -
-
-

Completion Phrases

-

Add parser phrases for completion statements followed by a task reference, e.g. done #12.

-
- {% csrf_token %} - - - -
- - -
- -
-
    - {% for row in patterns %}
  • {{ row.phrase }}
  • {% empty %}
  • No phrases.
  • {% endfor %} -
-
-
- -
-
-

Provider

-

Enable/disable external sync adapter and review recent provider event outcomes.

-
- {% csrf_token %} - - - - - - -
- - - - {% for row in sync_events %} - - - - - - - {% empty %} - - {% endfor %} - -
UpdatedProviderStatus
{{ row.updated_at }}{{ row.provider }}{{ row.status }} -
- {% csrf_token %} - - - - - -
-
No sync events.
-
+

How Matching Works

+

Safe default behavior: strict matching, required prefixes, completion parsing enabled, and task-id announcements disabled.

+

Hierarchy: Project flags are defaults. A mapped channel can override those defaults without changing project-wide behavior.

+

Matching modes: strict (prefix only), balanced (prefix + limited hints), broad (more permissive, higher false-positive risk).

+ +
+

Quick Setup

+

Creates or updates project + optional epic + channel mapping in one submission.

+

After setup, view tasks in Tasks Hub{% if prefill_service and prefill_identifier %} or this group task view{% endif %}.

+
+ {% csrf_token %} + +
+
+ +
+ +
+

Platform to watch for task extraction.

+
+
+ + +

Exact chat/group id where messages are monitored.

+
+
+ + +

Top-level container for derived tasks.

+
+
+ + +

Optional sub-container within a project.

+
+
+ +
+
+ +
+ +
+

strict = safest, balanced = moderate, broad = permissive.

+
+
+ + +

Click to add: + + + +

+
+
+ + +

Minimum length after prefix.

+
+
+ +
+ + + + + +
+

+ Require Prefix: only prefixed messages can create tasks. + Derivation Enabled: master on/off for extraction. + Completion Enabled: parse completion phrases like done #12. + AI Title Enabled: normalize task titles using AI. + Announce Task ID: send bot confirmation on creation. +

+ + +
+
+ +
+ Advanced Setup +

Manual controls for creating hierarchy entities, mapping channels, and overriding behavior.

+ +
+
+
+

Projects

+

Create projects and review their effective defaults.

+
+ {% csrf_token %} + +
+
+
+
+

Project names should describe a long-running stream of work.

+
+ + + + {% for row in projects %} + + + + + {% empty %} + + {% endfor %} + +
ProjectDefaults
{{ row.name }}mode={{ row.settings_effective.match_mode }}, prefixes={{ row.allowed_prefixes_csv }}, announce_id={{ row.settings_effective.announce_task_id }}
No projects.
+
+
+ +
+
+

Epics

+

Epics are optional subdivisions under a project.

+
+ {% csrf_token %} + +
+ +
+
+
+
+ +
+
+
+
+
+

Choose the parent project first, then add the epic name.

+
+
+
+ +
+
+

Group Mapping

+

Map a channel to a project/epic. Channel flags can later override project defaults.

+
+ {% csrf_token %} + +
+
+
+ +
+
+
+

Service/platform for this mapping.

+
+
+
+
+ +
+ +
+

Exact identifier for the chat/group.

+
+
+
+
+ +
+
+
+

Project receiving derived tasks.

+
+
+
+
+ +
+
+
+

Optional epic within that project.

+
+
+
+
+
+ + + + {% for row in sources %} + + + + + + + + + {% empty %} + + {% endfor %} + +
ChatProjectEpicMatchAnnounce
{{ row.service }} · {{ row.channel_identifier }}{{ row.project.name }}{{ row.epic.name }}{{ row.settings_effective.match_mode }}{% if row.settings_effective.require_prefix %} +prefix{% endif %}{{ row.settings_effective.announce_task_id }} +
+ {% csrf_token %} + + + +
+
No mappings.
+
+
+ +
+
+

Project Defaults (All Mapped Chats)

+

Set baseline extraction behavior for a project. Every mapped chat inherits this unless overridden below.

+
+ {% csrf_token %} + +
+
+
+
+
+ + + + +
+

+ + + +

+ +
+

+ Require Prefix: allow task creation only with configured prefixes. + Derivation Enabled: turn extraction on/off for this project. + Completion Enabled: enable completion phrase parser. + Announce Task ID: emit confirmation messages on task creation. +

+
+
+ +
+
+

Channel Override Flags

+

Channel-level override. Use only where this chat should behave differently from the project default.

+
+ {% csrf_token %} + +
+
+
+
+
+ + + + + +
+

+ + + +

+ +
+

+ Require Prefix: enforce prefixes in this channel. + Derivation Enabled: extraction on/off for this channel only. + Completion Enabled: completion phrase parser in this channel. + AI Title Enabled: AI title normalization in this channel. + Announce Task ID: confirmation message in this channel. +

+
+
+ +
+
+

Completion Phrases

+

Add parser phrases for completion statements followed by a task reference, e.g. done #12.

+
+ {% csrf_token %} + +
+
+
+
+
+
    {% for row in patterns %}
  • {{ row.phrase }}
  • {% empty %}
  • No phrases.
  • {% endfor %}
+
+
+ +
+
+

Provider

+

Controls outbound sync to external tracking systems. If disabled, tasks are still derived and visible inside GIA only.

+
+ {% csrf_token %} + + + +

Mock provider logs sync events without writing to a real third-party system.

+
+ +
+
+

Browse all derived tasks in Tasks Hub.

+
+
+
+
+ + {% endblock %} diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index 71427eb..a0e1199 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -81,20 +81,33 @@ Commands
{% if show_contact_actions %} @@ -97,9 +99,15 @@ {% endfor %}
+ {% if account_unlink_label == "Relink" %} +

+ Relink flow: click Relink on the current account, then use + Add account below to generate and scan a fresh QR code. +

+ {% endif %}
{% csrf_token %} diff --git a/core/templates/partials/signal-chats-list.html b/core/templates/partials/signal-chats-list.html index d8e4728..d7280e3 100644 --- a/core/templates/partials/signal-chats-list.html +++ b/core/templates/partials/signal-chats-list.html @@ -1,6 +1,4 @@ -{% load cache %} {% include 'mixins/partials/notify.html' %} -{% cache 600 objects_signal_chats request.user.id object_list type %} -{% endcache %} diff --git a/core/tests/test_command_routing_variant_ui.py b/core/tests/test_command_routing_variant_ui.py new file mode 100644 index 0000000..9437487 --- /dev/null +++ b/core/tests/test_command_routing_variant_ui.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from django.test import TestCase +from django.urls import reverse + +from core.commands.policies import ensure_variant_policies_for_profile +from core.models import CommandProfile, User + + +class CommandRoutingVariantUITests(TestCase): + def setUp(self): + self.user = User.objects.create_user( + username="routing-user", + email="routing@example.com", + password="x", + ) + self.client.force_login(self.user) + self.profile = CommandProfile.objects.create( + user=self.user, + slug="bp", + name="Business Plan", + enabled=True, + trigger_token="#bp#", + reply_required=True, + exact_match_only=True, + ) + ensure_variant_policies_for_profile(self.profile) + + def test_command_routing_page_shows_variant_policy_table(self): + response = self.client.get(reverse("command_routing")) + self.assertEqual(200, response.status_code) + self.assertContains(response, "Variant Policies") + self.assertContains(response, "bp set range") + self.assertContains(response, "Send status to egress") + + def test_variant_policy_update_persists(self): + response = self.client.post( + reverse("command_routing"), + { + "action": "variant_policy_update", + "profile_id": str(self.profile.id), + "variant_key": "bp_set", + "enabled": "1", + "generation_mode": "ai", + "send_plan_to_egress": "1", + "send_status_to_source": "1", + "send_status_to_egress": "1", + }, + follow=True, + ) + self.assertEqual(200, response.status_code) + row = self.profile.variant_policies.get(variant_key="bp_set") + self.assertEqual("ai", row.generation_mode) + self.assertTrue(row.send_status_to_egress) diff --git a/core/tests/test_command_variant_policy.py b/core/tests/test_command_variant_policy.py new file mode 100644 index 0000000..2597545 --- /dev/null +++ b/core/tests/test_command_variant_policy.py @@ -0,0 +1,225 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, patch + +from asgiref.sync import async_to_sync +from django.test import TransactionTestCase + +from core.commands.base import CommandContext +from core.commands.handlers.bp import BPCommandHandler +from core.commands.policies import ensure_variant_policies_for_profile +from core.models import ( + BusinessPlanDocument, + AI, + ChatSession, + CommandAction, + CommandChannelBinding, + CommandProfile, + CommandVariantPolicy, + Message, + Person, + PersonIdentifier, + User, +) + + +class CommandVariantPolicyTests(TransactionTestCase): + def setUp(self): + self.user = User.objects.create_user( + username="variant-user", + email="variant@example.com", + password="x", + ) + self.person = Person.objects.create(user=self.user, name="Variant Person") + self.identifier = PersonIdentifier.objects.create( + user=self.user, + person=self.person, + service="whatsapp", + identifier="120363402761690215", + ) + self.session = ChatSession.objects.create(user=self.user, identifier=self.identifier) + self.profile = CommandProfile.objects.create( + user=self.user, + slug="bp", + name="Business Plan", + enabled=True, + trigger_token="#bp#", + reply_required=True, + exact_match_only=True, + visibility_mode="status_in_source", + template_text="TEMPLATE SHOULD NOT LEAK INTO bp set", + ) + AI.objects.create( + user=self.user, + base_url="https://example.invalid", + api_key="test-key", + model="gpt-4o-mini", + ) + CommandAction.objects.create( + profile=self.profile, + action_type="extract_bp", + enabled=True, + position=0, + ) + CommandAction.objects.create( + profile=self.profile, + action_type="save_document", + enabled=True, + position=1, + ) + CommandAction.objects.create( + profile=self.profile, + action_type="post_result", + enabled=True, + position=2, + ) + CommandChannelBinding.objects.create( + profile=self.profile, + direction="ingress", + service="whatsapp", + channel_identifier="120363402761690215", + enabled=True, + ) + CommandChannelBinding.objects.create( + profile=self.profile, + direction="egress", + service="whatsapp", + channel_identifier="120363402761690215", + enabled=True, + ) + + def _ctx(self, trigger: Message, text: str) -> CommandContext: + return CommandContext( + service="whatsapp", + channel_identifier="120363402761690215", + message_id=str(trigger.id), + user_id=self.user.id, + message_text=text, + payload={}, + ) + + def test_ensure_variant_policies_backfills_bp_defaults(self): + rows = ensure_variant_policies_for_profile(self.profile) + self.assertSetEqual(set(rows.keys()), {"bp", "bp_set", "bp_set_range"}) + self.assertEqual("ai", rows["bp"].generation_mode) + self.assertEqual("verbatim", rows["bp_set"].generation_mode) + self.assertEqual("verbatim", rows["bp_set_range"].generation_mode) + self.assertTrue(rows["bp"].send_plan_to_egress) + self.assertTrue(rows["bp"].send_status_to_source) + + def test_bp_primary_can_run_in_verbatim_mode_without_ai(self): + ensure_variant_policies_for_profile(self.profile) + policy = CommandVariantPolicy.objects.get(profile=self.profile, variant_key="bp") + policy.generation_mode = "verbatim" + policy.send_plan_to_egress = False + policy.send_status_to_source = False + policy.send_status_to_egress = False + policy.save() + + anchor = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="peer", + text="anchor line", + ts=1000, + source_service="whatsapp", + source_chat_id="120363402761690215", + ) + trigger = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="me", + text="#bp#", + ts=2000, + source_service="whatsapp", + source_chat_id="120363402761690215", + reply_to=anchor, + ) + + result = async_to_sync(BPCommandHandler().execute)(self._ctx(trigger, "#bp#")) + self.assertTrue(result.ok) + doc = BusinessPlanDocument.objects.get(trigger_message=trigger) + self.assertEqual("anchor line\n#bp#", doc.content_markdown) + + def test_bp_set_ai_mode_ignores_template(self): + ensure_variant_policies_for_profile(self.profile) + policy = CommandVariantPolicy.objects.get(profile=self.profile, variant_key="bp_set") + policy.generation_mode = "ai" + policy.send_plan_to_egress = False + policy.send_status_to_source = False + policy.send_status_to_egress = False + policy.save() + + trigger = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="me", + text="#bp set# text to transform", + ts=1000, + source_service="whatsapp", + source_chat_id="120363402761690215", + ) + + with patch( + "core.commands.handlers.bp.ai_runner.run_prompt", + new=AsyncMock(return_value="AI RESULT"), + ) as mocked: + result = async_to_sync(BPCommandHandler().execute)( + self._ctx(trigger, trigger.text) + ) + + self.assertTrue(result.ok) + doc = BusinessPlanDocument.objects.get(trigger_message=trigger) + self.assertEqual("AI RESULT", doc.content_markdown) + call_args = mocked.await_args.args + prompt_payload = call_args[0] + self.assertNotIn("TEMPLATE SHOULD NOT LEAK", str(prompt_payload)) + + def test_delivery_flags_control_source_and_egress_status(self): + ensure_variant_policies_for_profile(self.profile) + policy = CommandVariantPolicy.objects.get( + profile=self.profile, + variant_key="bp_set_range", + ) + policy.generation_mode = "verbatim" + policy.store_document = False + policy.send_plan_to_egress = False + policy.send_status_to_source = True + policy.send_status_to_egress = True + policy.save() + + anchor = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="peer", + text="line one", + ts=1000, + source_service="whatsapp", + source_chat_id="120363402761690215", + ) + trigger = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="me", + text="#bp set range#", + ts=2000, + source_service="whatsapp", + source_chat_id="120363402761690215", + reply_to=anchor, + ) + + with patch( + "core.commands.handlers.bp.post_status_in_source", + new=AsyncMock(return_value=True), + ) as source_status, patch( + "core.commands.handlers.bp.post_to_channel_binding", + new=AsyncMock(return_value=True), + ) as binding_send: + result = async_to_sync(BPCommandHandler().execute)( + self._ctx(trigger, trigger.text) + ) + + self.assertTrue(result.ok) + source_status.assert_awaited() + self.assertEqual(1, binding_send.await_count) + self.assertFalse(BusinessPlanDocument.objects.filter(trigger_message=trigger).exists()) diff --git a/core/tests/test_phase1_command_reply.py b/core/tests/test_phase1_command_reply.py index 58b2d6f..ec4a088 100644 --- a/core/tests/test_phase1_command_reply.py +++ b/core/tests/test_phase1_command_reply.py @@ -6,6 +6,7 @@ from django.test import TestCase from core.commands.base import CommandContext from core.commands.engine import _matches_trigger, process_inbound_message from core.messaging.reply_sync import extract_reply_ref, resolve_reply_target +from core.views.compose import _command_options_for_channel from core.models import ( ChatSession, CommandChannelBinding, @@ -123,6 +124,27 @@ class Phase1ReplyResolutionTests(TestCase): self.assertEqual("signal-msg-quoted", result.get("reply_source_message_id")) self.assertEqual("signal", result.get("reply_source_service")) + def test_extract_reply_ref_signal_target_sent_timestamp_variant(self): + result = extract_reply_ref( + "signal", + { + "envelope": { + "dataMessage": { + "quote": { + "targetSentTimestamp": 1772545268786, + "authorNumber": "+15550000001", + } + } + } + }, + ) + self.assertEqual( + "1772545268786", + result.get("reply_source_message_id"), + ) + self.assertEqual("signal", result.get("reply_source_service")) + self.assertEqual("+15550000001", result.get("reply_source_chat_id")) + def test_extract_reply_ref_whatsapp(self): result = extract_reply_ref( "whatsapp", @@ -272,3 +294,23 @@ class Phase1CommandEngineTests(TestCase): self.assertEqual(1, len(results)) self.assertEqual("skipped", results[0].status) self.assertEqual("reply_required", results[0].error) + + def test_compose_command_options_show_bp_subcommands(self): + self.profile.channel_bindings.all().delete() + CommandChannelBinding.objects.create( + profile=self.profile, + direction="ingress", + service="whatsapp", + channel_identifier="120363402761690215@g.us", + enabled=True, + ) + options = _command_options_for_channel( + self.user, + "whatsapp", + "120363402761690215@g.us", + ) + names = [str(row.get("name") or "").strip().lower() for row in options] + self.assertIn("bp", names) + self.assertIn("bp set", names) + self.assertIn("bp set range", names) + self.assertNotIn("announce task ids", names) diff --git a/core/tests/test_repeat_answer_and_tasks.py b/core/tests/test_repeat_answer_and_tasks.py index 8304496..d85f503 100644 --- a/core/tests/test_repeat_answer_and_tasks.py +++ b/core/tests/test_repeat_answer_and_tasks.py @@ -16,6 +16,7 @@ from core.models import ( TaskProject, User, Message, + Chat, ) from core.tasks.engine import process_inbound_task_intelligence @@ -136,3 +137,63 @@ class TaskEngineTests(TestCase): self.assertTrue( DerivedTaskEvent.objects.filter(task=task, event_type="completion_marked").exists() ) + + def test_matches_whatsapp_private_channel_variants(self): + ChatTaskSource.objects.create( + user=self.user, + service="whatsapp", + channel_identifier="447700900123@s.whatsapp.net", + project=self.project, + enabled=True, + ) + m = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="peer", + text="task: update private chat mapping", + ts=1200, + source_service="whatsapp", + source_chat_id="447700900123", + ) + async_to_sync(process_inbound_task_intelligence)(m) + self.assertTrue( + DerivedTask.objects.filter(origin_message=m).exists(), + "Expected private WhatsApp bare identifier to match @s.whatsapp.net mapping.", + ) + + def test_matches_signal_uuid_to_number_companion_mapping(self): + signal_person = Person.objects.create(user=self.user, name="Signal Task Person") + signal_identifier = PersonIdentifier.objects.create( + user=self.user, + person=signal_person, + service="signal", + identifier="+447700900555", + ) + signal_session = ChatSession.objects.create(user=self.user, identifier=signal_identifier) + ChatTaskSource.objects.create( + user=self.user, + service="signal", + channel_identifier="+447700900555", + project=self.project, + enabled=True, + ) + Chat.objects.create( + source_uuid="54cb8dbe-4c5f-4ef9-9f3d-4a9b37fd15d9", + source_number="+447700900555", + source_name="Signal Peer", + account="+447700900000", + ) + m = Message.objects.create( + user=self.user, + session=signal_session, + sender_uuid="peer", + text="task: check signal private mapping", + ts=1300, + source_service="signal", + source_chat_id="54cb8dbe-4c5f-4ef9-9f3d-4a9b37fd15d9", + ) + async_to_sync(process_inbound_task_intelligence)(m) + self.assertTrue( + DerivedTask.objects.filter(origin_message=m).exists(), + "Expected Signal UUID source chat to match source mapping by companion number.", + ) diff --git a/core/tests/test_signal_relink.py b/core/tests/test_signal_relink.py new file mode 100644 index 0000000..0a8d678 --- /dev/null +++ b/core/tests/test_signal_relink.py @@ -0,0 +1,45 @@ +from unittest.mock import patch + +from django.urls import reverse +from django.test import TestCase + +from core.models import User + + +class SignalRelinkTests(TestCase): + def setUp(self): + self.user = User.objects.create_superuser( + username="signal-admin", + email="signal-admin@example.com", + password="x", + ) + self.client.force_login(self.user) + + @patch("core.views.signal.transport.list_accounts") + def test_signal_accounts_view_shows_relink_action(self, mock_list_accounts): + mock_list_accounts.return_value = ["+447000000001"] + response = self.client.get(reverse("signal_accounts", kwargs={"type": "page"})) + self.assertEqual(200, response.status_code) + self.assertContains(response, "Relink") + self.assertContains(response, "/services/signal/") + self.assertContains(response, "/unlink/+447000000001/") + + @patch("core.views.signal.transport.list_accounts") + @patch("core.views.signal.transport.unlink_account") + def test_signal_account_unlink_calls_transport_and_renders_panel( + self, + mock_unlink_account, + mock_list_accounts, + ): + mock_list_accounts.side_effect = [ + ["+447000000001"], + [], + ] + response = self.client.post( + reverse( + "signal_account_unlink", + kwargs={"type": "page", "account": "+447000000001"}, + ) + ) + self.assertEqual(200, response.status_code) + mock_unlink_account.assert_called_once_with("signal", "+447000000001") diff --git a/core/tests/test_signal_reply_send.py b/core/tests/test_signal_reply_send.py new file mode 100644 index 0000000..8851be0 --- /dev/null +++ b/core/tests/test_signal_reply_send.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, patch +from unittest.mock import Mock + +from asgiref.sync import async_to_sync +from django.conf import settings +from django.test import TestCase, TransactionTestCase + +from core.clients import transport +from core.clients.signal import SignalClient +from core.models import ChatSession, Message, Person, PersonIdentifier, User +from core.views.compose import _build_signal_reply_metadata + + +class SignalReplyMetadataTests(TestCase): + def setUp(self): + self.user = User.objects.create_user( + username="signal-reply-meta-user", + email="signal-reply-meta@example.com", + password="x", + ) + self.person = Person.objects.create(user=self.user, name="Signal Reply") + self.identifier = PersonIdentifier.objects.create( + user=self.user, + person=self.person, + service="signal", + identifier="+15550001000", + ) + self.session = ChatSession.objects.create( + user=self.user, + identifier=self.identifier, + ) + + def test_build_signal_reply_metadata_uses_signal_source(self): + incoming = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="+15550001000", + text="quoted body", + ts=1772538353497, + source_service="signal", + source_message_id="1772538353497", + source_chat_id="+15550001000", + ) + payload = _build_signal_reply_metadata(incoming, "+15550001000") + self.assertEqual(1772538353497, payload.get("quote_timestamp")) + self.assertEqual("+15550001000", payload.get("quote_author")) + self.assertEqual("quoted body", payload.get("quote_text")) + + def test_build_signal_reply_metadata_uses_chat_number_when_sender_is_uuid(self): + incoming = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="756078fd-d447-426d-a620-581a86d64f51", + text="quoted body", + ts=1772538353497, + source_service="signal", + source_message_id="1772538353497", + source_chat_id="+15550001000", + ) + payload = _build_signal_reply_metadata(incoming, "+15550001000") + self.assertEqual("+15550001000", payload.get("quote_author")) + + def test_build_signal_reply_metadata_uses_local_sender_for_own_messages(self): + outgoing = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="", + custom_author="USER", + text="my previous message", + ts=1772538353900, + source_service="web", + source_message_id="1772538353900", + source_chat_id="+15550001000", + ) + payload = _build_signal_reply_metadata(outgoing, "+15550001000") + expected_author = str(getattr(settings, "SIGNAL_NUMBER", "") or "").strip() + if expected_author: + self.assertEqual(expected_author, payload.get("quote_author")) + else: + self.assertEqual("+15550001000", payload.get("quote_author")) + + +class SignalTransportSendTests(TestCase): + def test_transport_passes_reply_metadata_to_signal_api(self): + with patch( + "core.clients.transport.prepare_outbound_attachments", + new=AsyncMock(return_value=[]), + ), patch( + "core.clients.transport.signalapi.send_message_raw", + new=AsyncMock(return_value=1772538354000), + ) as mocked_send: + result = async_to_sync(transport.send_message_raw)( + "signal", + "+15550001000", + text="reply payload", + attachments=[], + metadata={ + "quote_timestamp": 1772538353497, + "quote_author": "+15550001000", + "quote_text": "quoted body", + }, + ) + + self.assertEqual(1772538354000, result) + mocked_send.assert_awaited_once_with( + "+15550001000", + "reply payload", + [], + metadata={ + "quote_timestamp": 1772538353497, + "quote_author": "+15550001000", + "quote_text": "quoted body", + }, + ) + + +class SignalInboundReplyLinkTests(TransactionTestCase): + def setUp(self): + self.user = User.objects.create_user( + username="signal-inbound-user", + email="signal-inbound@example.com", + password="x", + ) + self.person = Person.objects.create(user=self.user, name="Signal Inbound") + self.identifier = PersonIdentifier.objects.create( + user=self.user, + person=self.person, + service="signal", + identifier="+15550002000", + ) + self.session = ChatSession.objects.create( + user=self.user, + identifier=self.identifier, + ) + self.anchor = Message.objects.create( + user=self.user, + session=self.session, + sender_uuid="+15550002000", + text="anchor inbound", + ts=1772545458187, + source_service="signal", + source_message_id="1772545458187", + source_chat_id="+15550002000", + ) + + def test_process_raw_inbound_event_links_signal_reply(self): + fake_ur = Mock() + fake_ur.message_received = AsyncMock(return_value=None) + client = SignalClient.__new__(SignalClient) + client.service = "signal" + client.ur = fake_ur + client.log = Mock() + client.client = Mock() + client.client.bot_uuid = "" + client.client.phone_number = "" + client._resolve_signal_identifiers = AsyncMock(return_value=[self.identifier]) + client._auto_link_single_user_signal_identifier = AsyncMock(return_value=[]) + + payload = { + "envelope": { + "sourceNumber": "+15550002000", + "sourceUuid": "756078fd-d447-426d-a620-581a86d64f51", + "timestamp": 1772545462051, + "dataMessage": { + "message": "reply inbound s3", + "quote": { + "targetSentTimestamp": 1772545458187, + "authorNumber": "+15550002000", + }, + }, + } + } + async_to_sync(client._process_raw_inbound_event)(json.dumps(payload)) + + created = Message.objects.filter( + user=self.user, + session=self.session, + text="reply inbound s3", + ).order_by("-ts").first() + self.assertIsNotNone(created) + self.assertEqual(self.anchor.id, created.reply_to_id) + self.assertEqual("1772545458187", created.reply_source_message_id) + + def test_process_raw_inbound_event_applies_reaction(self): + fake_ur = Mock() + fake_ur.message_received = AsyncMock(return_value=None) + fake_ur.xmpp = Mock() + fake_ur.xmpp.client = Mock() + fake_ur.xmpp.client.apply_external_reaction = AsyncMock(return_value=None) + client = SignalClient.__new__(SignalClient) + client.service = "signal" + client.ur = fake_ur + client.log = Mock() + client.client = Mock() + client.client.bot_uuid = "" + client.client.phone_number = "" + client._resolve_signal_identifiers = AsyncMock(return_value=[self.identifier]) + client._auto_link_single_user_signal_identifier = AsyncMock(return_value=[]) + + payload = { + "envelope": { + "sourceNumber": "+15550002000", + "sourceUuid": "756078fd-d447-426d-a620-581a86d64f51", + "timestamp": 1772545463000, + "dataMessage": { + "reaction": { + "emoji": "❤️", + "targetSentTimestamp": 1772545458187, + } + }, + } + } + async_to_sync(client._process_raw_inbound_event)(json.dumps(payload)) + + self.anchor.refresh_from_db() + reactions = list((self.anchor.receipt_payload or {}).get("reactions") or []) + self.assertTrue( + any(str(row.get("emoji") or "") == "❤️" for row in reactions), + "Expected Signal heart reaction to be applied to anchor receipt payload.", + ) diff --git a/core/tests/test_signal_unlink_fallback.py b/core/tests/test_signal_unlink_fallback.py new file mode 100644 index 0000000..404fefb --- /dev/null +++ b/core/tests/test_signal_unlink_fallback.py @@ -0,0 +1,43 @@ +from unittest.mock import Mock, patch + +from django.test import TestCase + +from core.clients import transport + + +class SignalUnlinkFallbackTests(TestCase): + @patch("core.clients.transport._wipe_signal_cli_local_state") + @patch("requests.delete") + def test_signal_unlink_uses_rest_delete_when_available( + self, + mock_delete, + mock_wipe, + ): + ok_response = Mock() + ok_response.ok = True + mock_delete.return_value = ok_response + + result = transport.unlink_account("signal", "+447700900000") + + self.assertTrue(result) + self.assertTrue(mock_delete.called) + mock_wipe.assert_not_called() + + @patch("core.clients.transport._wipe_signal_cli_local_state") + @patch("requests.delete") + def test_signal_unlink_falls_back_to_local_wipe( + self, + mock_delete, + mock_wipe, + ): + bad_response = Mock() + bad_response.ok = False + mock_delete.return_value = bad_response + mock_wipe.return_value = True + + result = transport.unlink_account("signal", "+447700900000") + + self.assertTrue(result) + self.assertEqual(2, mock_delete.call_count) + mock_wipe.assert_called_once() + diff --git a/core/tests/test_tasks_settings_and_toggle.py b/core/tests/test_tasks_settings_and_toggle.py index 6d36d23..41697ba 100644 --- a/core/tests/test_tasks_settings_and_toggle.py +++ b/core/tests/test_tasks_settings_and_toggle.py @@ -3,6 +3,7 @@ from __future__ import annotations from unittest.mock import AsyncMock, patch from asgiref.sync import async_to_sync +from django.urls import reverse from django.test import TestCase, override_settings from core.models import ( @@ -12,12 +13,13 @@ from core.models import ( Message, Person, PersonIdentifier, + TaskCompletionPattern, TaskProject, User, ) from core.tasks.engine import process_inbound_task_intelligence from core.views.compose import _command_options_for_channel, _toggle_task_announce_for_channel -from core.views.tasks import _apply_safe_defaults_for_user +from core.views.tasks import _apply_safe_defaults_for_user, _ensure_default_completion_patterns class TaskSettingsBackfillTests(TestCase): @@ -65,6 +67,13 @@ class TaskSettingsBackfillTests(TestCase): self.assertEqual("strict", self.source.settings.get("match_mode")) self.assertTrue(bool(self.source.settings.get("require_prefix"))) + def test_default_completion_phrases_seeded(self): + _ensure_default_completion_patterns(self.user) + phrases = set( + TaskCompletionPattern.objects.filter(user=self.user).values_list("phrase", flat=True) + ) + self.assertTrue({"done", "completed", "fixed"}.issubset(phrases)) + class TaskAnnounceToggleTests(TestCase): def setUp(self): @@ -98,14 +107,16 @@ class TaskAnnounceToggleTests(TestCase): self.source.refresh_from_db() self.assertTrue(bool(self.source.settings.get("announce_task_id"))) - def test_command_options_include_task_announce_state(self): + def test_command_options_include_bp_subcommands(self): options = _command_options_for_channel( self.user, "whatsapp", "120363402761690215", ) - row = [opt for opt in options if opt.get("slug") == "task_announce"][0] - self.assertFalse(bool(row.get("enabled_here"))) + names = [str(row.get("name") or "").strip().lower() for row in options] + self.assertIn("bp", names) + self.assertIn("bp set", names) + self.assertIn("bp set range", names) @override_settings(TASK_DERIVATION_USE_AI=False) @@ -161,3 +172,34 @@ class TaskAnnounceRuntimeTests(TestCase): async_to_sync(process_inbound_task_intelligence)(self._msg("task: rotate secrets")) self.assertTrue(DerivedTask.objects.exists()) mocked_send.assert_awaited() + + +class TaskSettingsViewActionsTests(TestCase): + def setUp(self): + self.user = User.objects.create_user("task-settings-user", "ts@example.com", "x") + self.client.force_login(self.user) + self.project = TaskProject.objects.create(user=self.user, name="Project A") + self.source = ChatTaskSource.objects.create( + user=self.user, + service="whatsapp", + channel_identifier="120363402761690215@g.us", + project=self.project, + settings={"match_mode": "strict"}, + enabled=True, + ) + + def test_source_delete_removes_mapping(self): + response = self.client.post( + reverse("tasks_settings"), + { + "action": "source_delete", + "source_id": str(self.source.id), + "prefill_service": "whatsapp", + "prefill_identifier": "120363402761690215@g.us", + }, + follow=True, + ) + self.assertEqual(200, response.status_code) + self.assertFalse( + ChatTaskSource.objects.filter(id=self.source.id, user=self.user).exists() + ) diff --git a/core/views/automation.py b/core/views/automation.py index b198855..a2a80b9 100644 --- a/core/views/automation.py +++ b/core/views/automation.py @@ -7,9 +7,11 @@ from django.db import transaction from django.db.models import Avg, Count, Q, Sum from django.http import JsonResponse from django.shortcuts import get_object_or_404, redirect, render +from django.urls import reverse from django.utils import timezone from django.views import View +from core.commands.policies import BP_VARIANT_KEYS, BP_VARIANT_META, ensure_variant_policies_for_profile from core.models import ( AIRunLog, BusinessPlanDocument, @@ -17,12 +19,29 @@ from core.models import ( CommandAction, CommandChannelBinding, CommandProfile, + CommandVariantPolicy, TranslationBridge, TranslationEventLog, ) from core.translation.engine import parse_quick_mode_title +def _channel_variants(service: str, identifier: str) -> list[str]: + value = str(identifier or "").strip() + if not value: + return [] + variants = [value] + svc = str(service or "").strip().lower() + if svc == "whatsapp": + bare = value.split("@", 1)[0].strip() + if bare and bare not in variants: + variants.append(bare) + group = f"{bare}@g.us" if bare else "" + if group and group not in variants: + variants.append(group) + return variants + + class CommandRoutingSettings(LoginRequiredMixin, View): template_name = "pages/command-routing.html" @@ -35,12 +54,71 @@ class CommandRoutingSettings(LoginRequiredMixin, View): row.save(update_fields=["position", "updated_at"]) return rows + @staticmethod + def _redirect_with_scope(request): + service = str(request.GET.get("service") or request.POST.get("service") or "").strip() + identifier = str( + request.GET.get("identifier") or request.POST.get("identifier") or "" + ).strip() + if service and identifier: + return redirect( + f"{reverse('command_routing')}?service={service}&identifier={identifier}" + ) + return redirect("command_routing") + def _context(self, request): - profiles = ( + profiles_qs = ( CommandProfile.objects.filter(user=request.user) - .prefetch_related("channel_bindings", "actions") + .prefetch_related("channel_bindings", "actions", "variant_policies") .order_by("slug") ) + scope_service = str(request.GET.get("service") or "").strip().lower() + scope_identifier = str(request.GET.get("identifier") or "").strip() + scope_variants = _channel_variants(scope_service, scope_identifier) + profiles = list(profiles_qs) + preview_profile_id = str(request.GET.get("preview_profile_id") or "").strip() + for profile in profiles: + policies = ensure_variant_policies_for_profile(profile) + if str(profile.slug or "").strip() == "bp": + keys = BP_VARIANT_KEYS + else: + keys = ("default",) + profile.variant_rows = [] + for key in keys: + row = policies.get(key) + if row is None: + continue + meta = BP_VARIANT_META.get(key, {}) + profile.variant_rows.append( + { + "variant_key": key, + "variant_label": str(meta.get("name") or key), + "trigger_token": str(meta.get("trigger_token") or profile.trigger_token or ""), + "template_supported": bool(meta.get("template_supported")), + "warn_verbatim_plan": bool( + key in {"bp", "bp_set_range"} + and str(getattr(row, "generation_mode", "") or "") == "verbatim" + and bool(getattr(row, "send_plan_to_egress", False)) + ), + "row": row, + } + ) + bindings = list(profile.channel_bindings.all()) + if scope_service and scope_variants: + profile.visible_bindings = [ + row + for row in bindings + if str(row.service or "").strip().lower() == scope_service + and str(row.channel_identifier or "").strip() in scope_variants + ] + else: + profile.visible_bindings = bindings + profile.enabled_egress_bindings = [ + row + for row in bindings + if str(row.direction or "").strip() == "egress" and bool(row.enabled) + ] + profile.preview_mode = preview_profile_id and str(profile.id) == preview_profile_id documents = BusinessPlanDocument.objects.filter(user=request.user).order_by( "-updated_at" )[:30] @@ -50,6 +128,11 @@ class CommandRoutingSettings(LoginRequiredMixin, View): "channel_services": ("web", "xmpp", "signal", "whatsapp"), "directions": ("ingress", "egress", "scratchpad_mirror"), "action_types": ("extract_bp", "post_result", "save_document"), + "command_choices": (("bp", "Business Plan (bp)"),), + "scope_service": scope_service, + "scope_identifier": scope_identifier, + "scope_variants": scope_variants, + "preview_profile_id": preview_profile_id, } def get(self, request): @@ -59,7 +142,12 @@ class CommandRoutingSettings(LoginRequiredMixin, View): action = str(request.POST.get("action") or "").strip() if action == "profile_create": - slug = str(request.POST.get("slug") or "bp").strip().lower() or "bp" + slug = ( + str(request.POST.get("command_slug") or request.POST.get("slug") or "bp") + .strip() + .lower() + or "bp" + ) profile, _ = CommandProfile.objects.get_or_create( user=request.user, slug=slug, @@ -74,6 +162,11 @@ class CommandRoutingSettings(LoginRequiredMixin, View): "template_text": str(request.POST.get("template_text") or ""), }, ) + profile.name = str(request.POST.get("name") or profile.name).strip() or profile.name + if slug == "bp": + profile.trigger_token = "#bp#" + profile.template_text = str(request.POST.get("template_text") or profile.template_text or "") + profile.save(update_fields=["name", "trigger_token", "template_text", "updated_at"]) CommandAction.objects.get_or_create( profile=profile, action_type="extract_bp", @@ -89,7 +182,8 @@ class CommandRoutingSettings(LoginRequiredMixin, View): action_type="post_result", defaults={"enabled": True, "position": 2}, ) - return redirect("command_routing") + ensure_variant_policies_for_profile(profile) + return self._redirect_with_scope(request) if action == "profile_update": profile = get_object_or_404( @@ -106,12 +200,11 @@ class CommandRoutingSettings(LoginRequiredMixin, View): profile.reply_required = bool(request.POST.get("reply_required")) profile.exact_match_only = bool(request.POST.get("exact_match_only")) profile.template_text = str(request.POST.get("template_text") or "") - profile.visibility_mode = ( - str(request.POST.get("visibility_mode") or "status_in_source").strip() - or "status_in_source" - ) + # Legacy field retained for compatibility only. + profile.visibility_mode = profile.visibility_mode or "status_in_source" profile.save() - return redirect("command_routing") + ensure_variant_policies_for_profile(profile) + return self._redirect_with_scope(request) if action == "profile_delete": profile = get_object_or_404( @@ -120,7 +213,7 @@ class CommandRoutingSettings(LoginRequiredMixin, View): user=request.user, ) profile.delete() - return redirect("command_routing") + return self._redirect_with_scope(request) if action == "binding_create": profile = get_object_or_404( @@ -137,7 +230,7 @@ class CommandRoutingSettings(LoginRequiredMixin, View): ).strip(), enabled=bool(request.POST.get("enabled") or "1"), ) - return redirect("command_routing") + return self._redirect_with_scope(request) if action == "binding_delete": binding = get_object_or_404( @@ -146,7 +239,7 @@ class CommandRoutingSettings(LoginRequiredMixin, View): profile__user=request.user, ) binding.delete() - return redirect("command_routing") + return self._redirect_with_scope(request) if action == "action_update": row = get_object_or_404( @@ -160,7 +253,7 @@ class CommandRoutingSettings(LoginRequiredMixin, View): row.save(update_fields=["enabled", "position", "updated_at"]) else: row.save(update_fields=["enabled", "updated_at"]) - return redirect("command_routing") + return self._redirect_with_scope(request) if action == "action_move": row = get_object_or_404( @@ -170,26 +263,74 @@ class CommandRoutingSettings(LoginRequiredMixin, View): ) direction = str(request.POST.get("direction") or "").strip().lower() if direction not in {"up", "down"}: - return redirect("command_routing") + return self._redirect_with_scope(request) with transaction.atomic(): ordered = self._normalize_action_positions(row.profile) action_ids = [entry.id for entry in ordered] try: idx = action_ids.index(row.id) except ValueError: - return redirect("command_routing") + return self._redirect_with_scope(request) target_idx = idx - 1 if direction == "up" else idx + 1 if target_idx < 0 or target_idx >= len(ordered): - return redirect("command_routing") + return self._redirect_with_scope(request) other = ordered[target_idx] current_pos = ordered[idx].position ordered[idx].position = other.position other.position = current_pos ordered[idx].save(update_fields=["position", "updated_at"]) other.save(update_fields=["position", "updated_at"]) - return redirect("command_routing") + return self._redirect_with_scope(request) - return redirect("command_routing") + if action == "variant_policy_update": + profile = get_object_or_404( + CommandProfile, + id=request.POST.get("profile_id"), + user=request.user, + ) + variant_key = str(request.POST.get("variant_key") or "").strip() + policy = get_object_or_404( + CommandVariantPolicy, + profile=profile, + variant_key=variant_key, + ) + policy.enabled = bool(request.POST.get("enabled")) + mode = str(request.POST.get("generation_mode") or "verbatim").strip().lower() + policy.generation_mode = mode if mode in {"ai", "verbatim"} else "verbatim" + policy.send_plan_to_egress = bool(request.POST.get("send_plan_to_egress")) + policy.send_status_to_source = bool(request.POST.get("send_status_to_source")) + policy.send_status_to_egress = bool(request.POST.get("send_status_to_egress")) + policy.store_document = bool(request.POST.get("store_document")) + policy.save() + return self._redirect_with_scope(request) + + if action == "variant_policy_reset_defaults": + profile = get_object_or_404( + CommandProfile, + id=request.POST.get("profile_id"), + user=request.user, + ) + profile.variant_policies.all().delete() + ensure_variant_policies_for_profile(profile) + return self._redirect_with_scope(request) + + if action == "variant_preview": + profile = get_object_or_404( + CommandProfile, + id=request.POST.get("profile_id"), + user=request.user, + ) + ensure_variant_policies_for_profile(profile) + service = str(request.GET.get("service") or request.POST.get("service") or "").strip() + identifier = str( + request.GET.get("identifier") or request.POST.get("identifier") or "" + ).strip() + query = f"?preview_profile_id={profile.id}" + if service and identifier: + query += f"&service={service}&identifier={identifier}" + return redirect(f"{reverse('command_routing')}{query}") + + return self._redirect_with_scope(request) class TranslationSettings(LoginRequiredMixin, View): diff --git a/core/views/compose.py b/core/views/compose.py index b70868f..fc77f41 100644 --- a/core/views/compose.py +++ b/core/views/compose.py @@ -28,6 +28,7 @@ from django.views import View from core.clients import transport from core.commands.base import CommandContext from core.commands.engine import process_inbound_message +from core.commands.policies import ensure_variant_policies_for_profile from core.messaging import ai as ai_runner from core.messaging import media_bridge from core.messaging.utils import messages_to_string @@ -1610,6 +1611,29 @@ def _latest_whatsapp_bridge_ref(message: Message | None) -> dict: return best +def _latest_signal_bridge_ref(message: Message | None) -> dict: + if message is None: + return {} + payload = dict(getattr(message, "receipt_payload", {}) or {}) + refs = dict(payload.get("bridge_refs") or {}) + rows = list(refs.get("signal") or []) + best = {} + best_updated = -1 + for row in rows: + if not isinstance(row, dict): + continue + has_upstream = str(row.get("upstream_message_id") or "").strip() or int( + row.get("upstream_ts") or 0 + ) + if not has_upstream: + continue + updated_at = int(row.get("updated_at") or 0) + if updated_at >= best_updated: + best = dict(row) + best_updated = updated_at + return best + + def _build_whatsapp_reply_metadata(reply_to: Message | None, channel_identifier: str) -> dict: if reply_to is None: return {} @@ -1640,6 +1664,58 @@ def _build_whatsapp_reply_metadata(reply_to: Message | None, channel_identifier: } +def _build_signal_reply_metadata(reply_to: Message | None, channel_identifier: str) -> dict: + if reply_to is None: + return {} + + quote_timestamp = 0 + source_message_id = str(getattr(reply_to, "source_message_id", "") or "").strip() + if source_message_id.isdigit(): + quote_timestamp = int(source_message_id) + if not quote_timestamp: + bridge_ref = _latest_signal_bridge_ref(reply_to) + upstream_id = str(bridge_ref.get("upstream_message_id") or "").strip() + if upstream_id.isdigit(): + quote_timestamp = int(upstream_id) + if not quote_timestamp: + quote_timestamp = int(bridge_ref.get("upstream_ts") or 0) + if not quote_timestamp: + quote_timestamp = int(getattr(reply_to, "ts", 0) or 0) + if quote_timestamp <= 0: + return {} + + quote_author = "" + sender_uuid = str(getattr(reply_to, "sender_uuid", "") or "").strip() + if sender_uuid: + quote_author = sender_uuid + # Signal quote payloads work best with phone-style identifiers. + # Inbound rows may store sender UUID; prefer known chat/number in that case. + source_chat_id = str(getattr(reply_to, "source_chat_id", "") or "").strip() + if quote_author and SIGNAL_UUID_PATTERN.match(quote_author): + if source_chat_id: + quote_author = source_chat_id + if ( + str(getattr(reply_to, "custom_author", "") or "").strip().upper() in {"USER", "BOT"} + or not quote_author + ): + quote_author = str(getattr(settings, "SIGNAL_NUMBER", "") or "").strip() or quote_author + if not quote_author: + quote_author = source_chat_id + if not quote_author: + quote_author = str(channel_identifier or "").strip() + if not quote_author: + return {} + + quote_text = str(getattr(reply_to, "text", "") or "").strip() + payload = { + "quote_timestamp": int(quote_timestamp), + "quote_author": quote_author, + } + if quote_text: + payload["quote_text"] = quote_text[:512] + return payload + + def _canonical_command_channel_identifier(service: str, identifier: str) -> str: value = str(identifier or "").strip() if not value: @@ -1689,6 +1765,7 @@ def _ensure_bp_profile_and_actions(user) -> CommandProfile: if (not created) and (not row.enabled): row.enabled = True row.save(update_fields=["enabled", "updated_at"]) + ensure_variant_policies_for_profile(profile) return profile @@ -1779,46 +1856,102 @@ def _command_options_for_channel(user, service: str, identifier: str) -> list[di channel_identifier__in=list(variants), enabled=True, ).exists() - options.append( - { - "slug": slug, - "name": str(profile.name or slug).strip() or slug, - "trigger_token": str(profile.trigger_token or "").strip(), - "enabled_here": bool(enabled_here), - "profile_enabled": bool(profile.enabled), + if slug == "bp": + policies = ensure_variant_policies_for_profile(profile) if profile.id else {} + label_by_key = { + "bp": "bp", + "bp_set": "bp set", + "bp_set_range": "bp set range", } - ) - task_announce_enabled = False - if variants: - source = ( - ChatTaskSource.objects.filter( - user=user, - service=service_key, - channel_identifier__in=list(variants), - enabled=True, + options.extend( + [ + { + "slug": "bp", + "toggle_slug": "bp", + "name": "bp", + "trigger_token": "#bp#", + "enabled_here": bool(enabled_here), + "profile_enabled": bool(profile.enabled), + "mode_label": str( + (policies.get("bp").generation_mode if policies.get("bp") else "ai") + ).upper(), + }, + { + "slug": "bp_set", + "toggle_slug": "bp", + "name": "bp set", + "trigger_token": "#bp set#", + "enabled_here": bool(enabled_here), + "profile_enabled": bool(profile.enabled), + "mode_label": str( + (policies.get("bp_set").generation_mode if policies.get("bp_set") else "verbatim") + ).upper(), + }, + { + "slug": "bp_set_range", + "toggle_slug": "bp", + "name": "bp set range", + "trigger_token": "#bp set range#", + "enabled_here": bool(enabled_here), + "profile_enabled": bool(profile.enabled), + "mode_label": str( + ( + policies.get("bp_set_range").generation_mode + if policies.get("bp_set_range") + else "verbatim" + ) + ).upper(), + }, + ] + ) + for row in options: + if row.get("slug") in label_by_key: + row["enabled_label"] = "Enabled" if row.get("enabled_here") else "Disabled" + else: + options.append( + { + "slug": slug, + "toggle_slug": slug, + "name": str(profile.name or slug).strip() or slug, + "trigger_token": str(profile.trigger_token or "").strip(), + "enabled_here": bool(enabled_here), + "profile_enabled": bool(profile.enabled), + "mode_label": "", + "enabled_label": "Enabled" if enabled_here else "Disabled", + } ) - .order_by("-updated_at") - .first() - ) - settings_row = dict(getattr(source, "settings", {}) or {}) if source else {} - task_announce_enabled = str(settings_row.get("announce_task_id", "")).strip().lower() in { - "1", - "true", - "yes", - "on", - } - options.append( - { - "slug": "task_announce", - "name": "Announce Task IDs", - "trigger_token": "", - "enabled_here": bool(task_announce_enabled), - "profile_enabled": True, - } - ) return options +def _bp_binding_summary_for_channel(user, service: str, identifier: str) -> dict: + service_key = _default_service(service) + variants = _command_channel_identifier_variants(service_key, identifier) + if not variants: + return {"ingress_count": 0, "egress_count": 0} + profile = ( + CommandProfile.objects.filter(user=user, slug="bp") + .order_by("id") + .first() + ) + if profile is None: + return {"ingress_count": 0, "egress_count": 0} + ingress_count = CommandChannelBinding.objects.filter( + profile=profile, + direction="ingress", + service=service_key, + channel_identifier__in=list(variants), + enabled=True, + ).count() + egress_count = CommandChannelBinding.objects.filter( + profile=profile, + direction="egress", + service=service_key, + channel_identifier__in=list(variants), + enabled=True, + ).count() + return {"ingress_count": int(ingress_count), "egress_count": int(egress_count)} + + def _toggle_task_announce_for_channel( *, user, @@ -2450,6 +2583,11 @@ def _panel_context( base["service"], base["identifier"], ) + bp_binding_summary = _bp_binding_summary_for_channel( + request.user, + base["service"], + base["identifier"], + ) recent_contacts = _recent_manual_contacts( request.user, current_service=base["service"], @@ -2457,6 +2595,27 @@ def _panel_context( current_person=base["person"], limit=12, ) + signal_ingest_warning = "" + if base["service"] == "signal": + signal_state = transport.get_runtime_state("signal") or {} + error_type = str(signal_state.get("last_inbound_exception_type") or "").strip() + error_message = str( + signal_state.get("last_inbound_exception_message") or "" + ).strip() + try: + error_ts = int(signal_state.get("last_inbound_exception_ts") or 0) + except Exception: + error_ts = 0 + try: + ok_ts = int(signal_state.get("last_inbound_ok_ts") or 0) + except Exception: + ok_ts = 0 + if (error_type or error_message) and error_ts >= ok_ts: + signal_ingest_warning = ( + "Signal inbound decrypt/metadata error detected" + + (f" ({error_type})" if error_type else "") + + (f": {error_message[:220]}" if error_message else "") + ) return { "service": base["service"], @@ -2485,6 +2644,9 @@ def _panel_context( "compose_quick_insights_url": reverse("compose_quick_insights"), "compose_history_sync_url": reverse("compose_history_sync"), "compose_toggle_command_url": reverse("compose_toggle_command"), + "command_routing_scoped_url": ( + f"{reverse('command_routing')}?{urlencode({'service': base['service'], 'identifier': base['identifier'] or ''})}" + ), "compose_answer_suggestion_send_url": reverse("compose_answer_suggestion_send"), "compose_ws_url": ws_url, "tasks_hub_url": reverse("tasks_hub"), @@ -2515,8 +2677,10 @@ def _panel_context( "panel_id": f"compose-panel-{unique}", "typing_state_json": json.dumps(typing_state), "command_options": command_options, + "bp_binding_summary": bp_binding_summary, "platform_options": platform_options, "recent_contacts": recent_contacts, + "signal_ingest_warning": signal_ingest_warning, "is_group": base.get("is_group", False), "group_name": base.get("group_name", ""), } @@ -3366,6 +3530,8 @@ class ComposeToggleCommand(LoginRequiredMixin, View): status=400, ) slug = str(request.POST.get("slug") or "bp").strip().lower() or "bp" + if slug in {"bp_set", "bp_set_range"}: + slug = "bp" enabled = str(request.POST.get("enabled") or "1").strip().lower() in { "1", "true", @@ -3406,6 +3572,9 @@ class ComposeToggleCommand(LoginRequiredMixin, View): if enabled else f"{slug} disabled for this chat." ) + scoped_settings_url = ( + f"{reverse('command_routing')}?{urlencode({'service': service, 'identifier': channel_identifier})}" + ) return JsonResponse( { "ok": True, @@ -3416,7 +3585,7 @@ class ComposeToggleCommand(LoginRequiredMixin, View): "settings_url": ( f"{reverse('tasks_settings')}?{urlencode({'service': service, 'identifier': channel_identifier})}" if slug == "task_announce" - else reverse("command_routing") + else scoped_settings_url ), } ) @@ -3442,7 +3611,7 @@ class ComposeBindBP(ComposeToggleCommand): "message": "bp enabled for this chat.", "slug": "bp", "enabled": True, - "settings_url": reverse("command_routing"), + "settings_url": f"{reverse('command_routing')}?{urlencode({'service': service, 'identifier': str(identifier or '')})}", } ) @@ -4026,6 +4195,10 @@ class ComposeSend(LoginRequiredMixin, View): outbound_reply_metadata = _build_whatsapp_reply_metadata( reply_to, str(base["identifier"] or "") ) + elif base["service"] == "signal": + outbound_reply_metadata = _build_signal_reply_metadata( + reply_to, str(base["identifier"] or "") + ) if base["service"] == "whatsapp": runtime_state = transport.get_runtime_state("whatsapp") last_seen = int(runtime_state.get("runtime_seen_at") or 0) @@ -4102,6 +4275,10 @@ class ComposeSend(LoginRequiredMixin, View): outbound_reply_metadata = _build_whatsapp_reply_metadata( reply_to, str(base["identifier"] or "") ) + elif base["service"] == "signal": + outbound_reply_metadata = _build_signal_reply_metadata( + reply_to, str(base["identifier"] or "") + ) ts = async_to_sync(transport.send_message_raw)( base["service"], base["identifier"], diff --git a/core/views/signal.py b/core/views/signal.py index aa8b9ef..72aaa0d 100644 --- a/core/views/signal.py +++ b/core/views/signal.py @@ -3,6 +3,8 @@ from urllib.parse import urlencode import orjson import requests from django.conf import settings +from django.contrib import messages +from django.db.models import Q from django.shortcuts import render from django.urls import reverse from django.views import View @@ -68,6 +70,11 @@ class SignalAccounts(SuperUserRequiredMixin, ObjectList): "service": service, "service_label": label, "account_add_url_name": add_url_name, + "account_add_type": "modal", + "account_add_target": "#modals-here", + "account_add_swap": "innerHTML", + "account_unlink_url_name": "signal_account_unlink", + "account_unlink_label": "Relink", "show_contact_actions": show_contact_actions, "contacts_url_name": f"{service}_contacts", "chats_url_name": f"{service}_chats", @@ -89,6 +96,69 @@ class SignalAccounts(SuperUserRequiredMixin, ObjectList): return self._normalize_accounts(transport.list_accounts("signal")) +class SignalAccountUnlink(SuperUserRequiredMixin, View): + def post(self, request, *args, **kwargs): + return self.delete(request, *args, **kwargs) + + def delete(self, request, *args, **kwargs): + account = str(kwargs.get("account") or "").strip() + if account: + ok = transport.unlink_account("signal", account) + if ok: + messages.success( + request, + ( + "Signal account unlinked. Next step: enter a device name under " + "'Add account', submit, then scan the new QR code." + ), + ) + else: + messages.error( + request, + "Signal relink failed to clear current device state. Try relink again.", + ) + else: + messages.warning(request, "No Signal account selected to relink.") + + rows = [] + for item in transport.list_accounts("signal"): + if isinstance(item, dict): + value = ( + item.get("number") + or item.get("id") + or item.get("jid") + or item.get("account") + ) + if value: + rows.append(str(value)) + elif item: + rows.append(str(item)) + + context = { + "service": "signal", + "service_label": "Signal", + "account_add_url_name": "signal_account_add", + "account_add_type": "modal", + "account_add_target": "#modals-here", + "account_add_swap": "innerHTML", + "account_unlink_url_name": "signal_account_unlink", + "account_unlink_label": "Relink", + "show_contact_actions": True, + "contacts_url_name": "signal_contacts", + "chats_url_name": "signal_chats", + "endpoint_base": str( + getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080") + ).rstrip("/"), + "service_warning": transport.get_service_warning("signal"), + "object_list": rows, + "list_url": reverse("signal_accounts", kwargs={"type": kwargs["type"]}), + "type": kwargs["type"], + "context_object_name_singular": "Signal Account", + "context_object_name": "Signal Accounts", + } + return render(request, "partials/signal-accounts.html", context) + + class SignalContactsList(SuperUserRequiredMixin, ObjectList): list_template = "partials/signal-contacts-list.html" @@ -141,7 +211,13 @@ class SignalChatsList(SuperUserRequiredMixin, ObjectList): def get_queryset(self, *args, **kwargs): pk = self.kwargs.get("pk", "") - chats = list(Chat.objects.filter(account=pk)) + chats = list( + Chat.objects.filter( + Q(account=pk) | Q(account__isnull=True) | Q(account="") + ).order_by("-id")[:1000] + ) + if not chats: + chats = list(Chat.objects.all().order_by("-id")[:1000]) rows = [] for chat in chats: identifier_candidates = [ diff --git a/core/views/tasks.py b/core/views/tasks.py index c096670..0f27542 100644 --- a/core/views/tasks.py +++ b/core/views/tasks.py @@ -23,6 +23,7 @@ from core.models import ( TaskProviderConfig, PersonIdentifier, PlatformChatLink, + Chat, ) from core.tasks.providers.mock import get_provider @@ -138,6 +139,25 @@ def _settings_redirect(request): return redirect("tasks_settings") +def _ensure_default_completion_patterns(user) -> None: + defaults = ("done", "completed", "fixed") + existing = set( + str(row or "").strip().lower() + for row in TaskCompletionPattern.objects.filter(user=user).values_list("phrase", flat=True) + ) + next_pos = TaskCompletionPattern.objects.filter(user=user).count() + for phrase in defaults: + if phrase in existing: + continue + TaskCompletionPattern.objects.create( + user=user, + phrase=phrase, + enabled=True, + position=next_pos, + ) + next_pos += 1 + + def _service_label(service: str) -> str: key = str(service or "").strip().lower() labels = { @@ -158,9 +178,52 @@ def _resolve_channel_display(user, service: str, identifier: str) -> dict: if bare_identifier and bare_identifier not in variants: variants.append(bare_identifier) if service_key == "whatsapp": + direct_identifier = ( + raw_identifier if raw_identifier.endswith("@s.whatsapp.net") else "" + ) + if direct_identifier and direct_identifier not in variants: + variants.append(direct_identifier) + if bare_identifier: + direct_bare = f"{bare_identifier}@s.whatsapp.net" + if direct_bare not in variants: + variants.append(direct_bare) group_identifier = f"{bare_identifier}@g.us" if bare_identifier else "" if group_identifier and group_identifier not in variants: variants.append(group_identifier) + if service_key == "signal": + digits = "".join(ch for ch in raw_identifier if ch.isdigit()) + if digits and digits not in variants: + variants.append(digits) + if digits: + plus = f"+{digits}" + if plus not in variants: + variants.append(plus) + if raw_identifier: + companion_numbers = list( + Chat.objects.filter(source_uuid=raw_identifier) + .exclude(source_number__isnull=True) + .exclude(source_number="") + .values_list("source_number", flat=True)[:200] + ) + companion_uuids = list( + Chat.objects.filter(source_number=raw_identifier) + .exclude(source_uuid__isnull=True) + .exclude(source_uuid="") + .values_list("source_uuid", flat=True)[:200] + ) + for candidate in companion_numbers + companion_uuids: + candidate_str = str(candidate or "").strip() + if not candidate_str: + continue + if candidate_str not in variants: + variants.append(candidate_str) + candidate_digits = "".join(ch for ch in candidate_str if ch.isdigit()) + if candidate_digits and candidate_digits not in variants: + variants.append(candidate_digits) + if candidate_digits: + plus_variant = f"+{candidate_digits}" + if plus_variant not in variants: + variants.append(plus_variant) group_link = None if bare_identifier: @@ -200,9 +263,6 @@ def _resolve_channel_display(user, service: str, identifier: str) -> dict: str(group_link.chat_jid or "").strip() or (f"{bare_identifier}@g.us" if bare_identifier else raw_identifier) ) - elif service_key == "whatsapp" and bare_identifier and not raw_identifier.endswith("@g.us"): - display_identifier = f"{bare_identifier}@g.us" - return { "service_key": service_key, "service_label": _service_label(service_key), @@ -275,15 +335,18 @@ class TaskGroupDetail(LoginRequiredMixin, View): def get(self, request, service, identifier): channel = _resolve_channel_display(request.user, service, identifier) variants = list(channel.get("variants") or [str(identifier or "").strip()]) + service_keys = [channel["service_key"]] + if channel["service_key"] != "web": + service_keys.append("web") mappings = ChatTaskSource.objects.filter( user=request.user, - service=channel["service_key"], + service__in=service_keys, channel_identifier__in=variants, ).select_related("project", "epic") tasks = ( DerivedTask.objects.filter( user=request.user, - source_service=channel["service_key"], + source_service__in=service_keys, source_channel__in=variants, ) .select_related("project", "epic") @@ -330,6 +393,7 @@ class TaskSettings(LoginRequiredMixin, View): def _context(self, request): _apply_safe_defaults_for_user(request.user) + _ensure_default_completion_patterns(request.user) prefill_service = str(request.GET.get("service") or "").strip().lower() prefill_identifier = str(request.GET.get("identifier") or "").strip() projects = list(TaskProject.objects.filter(user=request.user).order_by("name")) @@ -446,6 +510,15 @@ class TaskSettings(LoginRequiredMixin, View): source.save(update_fields=["settings", "updated_at"]) return _settings_redirect(request) + if action == "source_delete": + source = get_object_or_404( + ChatTaskSource, + id=request.POST.get("source_id"), + user=request.user, + ) + source.delete() + return _settings_redirect(request) + if action == "pattern_create": phrase = str(request.POST.get("phrase") or "").strip() if phrase: diff --git a/core/views/whatsapp.py b/core/views/whatsapp.py index 59b5831..8a0984e 100644 --- a/core/views/whatsapp.py +++ b/core/views/whatsapp.py @@ -118,6 +118,9 @@ class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList): class WhatsAppAccountUnlink(SuperUserRequiredMixin, View): + def post(self, request, *args, **kwargs): + return self.delete(request, *args, **kwargs) + def delete(self, request, *args, **kwargs): account = str(kwargs.get("account") or "").strip() _ = transport.unlink_account("whatsapp", account)