Implement business plans

This commit is contained in:
2026-03-02 00:00:53 +00:00
parent d22924f6aa
commit b3e183eb0a
26 changed files with 4109 additions and 39 deletions

View File

@@ -26,6 +26,8 @@ from django.utils import timezone as dj_timezone
from django.views import View
from core.clients import transport
from core.commands.base import CommandContext
from core.commands.engine import process_inbound_message
from core.messaging import ai as ai_runner
from core.messaging import media_bridge
from core.messaging.utils import messages_to_string
@@ -33,6 +35,9 @@ from core.models import (
AI,
Chat,
ChatSession,
CommandAction,
CommandChannelBinding,
CommandProfile,
Message,
MessageEvent,
PatternMitigationPlan,
@@ -42,6 +47,7 @@ from core.models import (
WorkspaceConversation,
)
from core.realtime.typing_state import get_person_typing_state
from core.translation.engine import process_inbound_translation
from core.views.workspace import (
INSIGHT_METRICS,
_build_engage_payload,
@@ -466,6 +472,18 @@ def _serialize_message(msg: Message) -> dict:
}
)
reply_preview = ""
try:
reply_obj = getattr(msg, "reply_to", None)
if reply_obj is not None:
reply_preview = str(getattr(reply_obj, "text", "") or "").strip()
except Exception:
reply_preview = ""
if reply_preview:
reply_preview = re.sub(r"\s+", " ", reply_preview).strip()
if len(reply_preview) > 140:
reply_preview = reply_preview[:137].rstrip() + "..."
return {
"id": str(msg.id),
"ts": int(msg.ts or 0),
@@ -491,6 +509,15 @@ def _serialize_message(msg: Message) -> dict:
"read_source_service": read_source_service,
"read_by_identifier": read_by_identifier,
"reactions": reaction_rows,
"source_message_id": str(getattr(msg, "source_message_id", "") or ""),
"reply_to_id": str(getattr(msg, "reply_to_id", "") or ""),
"reply_source_message_id": str(
getattr(msg, "reply_source_message_id", "") or ""
),
"reply_preview": reply_preview,
"message_meta": {
"origin_tag": str((getattr(msg, "message_meta", {}) or {}).get("origin_tag") or "")
},
}
@@ -1498,6 +1525,16 @@ def _context_base(user, service, identifier, person):
service = person_identifier.service
identifier = person_identifier.identifier
person = person_identifier.person
if service == "whatsapp" and identifier and "@" not in str(identifier):
bare_id = str(identifier).split("@", 1)[0].strip()
group_link = PlatformChatLink.objects.filter(
user=user,
service="whatsapp",
chat_identifier=bare_id,
is_group=True,
).first()
if group_link is not None:
identifier = str(group_link.chat_jid or f"{bare_id}@g.us")
if person_identifier is None and identifier:
bare_id = identifier.split("@", 1)[0].strip()
@@ -1527,6 +1564,208 @@ def _context_base(user, service, identifier, person):
}
def _latest_whatsapp_bridge_ref(message: Message | None) -> dict:
if message is None:
return {}
payload = dict(getattr(message, "receipt_payload", {}) or {})
refs = dict(payload.get("bridge_refs") or {})
rows = list(refs.get("whatsapp") or [])
best = {}
best_updated = -1
for row in rows:
if not isinstance(row, dict):
continue
upstream_id = str(row.get("upstream_message_id") or "").strip()
if not upstream_id:
continue
updated_at = int(row.get("updated_at") or 0)
if updated_at >= best_updated:
best = dict(row)
best_updated = updated_at
return best
def _build_whatsapp_reply_metadata(reply_to: Message | None, channel_identifier: str) -> dict:
if reply_to is None:
return {}
target_message_id = ""
participant = ""
source_service = str(getattr(reply_to, "source_service", "") or "").strip().lower()
if source_service == "whatsapp":
target_message_id = str(getattr(reply_to, "source_message_id", "") or "").strip()
participant = str(getattr(reply_to, "sender_uuid", "") or "").strip()
if not target_message_id:
bridge_ref = _latest_whatsapp_bridge_ref(reply_to)
target_message_id = str(bridge_ref.get("upstream_message_id") or "").strip()
participant = participant or str(bridge_ref.get("upstream_author") or "").strip()
if not target_message_id:
return {}
remote_jid = str(channel_identifier or "").strip()
if "@" not in remote_jid and remote_jid:
remote_jid = f"{remote_jid}@g.us"
return {
"reply_to_upstream_message_id": target_message_id,
"reply_to_participant": participant,
"reply_to_remote_jid": remote_jid,
}
def _canonical_command_channel_identifier(service: str, identifier: str) -> str:
value = str(identifier or "").strip()
if not value:
return ""
if service == "whatsapp":
return value.split("@", 1)[0].strip()
return value
def _command_channel_identifier_variants(service: str, identifier: str) -> set[str]:
canonical = _canonical_command_channel_identifier(service, identifier)
if not canonical:
return set()
variants = {canonical}
if service == "whatsapp":
variants.add(f"{canonical}@g.us")
return variants
def _ensure_bp_profile_and_actions(user) -> CommandProfile:
profile, _ = CommandProfile.objects.get_or_create(
user=user,
slug="bp",
defaults={
"name": "Business Plan",
"enabled": True,
"trigger_token": "#bp#",
"reply_required": True,
"exact_match_only": True,
"window_scope": "conversation",
"visibility_mode": "status_in_source",
},
)
if not profile.enabled:
profile.enabled = True
profile.save(update_fields=["enabled", "updated_at"])
for action_type, position in (
("extract_bp", 0),
("save_document", 1),
("post_result", 2),
):
row, created = CommandAction.objects.get_or_create(
profile=profile,
action_type=action_type,
defaults={"enabled": True, "position": position},
)
if (not created) and (not row.enabled):
row.enabled = True
row.save(update_fields=["enabled", "updated_at"])
return profile
def _toggle_command_for_channel(
*,
user,
service: str,
identifier: str,
slug: str,
enabled: bool,
) -> tuple[bool, str]:
service_key = _default_service(service)
canonical_identifier = _canonical_command_channel_identifier(service_key, identifier)
if not canonical_identifier:
return (False, "missing_identifier")
if slug == "bp":
profile = _ensure_bp_profile_and_actions(user)
else:
profile = (
CommandProfile.objects.filter(user=user, slug=slug)
.order_by("id")
.first()
)
if profile is None:
return (False, f"unknown_command:{slug}")
if not profile.enabled and enabled:
profile.enabled = True
profile.save(update_fields=["enabled", "updated_at"])
variants = _command_channel_identifier_variants(service_key, canonical_identifier)
if not variants:
return (False, "missing_identifier")
if enabled:
for direction in ("ingress", "egress"):
row, _ = CommandChannelBinding.objects.get_or_create(
profile=profile,
direction=direction,
service=service_key,
channel_identifier=canonical_identifier,
defaults={"enabled": True},
)
if not row.enabled:
row.enabled = True
row.save(update_fields=["enabled", "updated_at"])
CommandChannelBinding.objects.filter(
profile=profile,
direction=direction,
service=service_key,
channel_identifier__in=list(variants - {canonical_identifier}),
).update(enabled=False)
else:
CommandChannelBinding.objects.filter(
profile=profile,
direction__in=("ingress", "egress"),
service=service_key,
channel_identifier__in=list(variants),
).update(enabled=False)
return (True, "")
def _command_options_for_channel(user, service: str, identifier: str) -> list[dict]:
service_key = _default_service(service)
variants = _command_channel_identifier_variants(service_key, identifier)
profiles = list(
CommandProfile.objects.filter(user=user).order_by("slug", "id")
)
by_slug = {str(row.slug or "").strip(): row for row in profiles}
if "bp" not in by_slug:
by_slug["bp"] = CommandProfile(
user=user,
slug="bp",
name="Business Plan",
trigger_token="#bp#",
enabled=True,
)
slugs = sorted(by_slug.keys())
options = []
for slug in slugs:
profile = by_slug[slug]
enabled_here = False
if variants:
enabled_here = CommandChannelBinding.objects.filter(
profile_id=profile.id if profile.id else None,
direction="ingress",
service=service_key,
channel_identifier__in=list(variants),
enabled=True,
).exists()
options.append(
{
"slug": slug,
"name": str(profile.name or slug).strip() or slug,
"trigger_token": str(profile.trigger_token or "").strip(),
"enabled_here": bool(enabled_here),
"profile_enabled": bool(profile.enabled),
}
)
return options
def _compose_urls(service, identifier, person_id):
query = {"service": service, "identifier": identifier}
if person_id:
@@ -2092,6 +2331,11 @@ def _panel_context(
user_id=request.user.id,
person_id=base["person"].id if base["person"] else None,
)
command_options = _command_options_for_channel(
request.user,
base["service"],
base["identifier"],
)
recent_contacts = _recent_manual_contacts(
request.user,
current_service=base["service"],
@@ -2126,6 +2370,7 @@ def _panel_context(
"compose_engage_send_url": reverse("compose_engage_send"),
"compose_quick_insights_url": reverse("compose_quick_insights"),
"compose_history_sync_url": reverse("compose_history_sync"),
"compose_toggle_command_url": reverse("compose_toggle_command"),
"compose_ws_url": ws_url,
"ai_workspace_url": (
f"{reverse('ai_workspace')}?person={base['person'].id}"
@@ -2143,6 +2388,7 @@ def _panel_context(
"manual_icon_class": "fa-solid fa-paper-plane",
"panel_id": f"compose-panel-{unique}",
"typing_state_json": json.dumps(typing_state),
"command_options": command_options,
"platform_options": platform_options,
"recent_contacts": recent_contacts,
"is_group": base.get("is_group", False),
@@ -2577,6 +2823,7 @@ class ComposeThread(LoginRequiredMixin, View):
"session",
"session__identifier",
"session__identifier__person",
"reply_to",
).order_by("-ts")[:limit]
)
rows_desc.reverse()
@@ -2979,6 +3226,89 @@ class ComposeCommandResult(LoginRequiredMixin, View):
return JsonResponse({"pending": False, "result": result})
class ComposeToggleCommand(LoginRequiredMixin, View):
def post(self, request):
service, identifier, _ = _request_scope(request, "POST")
channel_identifier = _canonical_command_channel_identifier(
service, str(identifier or "")
)
if not channel_identifier:
return JsonResponse({"ok": False, "error": "missing_identifier"}, status=400)
if service not in {"web", "xmpp", "signal", "whatsapp"}:
return JsonResponse(
{"ok": False, "error": f"unsupported_service:{service}"},
status=400,
)
slug = str(request.POST.get("slug") or "bp").strip().lower() or "bp"
enabled = str(request.POST.get("enabled") or "1").strip().lower() in {
"1",
"true",
"yes",
"on",
}
ok, error = _toggle_command_for_channel(
user=request.user,
service=service,
identifier=channel_identifier,
slug=slug,
enabled=enabled,
)
if not ok:
return JsonResponse(
{
"ok": False,
"error": error or "command_toggle_failed",
},
status=400,
)
command_options = _command_options_for_channel(
request.user, service, channel_identifier
)
for row in command_options:
if row.get("slug") == slug:
row["enabled_here"] = bool(enabled)
message = (
f"{slug} enabled for this chat."
if enabled
else f"{slug} disabled for this chat."
)
return JsonResponse(
{
"ok": True,
"message": message,
"slug": slug,
"enabled": bool(enabled),
"command_options": command_options,
"settings_url": reverse("command_routing"),
}
)
class ComposeBindBP(ComposeToggleCommand):
def post(self, request):
service, identifier, _ = _request_scope(request, "POST")
ok, error = _toggle_command_for_channel(
user=request.user,
service=service,
identifier=str(identifier or ""),
slug="bp",
enabled=True,
)
if not ok:
return JsonResponse(
{"ok": False, "error": error or "command_toggle_failed"},
status=400,
)
return JsonResponse(
{
"ok": True,
"message": "bp enabled for this chat.",
"slug": "bp",
"enabled": True,
"settings_url": reverse("command_routing"),
}
)
class ComposeMediaBlob(LoginRequiredMixin, View):
"""
Serve cached media blobs for authenticated compose image previews.
@@ -3477,6 +3807,7 @@ class ComposeSend(LoginRequiredMixin, View):
)
text = str(request.POST.get("text") or "").strip()
reply_to_message_id = str(request.POST.get("reply_to_message_id") or "").strip()
if not text:
return self._response(
request,
@@ -3503,7 +3834,26 @@ class ComposeSend(LoginRequiredMixin, View):
)
ts = None
command_id = None
created_message = None
session = None
reply_to = None
if base["person_identifier"] is not None:
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=base["person_identifier"],
)
if reply_to_message_id:
reply_to = Message.objects.filter(
user=request.user,
session=session,
id=reply_to_message_id,
).first()
if runtime_client is None:
outbound_reply_metadata = {}
if base["service"] == "whatsapp":
outbound_reply_metadata = _build_whatsapp_reply_metadata(
reply_to, str(base["identifier"] or "")
)
if base["service"] == "whatsapp":
runtime_state = transport.get_runtime_state("whatsapp")
last_seen = int(runtime_state.get("runtime_seen_at") or 0)
@@ -3530,24 +3880,62 @@ class ComposeSend(LoginRequiredMixin, View):
level="warning",
panel_id=panel_id,
)
# Persist local message first so runtime can attach upstream bridge refs.
ts = int(time.time() * 1000)
if session is not None:
created_message = Message.objects.create(
user=request.user,
session=session,
sender_uuid="",
text=text,
ts=int(ts),
delivered_ts=None,
custom_author="USER",
source_service="web",
source_message_id=str(ts),
source_chat_id=str(base["identifier"] or ""),
reply_to=reply_to,
reply_source_service="web" if reply_to is not None else None,
reply_source_message_id=(
str(reply_to.id) if reply_to is not None else None
),
message_meta={},
)
command_id = transport.enqueue_runtime_command(
base["service"],
"send_message_raw",
{"recipient": base["identifier"], "text": text, "attachments": []},
{
"recipient": base["identifier"],
"text": text,
"attachments": [],
"metadata": (
{
"legacy_message_id": str(created_message.id),
"local_ts": int(ts),
**outbound_reply_metadata,
}
if created_message is not None
else outbound_reply_metadata
),
},
)
logger.debug(f"{log_prefix} command_id={command_id} enqueued")
# attach command id to request so _response can include it in HX-Trigger
request._compose_command_id = command_id
# Do NOT wait here — return immediately so the UI doesn't block.
# Record a pending message locally so the thread shows the outgoing message.
ts = int(time.time() * 1000)
else:
# In-process runtime can perform the send synchronously and return a timestamp.
outbound_reply_metadata = {}
if base["service"] == "whatsapp":
outbound_reply_metadata = _build_whatsapp_reply_metadata(
reply_to, str(base["identifier"] or "")
)
ts = async_to_sync(transport.send_message_raw)(
base["service"],
base["identifier"],
text=text,
attachments=[],
metadata=outbound_reply_metadata,
)
# For queued sends we set `ts` to a local timestamp; for in-process sends ts may be False.
if not ts:
@@ -3559,17 +3947,13 @@ class ComposeSend(LoginRequiredMixin, View):
panel_id=panel_id,
)
if base["person_identifier"] is not None:
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=base["person_identifier"],
)
if base["person_identifier"] is not None and created_message is None:
# For in-process sends (Signal, etc), ts is a timestamp or True.
# For queued sends (WhatsApp/UR), ts is a local timestamp.
# Set delivered_ts only if we got a real timestamp OR if it's an in-process sync send.
msg_ts = int(ts) if str(ts).isdigit() else int(time.time() * 1000)
delivered_ts = msg_ts if runtime_client is not None else None
Message.objects.create(
created_message = Message.objects.create(
user=request.user,
session=session,
sender_uuid="",
@@ -3577,7 +3961,26 @@ class ComposeSend(LoginRequiredMixin, View):
ts=msg_ts,
delivered_ts=delivered_ts,
custom_author="USER",
source_service="web",
source_message_id=str(msg_ts),
source_chat_id=str(base["identifier"] or ""),
reply_to=reply_to,
reply_source_service="web" if reply_to is not None else None,
reply_source_message_id=str(reply_to.id) if reply_to is not None else None,
message_meta={},
)
if created_message is not None:
async_to_sync(process_inbound_message)(
CommandContext(
service="web",
channel_identifier=str(base["identifier"] or ""),
message_id=str(created_message.id),
user_id=int(request.user.id),
message_text=text,
payload={},
)
)
async_to_sync(process_inbound_translation)(created_message)
# Notify XMPP clients from runtime so cross-platform sends appear there too.
if base["service"] in {"signal", "whatsapp"}:
try: