diff --git a/core/commands/delivery.py b/core/commands/delivery.py new file mode 100644 index 0000000..eb37a88 --- /dev/null +++ b/core/commands/delivery.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import time + +from asgiref.sync import sync_to_async + +from core.clients import transport +from core.models import ChatSession, Message + +STATUS_VISIBLE_SOURCE_SERVICES = {"web", "xmpp"} + + +def chunk_for_transport(text: str, limit: int = 3000) -> list[str]: + body = str(text or "").strip() + if not body: + return [] + if len(body) <= limit: + return [body] + parts = [] + remaining = body + while len(remaining) > limit: + cut = remaining.rfind("\n\n", 0, limit) + if cut < int(limit * 0.45): + cut = remaining.rfind("\n", 0, limit) + if cut < int(limit * 0.35): + cut = limit + parts.append(remaining[:cut].rstrip()) + remaining = remaining[cut:].lstrip() + if remaining: + parts.append(remaining) + return [part for part in parts if part] + + +async def post_status_in_source(trigger_message: Message, text: str, origin_tag: str) -> bool: + service = str(trigger_message.source_service or "").strip().lower() + if service not in STATUS_VISIBLE_SOURCE_SERVICES: + return False + if service == "web": + await sync_to_async(Message.objects.create)( + user=trigger_message.user, + session=trigger_message.session, + sender_uuid="", + text=text, + ts=int(time.time() * 1000), + custom_author="BOT", + source_service="web", + source_chat_id=trigger_message.source_chat_id or "", + message_meta={"origin_tag": origin_tag}, + ) + return True + # For non-web, route through transport raw API. + if not str(trigger_message.source_chat_id or "").strip(): + return False + try: + await transport.send_message_raw( + service, + str(trigger_message.source_chat_id or "").strip(), + text=text, + attachments=[], + metadata={"origin_tag": origin_tag}, + ) + return True + except Exception: + return False + + +async def post_to_channel_binding( + trigger_message: Message, + binding_service: str, + binding_channel_identifier: str, + text: str, + origin_tag: str, + command_slug: str, +) -> bool: + service = str(binding_service or "").strip().lower() + channel_identifier = str(binding_channel_identifier or "").strip() + if service == "web": + session = None + if channel_identifier and channel_identifier == str( + trigger_message.source_chat_id or "" + ).strip(): + session = trigger_message.session + if session is None and channel_identifier: + session = await sync_to_async( + lambda: ChatSession.objects.filter( + user=trigger_message.user, + identifier__identifier=channel_identifier, + ) + .order_by("-last_interaction") + .first() + )() + if session is None: + session = trigger_message.session + await sync_to_async(Message.objects.create)( + user=trigger_message.user, + session=session, + sender_uuid="", + text=text, + ts=int(time.time() * 1000), + custom_author="BOT", + source_service="web", + source_chat_id=channel_identifier or str(trigger_message.source_chat_id or ""), + message_meta={"origin_tag": origin_tag}, + ) + return True + try: + chunks = chunk_for_transport(text, limit=3000) + if not chunks: + return False + for chunk in chunks: + ts = await transport.send_message_raw( + service, + channel_identifier, + text=chunk, + attachments=[], + metadata={ + "origin_tag": origin_tag, + "command_slug": command_slug, + }, + ) + if not ts: + return False + return True + except Exception: + return False diff --git a/core/commands/handlers/bp.py b/core/commands/handlers/bp.py index 5ded0ac..442db8f 100644 --- a/core/commands/handlers/bp.py +++ b/core/commands/handlers/bp.py @@ -5,15 +5,14 @@ import time from asgiref.sync import sync_to_async from django.conf import settings -from core.clients import transport from core.commands.base import CommandContext, CommandHandler, CommandResult +from core.commands.delivery import post_status_in_source, post_to_channel_binding from core.messaging import ai as ai_runner from core.messaging.utils import messages_to_string from core.models import ( AI, BusinessPlanDocument, BusinessPlanRevision, - ChatSession, CommandAction, CommandChannelBinding, CommandRun, @@ -60,56 +59,9 @@ def _bp_fallback_markdown(template_text: str, transcript: str, error_text: str = ) -def _chunk_for_transport(text: str, limit: int = 3000) -> list[str]: - body = str(text or "").strip() - if not body: - return [] - if len(body) <= limit: - return [body] - parts = [] - remaining = body - while len(remaining) > limit: - cut = remaining.rfind("\n\n", 0, limit) - if cut < int(limit * 0.45): - cut = remaining.rfind("\n", 0, limit) - if cut < int(limit * 0.35): - cut = limit - parts.append(remaining[:cut].rstrip()) - remaining = remaining[cut:].lstrip() - if remaining: - parts.append(remaining) - return [part for part in parts if part] - - class BPCommandHandler(CommandHandler): slug = "bp" - async def _status_message(self, trigger_message: Message, text: str): - service = str(trigger_message.source_service or "").strip().lower() - if service == "web": - await sync_to_async(Message.objects.create)( - user=trigger_message.user, - session=trigger_message.session, - sender_uuid="", - text=text, - ts=int(time.time() * 1000), - custom_author="BOT", - source_service="web", - source_chat_id=trigger_message.source_chat_id or "", - ) - return - if service == "xmpp" and str(trigger_message.source_chat_id or "").strip(): - try: - await transport.send_message_raw( - "xmpp", - str(trigger_message.source_chat_id or "").strip(), - text=text, - attachments=[], - metadata={"origin_tag": f"bp-status:{trigger_message.id}"}, - ) - except Exception: - return - async def _fanout(self, run: CommandRun, text: str) -> dict: profile = run.profile trigger = await sync_to_async( @@ -129,63 +81,17 @@ class BPCommandHandler(CommandHandler): sent_bindings = 0 failed_bindings = 0 for binding in bindings: - if binding.service == "web": - session = None - channel_identifier = str(binding.channel_identifier or "").strip() - if ( - channel_identifier - and channel_identifier == str(trigger.source_chat_id or "").strip() - ): - session = trigger.session - if session is None and channel_identifier: - session = await sync_to_async( - lambda: ChatSession.objects.filter( - user=trigger.user, - identifier__identifier=channel_identifier, - ) - .order_by("-last_interaction") - .first() - )() - if session is None: - session = trigger.session - await sync_to_async(Message.objects.create)( - user=trigger.user, - session=session, - sender_uuid="", - text=text, - ts=int(time.time() * 1000), - custom_author="BOT", - source_service="web", - source_chat_id=channel_identifier or str(trigger.source_chat_id or ""), - message_meta={"origin_tag": f"bp:{run.id}"}, - ) + ok = await post_to_channel_binding( + trigger_message=trigger, + binding_service=binding.service, + binding_channel_identifier=binding.channel_identifier, + text=text, + origin_tag=f"bp:{run.id}", + command_slug=self.slug, + ) + if ok: sent_bindings += 1 - continue - try: - chunks = _chunk_for_transport(text, limit=3000) - if not chunks: - failed_bindings += 1 - continue - ok = True - for chunk in chunks: - ts = await transport.send_message_raw( - binding.service, - binding.channel_identifier, - text=chunk, - attachments=[], - metadata={ - "origin_tag": f"bp:{run.id}", - "command_slug": "bp", - }, - ) - if not ts: - ok = False - break - if ok: - sent_bindings += 1 - else: - failed_bindings += 1 - except Exception: + else: failed_bindings += 1 return {"sent_bindings": sent_bindings, "failed_bindings": failed_bindings} @@ -357,7 +263,11 @@ class BPCommandHandler(CommandHandler): status_text += f" · fanout sent:{sent_count}" if failed_count: status_text += f" failed:{failed_count}" - await self._status_message(trigger, status_text) + await post_status_in_source( + trigger_message=trigger, + text=status_text, + origin_tag=f"bp-status:{trigger.id}", + ) run.status = "ok" run.result_ref = document diff --git a/core/templates/base.html b/core/templates/base.html index bc92840..eec83c5 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -6,7 +6,7 @@
-