from __future__ import annotations import hashlib import json import re import time from datetime import datetime, timezone as dt_timezone from urllib.parse import urlencode, urlparse from asgiref.sync import async_to_sync from django.contrib.auth.mixins import LoginRequiredMixin from django.core import signing from django.core.cache import cache from django.http import HttpResponseBadRequest, JsonResponse from django.shortcuts import get_object_or_404, render from django.urls import reverse from django.utils import timezone as dj_timezone from django.views import View from core.clients import transport from core.messaging import ai as ai_runner from core.messaging.utils import messages_to_string from core.models import ( AI, ChatSession, Message, PatternMitigationPlan, Person, PersonIdentifier, WorkspaceConversation, ) from core.realtime.typing_state import get_person_typing_state from core.views.workspace import _build_engage_payload, _parse_draft_options COMPOSE_WS_TOKEN_SALT = "compose-ws" COMPOSE_ENGAGE_TOKEN_SALT = "compose-engage" COMPOSE_AI_CACHE_TTL = 60 * 30 URL_PATTERN = re.compile(r"https?://[^\s<>'\"\\]+") IMAGE_EXTENSIONS = ( ".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".avif", ".svg", ) def _uniq_ordered(values): seen = set() output = [] for value in values: cleaned = _clean_url(value) if not cleaned or cleaned in seen: continue seen.add(cleaned) output.append(cleaned) return output def _default_service(service: str | None) -> str: value = str(service or "").strip().lower() if value in {"signal", "whatsapp", "instagram", "xmpp"}: return value return "signal" def _safe_limit(raw) -> int: try: value = int(raw or 40) except (TypeError, ValueError): value = 40 return max(10, min(value, 200)) def _safe_after_ts(raw) -> int: try: value = int(raw or 0) except (TypeError, ValueError): value = 0 return max(0, value) def _format_ts_label(ts_value: int) -> str: try: as_dt = datetime.fromtimestamp(int(ts_value) / 1000, tz=dt_timezone.utc) return dj_timezone.localtime(as_dt).strftime("%H:%M") except Exception: return str(ts_value or "") def _is_outgoing(msg: Message) -> bool: return str(msg.custom_author or "").upper() in {"USER", "BOT"} def _clean_url(candidate: str) -> str: return str(candidate or "").strip().rstrip(".,);:!?\"'") def _extract_urls(text_value: str) -> list[str]: found = [] for match in URL_PATTERN.findall(str(text_value or "")): cleaned = _clean_url(match) if cleaned and cleaned not in found: found.append(cleaned) return found def _is_url_only_text(text_value: str) -> bool: lines = [line.strip() for line in str(text_value or "").splitlines() if line.strip()] if not lines: return False return all(bool(URL_PATTERN.fullmatch(line)) for line in lines) def _looks_like_image_url(url_value: str) -> bool: if not url_value: return False parsed = urlparse(url_value) path = str(parsed.path or "").lower() return path.endswith(IMAGE_EXTENSIONS) def _image_url_from_text(text_value: str) -> str: urls = _image_urls_from_text(text_value) return urls[0] if urls else "" def _image_urls_from_text(text_value: str) -> list[str]: urls = _uniq_ordered(_extract_urls(text_value)) if not urls: return [] confident = [url for url in urls if _looks_like_image_url(url)] if confident: return confident # Fallback: some XMPP upload URLs have no file extension. if _is_url_only_text(text_value): return urls return [] def _serialize_message(msg: Message) -> dict: text_value = str(msg.text or "") image_urls = _image_urls_from_text(text_value) image_url = image_urls[0] if image_urls else "" hide_text = bool( image_urls and _is_url_only_text(text_value) and all(_looks_like_image_url(url) for url in image_urls) ) display_text = text_value if text_value.strip() else ("(no text)" if not image_url else "") author = str(msg.custom_author or "").strip() return { "id": str(msg.id), "ts": int(msg.ts or 0), "display_ts": _format_ts_label(int(msg.ts or 0)), "text": text_value, "display_text": display_text, "image_url": image_url, "image_urls": image_urls, "hide_text": hide_text, "author": author, "outgoing": _is_outgoing(msg), } def _owner_name(user) -> str: return ( user.first_name or user.get_full_name().strip() or user.username or "Me" ) def _compose_ws_token(user_id, service, identifier, person_id): payload = { "u": int(user_id), "s": str(service or ""), "i": str(identifier or ""), "p": str(person_id) if person_id else "", "exp": int(time.time()) + (60 * 60 * 12), } return signing.dumps(payload, salt=COMPOSE_WS_TOKEN_SALT) def _compose_ai_cache_key(kind, user_id, service, identifier, person_id, last_ts, limit): raw = "|".join( [ str(kind or ""), str(user_id), str(service or ""), str(identifier or ""), str(person_id or ""), str(last_ts or 0), str(limit or 0), ] ) digest = hashlib.sha1(raw.encode("utf-8")).hexdigest() return f"compose:{kind}:{digest}" def _plain_text(value): cleaned = re.sub(r"\s+", " ", str(value or "").strip()) cleaned = re.sub(r"^\s*#{1,6}\s*", "", cleaned) cleaned = re.sub(r"\*\*(.*?)\*\*", r"\1", cleaned) cleaned = re.sub(r"`(.*?)`", r"\1", cleaned) return cleaned.strip() def _engage_body_only(value): lines = [line.strip() for line in str(value or "").splitlines() if line.strip()] if lines and lines[0].startswith("**"): lines = lines[1:] if lines and lines[0].lower() == "guidance:": lines = lines[1:] return _plain_text(" ".join(lines)) def _messages_for_ai(user, person_identifier, limit): if person_identifier is None: return [] session, _ = ChatSession.objects.get_or_create(user=user, identifier=person_identifier) rows = list( Message.objects.filter(user=user, session=session) .select_related("session", "session__identifier", "session__identifier__person") .order_by("-ts")[:limit] ) rows.reverse() return rows def _fallback_drafts(): return [ { "label": "Soft", "text": "I want us to stay connected. I am listening and I want to understand your perspective clearly.", }, { "label": "Neutral", "text": "I hear your point. Let us clarify what each of us means so we can move forward constructively.", }, { "label": "Firm", "text": "I want to resolve this respectfully. I will continue when we can keep the conversation constructive.", }, ] def _build_draft_prompt(owner_name, person_name, transcript): return [ { "role": "system", "content": ( "Generate exactly three short reply drafts for a chat. " "Return labels Soft, Neutral, Firm. " "Format:\nSoft: ...\nNeutral: ...\nFirm: ...\n" "Each draft must be one to two sentences, plain text, no markdown." ), }, { "role": "user", "content": ( f"Me: {owner_name}\n" f"Other: {person_name}\n" f"Conversation:\n{transcript}" ), }, ] def _build_summary_prompt(owner_name, person_name, transcript): return [ { "role": "system", "content": ( "Create a concise conversation summary with three sections. " "Use this exact structure:\n" "Headlines:\n- ...\n" "Patterns:\n- ...\n" "Suggested Next Message:\n- ...\n" "Keep each bullet practical and specific." ), }, { "role": "user", "content": ( f"Me: {owner_name}\n" f"Other: {person_name}\n" f"Conversation:\n{transcript}" ), }, ] def _to_float(value): if value is None: return None try: return float(value) except (TypeError, ValueError): return None def _format_number(value, precision=2): number = _to_float(value) if number is None: return "-" rounded = round(number, precision) if float(rounded).is_integer(): return str(int(rounded)) return f"{rounded:.{precision}f}" def _percent_change(current, previous): now_val = _to_float(current) prev_val = _to_float(previous) if now_val is None or prev_val is None: return None if abs(prev_val) < 1e-9: return None return ((now_val - prev_val) / abs(prev_val)) * 100.0 def _trend_meta(current, previous, higher_is_better=True): now_val = _to_float(current) prev_val = _to_float(previous) if now_val is None or prev_val is None: return { "direction": "unknown", "icon": "fa-solid fa-minus", "class_name": "has-text-grey", "meaning": "No comparison yet", } delta = now_val - prev_val if abs(delta) < 1e-9: return { "direction": "flat", "icon": "fa-solid fa-minus", "class_name": "has-text-grey", "meaning": "No meaningful change", } is_up = delta > 0 improves = is_up if higher_is_better else not is_up return { "direction": "up" if is_up else "down", "icon": "fa-solid fa-arrow-trend-up" if is_up else "fa-solid fa-arrow-trend-down", "class_name": "has-text-success" if improves else "has-text-danger", "meaning": "Improving signal" if improves else "Risk signal", } def _emotion_meta(metric_kind, value): score = _to_float(value) if score is None: return { "icon": "fa-regular fa-face-meh-blank", "class_name": "has-text-grey", "label": "Unknown", } if metric_kind == "confidence": score = score * 100.0 if metric_kind == "count": if score >= 80: return { "icon": "fa-solid fa-chart-column", "class_name": "has-text-success", "label": "Rich Data", } if score >= 30: return { "icon": "fa-solid fa-chart-simple", "class_name": "has-text-warning", "label": "Moderate Data", } return { "icon": "fa-solid fa-chart-line", "class_name": "has-text-danger", "label": "Sparse Data", } if score >= 75: return { "icon": "fa-regular fa-face-smile", "class_name": "has-text-success", "label": "Positive", } if score >= 50: return { "icon": "fa-regular fa-face-meh", "class_name": "has-text-warning", "label": "Mixed", } return { "icon": "fa-regular fa-face-frown", "class_name": "has-text-danger", "label": "Strained", } def _quick_insights_rows(conversation): latest = conversation.metric_snapshots.first() previous = ( conversation.metric_snapshots.order_by("-computed_at")[1:2].first() if conversation.metric_snapshots.count() > 1 else None ) metric_specs = [ { "key": "stability_score", "label": "Stability Score", "field": "stability_score", "source": "conversation", "kind": "score", "icon": "fa-solid fa-heart-pulse", "higher_better": True, }, { "key": "stability_confidence", "label": "Stability Confidence", "field": "stability_confidence", "source": "conversation", "kind": "confidence", "icon": "fa-solid fa-shield-check", "higher_better": True, }, { "key": "sample_messages", "label": "Sample Messages", "field": "stability_sample_messages", "source": "conversation", "kind": "count", "icon": "fa-solid fa-message", "higher_better": True, }, { "key": "sample_days", "label": "Sample Days", "field": "stability_sample_days", "source": "conversation", "kind": "count", "icon": "fa-solid fa-calendar-days", "higher_better": True, }, { "key": "commitment_inbound", "label": "Commit In", "field": "commitment_inbound_score", "source": "conversation", "kind": "score", "icon": "fa-solid fa-inbox", "higher_better": True, }, { "key": "commitment_outbound", "label": "Commit Out", "field": "commitment_outbound_score", "source": "conversation", "kind": "score", "icon": "fa-solid fa-paper-plane", "higher_better": True, }, { "key": "commitment_confidence", "label": "Commit Confidence", "field": "commitment_confidence", "source": "conversation", "kind": "confidence", "icon": "fa-solid fa-badge-check", "higher_better": True, }, { "key": "reciprocity", "label": "Reciprocity", "field": "reciprocity_score", "source": "snapshot", "kind": "score", "icon": "fa-solid fa-right-left", "higher_better": True, }, { "key": "continuity", "label": "Continuity", "field": "continuity_score", "source": "snapshot", "kind": "score", "icon": "fa-solid fa-link", "higher_better": True, }, { "key": "response", "label": "Response", "field": "response_score", "source": "snapshot", "kind": "score", "icon": "fa-solid fa-gauge-high", "higher_better": True, }, { "key": "volatility", "label": "Volatility", "field": "volatility_score", "source": "snapshot", "kind": "score", "icon": "fa-solid fa-wave-square", "higher_better": True, }, { "key": "inbound_messages", "label": "Inbound Messages", "field": "inbound_messages", "source": "snapshot", "kind": "count", "icon": "fa-solid fa-arrow-down", "higher_better": True, }, { "key": "outbound_messages", "label": "Outbound Messages", "field": "outbound_messages", "source": "snapshot", "kind": "count", "icon": "fa-solid fa-arrow-up", "higher_better": True, }, ] rows = [] for spec in metric_specs: field_name = spec["field"] if spec["source"] == "conversation": current = getattr(conversation, field_name, None) previous_value = getattr(previous, field_name, None) if previous else None else: current = getattr(latest, field_name, None) if latest else None previous_value = getattr(previous, field_name, None) if previous else None trend = _trend_meta( current, previous_value, higher_is_better=spec.get("higher_better", True), ) delta_pct = _percent_change(current, previous_value) point_count = conversation.metric_snapshots.exclude( **{f"{field_name}__isnull": True} ).count() emotion = _emotion_meta(spec["kind"], current) rows.append( { "key": spec["key"], "label": spec["label"], "icon": spec["icon"], "value": current, "display_value": _format_number( current, 3 if spec["kind"] == "confidence" else 2, ), "delta_pct": delta_pct, "delta_label": f"{delta_pct:+.2f}%" if delta_pct is not None else "n/a", "point_count": point_count, "trend": trend, "emotion": emotion, } ) return { "rows": rows, "snapshot_count": conversation.metric_snapshots.count(), "latest_computed_at": latest.computed_at if latest else None, } def _build_engage_prompt(owner_name, person_name, transcript): return [ { "role": "system", "content": ( "Write one short de-escalating outreach in shared framing. " "Use 'we/us/our' only. No names. One or two sentences." ), }, { "role": "user", "content": ( f"Me: {owner_name}\n" f"Other: {person_name}\n" f"Conversation:\n{transcript}" ), }, ] def _latest_plan_for_person(user, person): if person is None: return None conversation = ( PatternMitigationPlan.objects.filter( user=user, conversation__participants=person, ) .select_related("conversation") .order_by("-updated_at") .first() ) return conversation def _best_engage_source(plan): if plan is None: return (None, "") correction = plan.corrections.order_by("-created_at").first() if correction: return (correction, "correction") rule = plan.rules.order_by("-created_at").first() if rule: return (rule, "rule") game = plan.games.order_by("-created_at").first() if game: return (game, "game") return (None, "") def _engage_source_options(plan): if plan is None: return [] options = [] for rule in plan.rules.order_by("created_at"): options.append( { "value": f"rule:{rule.id}", "label": f"Rule: {rule.title}", } ) for game in plan.games.order_by("created_at"): options.append( { "value": f"game:{game.id}", "label": f"Game: {game.title}", } ) for correction in plan.corrections.order_by("created_at"): options.append( { "value": f"correction:{correction.id}", "label": f"Correction: {correction.title}", } ) return options def _engage_source_from_ref(plan, source_ref): if plan is None: return (None, "", "") ref = str(source_ref or "").strip() if ":" not in ref: return (None, "", "") kind, raw_id = ref.split(":", 1) kind = kind.strip().lower() raw_id = raw_id.strip() model_by_kind = { "rule": plan.rules, "game": plan.games, "correction": plan.corrections, } queryset = model_by_kind.get(kind) if queryset is None: return (None, "", "") obj = queryset.filter(id=raw_id).first() if obj is None: return (None, "", "") return (obj, kind, f"{kind}:{obj.id}") def _context_base(user, service, identifier, person): person_identifier = None if person is not None: person_identifier = ( PersonIdentifier.objects.filter( user=user, person=person, service=service, ).first() or PersonIdentifier.objects.filter(user=user, person=person).first() ) if person_identifier is None and identifier: person_identifier = PersonIdentifier.objects.filter( user=user, service=service, identifier=identifier, ).first() if person_identifier: service = person_identifier.service identifier = person_identifier.identifier person = person_identifier.person return { "person_identifier": person_identifier, "service": service, "identifier": identifier, "person": person, } def _compose_urls(service, identifier, person_id): query = {"service": service, "identifier": identifier} if person_id: query["person"] = str(person_id) payload = urlencode(query) return { "page_url": f"{reverse('compose_page')}?{payload}", "widget_url": f"{reverse('compose_widget')}?{payload}", } def _load_messages(user, person_identifier, limit): if person_identifier is None: return {"session": None, "messages": []} session, _ = ChatSession.objects.get_or_create( user=user, identifier=person_identifier, ) messages = list( Message.objects.filter(user=user, session=session) .select_related("session", "session__identifier", "session__identifier__person") .order_by("-ts")[:limit] ) messages.reverse() return {"session": session, "messages": messages} def _panel_context( request, service: str, identifier: str, person: Person | None, render_mode: str, notice: str = "", level: str = "success", ): base = _context_base(request.user, service, identifier, person) limit = _safe_limit(request.GET.get("limit") or request.POST.get("limit")) session_bundle = _load_messages(request.user, base["person_identifier"], limit) last_ts = 0 if session_bundle["messages"]: last_ts = int(session_bundle["messages"][-1].ts or 0) urls = _compose_urls( base["service"], base["identifier"], base["person"].id if base["person"] else None, ) ws_token = _compose_ws_token( user_id=request.user.id, service=base["service"], identifier=base["identifier"], person_id=base["person"].id if base["person"] else None, ) ws_url = f"/ws/compose/thread/?{urlencode({'token': ws_token})}" unique_raw = f"{base['service']}|{base['identifier']}|{request.user.id}" unique = hashlib.sha1(unique_raw.encode("utf-8")).hexdigest()[:12] typing_state = get_person_typing_state( user_id=request.user.id, person_id=base["person"].id if base["person"] else None, ) return { "service": base["service"], "identifier": base["identifier"], "person": base["person"], "person_identifier": base["person_identifier"], "session": session_bundle["session"], "messages": session_bundle["messages"], "serialized_messages": [ _serialize_message(msg) for msg in session_bundle["messages"] ], "last_ts": last_ts, "limit": limit, "notice_message": notice, "notice_level": level, "render_mode": render_mode, "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "compose_drafts_url": reverse("compose_drafts"), "compose_summary_url": reverse("compose_summary"), "compose_engage_preview_url": reverse("compose_engage_preview"), "compose_engage_send_url": reverse("compose_engage_send"), "compose_quick_insights_url": reverse("compose_quick_insights"), "compose_ws_url": ws_url, "ai_workspace_url": ( f"{reverse('ai_workspace')}?person={base['person'].id}" if base["person"] else reverse("ai_workspace") ), "manual_icon_class": "fa-solid fa-paper-plane", "panel_id": f"compose-panel-{unique}", "typing_state_json": json.dumps(typing_state), } class ComposeContactsDropdown(LoginRequiredMixin, View): def get(self, request): rows = list( PersonIdentifier.objects.filter(user=request.user) .select_related("person") .order_by("person__name", "service", "identifier") ) items = [] for row in rows: urls = _compose_urls(row.service, row.identifier, row.person_id) items.append( { "person_name": row.person.name, "service": row.service, "identifier": row.identifier, "compose_url": urls["page_url"], } ) return render( request, "partials/nav-contacts-dropdown.html", { "items": items, "manual_icon_class": "fa-solid fa-paper-plane", }, ) class ComposePage(LoginRequiredMixin, View): template_name = "pages/compose.html" def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return HttpResponseBadRequest("Missing contact identifier.") context = _panel_context( request=request, service=service, identifier=identifier, person=person, render_mode="page", ) return render(request, self.template_name, context) class ComposeWidget(LoginRequiredMixin, View): def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return HttpResponseBadRequest("Missing contact identifier.") panel_context = _panel_context( request=request, service=service, identifier=identifier, person=person, render_mode="widget", ) title_name = ( panel_context["person"].name if panel_context["person"] is not None else panel_context["identifier"] ) context = { "title": f"Manual Chat: {title_name}", "unique": f"compose-{panel_context['panel_id']}", "window_content": "partials/compose-panel.html", "widget_options": 'gs-w="6" gs-h="12" gs-x="0" gs-y="0" gs-min-w="4"', **panel_context, } return render(request, "mixins/wm/widget.html", context) class ComposeThread(LoginRequiredMixin, View): def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return HttpResponseBadRequest("Missing contact identifier.") limit = _safe_limit(request.GET.get("limit") or 60) after_ts = _safe_after_ts(request.GET.get("after_ts")) base = _context_base(request.user, service, identifier, person) latest_ts = after_ts messages = [] if base["person_identifier"] is not None: session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=base["person_identifier"], ) queryset = Message.objects.filter(user=request.user, session=session) if after_ts > 0: queryset = queryset.filter(ts__gt=after_ts) messages = list( queryset.select_related( "session", "session__identifier", "session__identifier__person", ) .order_by("ts")[:limit] ) newest = ( Message.objects.filter(user=request.user, session=session) .order_by("-ts") .values_list("ts", flat=True) .first() ) if newest: latest_ts = max(latest_ts, int(newest)) payload = { "messages": [_serialize_message(msg) for msg in messages], "last_ts": latest_ts, "typing": get_person_typing_state( user_id=request.user.id, person_id=base["person"].id if base["person"] else None, ), } return JsonResponse(payload) class ComposeDrafts(LoginRequiredMixin, View): def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return JsonResponse({"ok": False, "error": "Missing contact identifier."}) base = _context_base(request.user, service, identifier, person) limit = _safe_limit(request.GET.get("limit") or 60) messages = _messages_for_ai(request.user, base["person_identifier"], limit) if not messages: return JsonResponse( { "ok": True, "cached": False, "drafts": _fallback_drafts(), } ) last_ts = int(messages[-1].ts or 0) cache_key = _compose_ai_cache_key( "drafts", request.user.id, base["service"], base["identifier"], base["person"].id if base["person"] else "", last_ts, limit, ) cached = cache.get(cache_key) if cached: return JsonResponse({"ok": True, "cached": True, "drafts": cached}) ai_obj = AI.objects.filter(user=request.user).first() transcript = messages_to_string( messages, author_rewrites={ "USER": _owner_name(request.user), "BOT": "Assistant", }, ) drafts = _fallback_drafts() if ai_obj is not None: try: result = async_to_sync(ai_runner.run_prompt)( _build_draft_prompt( owner_name=_owner_name(request.user), person_name=base["person"].name if base["person"] else "Other", transcript=transcript, ), ai_obj, ) parsed = _parse_draft_options(result) if parsed: drafts = parsed except Exception: pass cache.set(cache_key, drafts, timeout=COMPOSE_AI_CACHE_TTL) return JsonResponse({"ok": True, "cached": False, "drafts": drafts}) class ComposeSummary(LoginRequiredMixin, View): def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return JsonResponse({"ok": False, "error": "Missing contact identifier."}) base = _context_base(request.user, service, identifier, person) limit = _safe_limit(request.GET.get("limit") or 60) messages = _messages_for_ai(request.user, base["person_identifier"], limit) if not messages: return JsonResponse({"ok": True, "cached": False, "summary": ""}) last_ts = int(messages[-1].ts or 0) cache_key = _compose_ai_cache_key( "summary", request.user.id, base["service"], base["identifier"], base["person"].id if base["person"] else "", last_ts, limit, ) cached = cache.get(cache_key) if cached: return JsonResponse({"ok": True, "cached": True, "summary": cached}) ai_obj = AI.objects.filter(user=request.user).first() transcript = messages_to_string( messages, author_rewrites={ "USER": _owner_name(request.user), "BOT": "Assistant", }, ) if ai_obj is None: fallback = ( "Headlines:\n" "- Conversation loaded.\n" "Patterns:\n" "- Not enough AI context configured yet.\n" "Suggested Next Message:\n" "- I want us to keep this clear and constructive." ) cache.set(cache_key, fallback, timeout=COMPOSE_AI_CACHE_TTL) return JsonResponse({"ok": True, "cached": False, "summary": fallback}) try: summary = async_to_sync(ai_runner.run_prompt)( _build_summary_prompt( owner_name=_owner_name(request.user), person_name=base["person"].name if base["person"] else "Other", transcript=transcript, ), ai_obj, ) except Exception as exc: return JsonResponse({"ok": False, "error": str(exc)}) summary = str(summary or "").strip() cache.set(cache_key, summary, timeout=COMPOSE_AI_CACHE_TTL) return JsonResponse({"ok": True, "cached": False, "summary": summary}) class ComposeQuickInsights(LoginRequiredMixin, View): def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return JsonResponse({"ok": False, "error": "Missing contact identifier."}) base = _context_base(request.user, service, identifier, person) person = base["person"] if person is None: return JsonResponse( { "ok": False, "error": "Quick Insights needs a linked person.", } ) conversation = ( WorkspaceConversation.objects.filter( user=request.user, participants=person, ) .order_by("-last_event_ts", "-created_at") .first() ) if conversation is None: return JsonResponse( { "ok": True, "empty": True, "summary": { "person_name": person.name, "platform": "", "state": "Calibrating", "thread": "", "last_event": "", "last_ai_run": "", "workspace_created": "", "snapshot_count": 0, }, "rows": [], "docs": [ "Quick Insights needs at least one workspace conversation snapshot.", "Run AI operations in AI Workspace to generate the first data points.", ], } ) payload = _quick_insights_rows(conversation) return JsonResponse( { "ok": True, "empty": False, "summary": { "person_name": person.name, "platform": conversation.get_platform_type_display(), "state": conversation.get_stability_state_display(), "thread": conversation.platform_thread_id or "", "last_event": _format_ts_label(conversation.last_event_ts or 0) if conversation.last_event_ts else "", "last_ai_run": ( dj_timezone.localtime(conversation.last_ai_run_at).strftime( "%Y-%m-%d %H:%M" ) if conversation.last_ai_run_at else "" ), "workspace_created": dj_timezone.localtime( conversation.created_at ).strftime("%Y-%m-%d %H:%M"), "snapshot_count": payload["snapshot_count"], }, "rows": payload["rows"], "docs": [ "Each row shows current value, percent change vs previous point, and data-point count.", "Arrow color indicates improving or risk direction for that metric.", "Face indicator maps value range to positive, mixed, or strained climate.", "Use this card for fast triage; open AI Workspace for full graphs and details.", ], } ) class ComposeEngagePreview(LoginRequiredMixin, View): def get(self, request): service = _default_service(request.GET.get("service")) identifier = str(request.GET.get("identifier") or "").strip() person = None person_id = request.GET.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return JsonResponse({"ok": False, "error": "Missing contact identifier."}) base = _context_base(request.user, service, identifier, person) limit = _safe_limit(request.GET.get("limit") or 60) messages = _messages_for_ai(request.user, base["person_identifier"], limit) transcript = messages_to_string( messages, author_rewrites={ "USER": _owner_name(request.user), "BOT": "Assistant", }, ) owner_name = _owner_name(request.user) recipient_name = base["person"].name if base["person"] else "Other" plan = _latest_plan_for_person(request.user, base["person"]) source_options = _engage_source_options(plan) source_options_with_custom = ( [{"value": "auto", "label": "Auto"}] + source_options + [{"value": "custom", "label": "Custom"}] ) source_ref = str(request.GET.get("source_ref") or "auto").strip().lower() custom_text = str(request.GET.get("custom_text") or "").strip() source_obj = None source_kind = "" selected_source = source_ref if source_ref else "auto" if selected_source == "custom": selected_source = "custom" else: if selected_source == "auto": fallback_obj, fallback_kind = _best_engage_source(plan) if fallback_obj is not None: source_obj = fallback_obj source_kind = fallback_kind else: source_obj, source_kind, explicit_ref = _engage_source_from_ref( plan, selected_source, ) if source_obj is None: selected_source = "auto" fallback_obj, fallback_kind = _best_engage_source(plan) if fallback_obj is not None: source_obj = fallback_obj source_kind = fallback_kind else: selected_source = explicit_ref preview = "" outbound = "" artifact_label = "AI-generated" if selected_source == "custom": outbound = _plain_text(custom_text) if outbound: preview = f"**Custom Engage** (Correction)\n\nGuidance:\n{outbound}" artifact_label = "Custom" else: preview = ( "**Custom Engage** (Correction)\n\nGuidance:\n" "Enter your custom engagement text to preview." ) elif source_obj is not None: payload = _build_engage_payload( source_obj=source_obj, source_kind=source_kind, share_target="other", framing="shared", context_note="", owner_name=owner_name, recipient_name=recipient_name, ) preview = str(payload.get("preview") or "").strip() outbound = _engage_body_only(payload.get("outbound") or "") artifact_label = f"{source_kind.title()}: {getattr(source_obj, 'title', '')}" else: ai_obj = AI.objects.filter(user=request.user).first() if ai_obj is not None: try: generated = async_to_sync(ai_runner.run_prompt)( _build_engage_prompt(owner_name, recipient_name, transcript), ai_obj, ) outbound = _plain_text(generated) except Exception: outbound = "" if not outbound: outbound = ( "We should slow down, clarify what we mean, and respond with care." ) preview = f"**Shared Engage** (Correction)\n\nGuidance:\n{outbound}" token = "" if outbound: token = signing.dumps( { "u": request.user.id, "s": base["service"], "i": base["identifier"], "p": str(base["person"].id) if base["person"] else "", "outbound": outbound, "exp": int(time.time()) + (60 * 10), }, salt=COMPOSE_ENGAGE_TOKEN_SALT, ) return JsonResponse( { "ok": True, "preview": preview, "outbound": outbound, "token": token, "artifact": artifact_label, "options": source_options_with_custom, "selected_source": selected_source, "custom_text": custom_text, } ) class ComposeEngageSend(LoginRequiredMixin, View): def post(self, request): service = _default_service(request.POST.get("service")) identifier = str(request.POST.get("identifier") or "").strip() person = None person_id = request.POST.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) if not identifier and person is None: return JsonResponse({"ok": False, "error": "Missing contact identifier."}) failsafe_arm = str(request.POST.get("failsafe_arm") or "").strip() failsafe_confirm = str(request.POST.get("failsafe_confirm") or "").strip() if failsafe_arm != "1" or failsafe_confirm != "1": return JsonResponse( {"ok": False, "error": "Enable send confirmation before sending."} ) token = str(request.POST.get("engage_token") or "").strip() if not token: return JsonResponse({"ok": False, "error": "Missing engage token."}) try: payload = signing.loads(token, salt=COMPOSE_ENGAGE_TOKEN_SALT) except Exception: return JsonResponse({"ok": False, "error": "Invalid engage token."}) if int(payload.get("u") or 0) != int(request.user.id): return JsonResponse({"ok": False, "error": "Token does not match user."}) if int(payload.get("exp") or 0) < int(time.time()): return JsonResponse({"ok": False, "error": "Engage token expired."}) outbound = str(payload.get("outbound") or "").strip() if not outbound: return JsonResponse({"ok": False, "error": "Empty engage payload."}) base = _context_base(request.user, service, identifier, person) ts = async_to_sync(transport.send_message_raw)( base["service"], base["identifier"], text=outbound, attachments=[], ) if not ts: return JsonResponse({"ok": False, "error": "Send failed."}) if base["person_identifier"] is not None: session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=base["person_identifier"], ) ts_value = int(ts) if str(ts).isdigit() else int(time.time() * 1000) Message.objects.create( user=request.user, session=session, sender_uuid="", text=outbound, ts=ts_value, delivered_ts=ts_value if str(ts).isdigit() else None, custom_author="USER", ) return JsonResponse({"ok": True, "message": "Shared engage sent."}) class ComposeSend(LoginRequiredMixin, View): @staticmethod def _response(request, *, ok, message="", level="info"): response = render( request, "partials/compose-send-status.html", { "notice_message": message, "notice_level": level, }, ) trigger_payload = { "composeSendResult": { "ok": bool(ok), "message": str(message or ""), "level": str(level or "info"), } } if ok: trigger_payload["composeMessageSent"] = True response["HX-Trigger"] = json.dumps(trigger_payload) return response def post(self, request): service = _default_service(request.POST.get("service")) identifier = str(request.POST.get("identifier") or "").strip() person = None person_id = request.POST.get("person") if person_id: person = get_object_or_404(Person, id=person_id, user=request.user) render_mode = str(request.POST.get("render_mode") or "page").strip().lower() if render_mode not in {"page", "widget"}: render_mode = "page" if not identifier and person is None: return HttpResponseBadRequest("Missing contact identifier.") failsafe_arm = str(request.POST.get("failsafe_arm") or "").strip() failsafe_confirm = str(request.POST.get("failsafe_confirm") or "").strip() if failsafe_arm != "1" or failsafe_confirm != "1": return self._response( request, ok=False, message="Enable send confirmation before sending.", level="warning", ) text = str(request.POST.get("text") or "").strip() if not text: return self._response( request, ok=False, message="Message is empty.", level="danger", ) base = _context_base(request.user, service, identifier, person) ts = async_to_sync(transport.send_message_raw)( base["service"], base["identifier"], text=text, attachments=[], ) if not ts: return self._response( request, ok=False, message="Send failed. Check service account state.", level="danger", ) if base["person_identifier"] is not None: session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=base["person_identifier"], ) Message.objects.create( user=request.user, session=session, sender_uuid="", text=text, ts=int(ts) if str(ts).isdigit() else int(time.time() * 1000), delivered_ts=int(ts) if str(ts).isdigit() else None, custom_author="USER", ) return self._response(request, ok=True, message="", level="success")