diff --git a/app/urls.py b/app/urls.py index da364fb..bd03011 100644 --- a/app/urls.py +++ b/app/urls.py @@ -22,6 +22,7 @@ from two_factor.urls import urlpatterns as tf_urls from core.views import ( ais, + automation, base, compose, groups, @@ -60,6 +61,21 @@ urlpatterns = [ system.SystemSettings.as_view(), name="system_settings", ), + path( + "settings/command-routing/", + automation.CommandRoutingSettings.as_view(), + name="command_routing", + ), + path( + "settings/business-plan//", + automation.BusinessPlanEditor.as_view(), + name="business_plan_editor", + ), + path( + "settings/translation/preview/", + automation.TranslationPreview.as_view(), + name="translation_preview", + ), path( "services/signal/", signal.Signal.as_view(), @@ -205,6 +221,16 @@ urlpatterns = [ compose.ComposeHistorySync.as_view(), name="compose_history_sync", ), + path( + "compose/commands/bp/bind/", + compose.ComposeBindBP.as_view(), + name="compose_bind_bp", + ), + path( + "compose/commands/toggle/", + compose.ComposeToggleCommand.as_view(), + name="compose_toggle_command", + ), path( "compose/media/blob/", compose.ComposeMediaBlob.as_view(), diff --git a/core/clients/signal.py b/core/clients/signal.py index fe44b75..d789fe1 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -11,7 +11,7 @@ from django.urls import reverse from signalbot import Command, Context, SignalBot from core.clients import ClientBase, signalapi -from core.messaging import ai, history, media_bridge, natural, replies, utils +from core.messaging import ai, history, media_bridge, natural, replies, reply_sync, utils from core.models import Chat, Manipulation, PersonIdentifier, PlatformChatLink, QueuedMessage from core.util import logs @@ -358,6 +358,13 @@ class HandleMessage(Command): ts = c.message.timestamp source_value = c.message.source envelope = raw.get("envelope", {}) + signal_source_message_id = str( + envelope.get("serverGuid") + or envelope.get("guid") + or envelope.get("timestamp") + or c.message.timestamp + or "" + ).strip() destination_number = sent_message.get("destination") bot_uuid = str(getattr(c.bot, "bot_uuid", "") or "").strip() @@ -639,16 +646,36 @@ class HandleMessage(Command): identifier.user, identifier ) session_cache[session_key] = chat_session + reply_ref = reply_sync.extract_reply_ref(self.service, raw) + reply_target = await reply_sync.resolve_reply_target( + identifier.user, + chat_session, + reply_ref, + ) sender_key = source_uuid or source_number or identifier_candidates[0] message_key = (chat_session.id, ts, sender_key) message_text = identifier_text_overrides.get(session_key, relay_text) if message_key not in stored_messages: - await history.store_message( + origin_tag = reply_sync.extract_origin_tag(raw) + local_message = await history.store_message( session=chat_session, sender=sender_key, text=message_text, ts=ts, outgoing=is_from_bot, + source_service=self.service, + source_message_id=signal_source_message_id, + source_chat_id=str( + destination_number_norm or dest_norm or sender_key or "" + ), + reply_to=reply_target, + reply_source_service=str( + reply_ref.get("reply_source_service") or "" + ), + reply_source_message_id=str( + reply_ref.get("reply_source_message_id") or "" + ), + message_meta=reply_sync.apply_sync_origin({}, origin_tag), ) stored_messages.add(message_key) # Notify unified router to ensure service context is preserved @@ -658,6 +685,7 @@ class HandleMessage(Command): text=message_text, ts=ts, payload=msg, + local_message=local_message, ) # TODO: Permission checks diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index e84f665..90c4dba 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -1,5 +1,6 @@ import asyncio import inspect +import json import logging import mimetypes import os @@ -14,9 +15,14 @@ from django.conf import settings from django.core.cache import cache from core.clients import ClientBase, transport -from core.messaging import history, media_bridge +from core.messaging import history, media_bridge, reply_sync from core.models import Message, PersonIdentifier, PlatformChatLink +try: + from google.protobuf.json_format import MessageToDict +except Exception: # pragma: no cover + MessageToDict = None + class WhatsAppClient(ClientBase): """ @@ -45,6 +51,9 @@ class WhatsAppClient(ClientBase): self._qr_handler_supported = False self._event_hook_callable = False self._last_send_error = "" + self.reply_debug_chat = str( + getattr(settings, "WHATSAPP_REPLY_DEBUG_CHAT", "120363402761690215") + ).strip() self.enabled = bool( str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower() @@ -1464,6 +1473,72 @@ class WhatsAppClient(ClientBase): return sorted(str(key) for key in vars(obj).keys()) return [] + def _proto_to_dict(self, obj): + if obj is None: + return {} + if isinstance(obj, dict): + return obj + # Neonize emits protobuf objects for inbound events. Convert them to a + # plain dict so nested contextInfo reply fields are addressable. + if MessageToDict is not None and hasattr(obj, "DESCRIPTOR"): + try: + return MessageToDict( + obj, + preserving_proto_field_name=True, + use_integers_for_enums=True, + ) + except Exception: + pass + return {} + + def _chat_matches_reply_debug(self, chat: str) -> bool: + target = str(self.reply_debug_chat or "").strip() + value = str(chat or "").strip() + if not target or not value: + return False + value_local = value.split("@", 1)[0] + return value == target or value_local == target + + def _extract_reply_hints(self, obj, max_depth: int = 6): + hints = [] + + def walk(value, path="", depth=0): + if depth > max_depth or value is None: + return + if isinstance(value, dict): + for key, nested in value.items(): + key_str = str(key) + next_path = f"{path}.{key_str}" if path else key_str + lowered = key_str.lower() + if any( + token in lowered + for token in ("stanza", "quoted", "reply", "context") + ): + if isinstance(nested, (str, int, float, bool)): + hints.append( + { + "path": next_path, + "value": str(nested)[:180], + } + ) + walk(nested, next_path, depth + 1) + return + if isinstance(value, list): + for idx, nested in enumerate(value): + walk(nested, f"{path}[{idx}]", depth + 1) + + walk(obj, "", 0) + # Deduplicate by path/value for compact diagnostics. + unique = [] + seen = set() + for row in hints: + key = (str(row.get("path") or ""), str(row.get("value") or "")) + if key in seen: + continue + seen.add(key) + unique.append(row) + return unique[:40] + def _normalize_timestamp(self, raw_value): if raw_value is None: return int(time.time() * 1000) @@ -2361,19 +2436,22 @@ class WhatsAppClient(ClientBase): ] async def _handle_message_event(self, event): - msg_obj = self._pluck(event, "message") or self._pluck(event, "Message") - text = self._message_text(msg_obj, event) + event_obj = self._proto_to_dict(event) or event + msg_obj = self._pluck(event_obj, "message") or self._pluck(event_obj, "Message") + text = self._message_text(msg_obj, event_obj) if not text: self.log.debug( "whatsapp empty-text event shape: msg_keys=%s event_keys=%s type=%s", self._shape_keys(msg_obj), - self._shape_keys(event), + self._shape_keys(event_obj), str(type(event).__name__), ) source = ( - self._pluck(event, "Info", "MessageSource") - or self._pluck(event, "info", "message_source") - or self._pluck(event, "info", "messageSource") + self._pluck(event_obj, "Info", "MessageSource") + or self._pluck(event_obj, "info", "message_source") + or self._pluck(event_obj, "info", "messageSource") + or self._pluck(event_obj, "info", "message_source") + or self._pluck(event_obj, "info", "messageSource") ) is_from_me = bool( self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe") @@ -2389,17 +2467,17 @@ class WhatsAppClient(ClientBase): self._pluck(source, "Chat") or self._pluck(source, "chat") ) raw_ts = ( - self._pluck(event, "Info", "Timestamp") - or self._pluck(event, "info", "timestamp") - or self._pluck(event, "info", "message_timestamp") - or self._pluck(event, "Timestamp") - or self._pluck(event, "timestamp") + self._pluck(event_obj, "Info", "Timestamp") + or self._pluck(event_obj, "info", "timestamp") + or self._pluck(event_obj, "info", "message_timestamp") + or self._pluck(event_obj, "Timestamp") + or self._pluck(event_obj, "timestamp") ) msg_id = str( - self._pluck(event, "Info", "ID") - or self._pluck(event, "info", "id") - or self._pluck(event, "ID") - or self._pluck(event, "id") + self._pluck(event_obj, "Info", "ID") + or self._pluck(event_obj, "info", "id") + or self._pluck(event_obj, "ID") + or self._pluck(event_obj, "id") or "" ).strip() ts = self._normalize_timestamp(raw_ts) @@ -2529,12 +2607,196 @@ class WhatsAppClient(ClientBase): )() if duplicate_exists: continue - await history.store_message( + reply_ref = reply_sync.extract_reply_ref( + self.service, + { + "contextInfo": self._pluck(msg_obj, "contextInfo") + or self._pluck(msg_obj, "ContextInfo") + or self._pluck(msg_obj, "extendedTextMessage", "contextInfo") + or self._pluck(msg_obj, "ExtendedTextMessage", "ContextInfo") + or self._pluck(msg_obj, "imageMessage", "contextInfo") + or self._pluck(msg_obj, "ImageMessage", "ContextInfo") + or self._pluck(msg_obj, "videoMessage", "contextInfo") + or self._pluck(msg_obj, "VideoMessage", "ContextInfo") + or self._pluck( + msg_obj, + "ephemeralMessage", + "message", + "extendedTextMessage", + "contextInfo", + ) + or self._pluck( + msg_obj, + "EphemeralMessage", + "Message", + "ExtendedTextMessage", + "ContextInfo", + ) + or self._pluck( + msg_obj, + "viewOnceMessage", + "message", + "extendedTextMessage", + "contextInfo", + ) + or self._pluck( + msg_obj, + "ViewOnceMessage", + "Message", + "ExtendedTextMessage", + "ContextInfo", + ) + or self._pluck( + msg_obj, + "viewOnceMessageV2", + "message", + "extendedTextMessage", + "contextInfo", + ) + or self._pluck( + msg_obj, + "ViewOnceMessageV2", + "Message", + "ExtendedTextMessage", + "ContextInfo", + ) + or self._pluck( + msg_obj, + "viewOnceMessageV2Extension", + "message", + "extendedTextMessage", + "contextInfo", + ) + or self._pluck( + msg_obj, + "ViewOnceMessageV2Extension", + "Message", + "ExtendedTextMessage", + "ContextInfo", + ) + or {}, + "messageContextInfo": self._pluck(msg_obj, "messageContextInfo") + or self._pluck(msg_obj, "MessageContextInfo") + or {}, + "message": { + "extendedTextMessage": self._pluck(msg_obj, "extendedTextMessage") + or self._pluck(msg_obj, "ExtendedTextMessage") + or {}, + "imageMessage": self._pluck(msg_obj, "imageMessage") or {}, + "ImageMessage": self._pluck(msg_obj, "ImageMessage") or {}, + "videoMessage": self._pluck(msg_obj, "videoMessage") or {}, + "VideoMessage": self._pluck(msg_obj, "VideoMessage") or {}, + "documentMessage": self._pluck(msg_obj, "documentMessage") + or {}, + "DocumentMessage": self._pluck(msg_obj, "DocumentMessage") + or {}, + "ephemeralMessage": self._pluck(msg_obj, "ephemeralMessage") + or {}, + "EphemeralMessage": self._pluck(msg_obj, "EphemeralMessage") + or {}, + "viewOnceMessage": self._pluck(msg_obj, "viewOnceMessage") + or {}, + "ViewOnceMessage": self._pluck(msg_obj, "ViewOnceMessage") + or {}, + "viewOnceMessageV2": self._pluck(msg_obj, "viewOnceMessageV2") + or {}, + "ViewOnceMessageV2": self._pluck(msg_obj, "ViewOnceMessageV2") + or {}, + "viewOnceMessageV2Extension": self._pluck( + msg_obj, "viewOnceMessageV2Extension" + ) + or {}, + "ViewOnceMessageV2Extension": self._pluck( + msg_obj, "ViewOnceMessageV2Extension" + ) + or {}, + }, + }, + ) + reply_debug = {} + if self._chat_matches_reply_debug(chat): + reply_debug = reply_sync.extract_whatsapp_reply_debug( + { + "contextInfo": self._pluck(msg_obj, "contextInfo") or {}, + "messageContextInfo": self._pluck(msg_obj, "messageContextInfo") + or {}, + "message": { + "extendedTextMessage": self._pluck( + msg_obj, "extendedTextMessage" + ) + or {}, + "imageMessage": self._pluck(msg_obj, "imageMessage") or {}, + "videoMessage": self._pluck(msg_obj, "videoMessage") or {}, + "documentMessage": self._pluck(msg_obj, "documentMessage") + or {}, + "ephemeralMessage": self._pluck(msg_obj, "ephemeralMessage") + or {}, + "viewOnceMessage": self._pluck(msg_obj, "viewOnceMessage") + or {}, + "viewOnceMessageV2": self._pluck(msg_obj, "viewOnceMessageV2") + or {}, + "viewOnceMessageV2Extension": self._pluck( + msg_obj, "viewOnceMessageV2Extension" + ) + or {}, + }, + } + ) + self.log.warning( + "wa-reply-debug chat=%s msg_id=%s reply_ref=%s debug=%s", + str(chat or ""), + str(msg_id or ""), + json.dumps(reply_ref, ensure_ascii=True), + json.dumps(reply_debug, ensure_ascii=True), + ) + reply_target = await reply_sync.resolve_reply_target( + identifier.user, + session, + reply_ref, + ) + message_meta = reply_sync.apply_sync_origin( + {}, + reply_sync.extract_origin_tag(payload), + ) + if self._chat_matches_reply_debug(chat): + info_obj = self._proto_to_dict(self._pluck(event_obj, "Info")) or self._pluck( + event_obj, "Info" + ) + raw_obj = self._proto_to_dict(self._pluck(event_obj, "Raw")) or self._pluck( + event_obj, "Raw" + ) + message_meta["wa_reply_debug"] = { + "reply_ref": reply_ref, + "reply_target_id": str(getattr(reply_target, "id", "") or ""), + "msg_id": str(msg_id or ""), + "chat": str(chat or ""), + "sender": str(sender or ""), + "msg_obj_keys": self._shape_keys(msg_obj), + "event_keys": self._shape_keys(event_obj), + "info_keys": self._shape_keys(info_obj), + "raw_keys": self._shape_keys(raw_obj), + "event_type": str(type(event).__name__), + "reply_hints_event": self._extract_reply_hints(event_obj), + "reply_hints_message": self._extract_reply_hints(msg_obj), + "reply_hints_info": self._extract_reply_hints(info_obj), + "reply_hints_raw": self._extract_reply_hints(raw_obj), + "debug": reply_debug, + } + local_message = await history.store_message( session=session, sender=str(sender or chat or ""), text=display_text, ts=ts, outgoing=is_from_me, + source_service=self.service, + source_message_id=str(msg_id or ""), + source_chat_id=str(chat or sender or ""), + reply_to=reply_target, + reply_source_service=str(reply_ref.get("reply_source_service") or ""), + reply_source_message_id=str( + reply_ref.get("reply_source_message_id") or "" + ), + message_meta=message_meta, ) await self.ur.message_received( self.service, @@ -2542,6 +2804,7 @@ class WhatsAppClient(ClientBase): text=display_text, ts=ts, payload=payload, + local_message=local_message, ) async def _handle_receipt_event(self, event): @@ -2679,6 +2942,11 @@ class WhatsAppClient(ClientBase): return "" if "@" in raw: return raw + # Group chats often arrive as bare numeric ids in compose/runtime + # payloads; prefer known group mappings before defaulting to person JIDs. + group_jid = self._resolve_group_jid(raw) + if group_jid: + return group_jid digits = re.sub(r"[^0-9]", "", raw) if digits: # Prefer direct JID formatting for phone numbers; Neonize build_jid @@ -2691,6 +2959,66 @@ class WhatsAppClient(ClientBase): pass return raw + def _resolve_group_jid(self, value: str) -> str: + local = str(value or "").strip().split("@", 1)[0].strip() + if not local: + return "" + + # Runtime state is the cheapest source of truth for currently joined groups. + state = transport.get_runtime_state(self.service) or {} + for row in list(state.get("groups") or []): + if not isinstance(row, dict): + continue + candidates = ( + row.get("identifier"), + row.get("chat_identifier"), + row.get("chat"), + row.get("jid"), + row.get("chat_jid"), + ) + matched = False + for candidate in candidates: + candidate_local = str(self._jid_to_identifier(candidate) or "").split( + "@", 1 + )[0].strip() + if candidate_local and candidate_local == local: + matched = True + break + if not matched: + continue + jid = str( + self._jid_to_identifier(row.get("jid") or row.get("chat_jid") or "") + ).strip() + if jid and "@g.us" in jid: + return jid + return f"{local}@g.us" + + # DB fallback for compose pages that resolved from PlatformChatLink. + try: + link = ( + PlatformChatLink.objects.filter( + service="whatsapp", + chat_identifier=local, + is_group=True, + ) + .order_by("-updated_at", "-id") + .first() + ) + except Exception: + link = None + if link is not None: + jid = str(self._jid_to_identifier(link.chat_jid or "")).strip() + if jid and "@g.us" in jid: + return jid + return f"{local}@g.us" + + # WhatsApp group ids are numeric and usually very long (commonly start + # with 120...). Treat those as groups when no explicit mapping exists. + digits = re.sub(r"[^0-9]", "", local) + if digits and digits == local and len(digits) >= 15 and digits.startswith("120"): + return f"{digits}@g.us" + return "" + def _blob_key_to_compose_url(self, blob_key): key = str(blob_key or "").strip() if not key: @@ -2806,8 +3134,31 @@ class WhatsAppClient(ClientBase): metadata = dict(metadata or {}) xmpp_source_id = str(metadata.get("xmpp_source_id") or "").strip() legacy_message_id = str(metadata.get("legacy_message_id") or "").strip() + reply_to_upstream_message_id = str( + metadata.get("reply_to_upstream_message_id") or "" + ).strip() + reply_to_participant = str(metadata.get("reply_to_participant") or "").strip() + reply_to_remote_jid = str(metadata.get("reply_to_remote_jid") or "").strip() person_identifier = None - if xmpp_source_id: + if legacy_message_id: + person_identifier = await sync_to_async( + lambda: ( + Message.objects.filter(id=legacy_message_id) + .select_related("session__identifier__user", "session__identifier__person") + .first() + ) + )() + if person_identifier is not None: + person_identifier = getattr( + getattr(person_identifier, "session", None), "identifier", None + ) + if ( + person_identifier is not None + and str(getattr(person_identifier, "service", "") or "").strip().lower() + != "whatsapp" + ): + person_identifier = None + if person_identifier is None and (xmpp_source_id or legacy_message_id): candidates = list(self._normalize_identifier_candidates(recipient, jid_str)) if candidates: person_identifier = await sync_to_async( @@ -2828,8 +3179,25 @@ class WhatsAppClient(ClientBase): or "" ).strip() - def _record_bridge(response, ts_value, body_hint=""): - if not xmpp_source_id or person_identifier is None: + async def _record_bridge(response, ts_value, body_hint=""): + if person_identifier is None: + return + upstream_message_id = _extract_response_message_id(response) + if legacy_message_id: + try: + await history.save_bridge_ref( + person_identifier.user, + person_identifier, + source_service="whatsapp", + local_message_id=legacy_message_id, + local_ts=int(ts_value or int(time.time() * 1000)), + upstream_message_id=upstream_message_id, + upstream_author=str(recipient or ""), + upstream_ts=int(ts_value or 0), + ) + except Exception: + pass + if not xmpp_source_id: return transport.record_bridge_mapping( user_id=person_identifier.user_id, @@ -2837,7 +3205,7 @@ class WhatsAppClient(ClientBase): service="whatsapp", xmpp_message_id=xmpp_source_id, xmpp_ts=int(metadata.get("xmpp_source_ts") or 0), - upstream_message_id=_extract_response_message_id(response), + upstream_message_id=upstream_message_id, upstream_ts=int(ts_value or 0), text_preview=str(body_hint or metadata.get("xmpp_body") or ""), local_message_id=legacy_message_id, @@ -2899,7 +3267,7 @@ class WhatsAppClient(ClientBase): sent_ts, self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), ) - _record_bridge(response, sent_ts, body_hint=filename) + await _record_bridge(response, sent_ts, body_hint=filename) sent_any = True if getattr(settings, "WHATSAPP_DEBUG", False): self.log.debug( @@ -2916,6 +3284,35 @@ class WhatsAppClient(ClientBase): if text: response = None last_error = None + quoted_text_message = text + if reply_to_upstream_message_id: + try: + from neonize.proto.waE2E.WAWebProtobufsE2E_pb2 import ( + ContextInfo, + ExtendedTextMessage, + Message as WAProtoMessage, + ) + + context = ContextInfo( + stanzaID=reply_to_upstream_message_id, + ) + participant_jid = self._to_jid(reply_to_participant) + remote_jid = self._to_jid(reply_to_remote_jid) or jid_str + if participant_jid: + context.participant = participant_jid + if remote_jid: + context.remoteJID = remote_jid + quoted_text_message = WAProtoMessage( + extendedTextMessage=ExtendedTextMessage( + text=str(text or ""), + contextInfo=context, + ) + ) + except Exception as exc: + self.log.warning( + "whatsapp quoted-reply payload build failed: %s", exc + ) + quoted_text_message = text # Prepare cancel key (if caller provided command_id) cancel_key = None try: @@ -2945,7 +3342,7 @@ class WhatsAppClient(ClientBase): response = await self._call_client_method( getattr(self._client, "send_message", None), send_target, - text, + quoted_text_message, timeout=9.0, ) sent_any = True @@ -3030,7 +3427,7 @@ class WhatsAppClient(ClientBase): sent_ts, self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), ) - _record_bridge(response, sent_ts, body_hint=str(text or "")) + await _record_bridge(response, sent_ts, body_hint=str(text or "")) if not sent_any: self._last_send_error = "no_payload_sent" diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py index d8d6132..8b98ce2 100644 --- a/core/clients/xmpp.py +++ b/core/clients/xmpp.py @@ -16,7 +16,7 @@ from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream.stanzabase import ET from core.clients import ClientBase, transport -from core.messaging import ai, history, replies, utils +from core.messaging import ai, history, replies, reply_sync, utils from core.models import ( ChatSession, Manipulation, @@ -1236,14 +1236,46 @@ class XMPPComponent(ComponentXMPP): user=identifier.user, ) self.log.debug("Storing outbound XMPP message in history") + reply_ref = reply_sync.extract_reply_ref( + "xmpp", + { + "reply_source_message_id": parsed_reply_target, + "reply_source_chat_id": str(sender_jid or ""), + }, + ) + reply_target = await reply_sync.resolve_reply_target( + identifier.user, + session, + reply_ref, + ) local_message = await history.store_message( session=session, sender="XMPP", text=body, ts=int(now().timestamp() * 1000), outgoing=True, + source_service="xmpp", + source_message_id=xmpp_message_id, + source_chat_id=str(sender_jid or ""), + reply_to=reply_target, + reply_source_service=str(reply_ref.get("reply_source_service") or ""), + reply_source_message_id=str( + reply_ref.get("reply_source_message_id") or "" + ), + message_meta={}, ) self.log.debug("Stored outbound XMPP message in history") + await self.ur.message_received( + "xmpp", + identifier=identifier, + text=body, + ts=int(now().timestamp() * 1000), + payload={ + "sender_jid": sender_jid, + "recipient_jid": recipient_jid, + }, + local_message=local_message, + ) manipulations = Manipulation.objects.filter( group__people=identifier.person, diff --git a/core/commands/__init__.py b/core/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/commands/base.py b/core/commands/base.py new file mode 100644 index 0000000..0d9dd42 --- /dev/null +++ b/core/commands/base.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(slots=True) +class CommandContext: + service: str + channel_identifier: str + message_id: str + user_id: int + message_text: str + payload: dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class CommandResult: + ok: bool + status: str = "ok" + error: str = "" + payload: dict[str, Any] = field(default_factory=dict) + + +class CommandHandler: + slug = "" + + async def execute(self, ctx: CommandContext) -> CommandResult: + raise NotImplementedError diff --git a/core/commands/engine.py b/core/commands/engine.py new file mode 100644 index 0000000..c68bbf6 --- /dev/null +++ b/core/commands/engine.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from asgiref.sync import sync_to_async + +from core.commands.base import CommandContext, CommandResult +from core.commands.handlers.bp import BPCommandHandler +from core.commands.registry import get as get_handler +from core.commands.registry import register +from core.messaging.reply_sync import is_mirrored_origin +from core.models import CommandChannelBinding, CommandProfile, Message +from core.util import logs + +log = logs.get_logger("command_engine") + +_REGISTERED = False + + +def ensure_handlers_registered(): + global _REGISTERED + if _REGISTERED: + return + register(BPCommandHandler()) + _REGISTERED = True + + +async def _eligible_profiles(ctx: CommandContext) -> list[CommandProfile]: + def _load(): + direct = list( + CommandProfile.objects.filter( + user_id=ctx.user_id, + enabled=True, + channel_bindings__enabled=True, + channel_bindings__direction="ingress", + channel_bindings__service=ctx.service, + channel_bindings__channel_identifier=ctx.channel_identifier, + ).distinct() + ) + if direct: + return direct + # Compose-originated messages use `web` service even when the + # underlying conversation is mapped to a platform identifier. + if str(ctx.service or "").strip().lower() != "web": + return [] + trigger = ( + Message.objects.select_related("session", "session__identifier") + .filter(id=ctx.message_id, user_id=ctx.user_id) + .first() + ) + identifier = getattr(getattr(trigger, "session", None), "identifier", None) + fallback_service = str(getattr(identifier, "service", "") or "").strip().lower() + fallback_identifier = str(getattr(identifier, "identifier", "") or "").strip() + if not fallback_service or not fallback_identifier: + return [] + return list( + CommandProfile.objects.filter( + user_id=ctx.user_id, + enabled=True, + channel_bindings__enabled=True, + channel_bindings__direction="ingress", + channel_bindings__service=fallback_service, + channel_bindings__channel_identifier=fallback_identifier, + ).distinct() + ) + + return await sync_to_async(_load)() + + +def _matches_trigger(profile: CommandProfile, text: str) -> bool: + body = str(text or "").strip() + trigger = str(profile.trigger_token or "").strip() + if not trigger: + return False + if profile.exact_match_only: + return body == trigger + return trigger in body + + +async def process_inbound_message(ctx: CommandContext) -> list[CommandResult]: + ensure_handlers_registered() + trigger_message = await sync_to_async( + lambda: Message.objects.filter(id=ctx.message_id).first() + )() + if trigger_message is None: + return [] + if is_mirrored_origin(trigger_message.message_meta): + return [] + + profiles = await _eligible_profiles(ctx) + results: list[CommandResult] = [] + for profile in profiles: + if not _matches_trigger(profile, ctx.message_text): + continue + if profile.reply_required and trigger_message.reply_to_id is None: + results.append( + CommandResult( + ok=False, + status="skipped", + error="reply_required", + payload={"profile": profile.slug}, + ) + ) + continue + handler = get_handler(profile.slug) + if handler is None: + results.append( + CommandResult( + ok=False, + status="failed", + error=f"missing_handler:{profile.slug}", + ) + ) + continue + try: + result = await handler.execute(ctx) + results.append(result) + except Exception as exc: + log.exception("command execution failed for profile=%s: %s", profile.slug, exc) + results.append( + CommandResult( + ok=False, + status="failed", + error=f"handler_exception:{exc}", + ) + ) + return results diff --git a/core/commands/handlers/__init__.py b/core/commands/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/commands/handlers/bp.py b/core/commands/handlers/bp.py new file mode 100644 index 0000000..60c6a04 --- /dev/null +++ b/core/commands/handlers/bp.py @@ -0,0 +1,358 @@ +from __future__ import annotations + +import time + +from asgiref.sync import sync_to_async +from django.conf import settings + +from core.clients import transport +from core.commands.base import CommandContext, CommandHandler, CommandResult +from core.messaging import ai as ai_runner +from core.messaging.utils import messages_to_string +from core.models import ( + AI, + BusinessPlanDocument, + BusinessPlanRevision, + ChatSession, + CommandAction, + CommandChannelBinding, + CommandRun, + Message, +) + + +def _bp_system_prompt(): + return ( + "Create a structured business plan using the given template. " + "Follow the template section order exactly. " + "If data is missing, write concise assumptions and risks. " + "Return markdown only." + ) + + +def _clamp_transcript(transcript: str, max_chars: int) -> str: + text = str(transcript or "") + if max_chars <= 0 or len(text) <= max_chars: + return text + head_size = min(2000, max_chars // 3) + tail_size = max(0, max_chars - head_size - 140) + omitted = len(text) - head_size - tail_size + return ( + text[:head_size].rstrip() + + f"\n\n[... truncated {max(0, omitted)} chars ...]\n\n" + + text[-tail_size:].lstrip() + ) + + +def _bp_fallback_markdown(template_text: str, transcript: str, error_text: str = "") -> str: + header = ( + "## Business Plan (Draft)\n\n" + "Automatic fallback was used because AI generation failed for this run.\n" + ) + if error_text: + header += f"\nFailure: `{error_text}`\n" + return ( + f"{header}\n" + "### Template\n" + f"{template_text}\n\n" + "### Transcript Window\n" + f"{transcript}" + ) + + +def _chunk_for_transport(text: str, limit: int = 3000) -> list[str]: + body = str(text or "").strip() + if not body: + return [] + if len(body) <= limit: + return [body] + parts = [] + remaining = body + while len(remaining) > limit: + cut = remaining.rfind("\n\n", 0, limit) + if cut < int(limit * 0.45): + cut = remaining.rfind("\n", 0, limit) + if cut < int(limit * 0.35): + cut = limit + parts.append(remaining[:cut].rstrip()) + remaining = remaining[cut:].lstrip() + if remaining: + parts.append(remaining) + return [part for part in parts if part] + + +class BPCommandHandler(CommandHandler): + slug = "bp" + + async def _status_message(self, trigger_message: Message, text: str): + service = str(trigger_message.source_service or "").strip().lower() + if service == "web": + await sync_to_async(Message.objects.create)( + user=trigger_message.user, + session=trigger_message.session, + sender_uuid="", + text=text, + ts=int(time.time() * 1000), + custom_author="BOT", + source_service="web", + source_chat_id=trigger_message.source_chat_id or "", + ) + return + if service == "xmpp" and str(trigger_message.source_chat_id or "").strip(): + try: + await transport.send_message_raw( + "xmpp", + str(trigger_message.source_chat_id or "").strip(), + text=text, + attachments=[], + metadata={"origin_tag": f"bp-status:{trigger_message.id}"}, + ) + except Exception: + return + + async def _fanout(self, run: CommandRun, text: str) -> dict: + profile = run.profile + trigger = await sync_to_async( + lambda: Message.objects.select_related("session", "user") + .filter(id=run.trigger_message_id) + .first() + )() + if trigger is None: + return {"sent_bindings": 0, "failed_bindings": 0} + bindings = await sync_to_async(list)( + CommandChannelBinding.objects.filter( + profile=profile, + enabled=True, + direction="egress", + ) + ) + sent_bindings = 0 + failed_bindings = 0 + for binding in bindings: + if binding.service == "web": + session = None + channel_identifier = str(binding.channel_identifier or "").strip() + if ( + channel_identifier + and channel_identifier == str(trigger.source_chat_id or "").strip() + ): + session = trigger.session + if session is None and channel_identifier: + session = await sync_to_async( + lambda: ChatSession.objects.filter( + user=trigger.user, + identifier__identifier=channel_identifier, + ) + .order_by("-last_interaction") + .first() + )() + if session is None: + session = trigger.session + await sync_to_async(Message.objects.create)( + user=trigger.user, + session=session, + sender_uuid="", + text=text, + ts=int(time.time() * 1000), + custom_author="BOT", + source_service="web", + source_chat_id=channel_identifier or str(trigger.source_chat_id or ""), + message_meta={"origin_tag": f"bp:{run.id}"}, + ) + sent_bindings += 1 + continue + try: + chunks = _chunk_for_transport(text, limit=3000) + if not chunks: + failed_bindings += 1 + continue + ok = True + for chunk in chunks: + ts = await transport.send_message_raw( + binding.service, + binding.channel_identifier, + text=chunk, + attachments=[], + metadata={ + "origin_tag": f"bp:{run.id}", + "command_slug": "bp", + }, + ) + if not ts: + ok = False + break + if ok: + sent_bindings += 1 + else: + failed_bindings += 1 + except Exception: + failed_bindings += 1 + return {"sent_bindings": sent_bindings, "failed_bindings": failed_bindings} + + async def execute(self, ctx: CommandContext) -> CommandResult: + trigger = await sync_to_async( + lambda: Message.objects.select_related("user", "session") + .filter(id=ctx.message_id) + .first() + )() + if trigger is None: + return CommandResult(ok=False, status="failed", error="trigger_not_found") + + profile = await sync_to_async( + lambda: trigger.user.commandprofile_set.filter(slug=self.slug, enabled=True) + .first() + )() + if profile is None: + return CommandResult(ok=False, status="skipped", error="profile_missing") + + actions = await sync_to_async(list)( + CommandAction.objects.filter( + profile=profile, + enabled=True, + ).order_by("position", "id") + ) + action_types = {row.action_type for row in actions} + if "extract_bp" not in action_types: + return CommandResult(ok=False, status="skipped", error="extract_bp_disabled") + + run, created = await sync_to_async(CommandRun.objects.get_or_create)( + profile=profile, + trigger_message=trigger, + defaults={ + "user": trigger.user, + "status": "running", + }, + ) + if not created and run.status in {"ok", "running"}: + return CommandResult( + ok=True, + status="ok", + payload={"document_id": str(run.result_ref_id or "")}, + ) + run.status = "running" + run.error = "" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + + if trigger.reply_to_id is None: + run.status = "failed" + run.error = "bp_requires_reply_target" + await sync_to_async(run.save)( + update_fields=["status", "error", "updated_at"] + ) + return CommandResult(ok=False, status="failed", error=run.error) + + anchor = trigger.reply_to + rows = await sync_to_async(list)( + Message.objects.filter( + user=trigger.user, + session=trigger.session, + ts__gte=int(anchor.ts or 0), + ts__lte=int(trigger.ts or 0), + ) + .order_by("ts") + .select_related("session", "session__identifier", "session__identifier__person") + ) + transcript = messages_to_string( + rows, + author_rewrites={"USER": "Operator", "BOT": "Assistant"}, + ) + max_transcript_chars = int( + getattr(settings, "BP_MAX_TRANSCRIPT_CHARS", 12000) or 12000 + ) + transcript = _clamp_transcript(transcript, max_transcript_chars) + default_template = ( + "Business Plan:\n" + "- Objective\n" + "- Audience\n" + "- Offer\n" + "- GTM\n" + "- Risks" + ) + template_text = profile.template_text or default_template + max_template_chars = int( + getattr(settings, "BP_MAX_TEMPLATE_CHARS", 5000) or 5000 + ) + template_text = str(template_text or "")[:max_template_chars] + ai_obj = await sync_to_async( + # Match compose draft/engage lookup behavior exactly. + lambda: AI.objects.filter(user=trigger.user).first() + )() + ai_warning = "" + if ai_obj is None: + summary = _bp_fallback_markdown( + template_text, + transcript, + "ai_not_configured", + ) + ai_warning = "ai_not_configured" + else: + prompt = [ + {"role": "system", "content": _bp_system_prompt()}, + { + "role": "user", + "content": ( + "Template:\n" + f"{template_text}\n\n" + "Messages:\n" + f"{transcript}" + ), + }, + ] + try: + summary = str(await ai_runner.run_prompt(prompt, ai_obj) or "").strip() + if not summary: + raise RuntimeError("empty_ai_response") + except Exception as exc: + ai_warning = f"bp_ai_failed:{exc}" + summary = _bp_fallback_markdown( + template_text, + transcript, + str(exc), + ) + + document = await sync_to_async(BusinessPlanDocument.objects.create)( + user=trigger.user, + command_profile=profile, + source_service=trigger.source_service or ctx.service, + source_channel_identifier=trigger.source_chat_id or ctx.channel_identifier, + trigger_message=trigger, + anchor_message=anchor, + title=f"Business Plan {time.strftime('%Y-%m-%d %H:%M:%S')}", + status="draft", + content_markdown=summary, + structured_payload={"source_message_ids": [str(row.id) for row in rows]}, + ) + await sync_to_async(BusinessPlanRevision.objects.create)( + document=document, + editor_user=trigger.user, + content_markdown=summary, + structured_payload={"source_message_ids": [str(row.id) for row in rows]}, + ) + + fanout_stats = {"sent_bindings": 0, "failed_bindings": 0} + if "post_result" in action_types: + fanout_stats = await self._fanout(run, summary) + + if "status_in_source" == profile.visibility_mode: + status_text = f"[bp] Generated business plan: {document.title}" + if ai_warning: + status_text += " (fallback mode)" + sent_count = int(fanout_stats.get("sent_bindings") or 0) + failed_count = int(fanout_stats.get("failed_bindings") or 0) + if sent_count or failed_count: + status_text += f" · fanout sent:{sent_count}" + if failed_count: + status_text += f" failed:{failed_count}" + await self._status_message(trigger, status_text) + + run.status = "ok" + run.result_ref = document + run.error = ai_warning + await sync_to_async(run.save)( + update_fields=["status", "result_ref", "error", "updated_at"] + ) + return CommandResult( + ok=True, + status="ok", + payload={"document_id": str(document.id)}, + ) diff --git a/core/commands/registry.py b/core/commands/registry.py new file mode 100644 index 0000000..43a89f3 --- /dev/null +++ b/core/commands/registry.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from core.commands.base import CommandHandler + +_HANDLERS: dict[str, CommandHandler] = {} + + +def register(handler: CommandHandler): + slug = str(getattr(handler, "slug", "") or "").strip().lower() + if not slug: + raise ValueError("handler slug is required") + _HANDLERS[slug] = handler + + +def get(slug: str) -> CommandHandler | None: + return _HANDLERS.get(str(slug or "").strip().lower()) diff --git a/core/messaging/history.py b/core/messaging/history.py index b60de0e..076acdd 100644 --- a/core/messaging/history.py +++ b/core/messaging/history.py @@ -144,7 +144,20 @@ async def get_chat_session(user, identifier): return chat_session -async def store_message(session, sender, text, ts, outgoing=False): +async def store_message( + session, + sender, + text, + ts, + outgoing=False, + source_service="", + source_message_id="", + source_chat_id="", + reply_to=None, + reply_source_service="", + reply_source_message_id="", + message_meta=None, +): log.debug("Storing message for session=%s outgoing=%s", session.id, outgoing) msg = await sync_to_async(Message.objects.create)( user=session.user, @@ -154,12 +167,32 @@ async def store_message(session, sender, text, ts, outgoing=False): ts=ts, delivered_ts=ts, custom_author="USER" if outgoing else None, + source_service=(source_service or None), + source_message_id=str(source_message_id or "").strip() or None, + source_chat_id=str(source_chat_id or "").strip() or None, + reply_to=reply_to, + reply_source_service=str(reply_source_service or "").strip() or None, + reply_source_message_id=str(reply_source_message_id or "").strip() or None, + message_meta=dict(message_meta or {}), ) return msg -async def store_own_message(session, text, ts, manip=None, queue=False): +async def store_own_message( + session, + text, + ts, + manip=None, + queue=False, + source_service="", + source_message_id="", + source_chat_id="", + reply_to=None, + reply_source_service="", + reply_source_message_id="", + message_meta=None, +): log.debug("Storing own message for session=%s queue=%s", session.id, queue) cast = { "user": session.user, @@ -168,6 +201,13 @@ async def store_own_message(session, text, ts, manip=None, queue=False): "text": text, "ts": ts, "delivered_ts": ts, + "source_service": (source_service or None), + "source_message_id": str(source_message_id or "").strip() or None, + "source_chat_id": str(source_chat_id or "").strip() or None, + "reply_to": reply_to, + "reply_source_service": str(reply_source_service or "").strip() or None, + "reply_source_message_id": str(reply_source_message_id or "").strip() or None, + "message_meta": dict(message_meta or {}), } if queue: msg_object = QueuedMessage diff --git a/core/messaging/reply_sync.py b/core/messaging/reply_sync.py new file mode 100644 index 0000000..54d204f --- /dev/null +++ b/core/messaging/reply_sync.py @@ -0,0 +1,391 @@ +from __future__ import annotations + +import re +from typing import Any + +from asgiref.sync import sync_to_async + +from core.messaging import history +from core.models import Message + + +def _as_dict(value: Any) -> dict[str, Any]: + return dict(value) if isinstance(value, dict) else {} + + +def _pluck(data: Any, *path: str): + cur = data + for key in path: + if isinstance(cur, dict): + cur = cur.get(key) + continue + if hasattr(cur, key): + cur = getattr(cur, key) + continue + return None + return cur + + +def _clean(value: Any) -> str: + return str(value or "").strip() + + +def _find_origin_tag(value: Any, depth: int = 0) -> str: + if depth > 4: + return "" + if isinstance(value, dict): + direct = _clean(value.get("origin_tag")) + if direct: + return direct + for key in ("metadata", "meta", "message_meta", "contextInfo", "context_info"): + nested = _find_origin_tag(value.get(key), depth + 1) + if nested: + return nested + for nested_value in value.values(): + nested = _find_origin_tag(nested_value, depth + 1) + if nested: + return nested + return "" + if isinstance(value, list): + for item in value: + nested = _find_origin_tag(item, depth + 1) + if nested: + return nested + return "" + + +def _extract_signal_reply(raw_payload: dict[str, Any]) -> dict[str, str]: + envelope = _as_dict((raw_payload or {}).get("envelope")) + data_message = _as_dict( + envelope.get("dataMessage") + or envelope.get("syncMessage", {}).get("sentMessage", {}).get("message") + ) + quote = _as_dict(data_message.get("quote")) + quote_id = _clean(quote.get("id")) + if quote_id: + return { + "reply_source_message_id": quote_id, + "reply_source_service": "signal", + "reply_source_chat_id": "", + } + return {} + + +def _extract_whatsapp_reply(raw_payload: dict[str, Any]) -> dict[str, str]: + # Handles common and nested contextInfo/messageContextInfo shapes for + # WhatsApp payloads (extended text, media, ephemeral, view-once wrappers). + candidate_paths = ( + ("contextInfo",), + ("ContextInfo",), + ("messageContextInfo",), + ("MessageContextInfo",), + ("extendedTextMessage", "contextInfo"), + ("ExtendedTextMessage", "ContextInfo"), + ("imageMessage", "contextInfo"), + ("ImageMessage", "ContextInfo"), + ("videoMessage", "contextInfo"), + ("VideoMessage", "ContextInfo"), + ("documentMessage", "contextInfo"), + ("DocumentMessage", "ContextInfo"), + ("ephemeralMessage", "message", "contextInfo"), + ("ephemeralMessage", "message", "extendedTextMessage", "contextInfo"), + ("viewOnceMessage", "message", "contextInfo"), + ("viewOnceMessage", "message", "extendedTextMessage", "contextInfo"), + ("viewOnceMessageV2", "message", "contextInfo"), + ("viewOnceMessageV2", "message", "extendedTextMessage", "contextInfo"), + ("viewOnceMessageV2Extension", "message", "contextInfo"), + ("viewOnceMessageV2Extension", "message", "extendedTextMessage", "contextInfo"), + # snake_case protobuf dict variants + ("context_info",), + ("message_context_info",), + ("extended_text_message", "context_info"), + ("image_message", "context_info"), + ("video_message", "context_info"), + ("document_message", "context_info"), + ("ephemeral_message", "message", "context_info"), + ("ephemeral_message", "message", "extended_text_message", "context_info"), + ("view_once_message", "message", "context_info"), + ("view_once_message", "message", "extended_text_message", "context_info"), + ("view_once_message_v2", "message", "context_info"), + ("view_once_message_v2", "message", "extended_text_message", "context_info"), + ("view_once_message_v2_extension", "message", "context_info"), + ( + "view_once_message_v2_extension", + "message", + "extended_text_message", + "context_info", + ), + ) + contexts = [] + for path in candidate_paths: + row = _as_dict(_pluck(raw_payload, *path)) + if row: + contexts.append(row) + # Recursive fallback for unknown wrapper shapes. + stack = [_as_dict(raw_payload)] + while stack: + current = stack.pop() + if not isinstance(current, dict): + continue + if isinstance(current.get("contextInfo"), dict): + contexts.append(_as_dict(current.get("contextInfo"))) + if isinstance(current.get("ContextInfo"), dict): + contexts.append(_as_dict(current.get("ContextInfo"))) + if isinstance(current.get("messageContextInfo"), dict): + contexts.append(_as_dict(current.get("messageContextInfo"))) + if isinstance(current.get("MessageContextInfo"), dict): + contexts.append(_as_dict(current.get("MessageContextInfo"))) + if isinstance(current.get("context_info"), dict): + contexts.append(_as_dict(current.get("context_info"))) + if isinstance(current.get("message_context_info"), dict): + contexts.append(_as_dict(current.get("message_context_info"))) + for value in current.values(): + if isinstance(value, dict): + stack.append(value) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + stack.append(item) + + for context in contexts: + stanza_id = _clean( + context.get("stanzaId") + or context.get("stanzaID") + or context.get("stanza_id") + or context.get("StanzaId") + or context.get("StanzaID") + or context.get("quotedMessageID") + or context.get("quotedMessageId") + or context.get("QuotedMessageID") + or context.get("QuotedMessageId") + or _pluck(context, "quotedMessageKey", "id") + or _pluck(context, "quoted_message_key", "id") + or _pluck(context, "quotedMessage", "key", "id") + or _pluck(context, "quoted_message", "key", "id") + ) + if not stanza_id: + continue + participant = _clean( + context.get("participant") + or context.get("remoteJid") + or context.get("chat") + or context.get("Participant") + or context.get("RemoteJid") + or context.get("RemoteJID") + or context.get("Chat") + ) + return { + "reply_source_message_id": stanza_id, + "reply_source_service": "whatsapp", + "reply_source_chat_id": participant, + } + return {} + + +def extract_whatsapp_reply_debug(raw_payload: dict[str, Any]) -> dict[str, Any]: + payload = _as_dict(raw_payload) + candidate_paths = ( + ("contextInfo",), + ("ContextInfo",), + ("messageContextInfo",), + ("MessageContextInfo",), + ("extendedTextMessage", "contextInfo"), + ("ExtendedTextMessage", "ContextInfo"), + ("imageMessage", "contextInfo"), + ("ImageMessage", "ContextInfo"), + ("videoMessage", "contextInfo"), + ("VideoMessage", "ContextInfo"), + ("documentMessage", "contextInfo"), + ("DocumentMessage", "ContextInfo"), + ("ephemeralMessage", "message", "contextInfo"), + ("ephemeralMessage", "message", "extendedTextMessage", "contextInfo"), + ("viewOnceMessage", "message", "contextInfo"), + ("viewOnceMessage", "message", "extendedTextMessage", "contextInfo"), + ("viewOnceMessageV2", "message", "contextInfo"), + ("viewOnceMessageV2", "message", "extendedTextMessage", "contextInfo"), + ("viewOnceMessageV2Extension", "message", "contextInfo"), + ("viewOnceMessageV2Extension", "message", "extendedTextMessage", "contextInfo"), + ("context_info",), + ("message_context_info",), + ("extended_text_message", "context_info"), + ("image_message", "context_info"), + ("video_message", "context_info"), + ("document_message", "context_info"), + ("ephemeral_message", "message", "context_info"), + ("ephemeral_message", "message", "extended_text_message", "context_info"), + ("view_once_message", "message", "context_info"), + ("view_once_message", "message", "extended_text_message", "context_info"), + ("view_once_message_v2", "message", "context_info"), + ("view_once_message_v2", "message", "extended_text_message", "context_info"), + ("view_once_message_v2_extension", "message", "context_info"), + ( + "view_once_message_v2_extension", + "message", + "extended_text_message", + "context_info", + ), + ) + rows = [] + for path in candidate_paths: + context = _as_dict(_pluck(payload, *path)) + if not context: + continue + rows.append( + { + "path": ".".join(path), + "keys": sorted([str(key) for key in context.keys()])[:40], + "stanzaId": _clean( + context.get("stanzaId") + or context.get("stanzaID") + or context.get("stanza_id") + or context.get("StanzaId") + or context.get("StanzaID") + or context.get("quotedMessageID") + or context.get("quotedMessageId") + or context.get("QuotedMessageID") + or context.get("QuotedMessageId") + or _pluck(context, "quotedMessageKey", "id") + or _pluck(context, "quoted_message_key", "id") + or _pluck(context, "quotedMessage", "key", "id") + or _pluck(context, "quoted_message", "key", "id") + ), + "participant": _clean( + context.get("participant") + or context.get("remoteJid") + or context.get("chat") + or context.get("Participant") + or context.get("RemoteJid") + or context.get("RemoteJID") + or context.get("Chat") + ), + } + ) + return { + "candidate_count": len(rows), + "candidates": rows[:20], + } + + +def extract_reply_ref(service: str, raw_payload: dict[str, Any]) -> dict[str, str]: + svc = _clean(service).lower() + payload = _as_dict(raw_payload) + if svc == "xmpp": + reply_id = _clean(payload.get("reply_source_message_id") or payload.get("reply_id")) + reply_chat = _clean(payload.get("reply_source_chat_id") or payload.get("reply_chat_id")) + if reply_id: + return { + "reply_source_message_id": reply_id, + "reply_source_service": "xmpp", + "reply_source_chat_id": reply_chat, + } + return {} + if svc == "signal": + return _extract_signal_reply(payload) + if svc == "whatsapp": + return _extract_whatsapp_reply(payload) + if svc == "web": + reply_id = _clean(payload.get("reply_to_message_id")) + if reply_id: + return { + "reply_source_message_id": reply_id, + "reply_source_service": "web", + "reply_source_chat_id": _clean(payload.get("reply_source_chat_id")), + } + return {} + + +def extract_origin_tag(raw_payload: dict[str, Any] | None) -> str: + return _find_origin_tag(_as_dict(raw_payload)) + + +async def resolve_reply_target(user, session, reply_ref: dict[str, str]) -> Message | None: + if not reply_ref or session is None: + return None + reply_source_message_id = _clean(reply_ref.get("reply_source_message_id")) + if not reply_source_message_id: + return None + + # Direct local UUID fallback (web compose references local Message IDs). + if re.fullmatch( + r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", + reply_source_message_id, + ): + direct = await sync_to_async( + lambda: Message.objects.filter( + user=user, + session=session, + id=reply_source_message_id, + ).first() + )() + if direct is not None: + return direct + + source_service = _clean(reply_ref.get("reply_source_service")) + by_source = await sync_to_async( + lambda: Message.objects.filter( + user=user, + session=session, + source_service=source_service or None, + source_message_id=reply_source_message_id, + ) + .order_by("-ts") + .first() + )() + if by_source is not None: + return by_source + + # Bridge ref fallback: resolve replies against bridge mappings persisted in + # message receipt payloads. + identifier = getattr(session, "identifier", None) + if identifier is not None: + service_candidates = [] + if source_service: + service_candidates.append(source_service) + # XMPP replies can target bridged messages from any external service. + if source_service == "xmpp": + service_candidates.extend(["signal", "whatsapp", "instagram"]) + for candidate in service_candidates: + bridge = await history.resolve_bridge_ref( + user=user, + identifier=identifier, + source_service=candidate, + xmpp_message_id=reply_source_message_id, + upstream_message_id=reply_source_message_id, + ) + local_message_id = _clean((bridge or {}).get("local_message_id")) + if not local_message_id: + continue + bridged = await sync_to_async( + lambda: Message.objects.filter( + user=user, + session=session, + id=local_message_id, + ).first() + )() + if bridged is not None: + return bridged + + fallback = await sync_to_async( + lambda: Message.objects.filter( + user=user, + session=session, + reply_source_message_id=reply_source_message_id, + ) + .order_by("-ts") + .first() + )() + return fallback + + +def apply_sync_origin(message_meta: dict | None, origin_tag: str) -> dict: + payload = dict(message_meta or {}) + tag = _clean(origin_tag) + if not tag: + return payload + payload["origin_tag"] = tag + return payload + + +def is_mirrored_origin(message_meta: dict | None) -> bool: + payload = dict(message_meta or {}) + return bool(_clean(payload.get("origin_tag"))) diff --git a/core/migrations/0027_businessplandocument_businessplanrevision_and_more.py b/core/migrations/0027_businessplandocument_businessplanrevision_and_more.py new file mode 100644 index 0000000..de3a2f4 --- /dev/null +++ b/core/migrations/0027_businessplandocument_businessplanrevision_and_more.py @@ -0,0 +1,311 @@ +# Generated by Django 5.2.7 on 2026-03-01 20:03 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0026_platformchatlink_is_group'), + ] + + operations = [ + migrations.CreateModel( + name='BusinessPlanDocument', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('source_service', models.CharField(choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], max_length=255)), + ('source_channel_identifier', models.CharField(blank=True, default='', max_length=255)), + ('title', models.CharField(default='Business Plan', max_length=255)), + ('status', models.CharField(choices=[('draft', 'Draft'), ('final', 'Final')], default='draft', max_length=32)), + ('content_markdown', models.TextField(blank=True, default='')), + ('structured_payload', models.JSONField(blank=True, default=dict)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='BusinessPlanRevision', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('content_markdown', models.TextField(blank=True, default='')), + ('structured_payload', models.JSONField(blank=True, default=dict)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ], + options={ + 'ordering': ['created_at'], + }, + ), + migrations.CreateModel( + name='CommandAction', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('action_type', models.CharField(choices=[('extract_bp', 'Extract Business Plan'), ('post_result', 'Post Result'), ('save_document', 'Save Document')], max_length=64)), + ('enabled', models.BooleanField(default=True)), + ('config', models.JSONField(blank=True, default=dict)), + ('position', models.PositiveIntegerField(default=0)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + options={ + 'ordering': ['position', 'id'], + }, + ), + migrations.CreateModel( + name='CommandChannelBinding', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('direction', models.CharField(choices=[('ingress', 'Ingress'), ('egress', 'Egress'), ('scratchpad_mirror', 'Scratchpad Mirror')], max_length=64)), + ('service', models.CharField(choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], max_length=255)), + ('channel_identifier', models.CharField(max_length=255)), + ('enabled', models.BooleanField(default=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='CommandProfile', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('slug', models.CharField(default='bp', max_length=64)), + ('name', models.CharField(default='Business Plan', max_length=255)), + ('enabled', models.BooleanField(default=True)), + ('trigger_token', models.CharField(default='#bp#', max_length=64)), + ('reply_required', models.BooleanField(default=True)), + ('exact_match_only', models.BooleanField(default=True)), + ('window_scope', models.CharField(choices=[('conversation', 'Conversation')], default='conversation', max_length=64)), + ('template_text', models.TextField(blank=True, default='')), + ('visibility_mode', models.CharField(choices=[('status_in_source', 'Status In Source'), ('silent', 'Silent')], default='status_in_source', max_length=64)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='CommandRun', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('status', models.CharField(choices=[('pending', 'Pending'), ('running', 'Running'), ('ok', 'OK'), ('failed', 'Failed'), ('skipped', 'Skipped')], default='pending', max_length=32)), + ('error', models.TextField(blank=True, default='')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='TranslationBridge', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(default='Translation Bridge', max_length=255)), + ('enabled', models.BooleanField(default=True)), + ('a_service', models.CharField(choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], max_length=255)), + ('a_channel_identifier', models.CharField(max_length=255)), + ('a_language', models.CharField(default='en', max_length=64)), + ('b_service', models.CharField(choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], max_length=255)), + ('b_channel_identifier', models.CharField(max_length=255)), + ('b_language', models.CharField(default='en', max_length=64)), + ('direction', models.CharField(choices=[('a_to_b', 'A To B'), ('b_to_a', 'B To A'), ('bidirectional', 'Bidirectional')], default='bidirectional', max_length=32)), + ('quick_mode_title', models.CharField(blank=True, default='', max_length=255)), + ('settings', models.JSONField(blank=True, default=dict)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='TranslationEventLog', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('target_service', models.CharField(choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], max_length=255)), + ('target_channel', models.CharField(max_length=255)), + ('status', models.CharField(choices=[('pending', 'Pending'), ('ok', 'OK'), ('failed', 'Failed'), ('skipped', 'Skipped')], default='pending', max_length=32)), + ('error', models.TextField(blank=True, default='')), + ('origin_tag', models.CharField(blank=True, default='', max_length=255)), + ('content_hash', models.CharField(blank=True, default='', max_length=255)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ], + ), + migrations.AddField( + model_name='message', + name='message_meta', + field=models.JSONField(blank=True, default=dict, help_text='Normalized message metadata such as origin tags.'), + ), + migrations.AddField( + model_name='message', + name='reply_source_message_id', + field=models.CharField(blank=True, help_text='Source message id for the replied target.', max_length=255, null=True), + ), + migrations.AddField( + model_name='message', + name='reply_source_service', + field=models.CharField(blank=True, choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], help_text='Source service for the replied target.', max_length=255, null=True), + ), + migrations.AddField( + model_name='message', + name='reply_to', + field=models.ForeignKey(blank=True, help_text='Resolved local message this message replies to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='reply_children', to='core.message'), + ), + migrations.AddField( + model_name='message', + name='source_chat_id', + field=models.CharField(blank=True, help_text='Source service chat or thread identifier when available.', max_length=255, null=True), + ), + migrations.AddField( + model_name='message', + name='source_message_id', + field=models.CharField(blank=True, help_text='Source service message id when available.', max_length=255, null=True), + ), + migrations.AddField( + model_name='message', + name='source_service', + field=models.CharField(blank=True, choices=[('signal', 'Signal'), ('whatsapp', 'WhatsApp'), ('xmpp', 'XMPP'), ('instagram', 'Instagram'), ('web', 'Web')], help_text='Source service where this message originally appeared.', max_length=255, null=True), + ), + migrations.AddIndex( + model_name='message', + index=models.Index(fields=['user', 'source_service', 'source_message_id'], name='core_messag_user_id_252699_idx'), + ), + migrations.AddIndex( + model_name='message', + index=models.Index(fields=['user', 'session', 'ts'], name='core_messag_user_id_ba0e73_idx'), + ), + migrations.AddIndex( + model_name='message', + index=models.Index(fields=['user', 'reply_source_service', 'reply_source_message_id'], name='core_messag_user_id_70ca93_idx'), + ), + migrations.AddField( + model_name='businessplandocument', + name='anchor_message', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='business_plan_anchor_docs', to='core.message'), + ), + migrations.AddField( + model_name='businessplandocument', + name='trigger_message', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='business_plan_trigger_docs', to='core.message'), + ), + migrations.AddField( + model_name='businessplandocument', + name='user', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL), + ), + migrations.AddField( + model_name='businessplanrevision', + name='document', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='revisions', to='core.businessplandocument'), + ), + migrations.AddField( + model_name='businessplanrevision', + name='editor_user', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL), + ), + migrations.AddField( + model_name='commandprofile', + name='user', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL), + ), + migrations.AddField( + model_name='commandchannelbinding', + name='profile', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='channel_bindings', to='core.commandprofile'), + ), + migrations.AddField( + model_name='commandaction', + name='profile', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='actions', to='core.commandprofile'), + ), + migrations.AddField( + model_name='businessplandocument', + name='command_profile', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='business_plan_documents', to='core.commandprofile'), + ), + migrations.AddField( + model_name='commandrun', + name='profile', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='runs', to='core.commandprofile'), + ), + migrations.AddField( + model_name='commandrun', + name='result_ref', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='command_runs', to='core.businessplandocument'), + ), + migrations.AddField( + model_name='commandrun', + name='trigger_message', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='command_runs', to='core.message'), + ), + migrations.AddField( + model_name='commandrun', + name='user', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL), + ), + migrations.AddField( + model_name='translationbridge', + name='user', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL), + ), + migrations.AddField( + model_name='translationeventlog', + name='bridge', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='events', to='core.translationbridge'), + ), + migrations.AddField( + model_name='translationeventlog', + name='source_message', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='translation_events', to='core.message'), + ), + migrations.AddConstraint( + model_name='commandprofile', + constraint=models.UniqueConstraint(fields=('user', 'slug'), name='unique_command_profile_per_user'), + ), + migrations.AddIndex( + model_name='commandchannelbinding', + index=models.Index(fields=['profile', 'direction', 'service'], name='core_comman_profile_6c16d5_idx'), + ), + migrations.AddIndex( + model_name='commandchannelbinding', + index=models.Index(fields=['profile', 'service', 'channel_identifier'], name='core_comman_profile_2c801d_idx'), + ), + migrations.AddIndex( + model_name='commandaction', + index=models.Index(fields=['profile', 'action_type', 'enabled'], name='core_comman_profile_f8e752_idx'), + ), + migrations.AddIndex( + model_name='businessplandocument', + index=models.Index(fields=['user', 'status', 'updated_at'], name='core_busine_user_id_028f36_idx'), + ), + migrations.AddIndex( + model_name='businessplandocument', + index=models.Index(fields=['user', 'source_service', 'source_channel_identifier'], name='core_busine_user_id_54ef14_idx'), + ), + migrations.AddIndex( + model_name='commandrun', + index=models.Index(fields=['user', 'status', 'updated_at'], name='core_comman_user_id_aa2881_idx'), + ), + migrations.AddConstraint( + model_name='commandrun', + constraint=models.UniqueConstraint(fields=('profile', 'trigger_message'), name='unique_command_run_profile_trigger_message'), + ), + migrations.AddIndex( + model_name='translationbridge', + index=models.Index(fields=['user', 'enabled'], name='core_transl_user_id_ce99cd_idx'), + ), + migrations.AddIndex( + model_name='translationbridge', + index=models.Index(fields=['user', 'a_service', 'a_channel_identifier'], name='core_transl_user_id_2f26ee_idx'), + ), + migrations.AddIndex( + model_name='translationbridge', + index=models.Index(fields=['user', 'b_service', 'b_channel_identifier'], name='core_transl_user_id_1f910a_idx'), + ), + migrations.AddIndex( + model_name='translationeventlog', + index=models.Index(fields=['bridge', 'created_at'], name='core_transl_bridge__509ffc_idx'), + ), + migrations.AddIndex( + model_name='translationeventlog', + index=models.Index(fields=['bridge', 'status', 'updated_at'], name='core_transl_bridge__0a7676_idx'), + ), + migrations.AddIndex( + model_name='translationeventlog', + index=models.Index(fields=['origin_tag'], name='core_transl_origin__a5c2f3_idx'), + ), + ] diff --git a/core/models.py b/core/models.py index 143c90b..e1605a1 100644 --- a/core/models.py +++ b/core/models.py @@ -18,6 +18,7 @@ SERVICE_CHOICES = ( ("xmpp", "XMPP"), ("instagram", "Instagram"), ) +CHANNEL_SERVICE_CHOICES = SERVICE_CHOICES + (("web", "Web"),) MBTI_CHOICES = ( ("INTJ", "INTJ - Architect"), ("INTP", "INTP - Logician"), @@ -297,9 +298,61 @@ class Message(models.Model): blank=True, help_text="Raw normalized delivery/read receipt metadata.", ) + source_service = models.CharField( + max_length=255, + choices=CHANNEL_SERVICE_CHOICES, + null=True, + blank=True, + help_text="Source service where this message originally appeared.", + ) + source_message_id = models.CharField( + max_length=255, + null=True, + blank=True, + help_text="Source service message id when available.", + ) + source_chat_id = models.CharField( + max_length=255, + null=True, + blank=True, + help_text="Source service chat or thread identifier when available.", + ) + reply_to = models.ForeignKey( + "self", + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="reply_children", + help_text="Resolved local message this message replies to.", + ) + reply_source_service = models.CharField( + max_length=255, + choices=CHANNEL_SERVICE_CHOICES, + null=True, + blank=True, + help_text="Source service for the replied target.", + ) + reply_source_message_id = models.CharField( + max_length=255, + null=True, + blank=True, + help_text="Source message id for the replied target.", + ) + message_meta = models.JSONField( + default=dict, + blank=True, + help_text="Normalized message metadata such as origin tags.", + ) class Meta: ordering = ["ts"] + indexes = [ + models.Index(fields=["user", "source_service", "source_message_id"]), + models.Index(fields=["user", "session", "ts"]), + models.Index( + fields=["user", "reply_source_service", "reply_source_message_id"] + ), + ] class Group(models.Model): @@ -1568,6 +1621,270 @@ class PatternArtifactExport(models.Model): ) +class CommandProfile(models.Model): + WINDOW_SCOPE_CHOICES = ( + ("conversation", "Conversation"), + ) + VISIBILITY_CHOICES = ( + ("status_in_source", "Status In Source"), + ("silent", "Silent"), + ) + + user = models.ForeignKey(User, on_delete=models.CASCADE) + slug = models.CharField(max_length=64, default="bp") + name = models.CharField(max_length=255, default="Business Plan") + enabled = models.BooleanField(default=True) + trigger_token = models.CharField(max_length=64, default="#bp#") + reply_required = models.BooleanField(default=True) + exact_match_only = models.BooleanField(default=True) + window_scope = models.CharField( + max_length=64, + choices=WINDOW_SCOPE_CHOICES, + default="conversation", + ) + template_text = models.TextField(blank=True, default="") + visibility_mode = models.CharField( + max_length=64, + choices=VISIBILITY_CHOICES, + default="status_in_source", + ) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["user", "slug"], + name="unique_command_profile_per_user", + ) + ] + + def __str__(self): + return f"{self.user_id}:{self.slug}" + + +class CommandChannelBinding(models.Model): + DIRECTION_CHOICES = ( + ("ingress", "Ingress"), + ("egress", "Egress"), + ("scratchpad_mirror", "Scratchpad Mirror"), + ) + + profile = models.ForeignKey( + CommandProfile, + on_delete=models.CASCADE, + related_name="channel_bindings", + ) + direction = models.CharField(max_length=64, choices=DIRECTION_CHOICES) + service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + channel_identifier = models.CharField(max_length=255) + enabled = models.BooleanField(default=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["profile", "direction", "service"]), + models.Index(fields=["profile", "service", "channel_identifier"]), + ] + + +class CommandAction(models.Model): + ACTION_CHOICES = ( + ("extract_bp", "Extract Business Plan"), + ("post_result", "Post Result"), + ("save_document", "Save Document"), + ) + + profile = models.ForeignKey( + CommandProfile, + on_delete=models.CASCADE, + related_name="actions", + ) + action_type = models.CharField(max_length=64, choices=ACTION_CHOICES) + enabled = models.BooleanField(default=True) + config = models.JSONField(default=dict, blank=True) + position = models.PositiveIntegerField(default=0) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + ordering = ["position", "id"] + indexes = [models.Index(fields=["profile", "action_type", "enabled"])] + + +class BusinessPlanDocument(models.Model): + STATUS_CHOICES = ( + ("draft", "Draft"), + ("final", "Final"), + ) + + user = models.ForeignKey(User, on_delete=models.CASCADE) + command_profile = models.ForeignKey( + CommandProfile, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="business_plan_documents", + ) + source_service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + source_channel_identifier = models.CharField(max_length=255, blank=True, default="") + trigger_message = models.ForeignKey( + Message, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="business_plan_trigger_docs", + ) + anchor_message = models.ForeignKey( + Message, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="business_plan_anchor_docs", + ) + title = models.CharField(max_length=255, default="Business Plan") + status = models.CharField(max_length=32, choices=STATUS_CHOICES, default="draft") + content_markdown = models.TextField(blank=True, default="") + structured_payload = models.JSONField(default=dict, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "status", "updated_at"]), + models.Index(fields=["user", "source_service", "source_channel_identifier"]), + ] + + +class BusinessPlanRevision(models.Model): + document = models.ForeignKey( + BusinessPlanDocument, + on_delete=models.CASCADE, + related_name="revisions", + ) + editor_user = models.ForeignKey(User, on_delete=models.CASCADE) + content_markdown = models.TextField(blank=True, default="") + structured_payload = models.JSONField(default=dict, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["created_at"] + + +class CommandRun(models.Model): + STATUS_CHOICES = ( + ("pending", "Pending"), + ("running", "Running"), + ("ok", "OK"), + ("failed", "Failed"), + ("skipped", "Skipped"), + ) + + user = models.ForeignKey(User, on_delete=models.CASCADE) + profile = models.ForeignKey( + CommandProfile, + on_delete=models.CASCADE, + related_name="runs", + ) + trigger_message = models.ForeignKey( + Message, + on_delete=models.CASCADE, + related_name="command_runs", + ) + status = models.CharField(max_length=32, choices=STATUS_CHOICES, default="pending") + error = models.TextField(blank=True, default="") + result_ref = models.ForeignKey( + BusinessPlanDocument, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="command_runs", + ) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["profile", "trigger_message"], + name="unique_command_run_profile_trigger_message", + ) + ] + indexes = [models.Index(fields=["user", "status", "updated_at"])] + + +class TranslationBridge(models.Model): + DIRECTION_CHOICES = ( + ("a_to_b", "A To B"), + ("b_to_a", "B To A"), + ("bidirectional", "Bidirectional"), + ) + + user = models.ForeignKey(User, on_delete=models.CASCADE) + name = models.CharField(max_length=255, default="Translation Bridge") + enabled = models.BooleanField(default=True) + a_service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + a_channel_identifier = models.CharField(max_length=255) + a_language = models.CharField(max_length=64, default="en") + b_service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + b_channel_identifier = models.CharField(max_length=255) + b_language = models.CharField(max_length=64, default="en") + direction = models.CharField( + max_length=32, + choices=DIRECTION_CHOICES, + default="bidirectional", + ) + quick_mode_title = models.CharField(max_length=255, blank=True, default="") + settings = models.JSONField(default=dict, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "enabled"]), + models.Index(fields=["user", "a_service", "a_channel_identifier"]), + models.Index(fields=["user", "b_service", "b_channel_identifier"]), + ] + + +class TranslationEventLog(models.Model): + STATUS_CHOICES = ( + ("pending", "Pending"), + ("ok", "OK"), + ("failed", "Failed"), + ("skipped", "Skipped"), + ) + + bridge = models.ForeignKey( + TranslationBridge, + on_delete=models.CASCADE, + related_name="events", + ) + source_message = models.ForeignKey( + Message, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="translation_events", + ) + target_service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + target_channel = models.CharField(max_length=255) + status = models.CharField(max_length=32, choices=STATUS_CHOICES, default="pending") + error = models.TextField(blank=True, default="") + origin_tag = models.CharField(max_length=255, blank=True, default="") + content_hash = models.CharField(max_length=255, blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["bridge", "created_at"]), + models.Index(fields=["bridge", "status", "updated_at"]), + models.Index(fields=["origin_tag"]), + ] + + # class Perms(models.Model): # class Meta: # permissions = ( diff --git a/core/modules/router.py b/core/modules/router.py index dbe088b..4cb9392 100644 --- a/core/modules/router.py +++ b/core/modules/router.py @@ -8,9 +8,12 @@ from core.clients.instagram import InstagramClient from core.clients.signal import SignalClient from core.clients.whatsapp import WhatsAppClient from core.clients.xmpp import XMPPClient +from core.commands.base import CommandContext +from core.commands.engine import process_inbound_message from core.messaging import history from core.models import PersonIdentifier from core.realtime.typing_state import set_person_typing_state +from core.translation.engine import process_inbound_translation from core.util import logs @@ -91,6 +94,34 @@ class UnifiedRouter(object): async def message_received(self, protocol, *args, **kwargs): self.log.info(f"Message received ({protocol}) {args} {kwargs}") + identifier = kwargs.get("identifier") + local_message = kwargs.get("local_message") + message_text = str(kwargs.get("text") or "").strip() + if local_message is None: + return + channel_identifier = "" + if isinstance(identifier, PersonIdentifier): + channel_identifier = str(identifier.identifier or "").strip() + elif identifier is not None: + channel_identifier = str(identifier or "").strip() + if channel_identifier: + try: + await process_inbound_message( + CommandContext( + service=str(protocol or "").strip().lower(), + channel_identifier=channel_identifier, + message_id=str(local_message.id), + user_id=int(local_message.user_id), + message_text=message_text, + payload=dict(kwargs.get("payload") or {}), + ) + ) + except Exception as exc: + self.log.warning("Command engine processing failed: %s", exc) + try: + await process_inbound_translation(local_message) + except Exception as exc: + self.log.warning("Translation engine processing failed: %s", exc) async def _resolve_identifier_objects(self, protocol, identifier): if isinstance(identifier, PersonIdentifier): diff --git a/core/templates/base.html b/core/templates/base.html index 1599978..cfc59a8 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -392,6 +392,9 @@ AI + + Command Routing + {% if user.is_superuser %} System diff --git a/core/templates/pages/business-plan-editor.html b/core/templates/pages/business-plan-editor.html new file mode 100644 index 0000000..ff60a62 --- /dev/null +++ b/core/templates/pages/business-plan-editor.html @@ -0,0 +1,57 @@ +{% extends "base.html" %} + +{% block content %} +
+
+

Business Plan Editor

+

{{ document.source_service }} · {{ document.source_channel_identifier }}

+ +
+ +
+

Revisions

+ + + + + + {% for row in revisions %} + + + + + + {% empty %} + + {% endfor %} + +
CreatedEditorExcerpt
{{ row.created_at }}{{ row.editor_user.username }}{{ row.content_markdown|truncatechars:180 }}
No revisions yet.
+
+
+
+{% endblock %} diff --git a/core/templates/pages/command-routing.html b/core/templates/pages/command-routing.html new file mode 100644 index 0000000..53873ee --- /dev/null +++ b/core/templates/pages/command-routing.html @@ -0,0 +1,267 @@ +{% extends "base.html" %} + +{% block content %} +
+
+

Command Routing

+

Manage command profiles, channel bindings, business-plan outputs, and translation bridges.

+ +
+

Create Command Profile

+
+ {% csrf_token %} + +
+
+ +
+
+ +
+
+ +
+
+ + +
+
+ + {% for profile in profiles %} +
+

{{ profile.name }} ({{ profile.slug }})

+
+ {% csrf_token %} + + +
+
+ + +
+
+ + +
+
+ +
+ +
+
+
+ + + + +
+
+ + +
+ +
+
+ +
+
+

Channel Bindings

+ + + + + + {% for binding in profile.channel_bindings.all %} + + + + + + + {% empty %} + + {% endfor %} + +
DirectionServiceChannel
{{ binding.direction }}{{ binding.service }}{{ binding.channel_identifier }} +
+ {% csrf_token %} + + + +
+
No bindings yet.
+
+ {% csrf_token %} + + +
+
+
+ +
+
+
+
+ +
+
+
+ +
+
+ +
+
+
+
+ +
+

Actions

+ + + + + + {% for action_row in profile.actions.all %} + + + + + + + {% empty %} + + {% endfor %} + +
TypeEnabledPosition
{{ action_row.action_type }}{{ action_row.enabled }}{{ action_row.position }} +
+ {% csrf_token %} + + + + + +
+
No actions.
+
+
+ +
+ {% csrf_token %} + + + +
+
+ {% empty %} +
No command profiles configured.
+ {% endfor %} + +
+

Business Plan Documents

+ + + + + + {% for doc in documents %} + + + + + + + + {% empty %} + + {% endfor %} + +
TitleStatusSourceUpdated
{{ doc.title }}{{ doc.status }}{{ doc.source_service }} · {{ doc.source_channel_identifier }}{{ doc.updated_at }}Open
No business plan documents yet.
+
+ +
+

Translation Bridges

+
+ {% csrf_token %} + +
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + + + + + {% for bridge in bridges %} + + + + + + + + {% empty %} + + {% endfor %} + +
NameABDirection
{{ bridge.name }}{{ bridge.a_service }} · {{ bridge.a_channel_identifier }} · {{ bridge.a_language }}{{ bridge.b_service }} · {{ bridge.b_channel_identifier }} · {{ bridge.b_language }}{{ bridge.direction }} +
+ {% csrf_token %} + + + +
+
No translation bridges configured.
+
+ +
+

Translation Event Log

+ + + + + + {% for event in events %} + + + + + + + + {% empty %} + + {% endfor %} + +
BridgeStatusTargetErrorAt
{{ event.bridge.name }}{{ event.status }}{{ event.target_service }} · {{ event.target_channel }}{{ event.error|default:"-" }}{{ event.created_at }}
No events yet.
+
+
+
+{% endblock %} diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index b1d1b22..55ddd6f 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -72,6 +72,31 @@ Force Sync +
+ + +
+ + {% endif %}
{{ msg.source_label }}
@@ -336,6 +367,10 @@ {% endif %}

+ {% empty %} @@ -365,6 +400,7 @@ +
@@ -372,6 +408,11 @@ Confirm Send
+