from __future__ import annotations import time from asgiref.sync import sync_to_async from django.conf import settings from core.clients import transport from core.commands.base import CommandContext, CommandHandler, CommandResult from core.messaging import ai as ai_runner from core.messaging.utils import messages_to_string from core.models import ( AI, BusinessPlanDocument, BusinessPlanRevision, ChatSession, CommandAction, CommandChannelBinding, CommandRun, Message, ) def _bp_system_prompt(): return ( "Create a structured business plan using the given template. " "Follow the template section order exactly. " "If data is missing, write concise assumptions and risks. " "Return markdown only." ) def _clamp_transcript(transcript: str, max_chars: int) -> str: text = str(transcript or "") if max_chars <= 0 or len(text) <= max_chars: return text head_size = min(2000, max_chars // 3) tail_size = max(0, max_chars - head_size - 140) omitted = len(text) - head_size - tail_size return ( text[:head_size].rstrip() + f"\n\n[... truncated {max(0, omitted)} chars ...]\n\n" + text[-tail_size:].lstrip() ) def _bp_fallback_markdown(template_text: str, transcript: str, error_text: str = "") -> str: header = ( "## Business Plan (Draft)\n\n" "Automatic fallback was used because AI generation failed for this run.\n" ) if error_text: header += f"\nFailure: `{error_text}`\n" return ( f"{header}\n" "### Template\n" f"{template_text}\n\n" "### Transcript Window\n" f"{transcript}" ) def _chunk_for_transport(text: str, limit: int = 3000) -> list[str]: body = str(text or "").strip() if not body: return [] if len(body) <= limit: return [body] parts = [] remaining = body while len(remaining) > limit: cut = remaining.rfind("\n\n", 0, limit) if cut < int(limit * 0.45): cut = remaining.rfind("\n", 0, limit) if cut < int(limit * 0.35): cut = limit parts.append(remaining[:cut].rstrip()) remaining = remaining[cut:].lstrip() if remaining: parts.append(remaining) return [part for part in parts if part] class BPCommandHandler(CommandHandler): slug = "bp" async def _status_message(self, trigger_message: Message, text: str): service = str(trigger_message.source_service or "").strip().lower() if service == "web": await sync_to_async(Message.objects.create)( user=trigger_message.user, session=trigger_message.session, sender_uuid="", text=text, ts=int(time.time() * 1000), custom_author="BOT", source_service="web", source_chat_id=trigger_message.source_chat_id or "", ) return if service == "xmpp" and str(trigger_message.source_chat_id or "").strip(): try: await transport.send_message_raw( "xmpp", str(trigger_message.source_chat_id or "").strip(), text=text, attachments=[], metadata={"origin_tag": f"bp-status:{trigger_message.id}"}, ) except Exception: return async def _fanout(self, run: CommandRun, text: str) -> dict: profile = run.profile trigger = await sync_to_async( lambda: Message.objects.select_related("session", "user") .filter(id=run.trigger_message_id) .first() )() if trigger is None: return {"sent_bindings": 0, "failed_bindings": 0} bindings = await sync_to_async(list)( CommandChannelBinding.objects.filter( profile=profile, enabled=True, direction="egress", ) ) sent_bindings = 0 failed_bindings = 0 for binding in bindings: if binding.service == "web": session = None channel_identifier = str(binding.channel_identifier or "").strip() if ( channel_identifier and channel_identifier == str(trigger.source_chat_id or "").strip() ): session = trigger.session if session is None and channel_identifier: session = await sync_to_async( lambda: ChatSession.objects.filter( user=trigger.user, identifier__identifier=channel_identifier, ) .order_by("-last_interaction") .first() )() if session is None: session = trigger.session await sync_to_async(Message.objects.create)( user=trigger.user, session=session, sender_uuid="", text=text, ts=int(time.time() * 1000), custom_author="BOT", source_service="web", source_chat_id=channel_identifier or str(trigger.source_chat_id or ""), message_meta={"origin_tag": f"bp:{run.id}"}, ) sent_bindings += 1 continue try: chunks = _chunk_for_transport(text, limit=3000) if not chunks: failed_bindings += 1 continue ok = True for chunk in chunks: ts = await transport.send_message_raw( binding.service, binding.channel_identifier, text=chunk, attachments=[], metadata={ "origin_tag": f"bp:{run.id}", "command_slug": "bp", }, ) if not ts: ok = False break if ok: sent_bindings += 1 else: failed_bindings += 1 except Exception: failed_bindings += 1 return {"sent_bindings": sent_bindings, "failed_bindings": failed_bindings} async def execute(self, ctx: CommandContext) -> CommandResult: trigger = await sync_to_async( lambda: Message.objects.select_related("user", "session") .filter(id=ctx.message_id) .first() )() if trigger is None: return CommandResult(ok=False, status="failed", error="trigger_not_found") profile = await sync_to_async( lambda: trigger.user.commandprofile_set.filter(slug=self.slug, enabled=True) .first() )() if profile is None: return CommandResult(ok=False, status="skipped", error="profile_missing") actions = await sync_to_async(list)( CommandAction.objects.filter( profile=profile, enabled=True, ).order_by("position", "id") ) action_types = {row.action_type for row in actions} if "extract_bp" not in action_types: return CommandResult(ok=False, status="skipped", error="extract_bp_disabled") run, created = await sync_to_async(CommandRun.objects.get_or_create)( profile=profile, trigger_message=trigger, defaults={ "user": trigger.user, "status": "running", }, ) if not created and run.status in {"ok", "running"}: return CommandResult( ok=True, status="ok", payload={"document_id": str(run.result_ref_id or "")}, ) run.status = "running" run.error = "" await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) if trigger.reply_to_id is None: run.status = "failed" run.error = "bp_requires_reply_target" await sync_to_async(run.save)( update_fields=["status", "error", "updated_at"] ) return CommandResult(ok=False, status="failed", error=run.error) anchor = trigger.reply_to rows = await sync_to_async(list)( Message.objects.filter( user=trigger.user, session=trigger.session, ts__gte=int(anchor.ts or 0), ts__lte=int(trigger.ts or 0), ) .order_by("ts") .select_related("session", "session__identifier", "session__identifier__person") ) transcript = messages_to_string( rows, author_rewrites={"USER": "Operator", "BOT": "Assistant"}, ) max_transcript_chars = int( getattr(settings, "BP_MAX_TRANSCRIPT_CHARS", 12000) or 12000 ) transcript = _clamp_transcript(transcript, max_transcript_chars) default_template = ( "Business Plan:\n" "- Objective\n" "- Audience\n" "- Offer\n" "- GTM\n" "- Risks" ) template_text = profile.template_text or default_template max_template_chars = int( getattr(settings, "BP_MAX_TEMPLATE_CHARS", 5000) or 5000 ) template_text = str(template_text or "")[:max_template_chars] ai_obj = await sync_to_async( # Match compose draft/engage lookup behavior exactly. lambda: AI.objects.filter(user=trigger.user).first() )() ai_warning = "" if ai_obj is None: summary = _bp_fallback_markdown( template_text, transcript, "ai_not_configured", ) ai_warning = "ai_not_configured" else: prompt = [ {"role": "system", "content": _bp_system_prompt()}, { "role": "user", "content": ( "Template:\n" f"{template_text}\n\n" "Messages:\n" f"{transcript}" ), }, ] try: summary = str( await ai_runner.run_prompt( prompt, ai_obj, operation="command_bp_extract" ) or "" ).strip() if not summary: raise RuntimeError("empty_ai_response") except Exception as exc: ai_warning = f"bp_ai_failed:{exc}" summary = _bp_fallback_markdown( template_text, transcript, str(exc), ) document = await sync_to_async(BusinessPlanDocument.objects.create)( user=trigger.user, command_profile=profile, source_service=trigger.source_service or ctx.service, source_channel_identifier=trigger.source_chat_id or ctx.channel_identifier, trigger_message=trigger, anchor_message=anchor, title=f"Business Plan {time.strftime('%Y-%m-%d %H:%M:%S')}", status="draft", content_markdown=summary, structured_payload={"source_message_ids": [str(row.id) for row in rows]}, ) await sync_to_async(BusinessPlanRevision.objects.create)( document=document, editor_user=trigger.user, content_markdown=summary, structured_payload={"source_message_ids": [str(row.id) for row in rows]}, ) fanout_stats = {"sent_bindings": 0, "failed_bindings": 0} fanout_text = summary if ai_warning: warning_text = str(ai_warning or "").strip() if len(warning_text) > 300: warning_text = warning_text[:297].rstrip() + "..." fanout_text = ( "[bp] AI generation failed. Draft document was saved in fallback mode." + (f"\nReason: {warning_text}" if warning_text else "") ) if "post_result" in action_types: fanout_stats = await self._fanout(run, fanout_text) if "status_in_source" == profile.visibility_mode: status_text = f"[bp] Generated business plan: {document.title}" if ai_warning: status_text += " (fallback mode)" sent_count = int(fanout_stats.get("sent_bindings") or 0) failed_count = int(fanout_stats.get("failed_bindings") or 0) if sent_count or failed_count: status_text += f" ยท fanout sent:{sent_count}" if failed_count: status_text += f" failed:{failed_count}" await self._status_message(trigger, status_text) run.status = "ok" run.result_ref = document run.error = ai_warning await sync_to_async(run.save)( update_fields=["status", "result_ref", "error", "updated_at"] ) return CommandResult( ok=True, status="ok", payload={"document_id": str(document.id)}, )