import json import re import statistics from datetime import datetime, timezone from urllib.parse import urlencode from asgiref.sync import async_to_sync from django.contrib.auth.mixins import LoginRequiredMixin from django.http import HttpResponseBadRequest from django.shortcuts import get_object_or_404, render from django.urls import reverse from django.utils import timezone as dj_timezone from django.views import View from core.forms import AIWorkspaceWindowForm from core.lib.notify import raw_sendmsg from core.messaging import ai as ai_runner from core.messaging.utils import messages_to_string from core.models import ( AI, AIRequest, AIResult, AIResultSignal, ChatSession, Manipulation, Message, MessageEvent, PatternArtifactExport, PatternMitigationAutoSettings, PatternMitigationCorrection, PatternMitigationGame, PatternMitigationMessage, PatternMitigationPlan, PatternMitigationRule, Person, PersonIdentifier, QueuedMessage, WorkspaceConversation, WorkspaceMetricSnapshot, ) SEND_ENABLED_MODES = {"active", "instant"} OPERATION_LABELS = { "summarise": "Summarise", "draft_reply": "Draft Reply", "extract_patterns": "Extract Patterns", "artifacts": "Plan", } MITIGATION_TABS = { "plan_board", "corrections", "engage", "fundamentals", "ask_ai", "auto", } INSIGHT_GROUPS = { "identity": { "title": "Conversation Identity", "summary": ( "Describes where this workspace data comes from and which thread is " "being analysed." ), }, "stability": { "title": "Stability Scoring", "summary": ( "Stability combines reciprocity, continuity, response pace, and " "message-volatility regularity into a 0-100 score." ), }, "confidence": { "title": "Sample And Confidence", "summary": ( "Confidence scales with data volume, day coverage, and observed " "response pairs so sparse windows do not overclaim certainty." ), }, "commitment": { "title": "Commitment Directionality", "summary": ( "Commitment estimates effort in each direction by combining response " "speed with participation balance." ), }, "timeline": { "title": "Recency And Cadence", "summary": ( "Tracks when messages and AI analysis most recently occurred to " "separate stale from current assessments." ), }, } INSIGHT_METRICS = { "platform": { "title": "Platform", "group": "identity", "history_field": None, "calculation": ( "Updated to the service of the latest message in the sampled history " "window." ), "psychology": ( "Platform shifts can indicate context switches in how the relationship " "is maintained across channels." ), }, "thread": { "title": "Thread", "group": "identity", "history_field": None, "calculation": ( "Primary thread identifier used to anchor this workspace conversation." ), "psychology": ( "A stable thread usually implies continuity; frequent thread resets can " "reflect fragmentation." ), }, "workspace_created": { "title": "Workspace Created", "group": "identity", "history_field": None, "calculation": "Creation timestamp of this workspace conversation record.", "psychology": ( "Older workspaces allow stronger trend reading because they include " "longer temporal context." ), }, "stability_state": { "title": "Stability State", "group": "stability", "history_field": None, "calculation": ( "Derived from stability score bands: Stable >= 70, Watch >= 50, " "Fragile < 50, or Calibrating if data is insufficient." ), "psychology": ( "Use state as a risk band, not a diagnosis. It indicates likely " "interaction friction level." ), }, "stability_score": { "title": "Stability Score", "group": "stability", "history_field": "stability_score", "calculation": ( "0.35*reciprocity + 0.25*continuity + 0.20*response + " "0.20*volatility." ), "psychology": ( "Higher values suggest consistent mutual engagement patterns; falling " "values often precede misunderstandings or withdrawal cycles." ), }, "reciprocity_score": { "title": "Reciprocity Component", "group": "stability", "history_field": "reciprocity_score", "calculation": ( "100 * (1 - |inbound - outbound| / total_messages). Higher means " "more balanced participation." ), "psychology": ( "Lower reciprocity can reflect perceived asymmetry and rising pursuit/" "withdraw cycles." ), }, "continuity_score": { "title": "Continuity Component", "group": "stability", "history_field": "continuity_score", "calculation": ( "100 * min(1, distinct_sample_days / span_days). Higher means steadier " "day-to-day continuity." ), "psychology": ("Drops can signal communication becoming episodic or reactive."), }, "response_score": { "title": "Response Component", "group": "stability", "history_field": "response_score", "calculation": ( "Average of inbound and outbound response-lag scores, each mapped from " "median lag to a 0-100 curve." ), "psychology": ( "Lower response score can indicate delayed repair loops during tension." ), }, "volatility_score": { "title": "Volatility Component", "group": "stability", "history_field": "volatility_score", "calculation": ( "Derived from coefficient of variation of daily message counts and " "inverted to a 0-100 stability signal." ), "psychology": ( "High volatility can suggest inconsistent rhythm and reduced predictability." ), }, "stability_confidence": { "title": "Stability Confidence", "group": "confidence", "history_field": "stability_confidence", "calculation": ( "0.50*message_volume + 0.30*day_coverage + 0.20*response_pair_density." ), "psychology": ( "Low confidence means defer strong conclusions; treat outputs as " "tentative signals." ), }, "sample_messages": { "title": "Sample Messages", "group": "confidence", "history_field": "stability_sample_messages", "calculation": "Count of messages in the analysed window.", "psychology": ( "Larger sample size improves reliability and reduces overreaction to " "single events." ), }, "sample_days": { "title": "Sample Days", "group": "confidence", "history_field": "stability_sample_days", "calculation": "Count of distinct calendar days represented in the sample.", "psychology": ( "Coverage across days better captures rhythm, not just intensity " "bursts." ), }, "stability_computed": { "title": "Stability Computed", "group": "stability", "history_field": None, "calculation": "Timestamp of the latest stability computation run.", "psychology": ( "Recent recomputation indicates current context; stale computations may " "lag emotional reality." ), }, "commitment_inbound": { "title": "Commit In", "group": "commitment", "history_field": "commitment_inbound_score", "calculation": ("0.60*inbound_response_score + 0.40*inbound_balance_score."), "psychology": ( "Estimates counterpart follow-through and reciprocity toward the user." ), }, "commitment_outbound": { "title": "Commit Out", "group": "commitment", "history_field": "commitment_outbound_score", "calculation": ("0.60*outbound_response_score + 0.40*outbound_balance_score."), "psychology": ( "Estimates user follow-through and consistency toward the counterpart." ), }, "inbound_response_score": { "title": "Inbound Response Score", "group": "commitment", "history_field": "inbound_response_score", "calculation": ( "Response-speed score built from median lag between user outbound and " "counterpart inbound replies." ), "psychology": ( "Lower values suggest delayed reciprocity from counterpart direction." ), }, "outbound_response_score": { "title": "Outbound Response Score", "group": "commitment", "history_field": "outbound_response_score", "calculation": ( "Response-speed score built from median lag between counterpart inbound " "and user outbound replies." ), "psychology": "Lower values suggest slower follow-through from user direction.", }, "balance_inbound_score": { "title": "Inbound Balance Score", "group": "commitment", "history_field": "balance_inbound_score", "calculation": ( "100 * min(1, inbound_messages / outbound_messages). Captures inbound " "participation parity." ), "psychology": ( "Lower values can indicate one-sided conversational load from user side." ), }, "balance_outbound_score": { "title": "Outbound Balance Score", "group": "commitment", "history_field": "balance_outbound_score", "calculation": ( "100 * min(1, outbound_messages / inbound_messages). Captures outbound " "participation parity." ), "psychology": ( "Lower values can indicate one-sided conversational load from counterpart side." ), }, "commitment_confidence": { "title": "Commit Confidence", "group": "confidence", "history_field": "commitment_confidence", "calculation": ( "Uses the same confidence weighting as stability for directional scores." ), "psychology": ( "Low confidence means directionality gaps might be temporary rather " "than structural." ), }, "commitment_computed": { "title": "Commitment Computed", "group": "commitment", "history_field": None, "calculation": "Timestamp of the latest commitment computation run.", "psychology": ( "Recency matters because commitment signals shift quickly under stress." ), }, "last_event": { "title": "Last Event", "group": "timeline", "history_field": None, "calculation": "Unix ms timestamp of the newest message in this workspace.", "psychology": ( "Long inactivity windows can indicate pause, repair distance, or " "channel migration." ), }, "last_ai_run": { "title": "Last AI Run", "group": "timeline", "history_field": None, "calculation": "Most recent completed AI analysis timestamp.", "psychology": ( "Recency of AI summaries affects relevance of suggested interventions." ), }, } INSIGHT_GRAPH_SPECS = [ { "slug": "stability_score", "title": "Stability Score", "field": "stability_score", "group": "stability", "y_min": 0, "y_max": 100, }, { "slug": "stability_confidence", "title": "Stability Confidence", "field": "stability_confidence", "group": "confidence", "y_min": 0, "y_max": 1, }, { "slug": "sample_messages", "title": "Sample Messages", "field": "stability_sample_messages", "group": "confidence", "y_min": 0, "y_max": None, }, { "slug": "sample_days", "title": "Sample Days", "field": "stability_sample_days", "group": "confidence", "y_min": 0, "y_max": None, }, { "slug": "commitment_inbound", "title": "Commit In", "field": "commitment_inbound_score", "group": "commitment", "y_min": 0, "y_max": 100, }, { "slug": "commitment_outbound", "title": "Commit Out", "field": "commitment_outbound_score", "group": "commitment", "y_min": 0, "y_max": 100, }, { "slug": "commitment_confidence", "title": "Commit Confidence", "field": "commitment_confidence", "group": "confidence", "y_min": 0, "y_max": 1, }, { "slug": "inbound_messages", "title": "Inbound Messages", "field": "inbound_messages", "group": "timeline", "y_min": 0, "y_max": None, }, { "slug": "outbound_messages", "title": "Outbound Messages", "field": "outbound_messages", "group": "timeline", "y_min": 0, "y_max": None, }, { "slug": "reciprocity_score", "title": "Reciprocity Component", "field": "reciprocity_score", "group": "stability", "y_min": 0, "y_max": 100, }, { "slug": "continuity_score", "title": "Continuity Component", "field": "continuity_score", "group": "stability", "y_min": 0, "y_max": 100, }, { "slug": "response_score", "title": "Response Component", "field": "response_score", "group": "stability", "y_min": 0, "y_max": 100, }, { "slug": "volatility_score", "title": "Volatility Component", "field": "volatility_score", "group": "stability", "y_min": 0, "y_max": 100, }, { "slug": "inbound_response_score", "title": "Inbound Response Score", "field": "inbound_response_score", "group": "commitment", "y_min": 0, "y_max": 100, }, { "slug": "outbound_response_score", "title": "Outbound Response Score", "field": "outbound_response_score", "group": "commitment", "y_min": 0, "y_max": 100, }, { "slug": "balance_inbound_score", "title": "Inbound Balance Score", "field": "balance_inbound_score", "group": "commitment", "y_min": 0, "y_max": 100, }, { "slug": "balance_outbound_score", "title": "Outbound Balance Score", "field": "balance_outbound_score", "group": "commitment", "y_min": 0, "y_max": 100, }, ] INFORMATION_OVERVIEW_SLUGS = ( "platform", "thread", "workspace_created", "stability_state", "stability_computed", "commitment_computed", "last_event", "last_ai_run", ) def _format_unix_ms(ts): if not ts: return "" dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc) return dt.strftime("%Y-%m-%d %H:%M UTC") def _infer_direction(message, person_identifiers): """ Infer message direction relative to workspace owner. """ sender = message.sender_uuid or "" if sender and sender in person_identifiers: return "in" return "out" def _get_send_state(user, person): """ Resolve current send capability from user's enabled manipulations for this person. """ manipulations = ( Manipulation.objects.filter( user=user, enabled=True, group__people=person, ) .select_related("group") .distinct() ) if not manipulations.exists(): return { "can_send": False, "level": "warning", "text": "Sending is blocked: no enabled manipulation matched this recipient.", "manipulation_id": None, } send_manip = manipulations.filter(mode__in=SEND_ENABLED_MODES).first() if send_manip: return { "can_send": True, "level": "success", "text": f"Enabled by manipulation '{send_manip.name}' ({send_manip.mode}).", "manipulation_id": str(send_manip.id), } mode_list = ", ".join(sorted({(m.mode or "unset") for m in manipulations})) return { "can_send": False, "level": "warning", "text": f"Sending is blocked by active mode(s): {mode_list}.", "manipulation_id": None, } def _get_queue_manipulation(user, person): """ Resolve a manipulation for queue entries: prefer send-enabled, otherwise any enabled manipulation in recipient groups. """ matched = ( Manipulation.objects.filter( user=user, enabled=True, group__people=person, ) .select_related("group") .distinct() ) return matched.filter(mode__in=SEND_ENABLED_MODES).first() or matched.first() def _resolve_person_identifier(user, person, preferred_service=None): """ Resolve the best identifier for outbound share/send operations. Prefer `preferred_service` when provided, then Signal, then any identifier. """ if preferred_service: preferred = PersonIdentifier.objects.filter( user=user, person=person, service=preferred_service, ).first() if preferred: return preferred signal_row = PersonIdentifier.objects.filter( user=user, person=person, service="signal", ).first() if signal_row: return signal_row return PersonIdentifier.objects.filter(user=user, person=person).first() def _send_target_options_for_person(user, person): rows = list( PersonIdentifier.objects.filter(user=user, person=person) .exclude(identifier="") .order_by("service", "identifier", "id") ) if not rows: return {"options": [], "selected_id": ""} preferred_service = _preferred_service_for_person(user, person) labels = { "signal": "Signal", "whatsapp": "WhatsApp", "instagram": "Instagram", "xmpp": "XMPP", } seen = set() options = [] for row in rows: service = str(row.service or "").strip().lower() identifier = str(row.identifier or "").strip() if not service or not identifier: continue dedupe_key = (service, identifier) if dedupe_key in seen: continue seen.add(dedupe_key) options.append( { "id": str(row.id), "service": service, "service_label": labels.get(service, service.title()), "identifier": identifier, } ) if not options: return {"options": [], "selected_id": ""} selected_id = options[0]["id"] if preferred_service: preferred = next( (item for item in options if item["service"] == preferred_service), None, ) if preferred is not None: selected_id = preferred["id"] return {"options": options, "selected_id": selected_id} def _resolve_person_identifier_target( user, person, target_identifier_id="", target_service="", fallback_service=None, ): target_id = str(target_identifier_id or "").strip() if target_id: selected = PersonIdentifier.objects.filter( user=user, person=person, id=target_id, ).first() if selected is not None: return selected preferred = str(target_service or "").strip().lower() or fallback_service return _resolve_person_identifier( user=user, person=person, preferred_service=preferred, ) def _preferred_service_for_person(user, person): """ Best-effort service hint from the most recent workspace conversation. """ conversation = ( WorkspaceConversation.objects.filter( user=user, participants=person, ) .exclude(platform_type="") .order_by("-last_event_ts", "-created_at") .first() ) if conversation and conversation.platform_type: return conversation.platform_type return None def _compose_page_url_for_person(user, person): preferred_service = _preferred_service_for_person(user, person) identifier_row = _resolve_person_identifier( user=user, person=person, preferred_service=preferred_service, ) if identifier_row is None: return "" query = urlencode( { "service": identifier_row.service, "identifier": identifier_row.identifier, "person": str(person.id), } ) return f"{reverse('compose_page')}?{query}" def _compose_widget_url_for_person(user, person, limit=40): preferred_service = _preferred_service_for_person(user, person) identifier_row = _resolve_person_identifier( user=user, person=person, preferred_service=preferred_service, ) if identifier_row is None: return "" try: safe_limit = int(limit or 40) except (TypeError, ValueError): safe_limit = 40 safe_limit = max(10, min(safe_limit, 200)) query = urlencode( { "service": identifier_row.service, "identifier": identifier_row.identifier, "person": str(person.id), "limit": safe_limit, } ) return f"{reverse('compose_widget')}?{query}" def _participant_feedback_display(conversation, person): payload = conversation.participant_feedback or {} if not isinstance(payload, dict): return None raw = payload.get(str(person.id)) or {} if not isinstance(raw, dict): return None state_key = str(raw.get("state") or "").strip().lower() state_label = { "withdrawing": "Withdrawing", "overextending": "Overextending", "balanced": "Balanced", }.get(state_key, "Unknown") state_icon = { "withdrawing": "fa-regular fa-face-frown", "overextending": "fa-regular fa-face-meh", "balanced": "fa-regular fa-face-smile", }.get(state_key, "fa-regular fa-face-meh-blank") state_class = { "withdrawing": "has-text-danger", "overextending": "has-text-warning", "balanced": "has-text-success", }.get(state_key, "has-text-grey") updated_label = "" updated_raw = raw.get("updated_at") if updated_raw: try: dt_value = datetime.fromisoformat(str(updated_raw)) if dt_value.tzinfo is None: dt_value = dt_value.replace(tzinfo=timezone.utc) updated_label = dj_timezone.localtime(dt_value).strftime("%Y-%m-%d %H:%M") except Exception: updated_label = str(updated_raw) return { "state_key": state_key or "unknown", "state_label": state_label, "state_icon": state_icon, "state_class": state_class, "inbound_messages": raw.get("inbound_messages"), "outbound_messages": raw.get("outbound_messages"), "sample_messages": raw.get("sample_messages"), "sample_days": raw.get("sample_days"), "updated_at_label": updated_label, } def _message_rows_for_person(user, person, limit): sessions = ChatSession.objects.filter(user=user, identifier__person=person) identifiers = set( PersonIdentifier.objects.filter(user=user, person=person).values_list( "identifier", flat=True ) ) messages = ( Message.objects.filter(user=user, session__in=sessions) .select_related("session", "session__identifier") .order_by("-ts")[:limit] ) rows = [] for message in reversed(list(messages)): rows.append( { "message": message, "direction": _infer_direction(message, identifiers), "ts_label": _format_unix_ms(message.ts), } ) return rows def _recent_messages_for_person(user, person, limit): sessions = ChatSession.objects.filter(user=user, identifier__person=person) messages = ( Message.objects.filter(user=user, session__in=sessions) .select_related("session", "session__identifier") .order_by("-ts")[:limit] ) return list(reversed(list(messages))) def _is_truthy(value): return str(value or "").strip().lower() in {"1", "true", "on", "yes"} def _sanitize_active_tab(value, default="plan_board"): tab = (value or "").strip() if tab in MITIGATION_TABS: return tab return default def _to_float(value): if value is None: return None return float(value) def _format_metric_value(conversation, metric_slug, latest_snapshot=None): snapshot = latest_snapshot if snapshot is None: snapshot = conversation.metric_snapshots.first() if metric_slug == "platform": return conversation.get_platform_type_display() or "-" if metric_slug == "thread": return conversation.platform_thread_id or "-" if metric_slug == "workspace_created": return conversation.created_at if metric_slug == "stability_state": return conversation.get_stability_state_display() if metric_slug == "stability_score": return conversation.stability_score if metric_slug == "reciprocity_score": return snapshot.reciprocity_score if snapshot else None if metric_slug == "continuity_score": return snapshot.continuity_score if snapshot else None if metric_slug == "response_score": return snapshot.response_score if snapshot else None if metric_slug == "volatility_score": return snapshot.volatility_score if snapshot else None if metric_slug == "stability_confidence": return conversation.stability_confidence if metric_slug == "sample_messages": return conversation.stability_sample_messages if metric_slug == "sample_days": return conversation.stability_sample_days if metric_slug == "stability_computed": return conversation.stability_last_computed_at if metric_slug == "commitment_inbound": return conversation.commitment_inbound_score if metric_slug == "commitment_outbound": return conversation.commitment_outbound_score if metric_slug == "inbound_response_score": return snapshot.inbound_response_score if snapshot else None if metric_slug == "outbound_response_score": return snapshot.outbound_response_score if snapshot else None if metric_slug == "balance_inbound_score": return snapshot.balance_inbound_score if snapshot else None if metric_slug == "balance_outbound_score": return snapshot.balance_outbound_score if snapshot else None if metric_slug == "commitment_confidence": return conversation.commitment_confidence if metric_slug == "commitment_computed": return conversation.commitment_last_computed_at if metric_slug == "last_event": return _format_unix_ms(conversation.last_event_ts) or "-" if metric_slug == "last_ai_run": return conversation.last_ai_run_at return "-" def _metric_psychological_read(metric_slug, conversation): if metric_slug == "stability_state": state = conversation.stability_state if state == WorkspaceConversation.StabilityState.CALIBRATING: return ( "Calibrating means the system does not yet have enough longitudinal " "signal to classify friction reliably. Prioritize collecting a few " "more days of normal interaction before drawing conclusions." ) if state == WorkspaceConversation.StabilityState.STABLE: return ( "Stable indicates low-friction reciprocity and predictable cadence in " "the sampled window. Keep routines consistent and focus on maintenance " "habits rather than heavy corrective interventions." ) if state == WorkspaceConversation.StabilityState.WATCH: return ( "Watch indicates meaningful strain without full collapse. This often " "matches early misunderstanding cycles: repair is still easy if you " "slow pace, validate first, and reduce escalation triggers." ) if state == WorkspaceConversation.StabilityState.FRAGILE: return ( "Fragile indicates high volatility or directional imbalance in recent " "interaction. Use short, clear, safety-first communication and avoid " "high-load conversations until cadence normalizes." ) return ( "State is an operational risk band, not a diagnosis. Read it alongside " "confidence and recent events." ) if metric_slug == "stability_score": score = conversation.stability_score if score is None: return "Calibrating: collect more interaction data before interpreting." if score >= 70: return ( "Pattern suggests low relational friction and resilient repair cycles." ) if score >= 50: return "Pattern suggests moderate strain; monitor for repeated escalation loops." return ( "Pattern suggests high friction risk; prioritise safety and repair pacing." ) if metric_slug == "stability_confidence": conf = conversation.stability_confidence or 0.0 if conf < 0.25: return "Low certainty: treat this as a weak signal, not a conclusion." if conf < 0.6: return ( "Moderate certainty: useful directional cue, still context-dependent." ) return "High certainty: trend interpretation is likely reliable." if metric_slug in {"commitment_inbound", "commitment_outbound"}: inbound = conversation.commitment_inbound_score outbound = conversation.commitment_outbound_score if inbound is None or outbound is None: return "Calibrating: directional commitment cannot be inferred yet." gap = abs(inbound - outbound) if gap < 10: return "Directional effort appears balanced." if inbound > outbound: return "Counterpart appears more responsive than user in this sample." return "User appears more responsive than counterpart in this sample." if metric_slug == "sample_days": days = conversation.stability_sample_days or 0 if days < 3: return "Coverage is narrow; day-to-day rhythm inference is weak." return "Coverage spans multiple days, improving cadence interpretation." return "" def _history_points(conversation, field_name): rows = ( conversation.metric_snapshots.exclude(**{f"{field_name}__isnull": True}) .order_by("computed_at") .values("computed_at", field_name) ) points = [] for row in rows: points.append( { "x": row["computed_at"].isoformat(), "y": row[field_name], } ) return points def _metric_supports_history(metric_slug, metric_spec): if not metric_spec.get("history_field"): return False return any(graph["slug"] == metric_slug for graph in INSIGHT_GRAPH_SPECS) def _all_graph_payload(conversation): graphs = [] for spec in INSIGHT_GRAPH_SPECS: points = _history_points(conversation, spec["field"]) graphs.append( { "slug": spec["slug"], "title": spec["title"], "group": spec["group"], "group_title": INSIGHT_GROUPS[spec["group"]]["title"], "points": points, "count": len(points), "y_min": spec["y_min"], "y_max": spec["y_max"], } ) return graphs def _information_overview_rows(conversation): latest_snapshot = conversation.metric_snapshots.first() rows = [] for slug in INFORMATION_OVERVIEW_SLUGS: spec = INSIGHT_METRICS.get(slug) if not spec: continue rows.append( { "slug": slug, "title": spec.get("title") or slug.replace("_", " ").title(), "value": _format_metric_value(conversation, slug, latest_snapshot), "group": spec.get("group"), "group_title": INSIGHT_GROUPS.get(spec.get("group"), {}).get( "title", "Information" ), } ) return rows def _commitment_directionality_payload(conversation): latest_snapshot = conversation.metric_snapshots.first() inbound = conversation.commitment_inbound_score outbound = conversation.commitment_outbound_score confidence = conversation.commitment_confidence or 0.0 if inbound is None or outbound is None: return { "direction_key": "calibrating", "direction_label": "Calibrating", "magnitude": None, "delta": None, "confidence": confidence, "conclusion": ( "Directionality cannot be inferred yet. Collect more exchanges to " "stabilize directional signal." ), "factors": [], "graph_refs": [], } delta = round(float(outbound) - float(inbound), 2) magnitude = round(abs(delta), 2) if magnitude < 4: direction_key = "balanced" direction_label = "Balanced" conclusion = ( "Commitment appears symmetric. Keep current cadence and focus on " "maintaining clarity." ) elif delta > 0: direction_key = "outbound" direction_label = "Outbound-Leaning" conclusion = ( "You are carrying relatively more directional effort right now. " "Consider reducing over-functioning and asking for explicit reciprocity." ) else: direction_key = "inbound" direction_label = "Inbound-Leaning" conclusion = ( "The other party is carrying relatively more directional effort right now. " "Acknowledge this and match consistency to reduce asymmetry." ) graph_refs = [ {"slug": "commitment_inbound", "title": "Commit In"}, {"slug": "commitment_outbound", "title": "Commit Out"}, {"slug": "inbound_response_score", "title": "Inbound Response Score"}, {"slug": "outbound_response_score", "title": "Outbound Response Score"}, {"slug": "balance_inbound_score", "title": "Inbound Balance Score"}, {"slug": "balance_outbound_score", "title": "Outbound Balance Score"}, {"slug": "commitment_confidence", "title": "Commit Confidence"}, ] factor_lookup = { "inbound_response_score": ( latest_snapshot.inbound_response_score if latest_snapshot else None ), "outbound_response_score": ( latest_snapshot.outbound_response_score if latest_snapshot else None ), "balance_inbound_score": ( latest_snapshot.balance_inbound_score if latest_snapshot else None ), "balance_outbound_score": ( latest_snapshot.balance_outbound_score if latest_snapshot else None ), "commitment_confidence": confidence, } factors = [ { "title": "Inbound Response", "icon": "fa-solid fa-inbox", "weight": "60% of Commit In", "value": factor_lookup["inbound_response_score"], "slug": "inbound_response_score", }, { "title": "Inbound Balance", "icon": "fa-solid fa-scale-balanced", "weight": "40% of Commit In", "value": factor_lookup["balance_inbound_score"], "slug": "balance_inbound_score", }, { "title": "Outbound Response", "icon": "fa-solid fa-paper-plane", "weight": "60% of Commit Out", "value": factor_lookup["outbound_response_score"], "slug": "outbound_response_score", }, { "title": "Outbound Balance", "icon": "fa-solid fa-arrows-left-right", "weight": "40% of Commit Out", "value": factor_lookup["balance_outbound_score"], "slug": "balance_outbound_score", }, { "title": "Confidence", "icon": "fa-solid fa-shield-check", "weight": "Applies To Direction", "value": confidence, "slug": "commitment_confidence", }, ] return { "direction_key": direction_key, "direction_label": direction_label, "magnitude": magnitude, "delta": delta, "confidence": confidence, "conclusion": conclusion, "commit_in": inbound, "commit_out": outbound, "factors": factors, "graph_refs": graph_refs, } def _metric_pattern_context(conversation): latest_snapshot = conversation.metric_snapshots.first() directionality = _commitment_directionality_payload(conversation) confidence = conversation.stability_confidence or 0.0 risk_signals = [] state_key = str(conversation.stability_state or "").lower() if state_key == WorkspaceConversation.StabilityState.FRAGILE: risk_signals.append( { "key": "stability_fragile", "label": "Fragile Stability", "severity": "high", "explanation": ( "Stability is in fragile range. Bias corrections toward " "de-escalation and explicit repair loops." ), } ) elif state_key == WorkspaceConversation.StabilityState.WATCH: risk_signals.append( { "key": "stability_watch", "label": "Watch Stability", "severity": "medium", "explanation": ( "Stability is watch-range. Reinforce concise requests and " "misinterpretation checks before escalation." ), } ) if confidence < 0.25: risk_signals.append( { "key": "low_confidence", "label": "Low Confidence Window", "severity": "low", "explanation": ( "Confidence is low. Prefer reversible, low-risk corrections " "that can be validated quickly." ), } ) magnitude = directionality.get("magnitude") if magnitude is not None: severity = "high" if magnitude >= 15 else "medium" if magnitude >= 8 else None if severity: risk_signals.append( { "key": "commitment_asymmetry", "label": "Commitment Asymmetry", "severity": severity, "explanation": ( "Directional commitment is asymmetric. Add corrections " "that restore reciprocity and explicit confirmation." ), } ) if latest_snapshot: if ( latest_snapshot.volatility_score is not None and latest_snapshot.volatility_score >= 70 ): risk_signals.append( { "key": "volatility_spike", "label": "Volatility Spike", "severity": "medium", "explanation": ( "Volatility is elevated. Use short, bounded wording to " "reduce sudden interaction swings." ), } ) if ( latest_snapshot.reciprocity_score is not None and latest_snapshot.reciprocity_score <= 35 ): risk_signals.append( { "key": "reciprocity_drop", "label": "Reciprocity Drop", "severity": "medium", "explanation": ( "Reciprocity is low. Add corrections that request and " "acknowledge balanced turn-taking." ), } ) if ( latest_snapshot.response_score is not None and latest_snapshot.response_score <= 35 ): risk_signals.append( { "key": "response_drag", "label": "Response Drag", "severity": "medium", "explanation": ( "Response pace is slow. Prefer corrections that set timing " "expectations and explicit follow-up windows." ), } ) state_label = ( conversation.get_stability_state_display() if hasattr(conversation, "get_stability_state_display") else str(conversation.stability_state or "") ) return { "stability": { "state": state_label, "score": conversation.stability_score, "confidence": confidence, "sample_messages": conversation.stability_sample_messages, "sample_days": conversation.stability_sample_days, "computed_at": conversation.stability_last_computed_at, }, "commitment": { "inbound": conversation.commitment_inbound_score, "outbound": conversation.commitment_outbound_score, "confidence": conversation.commitment_confidence, "computed_at": conversation.commitment_last_computed_at, "directionality": directionality, }, "components": ( { "reciprocity": latest_snapshot.reciprocity_score, "continuity": latest_snapshot.continuity_score, "response": latest_snapshot.response_score, "volatility": latest_snapshot.volatility_score, "inbound_response": latest_snapshot.inbound_response_score, "outbound_response": latest_snapshot.outbound_response_score, "balance_inbound": latest_snapshot.balance_inbound_score, "balance_outbound": latest_snapshot.balance_outbound_score, "source_event_ts": latest_snapshot.source_event_ts, } if latest_snapshot else {} ), "risk_signals": risk_signals[:8], } def _store_metric_snapshot(conversation, payload): compare_keys = [ "source_event_ts", "stability_state", "stability_score", "stability_confidence", "stability_sample_messages", "stability_sample_days", "commitment_inbound_score", "commitment_outbound_score", "commitment_confidence", "inbound_messages", "outbound_messages", "reciprocity_score", "continuity_score", "response_score", "volatility_score", "inbound_response_score", "outbound_response_score", "balance_inbound_score", "balance_outbound_score", ] last = conversation.metric_snapshots.first() if last and all(getattr(last, key) == payload.get(key) for key in compare_keys): return WorkspaceMetricSnapshot.objects.create(conversation=conversation, **payload) def _parse_draft_options(result_text): """ Parse model output into labeled draft options shown simultaneously in UI. """ content = (result_text or "").strip() if not content: return [] def clean_option_text(value): value = (value or "").strip(" \n\r\t*:") # Strip surrounding quotes when the whole option is wrapped. if len(value) >= 2 and ( (value[0] == '"' and value[-1] == '"') or (value[0] == "'" and value[-1] == "'") ): value = value[1:-1].strip() return value def dedupe_by_label(seq): ordered = [] seen = set() for item in seq: label = (item.get("label") or "").strip().title() text = clean_option_text(item.get("text") or "") if not label or not text: continue key = label.lower() if key in seen: continue seen.add(key) ordered.append({"label": label, "text": text}) return ordered # Primary parser: line-based labeled blocks. # Accepts: # - Soft/Neutral/Firm # - optional Tone/Response/Reply suffix # - optional markdown bold markers # - content on same line or subsequent lines block_re = re.compile( r"(?ims)^\s*(?:[-*]\s*)?(?:\*\*)?\s*(Soft|Neutral|Firm)\s*" r"(?:(?:Tone|Response|Reply))?\s*:?\s*(?:\*\*)?\s*" r"(.*?)(?=^\s*(?:[-*]\s*)?(?:\*\*)?\s*(?:Soft|Neutral|Firm)\s*" r"(?:(?:Tone|Response|Reply))?\s*:?\s*(?:\*\*)?\s*|\Z)" ) options = [ {"label": match.group(1).strip().title(), "text": match.group(2)} for match in block_re.finditer(content) ] options = dedupe_by_label(options) if options: return options[:3] # Secondary parser: inline labeled segments in one paragraph. inline_re = re.compile( r"(?is)\b(Soft|Neutral|Firm)\s*(?:(?:Tone|Response|Reply))?\s*:\s*(.*?)" r"(?=\b(?:Soft|Neutral|Firm)\s*(?:(?:Tone|Response|Reply))?\s*:|$)" ) options = [ {"label": match.group(1).strip().title(), "text": match.group(2)} for match in inline_re.finditer(content) ] options = dedupe_by_label(options) if options: return options[:3] # Secondary parser: Option 1/2/3 blocks. option_split_re = re.compile(r"(?im)^\s*Option\s+\d+\s*$") chunks = [ chunk.strip() for chunk in option_split_re.split(content) if chunk.strip() ] parsed = [] prefix_re = re.compile( r"(?im)^(?:\*\*)?\s*(Soft|Neutral|Firm)\s*(?:Tone|Response|Reply)?\s*:?\s*(?:\*\*)?\s*" ) for idx, chunk in enumerate(chunks, start=1): label = f"Option {idx}" prefix_match = prefix_re.match(chunk) if prefix_match: label = prefix_match.group(1).strip().title() chunk = prefix_re.sub("", chunk, count=1).strip(" \n\r\t*:") if chunk: parsed.append({"label": label, "text": chunk}) if parsed: return dedupe_by_label(parsed)[:3] # Final fallback: use first non-empty paragraphs. paragraphs = [ para.strip() for para in re.split(r"\n\s*\n", content) if para.strip() ] return dedupe_by_label( [ {"label": f"Option {idx}", "text": para} for idx, para in enumerate(paragraphs[:3], start=1) ] ) def _extract_seed_entities_from_context(raw_context): """ Heuristic extractor for pasted long-form frameworks. Returns candidate fundamentals/rules/games without model dependency. """ text = (raw_context or "").strip() if not text: return {"fundamentals": [], "rules": [], "games": []} lines = [line.strip() for line in text.splitlines()] fundamentals = [] rules = [] games = [] current_rule = None current_game = None in_core_principle = False in_quick_cheat = False def flush_rule(): nonlocal current_rule if not current_rule: return title = current_rule.get("title", "").strip() content = " ".join(current_rule.get("body", [])).strip() if title and content: rules.append({"title": title, "content": content}) elif title: rules.append({"title": title, "content": title}) current_rule = None def flush_game(): nonlocal current_game if not current_game: return title = current_game.get("title", "").strip() instructions = " ".join(current_game.get("body", [])).strip() if title and instructions: games.append({"title": title, "instructions": instructions}) elif title: games.append({"title": title, "instructions": title}) current_game = None for line in lines: if not line: flush_rule() flush_game() in_core_principle = False in_quick_cheat = False continue if re.match(r"^SECTION\s+\d+", line, re.IGNORECASE): in_core_principle = False in_quick_cheat = False flush_rule() flush_game() continue if re.match(r"^CORE PRINCIPLE", line, re.IGNORECASE): in_core_principle = True in_quick_cheat = False continue if re.match(r"^QUICK CHEAT SHEET", line, re.IGNORECASE): in_quick_cheat = True in_core_principle = False continue rule_match = re.match(r"^Rule\s+(\d+)\s*:\s*(.+)$", line, re.IGNORECASE) game_match = re.match(r"^Game\s+(\d+)\s*:\s*(.+)$", line, re.IGNORECASE) mantra_match = re.match(r"^Mantra\s*:\s*(.+)$", line, re.IGNORECASE) if rule_match: flush_rule() flush_game() title = rule_match.group(2).strip() current_rule = {"title": title, "body": []} if rule_match.group(1) in {"1", "11"} and title: fundamentals.append(title) continue if game_match: flush_rule() flush_game() title = game_match.group(2).strip() current_game = {"title": title, "body": []} continue if mantra_match: fundamentals.append(mantra_match.group(1).strip()) continue if in_core_principle and len(line) <= 120 and ":" not in line: fundamentals.append(line) continue if in_quick_cheat: quick_line = re.sub(r"^\s*(?:[-*]|\d+\.)\s*", "", line).strip() if ( quick_line and len(quick_line) <= 120 and not quick_line.lower().startswith("if you want") ): fundamentals.append(quick_line) continue if "Emotional safety > Accuracy > Analysis" in line: fundamentals.append("Emotional safety > Accuracy > Analysis") continue if current_rule: current_rule["body"].append(line) continue if current_game: current_game["body"].append(line) continue flush_rule() flush_game() # Keep order, remove duplicates. def dedupe_strings(seq): seen = set() out = [] for item in seq: key = item.strip().lower() if not key or key in seen: continue seen.add(key) out.append(item.strip()) return out def dedupe_dicts(seq): seen = set() out = [] for item in seq: title = (item.get("title") or "").strip() key = title.lower() if not key or key in seen: continue seen.add(key) out.append(item) return out return { "fundamentals": dedupe_strings(fundamentals)[:20], "rules": dedupe_dicts(rules)[:40], "games": dedupe_dicts(games)[:40], } def _merge_seed_entities(artifacts, seed): merged = dict(artifacts or {}) seed = seed or {} fundamentals = list(merged.get("fundamental_items") or []) fundamentals = list( dict.fromkeys(fundamentals + list(seed.get("fundamentals") or [])) ) merged["fundamental_items"] = fundamentals def merge_artifact_list(existing, injected, body_key): existing = list(existing or []) injected = list(injected or []) seen = {(item.get("title") or "").strip().lower() for item in existing} for item in injected: title = (item.get("title") or "").strip() body = (item.get(body_key) or "").strip() if not title or not body: continue key = title.lower() if key in seen: continue existing.append({"title": title, body_key: body}) seen.add(key) return existing merged["rules"] = merge_artifact_list( merged.get("rules"), seed.get("rules"), "content" ) merged["games"] = merge_artifact_list( merged.get("games"), seed.get("games"), "instructions" ) return merged def _normalize_markdown_titles(text): """ Minimal markdown cleanup: - convert '**Title:**' style lines into markdown headings so Bulma headers can style them. """ out = [] for line in (text or "").splitlines(): match = re.match(r"^\s*\*\*(.+?)\*\*\s*:?\s*$", line) if match: out.append(f"## {match.group(1).strip()}") else: out.append(line) return "\n".join(out) def _clean_inline_markdown(value): value = re.sub(r"\*\*(.*?)\*\*", r"\1", value) value = re.sub(r"\*(.*?)\*", r"\1", value) value = re.sub(r"`(.*?)`", r"\1", value) return value.strip() def _append_block(section, block_type, values): if not values: return section["blocks"].append({"type": block_type, "items": values}) def _parse_result_sections(result_text): """ Minimal markdown-ish parser used by UI: - '#/##/### Title' become section headers - bullet lines become lists - remaining lines are grouped as paragraphs Returned structure is template-safe (no raw HTML). """ text = _normalize_markdown_titles(result_text or "") lines = text.splitlines() sections = [] current = {"title": "Output", "level": 3, "blocks": []} paragraph = [] bullets = [] def flush_paragraph(): nonlocal paragraph if paragraph: _append_block(current, "p", [" ".join(paragraph)]) paragraph = [] def flush_bullets(): nonlocal bullets if bullets: _append_block(current, "ul", bullets) bullets = [] def flush_section(force=False): if force or current["blocks"]: sections.append(current.copy()) for raw_line in lines: line = raw_line.rstrip() heading_match = re.match(r"^\s*(#{1,6})\s+(.+?)\s*$", line) if heading_match: flush_paragraph() flush_bullets() flush_section() level = len(heading_match.group(1)) title = _clean_inline_markdown(heading_match.group(2)) current = {"title": title or "Section", "level": level, "blocks": []} continue bullet_match = re.match(r"^\s*(?:[-*]|\d+\.)\s+(.+?)\s*$", line) if bullet_match: flush_paragraph() bullets.append(_clean_inline_markdown(bullet_match.group(1))) continue if not line.strip(): flush_paragraph() flush_bullets() continue flush_bullets() paragraph.append(_clean_inline_markdown(line)) flush_paragraph() flush_bullets() flush_section(force=True) cleaned = [sec for sec in sections if sec.get("blocks")] if cleaned: return cleaned fallback = _clean_inline_markdown(result_text or "") return [ {"title": "Output", "level": 3, "blocks": [{"type": "p", "items": [fallback]}]} ] def _build_interaction_signals(operation, result_text, message_event_ids): def _normalize_signal_key(value): key = re.sub(r"[^a-z0-9]+", "_", str(value or "").strip().lower()).strip("_") if key == "open_loops": return "open_loop" return key def _signal_display_label(label, key): if str(label or "").strip(): return str(label).strip().title() return str(key or "Signal").replace("_", " ").strip().title() meaning_by_key = { "repair": "Repair markers suggest active attempts to restore connection.", "de_escalation": "De-escalation markers suggest pressure reduction and safer tone.", "open_loop": "Open loops are unresolved topics likely to reappear later.", "risk": "Risk markers indicate escalating friction or potential rupture points.", "conflict": "Conflict markers indicate direct tension or adversarial framing.", "draft_generated": "Draft generation indicates actionable next-step options are available.", } text = (result_text or "").lower() signals = [] heuristics = [ ("repair", "repair", "positive"), ("de-escalation", "de-escalat", "positive"), ("open loop", "open loop", "neutral"), ("risk", "risk", "risk"), ("conflict", "conflict", "risk"), ] for label, token, valence in heuristics: if token in text: signal_key = _normalize_signal_key(label) signals.append( { "label": label, "display_label": _signal_display_label(label, signal_key), "signal_key": signal_key, "meaning": meaning_by_key.get(signal_key, ""), "valence": valence, "message_event_ids": message_event_ids[:6], } ) if not signals and operation == "draft_reply": signal_key = _normalize_signal_key("draft_generated") signals.append( { "label": "draft_generated", "display_label": _signal_display_label("draft_generated", signal_key), "signal_key": signal_key, "meaning": meaning_by_key.get(signal_key, ""), "valence": "positive", "message_event_ids": message_event_ids[:3], } ) return signals[:8] def _memory_kind_from_title(title): lowered = str(title or "").strip().lower() if "open" in lowered and "loop" in lowered: return "open_loops", "Open Loops" if "emotion" in lowered or "state" in lowered: return "emotional_state", "Emotional State" if "pattern" in lowered: return "patterns", "Patterns" if "friction" in lowered: return "friction_loops", "Friction Loops" if "rule" in lowered or "next-step" in lowered: return "rules", "Rules" if "summary" in lowered or "key" in lowered: return "summary", "Summary" return "insights", "Insights" def _build_memory_proposals(operation, result_text): """ Build structured, grouped proposals from model output sections. """ if operation not in {"summarise", "extract_patterns"}: return [] sections = _parse_result_sections(result_text) proposals = [] for section in sections: kind, label = _memory_kind_from_title(section.get("title")) section_title = (section.get("title") or "").strip() for block in section.get("blocks", []): for item in block.get("items", []): content = str(item or "").strip() if not content: continue proposals.append( { "kind": kind, "kind_label": label, "section_title": section_title, "content": content, "status": "proposed", } ) if not proposals: fallback = (result_text or "").strip() if fallback: proposals.append( { "kind": "summary", "kind_label": "Summary", "section_title": "Summary", "content": fallback, "status": "proposed", } ) return proposals[:80] def _group_memory_proposals(memory_proposals): signal_keys_by_kind = { "open_loops": ["open_loop"], "emotional_state": ["de_escalation", "repair", "conflict"], "patterns": ["repair", "conflict"], "friction_loops": ["conflict", "risk"], "summary": ["open_loop", "repair", "de_escalation", "conflict", "risk"], "rules": ["repair", "conflict", "risk"], "insights": ["open_loop", "repair", "de_escalation", "conflict", "risk"], } grouped = {} for item in memory_proposals or []: label = str(item.get("kind_label") or item.get("kind") or "Insights").strip() key = str(item.get("kind") or label).strip().lower() if key not in grouped: grouped[key] = { "title": label, "key": key, "signal_keys": list(signal_keys_by_kind.get(key, [])), "items": [], } grouped[key]["items"].append(item) return list(grouped.values()) def _window_spec_tags(window_spec): if not isinstance(window_spec, dict): return [] out = [] if "limit" in window_spec: out.append(f"Window: last {window_spec.get('limit')} messages") if "since_ts" in window_spec: out.append(f"Since: {_format_unix_ms(window_spec.get('since_ts'))}") if "between_ts" in window_spec and isinstance(window_spec.get("between_ts"), list): values = window_spec.get("between_ts") if len(values) == 2: out.append( f"Range: {_format_unix_ms(values[0])} to {_format_unix_ms(values[1])}" ) if not out: for key, value in window_spec.items(): out.append(f"{str(key).replace('_', ' ').title()}: {value}") return out[:6] def _policy_snapshot_tags(policy_snapshot): if not isinstance(policy_snapshot, dict): return [] out = [] send_state = policy_snapshot.get("send_state") if isinstance(send_state, dict): out.append(f"Send: {'enabled' if send_state.get('can_send') else 'blocked'}") text = str(send_state.get("text") or "").strip() if text: out.append(text) for key, value in policy_snapshot.items(): if key == "send_state": continue out.append(f"{str(key).replace('_', ' ').title()}: {value}") return out[:6] def _extract_json_object(raw): text = (raw or "").strip() if not text: return None try: parsed = json.loads(text) if isinstance(parsed, dict): return parsed except Exception: pass start = text.find("{") if start == -1: return None depth = 0 end = None for index, char in enumerate(text[start:], start=start): if char == "{": depth += 1 elif char == "}": depth -= 1 if depth == 0: end = index + 1 break if end is None: return None try: parsed = json.loads(text[start:end]) if isinstance(parsed, dict): return parsed except Exception: return None return None def _section_lines(section): lines = [] for block in section.get("blocks", []): lines.extend([item for item in block.get("items", []) if item]) return lines def _shape_artifacts_for_profile(rules, games, output_profile): """ Apply lightweight profile shaping for generated mitigation artifacts. """ profile = (output_profile or "framework").strip().lower() if profile in {"rules", "rule"}: return rules[:12], games[:2] if profile in {"games", "game"}: return rules[:3], games[:12] # framework: balanced return rules[:10], games[:10] def _default_artifacts_from_patterns(result_text, person, output_profile="framework"): sections = _parse_result_sections(result_text) rules = [] games = [] for section in sections: title = (section.get("title") or "").lower() lines = _section_lines(section) if not lines: continue if "rule" in title or "next-step" in title or "mitigation" in title: for idx, line in enumerate(lines, start=1): rules.append({"title": f"Rule {idx}", "content": line}) elif "game" in title or "protocol" in title: for idx, line in enumerate(lines, start=1): games.append({"title": f"Game {idx}", "instructions": line}) if not rules: rules = [ { "title": "Safety Before Analysis", "content": "Prioritize reducing emotional escalation before introducing analysis.", }, { "title": "State Matching", "content": "If either side is flooded, pause first and resume with a time-bound return.", }, ] if not games: games = [ { "title": "Two-Turn Pause", "instructions": "Limit conflict responses to two short turns, then pause with a clear return time.", }, { "title": "Mirror Then Ask", "instructions": "Mirror what you heard, validate emotion, then ask whether comfort or solutions are wanted.", }, ] rules, games = _shape_artifacts_for_profile(rules, games, output_profile) return { "title": f"{person.name} Pattern Mitigation", "objective": "Reduce repeated friction loops while preserving trust and clarity.", "fundamental_items": [], "rules": rules, "games": games, "corrections": [], } def _build_mitigation_artifacts( ai_obj, person, source_text, creation_mode, inspiration, fundamentals, output_profile, metric_context=None, ): fallback = _default_artifacts_from_patterns(source_text, person, output_profile) if not ai_obj: if fundamentals: fallback["fundamental_items"] = fundamentals return fallback prompt = [ { "role": "system", "content": ( "You design practical relationship mitigation protocols. " "Return strict JSON only with keys: title, objective, " "fundamental_items, rules, games, corrections. " "Each rule item must have title and content. " "Each game item must have title and instructions. " "Each correction item must have title and clarification. " "If mode is auto, choose strongest artifacts. If mode is guided, strongly follow inspiration. " "Use provided metrics as risk context to tighten corrections. " "Output profile controls emphasis: framework (balanced), " "rules (rules-first), games (games-first)." ), }, { "role": "user", "content": ( f"Person: {person.name}\n" f"Mode: {creation_mode}\n" f"Output profile: {output_profile}\n" f"User inspiration: {inspiration or 'None'}\n" f"Fundamental items (pre-agreed): {json.dumps(fundamentals)}\n\n" "Metric context:\n" f"{json.dumps(metric_context or {}, ensure_ascii=False, default=str)}\n\n" f"Pattern analysis:\n{source_text}" ), }, ] try: raw = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj) except Exception: raw = "" parsed = _extract_json_object(raw) or {} title = (parsed.get("title") or "").strip() or fallback["title"] objective = (parsed.get("objective") or "").strip() or fallback["objective"] parsed_fundamentals = parsed.get("fundamental_items") if isinstance(parsed_fundamentals, list): merged_fundamentals = [ str(item).strip() for item in parsed_fundamentals if str(item).strip() ] else: merged_fundamentals = [] if fundamentals: merged_fundamentals = list(dict.fromkeys(fundamentals + merged_fundamentals)) raw_rules = parsed.get("rules") rules = [] if isinstance(raw_rules, list): for item in raw_rules: if not isinstance(item, dict): continue title_i = str(item.get("title") or "").strip() content_i = str(item.get("content") or "").strip() if title_i and content_i: rules.append({"title": title_i, "content": content_i}) raw_games = parsed.get("games") games = [] if isinstance(raw_games, list): for item in raw_games: if not isinstance(item, dict): continue title_i = str(item.get("title") or "").strip() instructions_i = str(item.get("instructions") or "").strip() if title_i and instructions_i: games.append({"title": title_i, "instructions": instructions_i}) raw_corrections = parsed.get("corrections") corrections = [] if isinstance(raw_corrections, list): for item in raw_corrections: if not isinstance(item, dict): continue title_i = _normalize_correction_title( item.get("title") or "", fallback="Correction" ) clarification_i = str( item.get("clarification") or item.get("content") or "" ).strip() source_phrase_i = str(item.get("source_phrase") or "").strip() if title_i and clarification_i: corrections.append( { "title": title_i[:255], "clarification": clarification_i[:2000], "source_phrase": source_phrase_i[:1000], } ) if not rules: rules = fallback["rules"] if not games: games = fallback["games"] rules, games = _shape_artifacts_for_profile(rules, games, output_profile) return { "title": title, "objective": objective, "fundamental_items": merged_fundamentals, "rules": rules, "games": games, "corrections": _normalize_violation_items(corrections), } def _serialize_export_payload(plan, artifact_type, export_format): rules = list( plan.rules.order_by("created_at").values("title", "content", "enabled") ) games = list( plan.games.order_by("created_at").values("title", "instructions", "enabled") ) corrections = list( plan.corrections.order_by("created_at").values( "title", "clarification", "enabled" ) ) body = { "protocol_version": "artifact-v1", "plan_id": str(plan.id), "plan_title": plan.title, "objective": plan.objective, "fundamental_items": plan.fundamental_items or [], "rules": rules, "games": games, "corrections": corrections, } if artifact_type == "rules": body = { **body, "games": [], "corrections": [], } elif artifact_type == "games": body = { **body, "rules": [], "corrections": [], } elif artifact_type == "corrections": body = { **body, "rules": [], "games": [], } if export_format == "json": payload = json.dumps(body, indent=2) else: lines = [ f"# {plan.title or 'Pattern Mitigation Artifact'}", "", "Protocol: artifact-v1", f"Artifact Type: {artifact_type}", "", "## Objective", plan.objective or "(none)", "", "## Fundamental Items", ] fundamentals = plan.fundamental_items or [] if fundamentals: lines.extend([f"- {item}" for item in fundamentals]) else: lines.append("- (none)") if artifact_type in {"rulebook", "rules"}: lines.append("") lines.append("## Rules") if rules: for idx, rule in enumerate(rules, start=1): lines.append(f"{idx}. **{rule['title']}** - {rule['content']}") else: lines.append("- (none)") if artifact_type in {"rulebook", "games"}: lines.append("") lines.append("## Games") if games: for idx, game in enumerate(games, start=1): lines.append(f"{idx}. **{game['title']}** - {game['instructions']}") else: lines.append("- (none)") if artifact_type in {"rulebook", "corrections"}: lines.append("") lines.append("## Corrections") if corrections: for idx, correction in enumerate(corrections, start=1): lines.append( f"{idx}. **{correction['title']}** - {correction['clarification']}" ) else: lines.append("- (none)") payload = "\n".join(lines) meta = { "rule_count": len(rules), "game_count": len(games), "correction_count": len(corrections), "fundamental_count": len(plan.fundamental_items or []), } return payload, meta def _conversation_for_person(user, person): primary_identifier = ( PersonIdentifier.objects.filter(user=user, person=person) .order_by("service") .first() ) default_platform = primary_identifier.service if primary_identifier else "signal" thread_id = str(person.id) conversation = ( WorkspaceConversation.objects.filter( user=user, platform_thread_id=thread_id, ).first() or WorkspaceConversation.objects.filter( user=user, participants=person, ) .order_by("-created_at") .first() ) if conversation is None: conversation = WorkspaceConversation.objects.create( user=user, platform_type=default_platform, title=f"{person.name} Workspace", platform_thread_id=thread_id, ) else: update_fields = [] if conversation.platform_thread_id != thread_id: conversation.platform_thread_id = thread_id update_fields.append("platform_thread_id") expected_title = f"{person.name} Workspace" if not conversation.title: conversation.title = expected_title update_fields.append("title") if update_fields: conversation.save(update_fields=update_fields) conversation.participants.add(person) _refresh_conversation_stability(conversation, user, person) return conversation def _score_from_lag(lag_ms, target_hours=4): if lag_ms is None: return 50.0 target_ms = max(1, target_hours) * 60 * 60 * 1000 return max(0.0, min(100.0, 100.0 / (1.0 + (lag_ms / target_ms)))) def _median_or_none(values): if not values: return None return float(statistics.median(values)) def _refresh_conversation_stability(conversation, user, person): """ Populate stability/commitment fields from cross-platform history. Uses Person -> PersonIdentifier -> ChatSession -> Message so all linked services contribute to one workspace conversation. """ now_ts = dj_timezone.now() identifiers = list( PersonIdentifier.objects.filter( user=user, person=person, ) ) identifier_values = { str(row.identifier or "").strip() for row in identifiers if row.identifier } if not identifiers: conversation.stability_state = WorkspaceConversation.StabilityState.CALIBRATING conversation.stability_score = None conversation.stability_confidence = 0.0 conversation.stability_sample_messages = 0 conversation.stability_sample_days = 0 conversation.commitment_inbound_score = None conversation.commitment_outbound_score = None conversation.commitment_confidence = 0.0 conversation.stability_last_computed_at = now_ts conversation.commitment_last_computed_at = now_ts conversation.save( update_fields=[ "stability_state", "stability_score", "stability_confidence", "stability_sample_messages", "stability_sample_days", "commitment_inbound_score", "commitment_outbound_score", "commitment_confidence", "stability_last_computed_at", "commitment_last_computed_at", ] ) _store_metric_snapshot( conversation, { "source_event_ts": conversation.last_event_ts, "stability_state": conversation.stability_state, "stability_score": None, "stability_confidence": 0.0, "stability_sample_messages": 0, "stability_sample_days": 0, "commitment_inbound_score": None, "commitment_outbound_score": None, "commitment_confidence": 0.0, "inbound_messages": 0, "outbound_messages": 0, "reciprocity_score": None, "continuity_score": None, "response_score": None, "volatility_score": None, "inbound_response_score": None, "outbound_response_score": None, "balance_inbound_score": None, "balance_outbound_score": None, }, ) return latest_ts = ( Message.objects.filter( user=user, session__identifier__in=identifiers, ) .order_by("-ts") .values_list("ts", flat=True) .first() ) if ( conversation.stability_last_computed_at and latest_ts is not None and conversation.last_event_ts == latest_ts and (now_ts - conversation.stability_last_computed_at).total_seconds() < 120 ): return rows = list( Message.objects.filter( user=user, session__identifier__in=identifiers, ) .order_by("ts") .values("ts", "sender_uuid", "session__identifier__service") ) if not rows: conversation.stability_state = WorkspaceConversation.StabilityState.CALIBRATING conversation.stability_score = None conversation.stability_confidence = 0.0 conversation.stability_sample_messages = 0 conversation.stability_sample_days = 0 conversation.last_event_ts = None conversation.commitment_inbound_score = None conversation.commitment_outbound_score = None conversation.commitment_confidence = 0.0 conversation.stability_last_computed_at = now_ts conversation.commitment_last_computed_at = now_ts conversation.save( update_fields=[ "stability_state", "stability_score", "stability_confidence", "stability_sample_messages", "stability_sample_days", "last_event_ts", "commitment_inbound_score", "commitment_outbound_score", "commitment_confidence", "stability_last_computed_at", "commitment_last_computed_at", ] ) _store_metric_snapshot( conversation, { "source_event_ts": conversation.last_event_ts, "stability_state": conversation.stability_state, "stability_score": None, "stability_confidence": 0.0, "stability_sample_messages": 0, "stability_sample_days": 0, "commitment_inbound_score": None, "commitment_outbound_score": None, "commitment_confidence": 0.0, "inbound_messages": 0, "outbound_messages": 0, "reciprocity_score": None, "continuity_score": None, "response_score": None, "volatility_score": None, "inbound_response_score": None, "outbound_response_score": None, "balance_inbound_score": None, "balance_outbound_score": None, }, ) return inbound_count = 0 outbound_count = 0 daily_counts = {} inbound_response_lags = [] outbound_response_lags = [] pending_in_ts = None pending_out_ts = None first_ts = rows[0]["ts"] last_ts = rows[-1]["ts"] latest_service = ( rows[-1].get("session__identifier__service") or conversation.platform_type ) for row in rows: ts = int(row["ts"] or 0) sender = str(row.get("sender_uuid") or "").strip() is_inbound = sender in identifier_values direction = "in" if is_inbound else "out" day_key = datetime.fromtimestamp(ts / 1000, tz=timezone.utc).date().isoformat() daily_counts[day_key] = daily_counts.get(day_key, 0) + 1 if direction == "in": inbound_count += 1 if pending_out_ts is not None and ts >= pending_out_ts: inbound_response_lags.append(ts - pending_out_ts) pending_out_ts = None pending_in_ts = ts else: outbound_count += 1 if pending_in_ts is not None and ts >= pending_in_ts: outbound_response_lags.append(ts - pending_in_ts) pending_in_ts = None pending_out_ts = ts message_count = len(rows) span_days = max(1, int(((last_ts - first_ts) / (24 * 60 * 60 * 1000)) + 1)) sample_days = len(daily_counts) total_messages = max(1, inbound_count + outbound_count) reciprocity_score = 100.0 * ( 1.0 - abs(inbound_count - outbound_count) / total_messages ) continuity_score = 100.0 * min(1.0, sample_days / max(1, span_days)) out_resp_score = _score_from_lag(_median_or_none(outbound_response_lags)) in_resp_score = _score_from_lag(_median_or_none(inbound_response_lags)) response_score = (out_resp_score + in_resp_score) / 2.0 daily_values = list(daily_counts.values()) if len(daily_values) > 1: mean_daily = statistics.mean(daily_values) stdev_daily = statistics.pstdev(daily_values) cv = (stdev_daily / mean_daily) if mean_daily else 1.0 volatility_score = max(0.0, 100.0 * (1.0 - min(cv, 1.5) / 1.5)) else: volatility_score = 60.0 stability_score = ( (0.35 * reciprocity_score) + (0.25 * continuity_score) + (0.20 * response_score) + (0.20 * volatility_score) ) balance_out = 100.0 * min(1.0, outbound_count / max(1, inbound_count)) balance_in = 100.0 * min(1.0, inbound_count / max(1, outbound_count)) commitment_out = (0.60 * out_resp_score) + (0.40 * balance_out) commitment_in = (0.60 * in_resp_score) + (0.40 * balance_in) msg_conf = min(1.0, message_count / 200.0) day_conf = min(1.0, sample_days / 30.0) pair_conf = min( 1.0, (len(inbound_response_lags) + len(outbound_response_lags)) / 40.0 ) confidence = (0.50 * msg_conf) + (0.30 * day_conf) + (0.20 * pair_conf) if message_count < 20 or sample_days < 3 or confidence < 0.25: stability_state = WorkspaceConversation.StabilityState.CALIBRATING stability_score_value = None commitment_in_value = None commitment_out_value = None else: stability_score_value = round(stability_score, 2) commitment_in_value = round(commitment_in, 2) commitment_out_value = round(commitment_out, 2) if stability_score_value >= 70: stability_state = WorkspaceConversation.StabilityState.STABLE elif stability_score_value >= 50: stability_state = WorkspaceConversation.StabilityState.WATCH else: stability_state = WorkspaceConversation.StabilityState.FRAGILE feedback_state = "balanced" if outbound_count > (inbound_count * 1.5): feedback_state = "withdrawing" elif inbound_count > (outbound_count * 1.5): feedback_state = "overextending" feedback = dict(conversation.participant_feedback or {}) feedback[str(person.id)] = { "state": feedback_state, "inbound_messages": inbound_count, "outbound_messages": outbound_count, "sample_messages": message_count, "sample_days": sample_days, "updated_at": now_ts.isoformat(), } conversation.platform_type = latest_service or conversation.platform_type conversation.last_event_ts = last_ts conversation.stability_state = stability_state conversation.stability_score = stability_score_value conversation.stability_confidence = round(confidence, 3) conversation.stability_sample_messages = message_count conversation.stability_sample_days = sample_days conversation.stability_last_computed_at = now_ts conversation.commitment_inbound_score = commitment_in_value conversation.commitment_outbound_score = commitment_out_value conversation.commitment_confidence = round(confidence, 3) conversation.commitment_last_computed_at = now_ts conversation.participant_feedback = feedback conversation.save( update_fields=[ "platform_type", "last_event_ts", "stability_state", "stability_score", "stability_confidence", "stability_sample_messages", "stability_sample_days", "stability_last_computed_at", "commitment_inbound_score", "commitment_outbound_score", "commitment_confidence", "commitment_last_computed_at", "participant_feedback", ] ) _store_metric_snapshot( conversation, { "source_event_ts": last_ts, "stability_state": stability_state, "stability_score": _to_float(stability_score_value), "stability_confidence": round(confidence, 3), "stability_sample_messages": message_count, "stability_sample_days": sample_days, "commitment_inbound_score": _to_float(commitment_in_value), "commitment_outbound_score": _to_float(commitment_out_value), "commitment_confidence": round(confidence, 3), "inbound_messages": inbound_count, "outbound_messages": outbound_count, "reciprocity_score": round(reciprocity_score, 3), "continuity_score": round(continuity_score, 3), "response_score": round(response_score, 3), "volatility_score": round(volatility_score, 3), "inbound_response_score": round(in_resp_score, 3), "outbound_response_score": round(out_resp_score, 3), "balance_inbound_score": round(balance_in, 3), "balance_outbound_score": round(balance_out, 3), }, ) def _parse_fundamentals(raw_text): lines = [] for line in (raw_text or "").splitlines(): cleaned = line.strip() if cleaned: lines.append(cleaned) return lines def _engage_source_options(plan): options = [] for rule in plan.rules.order_by("created_at"): options.append( { "value": f"rule:{rule.id}", "label": f"Rule: {rule.title}", } ) for game in plan.games.order_by("created_at"): options.append( { "value": f"game:{game.id}", "label": f"Game: {game.title}", } ) for correction in plan.corrections.order_by("created_at"): options.append( { "value": f"correction:{correction.id}", "label": f"Correction: {correction.title}", } ) return options def _normalize_correction_title(value, fallback="Correction"): cleaned = re.sub(r"\s+", " ", str(value or "").strip()) cleaned = cleaned.strip("\"'` ") if not cleaned: return fallback # Capitalize each lexical token for consistent correction naming. words = [] for token in cleaned.split(" "): if not token: continue if token.isupper() and len(token) <= 4: words.append(token) else: words.append(token[:1].upper() + token[1:]) return " ".join(words) def _build_engage_payload( source_obj, source_kind, share_target, framing, context_note, owner_name, recipient_name, ): share_key = (share_target or "self").strip().lower() framing_key = (framing or "dont_change").strip().lower() if share_key not in {"self", "other", "both"}: share_key = "self" if framing_key != "shared": framing_key = "dont_change" artifact_type_label = { "rule": "Rule", "game": "Game", "correction": "Correction", }.get(source_kind, (source_kind or "Artifact").title()) artifact_name_raw = ( getattr(source_obj, "title", None) or f"{artifact_type_label} Item" ).strip() artifact_name = ( _normalize_correction_title( artifact_name_raw, fallback=f"{artifact_type_label} Item" ) if source_kind == "correction" else artifact_name_raw ) if source_kind == "rule": insight_text = source_obj.content.strip() or source_obj.title.strip() elif source_kind == "game": insight_text = source_obj.instructions.strip() or source_obj.title.strip() else: insight_text = source_obj.clarification.strip() or source_obj.title.strip() owner_label = (owner_name or "You").strip() recipient_label = (recipient_name or "Other").strip() def _clean_text(value): cleaned = re.sub(r"\s+", " ", (value or "").strip()) cleaned = cleaned.strip("\"' ") cleaned = re.sub(r"^\s*#{1,6}\s*", "", cleaned) cleaned = re.sub(r"\*\*(.*?)\*\*", r"\1", cleaned) cleaned = re.sub(r"__(.*?)__", r"\1", cleaned) cleaned = re.sub(r"`(.*?)`", r"\1", cleaned) cleaned = re.sub(r"^[\-*•]\s*", "", cleaned) cleaned = re.sub(r"^\d+[.)]\s*", "", cleaned) return cleaned.strip() def _split_sentences(value): parts = [] for line in (value or "").splitlines(): line = _clean_text(line) if not line: continue for piece in re.split(r"(?<=[.!?;])\s+", line.strip()): piece = piece.strip() if piece: parts.append(piece) return parts def _expand_shorthand_tokens(value): text = value or "" alias_map = {} for name in [owner_label, recipient_label]: lowered = name.lower() if lowered in {"you", "we", "us", "our", "i", "me", "other"}: continue initial = lowered[:1] if initial and initial not in alias_map: alias_map[initial] = name # Expand any known initial shorthand before modal verbs. for initial, name in alias_map.items(): text = re.sub( rf"(?i)\b{re.escape(initial)}\s+(?=(?:should|will|must|can|need to|needs to|have to|has to|am|are|is|was|were)\b)", f"{name} ", text, ) def replace_quoted_marker(match): marker = match.group(1) or "" lower_marker = marker.strip().lower() replacement = marker if lower_marker in alias_map: replacement = alias_map[lower_marker] elif lower_marker in {"you", "we", "i"}: replacement = "both parties" return f"('{replacement}')" text = re.sub( r"\(\s*['\"]([A-Za-z]{1,8})['\"]\s*\)", replace_quoted_marker, text, ) return text def _fix_shared_grammar(value): text = value replacements = [ (r"(?i)\bwe needs to\b", "we need to"), (r"(?i)\bwe has to\b", "we have to"), (r"(?i)\bwe is\b", "we are"), (r"(?i)\bwe was\b", "we were"), (r"(?i)\bus needs to\b", "we need to"), (r"(?i)\bus need to\b", "we need to"), (r"(?i)\bus is\b", "we are"), (r"(?i)\bus are\b", "we are"), (r"(?i)\bwe does not\b", "we do not"), (r"(?i)\bwe doesn't\b", "we don't"), (r"(?i)\bwe says\b", "we say"), (r"(?i)\bwe responds\b", "we respond"), (r"(?i)\bwe follows\b", "we follow"), ] for pattern, replacement in replacements: text = re.sub(pattern, replacement, text) return re.sub(r"\s+", " ", text).strip() def _rewrite_shared_sentence(sentence): text = _clean_text(sentence) if not text: return "" punctuation = "." if text[-1] in ".!?": punctuation = text[-1] text = text[:-1].strip() text = _clean_text(text) if not text: return "" # Shared-only edge-case logic. text = _expand_shorthand_tokens(text) shared_replacements = [ (r"\bi['’]m\b", "we're"), (r"\bi['’]ve\b", "we've"), (r"\bi['’]ll\b", "we'll"), (r"\bi['’]d\b", "we'd"), (r"\byou['’]re\b", "we're"), (r"\byou['’]ve\b", "we've"), (r"\byou['’]ll\b", "we'll"), (r"\byou['’]d\b", "we'd"), (r"\bmy\b", "our"), (r"\bmine\b", "ours"), (r"\bmyself\b", "ourselves"), (r"\byour\b", "our"), (r"\byours\b", "ours"), (r"\byourself\b", "ourselves"), (r"\byourselves\b", "ourselves"), (r"\bi\b", "we"), (r"\bme\b", "us"), (r"\byou\b", "we"), (r"\bhis\b", "our"), (r"\bher\b", "our"), (r"\btheir\b", "our"), (r"\bhim\b", "us"), (r"\bthem\b", "us"), (r"\bhe\b", "we"), (r"\bshe\b", "we"), (r"\bthey\b", "we"), ] for pattern, replacement in shared_replacements: text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) for name in [owner_label, recipient_label]: name = str(name or "").strip() if not name: continue if name.lower() in {"you", "we", "us", "our", "i", "me", "other"}: continue text = re.sub( r"(? 1 else text.lower() text = f"We should {lowered}" text = text[:1].upper() + text[1:] return f"{text}{punctuation}" def _rewrite_shared_text(value): sentences = _split_sentences(value) if not sentences: return "" adapted = [_rewrite_shared_sentence(sentence) for sentence in sentences] adapted = [part for part in adapted if part] return " ".join(adapted).strip() preview_lines = [] outbound_lines = [] def _format_artifact_message(lines): lines = [ f"**{artifact_name}** ({artifact_type_label})", "", "Guidance:", ] + [line.strip() for line in (lines or []) if (line or "").strip()] if lines[-1] == "Guidance:": lines.append("No guidance text available.") return "\n".join(lines).strip() if framing_key == "shared": shared_line = _rewrite_shared_text(insight_text) preview_lines = [shared_line] outbound_lines = [shared_line] else: unchanged = _clean_text(insight_text) or (insight_text or "").strip() preview_lines = [unchanged] outbound_lines = [unchanged] preview = _format_artifact_message(preview_lines) outbound = _format_artifact_message(outbound_lines) # Context note is metadata for the operator and should not alter shared outbound text. _ = (context_note or "").strip() return { "preview": preview, "outbound": outbound, "share_target": share_key, "framing": framing_key, } def _get_or_create_auto_settings(user, conversation): settings_obj, _ = PatternMitigationAutoSettings.objects.get_or_create( user=user, conversation=conversation, ) return settings_obj def _metric_guided_artifact_candidates(plan, metric_context): signals = list((metric_context or {}).get("risk_signals") or []) if not signals: return [] artifacts = [] for rule in plan.rules.filter(enabled=True).order_by("created_at")[:10]: artifacts.append( { "kind": "rule", "title": str(rule.title or "").strip(), "body": str(rule.content or "").strip(), } ) for game in plan.games.filter(enabled=True).order_by("created_at")[:10]: artifacts.append( { "kind": "game", "title": str(game.title or "").strip(), "body": str(game.instructions or "").strip(), } ) if not artifacts: for item in (plan.fundamental_items or [])[:10]: text = str(item or "").strip() if not text: continue artifacts.append( { "kind": "fundamental", "title": text[:100], "body": text, } ) if not artifacts: return [] out = [] for idx, signal in enumerate(signals[:8]): artifact = artifacts[idx % len(artifacts)] kind_label = { "rule": "Rule", "game": "Game", "fundamental": "Fundamental", }.get(artifact["kind"], "Artifact") title = _normalize_correction_title( f"{signal.get('label') or 'Metric Signal'} Safeguard" ) clarification = ( f"{str(signal.get('explanation') or '').strip()} " f"Apply {kind_label.lower()} '{artifact['title']}' in the next exchange: " f"{artifact['body']}" ).strip() source_phrase = ( f"Metric signal: {signal.get('label') or 'Metric Signal'}; " f"Artifact: {kind_label} '{artifact['title']}'" ) out.append( { "title": title, "source_phrase": source_phrase[:1000], "clarification": clarification[:2000], "severity": str(signal.get("severity") or "medium"), } ) return _normalize_violation_items(out) def _detect_violation_candidates(plan, recent_rows): candidates = [] for row in recent_rows: text = (row.get("text") or "").strip() if not text: continue upper_ratio = ( ( sum(1 for c in text if c.isupper()) / max(1, sum(1 for c in text if c.isalpha())) ) if any(c.isalpha() for c in text) else 0 ) if upper_ratio > 0.6 and len(text) > 10: candidates.append( { "title": "Escalated tone spike", "source_phrase": text[:500], "clarification": "Rephrase into one direct request and one feeling statement.", "severity": "medium", } ) lowered = text.lower() if "you always" in lowered or "you never" in lowered: candidates.append( { "title": "Absolute framing", "source_phrase": text[:500], "clarification": "Replace absolutes with one concrete example and a bounded request.", "severity": "medium", } ) return candidates def _normalize_violation_items(raw_items): normalized = [] seen = set() for item in raw_items or []: title = _normalize_correction_title( item.get("title") or "", fallback="Correction" ) phrase = str(item.get("source_phrase") or "").strip() clarification = str( item.get("clarification") or item.get("correction") or "" ).strip() severity = str(item.get("severity") or "medium").strip().lower() if severity not in {"low", "medium", "high"}: severity = "medium" if not title or not clarification: continue key = _correction_signature(title, clarification) if key in seen: continue seen.add(key) normalized.append( { "title": title[:255], "source_phrase": phrase[:1000], "clarification": clarification[:2000], "severity": severity, } ) return normalized def _normalize_correction_text(value): cleaned = re.sub(r"\s+", " ", str(value or "").strip()) cleaned = cleaned.strip("\"'` ") cleaned = cleaned.rstrip(" .;:") return cleaned def _correction_signature(title, clarification): """ Normalized key used to deduplicate corrections before persisting. Source phrase is intentionally excluded so the same correction guidance cannot be stored repeatedly with minor phrase variations. """ normalized_title = _normalize_correction_text(title).lower() normalized_clarification = _normalize_correction_text(clarification).lower() return (normalized_title, normalized_clarification) def _existing_correction_signatures(plan, exclude_id=None): query = plan.corrections.all() if exclude_id is not None: query = query.exclude(id=exclude_id) signatures = set() for row in query.values("title", "clarification"): signatures.add( _correction_signature( row.get("title") or "", row.get("clarification") or "", ) ) return signatures def _ai_detect_violations(user, plan, person, recent_rows, metric_context=None): ai_obj = AI.objects.filter(user=user).first() if ai_obj is None: return {"violations": [], "artifact_corrections": []} rules_payload = [ {"id": str(rule.id), "title": rule.title, "content": rule.content} for rule in plan.rules.filter(enabled=True).order_by("created_at")[:30] ] games_payload = [ {"id": str(game.id), "title": game.title, "instructions": game.instructions} for game in plan.games.filter(enabled=True).order_by("created_at")[:30] ] corrections_payload = [ { "id": str(correction.id), "title": correction.title, "source_phrase": correction.source_phrase, "clarification": correction.clarification, } for correction in plan.corrections.filter(enabled=True).order_by("created_at")[ :30 ] ] source_payload = { "person": person.name, "plan": { "id": str(plan.id), "title": plan.title, "objective": plan.objective, "fundamentals": plan.fundamental_items or [], "rules": rules_payload, "games": games_payload, "corrections": corrections_payload, }, "metrics": metric_context or {}, "recent_messages": recent_rows, "output_schema": { "violations": [ { "title": "short string", "source_phrase": "exact snippet from recent_messages", "clarification": "correction-style guidance", "severity": "low|medium|high", } ], "artifact_corrections": [ { "title": "short string", "source_phrase": "artifact reference + metric rationale", "clarification": "proactive correction mapped to an artifact", "severity": "low|medium|high", } ], }, } prompt = [ { "role": "system", "content": ( "You detect violations of mitigation patterns in a conversation. " "Use recent_messages for direct violations. " "Use plan artifacts plus metrics for proactive artifact_corrections. " "Return strict JSON only. No markdown. No prose wrapper. " "Use only schema keys requested." ), }, { "role": "user", "content": json.dumps(source_payload, ensure_ascii=False, default=str), }, ] try: raw = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj) except Exception: return {"violations": [], "artifact_corrections": []} parsed = _extract_json_object(raw) or {} return { "violations": _normalize_violation_items(parsed.get("violations") or []), "artifact_corrections": _normalize_violation_items( parsed.get("artifact_corrections") or parsed.get("artifact_based_corrections") or [] ), } def _maybe_send_auto_notification(user, auto_settings, title, body): topic_override = (auto_settings.ntfy_topic_override or "").strip() if topic_override: raw_sendmsg( body, title=title, url=(auto_settings.ntfy_url_override or None), topic=topic_override, ) return user.sendmsg(body, title=title) def _run_auto_analysis_for_plan( user, person, conversation, plan, auto_settings, trigger="manual" ): if not auto_settings.enabled: return { "ran": False, "summary": "Automation is disabled.", "violations": [], "created_corrections": 0, "notified": False, } if trigger == "auto" and not auto_settings.auto_pattern_recognition: return { "ran": False, "summary": "Automatic pattern recognition is disabled.", "violations": [], "created_corrections": 0, "notified": False, } now = dj_timezone.now() if ( trigger == "auto" and auto_settings.last_run_at and auto_settings.check_cooldown_seconds ): elapsed = (now - auto_settings.last_run_at).total_seconds() if elapsed < auto_settings.check_cooldown_seconds: return { "ran": False, "summary": "Skipped: cooldown active.", "violations": [], "created_corrections": 0, "notified": False, } limit = max(10, min(int(auto_settings.sample_message_window or 40), 200)) _refresh_conversation_stability(conversation, user, person) metric_context = _metric_pattern_context(conversation) sessions = ChatSession.objects.filter(user=user, identifier__person=person) messages = ( Message.objects.filter(user=user, session__in=sessions) .order_by("-ts") .values("id", "ts", "sender_uuid", "text")[:limit] ) recent_rows = [] for row in reversed(list(messages)): recent_rows.append( { "id": str(row["id"]), "ts": row["ts"], "sender_uuid": row["sender_uuid"] or "", "text": row["text"] or "", } ) if not recent_rows: auto_settings.last_result_summary = ( "No recent messages available for automation." ) auto_settings.last_run_at = now auto_settings.save( update_fields=["last_result_summary", "last_run_at", "updated_at"] ) return { "ran": True, "summary": auto_settings.last_result_summary, "violations": [], "created_corrections": 0, "notified": False, } latest_message_ts = recent_rows[-1]["ts"] if ( trigger == "auto" and auto_settings.last_checked_event_ts and latest_message_ts <= auto_settings.last_checked_event_ts ): return { "ran": False, "summary": "Skipped: no new messages since last check.", "violations": [], "created_corrections": 0, "notified": False, } ai_detection = _ai_detect_violations( user, plan, person, recent_rows, metric_context=metric_context, ) ai_candidates = list(ai_detection.get("violations") or []) artifact_candidates_ai = list(ai_detection.get("artifact_corrections") or []) heuristic_candidates = _detect_violation_candidates(plan, recent_rows) artifact_candidates_metric = _metric_guided_artifact_candidates( plan, metric_context ) violations = _normalize_violation_items( ai_candidates + heuristic_candidates + artifact_candidates_ai + artifact_candidates_metric ) created_corrections = 0 if auto_settings.auto_create_corrections and violations: existing_signatures = _existing_correction_signatures(plan) for item in violations[:8]: signature = _correction_signature(item["title"], item["clarification"]) if signature in existing_signatures: continue PatternMitigationCorrection.objects.create( user=user, plan=plan, title=item["title"], source_phrase=item["source_phrase"], clarification=item["clarification"], perspective="second_person", share_target="both", language_style="adapted", enabled=True, ) existing_signatures.add(signature) created_corrections += 1 notified = False if auto_settings.auto_notify_enabled and violations: title = f"[GIA] Auto pattern alerts for {person.name}" preview = "\n".join( [f"- {item['title']}: {item['clarification']}" for item in violations[:3]] ) body = ( f"Detected {len(violations)} potential mitigation violations.\n" f"Created corrections: {created_corrections}\n\n" f"{preview}" ) _maybe_send_auto_notification(user, auto_settings, title, body) notified = True summary = ( f"Auto analysis ran on {len(recent_rows)} messages. " f"Detected {len(violations)} candidates. " f"Created {created_corrections} corrections." ) auto_settings.last_result_summary = summary auto_settings.last_run_at = now auto_settings.last_checked_event_ts = latest_message_ts auto_settings.save( update_fields=[ "last_result_summary", "last_run_at", "last_checked_event_ts", "updated_at", ] ) return { "ran": True, "summary": summary, "violations": violations, "created_corrections": created_corrections, "notified": notified, } def _create_baseline_mitigation_plan(user, person, conversation, source_text=""): artifacts = _default_artifacts_from_patterns( source_text or f"{person.name} baseline mitigation", person, output_profile="framework", ) plan = PatternMitigationPlan.objects.create( user=user, conversation=conversation, source_ai_result=None, title=artifacts.get("title") or f"{person.name} Pattern Mitigation", objective=artifacts.get("objective") or "", fundamental_items=artifacts.get("fundamental_items") or [], creation_mode="auto", status="draft", ) for rule in artifacts.get("rules", []): PatternMitigationRule.objects.create( user=user, plan=plan, title=str(rule.get("title") or "Rule").strip()[:255], content=str(rule.get("content") or "").strip(), ) for game in artifacts.get("games", []): PatternMitigationGame.objects.create( user=user, plan=plan, title=str(game.get("title") or "Game").strip()[:255], instructions=str(game.get("instructions") or "").strip(), ) PatternMitigationMessage.objects.create( user=user, plan=plan, role="system", text="Baseline plan auto-created by automation settings.", ) return plan def _mitigation_panel_context( person, plan, notice_message="", notice_level="info", export_record=None, engage_preview="", engage_preview_flash=False, engage_form=None, active_tab="plan_board", auto_settings=None, ): engage_form = engage_form or {} engage_options = _engage_source_options(plan) selected_ref = engage_form.get("source_ref") or ( engage_options[0]["value"] if engage_options else "" ) send_target_bundle = _send_target_options_for_person(plan.user, person) selected_target_id = str(engage_form.get("target_identifier_id") or "").strip() if selected_target_id and not any( item["id"] == selected_target_id for item in send_target_bundle["options"] ): selected_target_id = "" if not selected_target_id: selected_target_id = send_target_bundle["selected_id"] auto_settings = auto_settings or _get_or_create_auto_settings( plan.user, plan.conversation ) return { "person": person, "plan": plan, "plan_status_choices": PatternMitigationPlan.STATUS_CHOICES, "plan_creation_mode_choices": PatternMitigationPlan.CREATION_MODE_CHOICES, "rules": plan.rules.order_by("created_at"), "games": plan.games.order_by("created_at"), "corrections": plan.corrections.order_by("created_at"), "fundamentals_text": "\n".join(plan.fundamental_items or []), "mitigation_messages": plan.messages.order_by("created_at")[:40], "latest_export": export_record, "artifact_type_choices": PatternArtifactExport.ARTIFACT_TYPE_CHOICES, "artifact_format_choices": PatternArtifactExport.FORMAT_CHOICES, "notice_message": notice_message, "notice_level": notice_level, "engage_preview": engage_preview, "engage_preview_flash": engage_preview_flash, "engage_options": engage_options, "engage_form": { "source_ref": selected_ref, "share_target": engage_form.get("share_target") or "self", "framing": engage_form.get("framing") or "dont_change", "context_note": engage_form.get("context_note") or "", "target_identifier_id": selected_target_id, }, "send_target_bundle": send_target_bundle, "send_state": _get_send_state(plan.user, person), "active_tab": _sanitize_active_tab(active_tab), "auto_settings": auto_settings, } def _latest_plan_bundle(conversation): latest_plan = conversation.mitigation_plans.order_by("-updated_at").first() latest_plan_rules = latest_plan.rules.order_by("created_at") if latest_plan else [] latest_plan_games = latest_plan.games.order_by("created_at") if latest_plan else [] latest_plan_corrections = ( latest_plan.corrections.order_by("created_at") if latest_plan else [] ) latest_plan_messages = ( latest_plan.messages.order_by("created_at")[:40] if latest_plan else [] ) latest_plan_export = ( latest_plan.exports.order_by("-created_at").first() if latest_plan else None ) latest_auto_settings = _get_or_create_auto_settings(conversation.user, conversation) return { "latest_plan": latest_plan, "latest_plan_rules": latest_plan_rules, "latest_plan_games": latest_plan_games, "latest_plan_corrections": latest_plan_corrections, "latest_plan_messages": latest_plan_messages, "latest_plan_export": latest_plan_export, "latest_auto_settings": latest_auto_settings, } def _render_send_status(request, ok, message, level): return render( request, "partials/ai-workspace-send-status.html", {"ok": ok, "message": message, "level": level}, ) def _render_mitigation_status(request, person, message, level="info"): return render( request, "partials/ai-workspace-mitigation-status.html", {"person": person, "level": level, "message": message}, ) def _render_mitigation_panel(request, person, plan, **kwargs): return render( request, "partials/ai-workspace-mitigation-panel.html", _mitigation_panel_context(person=person, plan=plan, **kwargs), ) def _workspace_nav_urls(person): return { "graphs_url": reverse( "ai_workspace_insight_graphs", kwargs={"type": "page", "person_id": person.id}, ), "information_url": reverse( "ai_workspace_information", kwargs={"type": "page", "person_id": person.id}, ), "help_url": reverse( "ai_workspace_insight_help", kwargs={"type": "page", "person_id": person.id}, ), "workspace_url": f"{reverse('ai_workspace')}?person={person.id}", } class AIWorkspace(LoginRequiredMixin, View): template_name = "pages/ai-workspace.html" def get(self, request): selected_person_id = "" raw_person = str(request.GET.get("person") or "").strip() if raw_person: person = Person.objects.filter(id=raw_person, user=request.user).first() if person: selected_person_id = str(person.id) return render( request, self.template_name, {"selected_person_id": selected_person_id}, ) class AIWorkspaceContactsWidget(LoginRequiredMixin, View): allowed_types = {"widget"} def _contact_rows(self, user): rows = [] people = Person.objects.filter(user=user).order_by("name") for person in people: sessions = ChatSession.objects.filter(user=user, identifier__person=person) message_qs = Message.objects.filter(user=user, session__in=sessions) last_message = message_qs.order_by("-ts").first() rows.append( { "person": person, "message_count": message_qs.count(), "last_text": (last_message.text or "")[:120] if last_message else "", "last_ts": last_message.ts if last_message else None, "last_ts_label": _format_unix_ms(last_message.ts) if last_message else "", } ) rows.sort(key=lambda row: row["last_ts"] or 0, reverse=True) return rows def get(self, request, type): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") context = { "title": "AI Workspace", "unique": "ai-workspace-contacts", "window_content": "partials/ai-workspace-widget.html", "widget_options": 'gs-w="4" gs-h="14" gs-x="0" gs-y="0" gs-min-w="3"', "contact_rows": self._contact_rows(request.user), "window_form": AIWorkspaceWindowForm(request.GET or None), } return render(request, "mixins/wm/widget.html", context) class AIWorkspacePersonWidget(LoginRequiredMixin, View): allowed_types = {"widget"} def get(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) conversation = _conversation_for_person(request.user, person) try: limit = int(request.GET.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(5, min(limit, 200)) context = { "title": f"{person.name} AI", "unique": f"ai-person-{person.id}", "window_content": "partials/ai-workspace-person-widget.html", "widget_options": 'gs-w="8" gs-h="11" gs-x="4" gs-y="0" gs-min-w="4"', "person": person, "workspace_conversation": conversation, "participant_feedback_display": _participant_feedback_display( conversation, person ), "limit": limit, "ai_operations": [ ("artifacts", "Plan"), ("summarise", "Summary"), ("draft_reply", "Draft"), ("extract_patterns", "Patterns"), ], "send_state": _get_send_state(request.user, person), "compose_page_url": _compose_page_url_for_person(request.user, person), "compose_page_base_url": reverse("compose_page"), "compose_widget_url": _compose_widget_url_for_person( request.user, person, limit=limit, ), "compose_widget_base_url": reverse("compose_widget"), "manual_icon_class": "fa-solid fa-paper-plane", "send_target_bundle": _send_target_options_for_person(request.user, person), } return render(request, "mixins/wm/widget.html", context) class AIWorkspacePersonTimelineWidget(LoginRequiredMixin, View): allowed_types = {"widget"} def get(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) try: limit = int(request.GET.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(5, min(limit, 200)) context = { "title": f"{person.name} Timeline", "unique": f"ai-timeline-{person.id}", "window_content": "partials/ai-workspace-person-timeline-widget.html", "widget_options": 'gs-w="8" gs-h="10" gs-x="4" gs-y="11" gs-min-w="4"', "person": person, "limit": limit, "message_rows": _message_rows_for_person(request.user, person, limit), } return render(request, "mixins/wm/widget.html", context) class AIWorkspaceInsightDetail(LoginRequiredMixin, View): allowed_types = {"page", "widget"} def get(self, request, type, person_id, metric): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") spec = INSIGHT_METRICS.get(metric) if spec is None: return HttpResponseBadRequest("Unknown insight metric") person = get_object_or_404(Person, pk=person_id, user=request.user) conversation = _conversation_for_person(request.user, person) latest_snapshot = conversation.metric_snapshots.first() value = _format_metric_value(conversation, metric, latest_snapshot) group = INSIGHT_GROUPS[spec["group"]] graph_applicable = _metric_supports_history(metric, spec) points = [] if graph_applicable: points = _history_points(conversation, spec["history_field"]) context = { "person": person, "workspace_conversation": conversation, "metric_slug": metric, "metric": spec, "metric_value": value, "metric_psychology_hint": _metric_psychological_read(metric, conversation), "metric_group": group, "graph_points": points, "graph_applicable": graph_applicable, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-insight-detail.html", context) class AIWorkspaceInsightGraphs(LoginRequiredMixin, View): allowed_types = {"page", "widget"} def get(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) conversation = _conversation_for_person(request.user, person) graph_cards = _all_graph_payload(conversation) context = { "person": person, "workspace_conversation": conversation, "graph_cards": graph_cards, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-insight-graphs.html", context) class AIWorkspaceInformation(LoginRequiredMixin, View): allowed_types = {"page", "widget"} def get(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) conversation = _conversation_for_person(request.user, person) latest_snapshot = conversation.metric_snapshots.first() directionality = _commitment_directionality_payload(conversation) commitment_graph_cards = [ card for card in _all_graph_payload(conversation) if card["group"] == "commitment" ] graph_refs = [] for ref in directionality.get("graph_refs", []): slug = ref.get("slug") if not slug: continue graph_refs.append( { **ref, "slug": slug, "value": _format_metric_value(conversation, slug, latest_snapshot), } ) directionality["graph_refs"] = graph_refs context = { "person": person, "workspace_conversation": conversation, "directionality": directionality, "overview_rows": _information_overview_rows(conversation), "commitment_graph_cards": commitment_graph_cards, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-information.html", context) class AIWorkspaceInsightHelp(LoginRequiredMixin, View): allowed_types = {"page", "widget"} def get(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) conversation = _conversation_for_person(request.user, person) latest_snapshot = conversation.metric_snapshots.first() metrics = [] for slug, spec in INSIGHT_METRICS.items(): metrics.append( { "slug": slug, "title": spec["title"], "group": spec["group"], "group_title": INSIGHT_GROUPS[spec["group"]]["title"], "calculation": spec["calculation"], "psychology": spec["psychology"], "value": _format_metric_value( conversation, slug, latest_snapshot, ), } ) context = { "person": person, "workspace_conversation": conversation, "groups": INSIGHT_GROUPS, "metrics": metrics, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-insight-help.html", context) class AIWorkspaceRunOperation(LoginRequiredMixin, View): allowed_types = {"widget"} allowed_operations = {"artifacts", "summarise", "draft_reply", "extract_patterns"} def _empty_plan_bundle(self): return { "latest_plan": None, "latest_plan_rules": [], "latest_plan_games": [], "latest_plan_corrections": [], "latest_plan_messages": [], "latest_plan_export": None, } def _render_result( self, request, operation, person, send_state, send_target_bundle, **extra, ): return render( request, "partials/ai-workspace-ai-result.html", { "operation_label": OPERATION_LABELS.get( operation, operation.replace("_", " ").title() ), "operation": operation, "person": person, "send_state": send_state, "send_target_bundle": send_target_bundle, **extra, }, ) def _ensure_message_events(self, user, conversation, person_identifiers, messages): """ Materialize workspace MessageEvent rows from legacy Message rows and return ordered event IDs for the selected window. """ event_ids = [] for message in messages: legacy_id = str(message.id) event = MessageEvent.objects.filter( user=user, conversation=conversation, raw_payload_ref__legacy_message_id=legacy_id, ).first() if event is None: event = MessageEvent.objects.create( user=user, conversation=conversation, source_system=message.session.identifier.service or "signal", ts=message.ts, direction=_infer_direction(message, person_identifiers), sender_uuid=message.sender_uuid or "", text=message.text or "", attachments=[], raw_payload_ref={"legacy_message_id": legacy_id}, ) else: # Keep event fields in sync if upstream message rows changed. update_fields = [] new_direction = _infer_direction(message, person_identifiers) if event.ts != message.ts: event.ts = message.ts update_fields.append("ts") new_source_system = message.session.identifier.service or "signal" if event.source_system != new_source_system: event.source_system = new_source_system update_fields.append("source_system") if event.direction != new_direction: event.direction = new_direction update_fields.append("direction") if event.sender_uuid != (message.sender_uuid or ""): event.sender_uuid = message.sender_uuid or "" update_fields.append("sender_uuid") if event.text != (message.text or ""): event.text = message.text or "" update_fields.append("text") if update_fields: event.save(update_fields=update_fields) event_ids.append(str(event.id)) return event_ids def _citation_rows(self, user, citation_ids): ids = [str(item) for item in (citation_ids or []) if item] if not ids: return [] events = MessageEvent.objects.filter(user=user, id__in=ids) by_id = {str(event.id): event for event in events} rows = [] for cid in ids: event = by_id.get(cid) if not event: rows.append( { "id": cid, "ts_label": "", "source_system": "", "direction": "", "text": "", } ) continue rows.append( { "id": cid, "ts_label": _format_unix_ms(event.ts), "source_system": event.source_system, "direction": event.direction, "text": (event.text or "").strip(), } ) return rows def _build_prompt(self, operation, owner_name, person, transcript, user_notes): notes = (user_notes or "").strip() if operation == "draft_reply": instruction = ( "Generate 3 concise reply options in different tones: soft, neutral, firm. " "Return plain text with clear section labels." ) elif operation == "extract_patterns": instruction = ( "Extract recurring interaction patterns, friction loops, and practical next-step rules. " "Keep it actionable and concise." ) else: instruction = "Summarize this conversation window with key points, emotional state shifts, and open loops." prompt = [ { "role": "system", "content": ( f"{instruction} " "Use participant names directly. " "Do not refer to either side as 'the user'." ), }, { "role": "user", "content": ( f"Owner: {owner_name}\n" f"Person: {person.name}\n" f"Notes: {notes or 'None'}\n\n" f"Conversation:\n{transcript}" ), }, ] return prompt def get(self, request, type, person_id, operation): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") if operation not in self.allowed_operations: return HttpResponseBadRequest("Invalid operation specified") person = get_object_or_404(Person, pk=person_id, user=request.user) send_state = _get_send_state(request.user, person) send_target_bundle = _send_target_options_for_person(request.user, person) conversation = _conversation_for_person(request.user, person) if operation == "artifacts": auto_settings = _get_or_create_auto_settings(request.user, conversation) plan_bundle = _latest_plan_bundle(conversation) mitigation_notice_message = "" mitigation_notice_level = "info" if ( plan_bundle["latest_plan"] is None and auto_settings.enabled and auto_settings.auto_create_mitigation ): recent_messages = _recent_messages_for_person( request.user, person, max(20, min(auto_settings.sample_message_window, 200)), ) source_text = ( messages_to_string(recent_messages) if recent_messages else "" ) _create_baseline_mitigation_plan( user=request.user, person=person, conversation=conversation, source_text=source_text, ) plan_bundle = _latest_plan_bundle(conversation) mitigation_notice_message = "Baseline plan auto-created." mitigation_notice_level = "success" if ( plan_bundle["latest_plan"] is not None and auto_settings.enabled and auto_settings.auto_pattern_recognition ): auto_result = _run_auto_analysis_for_plan( user=request.user, person=person, conversation=conversation, plan=plan_bundle["latest_plan"], auto_settings=auto_settings, trigger="auto", ) if auto_result.get("ran"): mitigation_notice_message = auto_result["summary"] mitigation_notice_level = "info" if auto_result.get("created_corrections"): plan_bundle = _latest_plan_bundle(conversation) return self._render_result( request, operation, person, send_state, send_target_bundle, result_text="", result_sections=[], error=False, ai_result_id="", mitigation_notice_message=mitigation_notice_message, mitigation_notice_level=mitigation_notice_level, **plan_bundle, ) ai_obj = AI.objects.filter(user=request.user).first() if ai_obj is None: message = "No AI configured for this user yet." return self._render_result( request, operation, person, send_state, send_target_bundle, result_text=message, result_sections=_parse_result_sections(message), error=True, **self._empty_plan_bundle(), ) try: limit = int(request.GET.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(5, min(limit, 200)) user_notes = request.GET.get("user_notes", "") messages = _recent_messages_for_person(request.user, person, limit) owner_name = ( request.user.first_name or request.user.get_full_name().strip() or request.user.username or "Me" ) transcript = messages_to_string( messages, author_rewrites={ "USER": owner_name, "BOT": "Assistant", }, ) person_identifiers = set( PersonIdentifier.objects.filter( user=request.user, person=person, ).values_list("identifier", flat=True) ) if messages: conversation.last_event_ts = messages[-1].ts conversation.save(update_fields=["last_event_ts"]) message_event_ids = self._ensure_message_events( request.user, conversation, person_identifiers, messages, ) ai_request = AIRequest.objects.create( user=request.user, conversation=conversation, window_spec={"limit": limit}, message_ids=message_event_ids, user_notes=user_notes, operation=operation, policy_snapshot={"send_state": send_state}, status="running", started_at=dj_timezone.now(), ) try: prompt = self._build_prompt( operation=operation, owner_name=owner_name, person=person, transcript=transcript, user_notes=user_notes, ) result_text = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj) draft_options = ( _parse_draft_options(result_text) if operation == "draft_reply" else [] ) interaction_signals = _build_interaction_signals( operation, result_text, message_event_ids, ) memory_proposals = _build_memory_proposals(operation, result_text) ai_result = AIResult.objects.create( user=request.user, ai_request=ai_request, working_summary=result_text if operation != "draft_reply" else "", draft_replies=draft_options, interaction_signals=interaction_signals, memory_proposals=memory_proposals, citations=message_event_ids, ) first_event = None if message_event_ids: first_event = MessageEvent.objects.filter( id=message_event_ids[0], user=request.user, ).first() for signal in interaction_signals: AIResultSignal.objects.create( user=request.user, ai_result=ai_result, message_event=first_event, label=signal["label"][:128], valence=signal["valence"], score=None, rationale="Auto-tagged from operation output.", ) ai_request.status = "done" ai_request.finished_at = dj_timezone.now() ai_request.save(update_fields=["status", "finished_at"]) conversation.last_ai_run_at = dj_timezone.now() conversation.save(update_fields=["last_ai_run_at"]) plan_bundle = _latest_plan_bundle(conversation) return self._render_result( request, operation, person, send_state, send_target_bundle, result_text=result_text, result_sections=_parse_result_sections(result_text), draft_replies=ai_result.draft_replies, interaction_signals=ai_result.interaction_signals, memory_proposals=ai_result.memory_proposals, memory_proposal_groups=_group_memory_proposals( ai_result.memory_proposals ), citations=ai_result.citations, citation_rows=self._citation_rows(request.user, ai_result.citations), error=False, ai_result_id=str(ai_result.id), ai_result_created_at=ai_result.created_at, ai_request_status=ai_request.status, ai_request_started_at=ai_request.started_at, ai_request_finished_at=ai_request.finished_at, ai_request_window_spec=ai_request.window_spec, ai_request_window_tags=_window_spec_tags(ai_request.window_spec), ai_request_message_count=len(ai_request.message_ids or []), ai_request_policy_snapshot=ai_request.policy_snapshot, ai_request_policy_tags=_policy_snapshot_tags( ai_request.policy_snapshot ), **plan_bundle, ) except Exception as exc: ai_request.status = "failed" ai_request.error = str(exc) ai_request.finished_at = dj_timezone.now() ai_request.save(update_fields=["status", "error", "finished_at"]) error_text = str(exc) return self._render_result( request, operation, person, send_state, send_target_bundle, result_text=error_text, result_sections=_parse_result_sections(error_text), error=True, **self._empty_plan_bundle(), ) class AIWorkspaceSendDraft(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) send_state = _get_send_state(request.user, person) text = (request.POST.get("draft_text") or "").strip() force_send = _is_truthy(request.POST.get("force_send")) if not text: return _render_send_status(request, False, "Draft is empty.", "danger") if not send_state["can_send"] and not force_send: return _render_send_status( request, False, f"Send blocked. {send_state['text']}", "warning", ) identifier = _resolve_person_identifier_target( request.user, person, target_identifier_id=request.POST.get("target_identifier_id"), target_service=request.POST.get("target_service"), fallback_service=_preferred_service_for_person(request.user, person), ) if identifier is None: return _render_send_status( request, False, "No recipient identifier found.", "danger", ) try: ts = async_to_sync(identifier.send)(text) except Exception as exc: return _render_send_status(request, False, f"Send failed: {exc}", "danger") if ts is False or ts is None: return _render_send_status(request, False, "Send failed.", "danger") session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=identifier, ) sent_ts = ( int(ts) if (ts is not None and not isinstance(ts, bool)) else int(dj_timezone.now().timestamp() * 1000) ) Message.objects.create( user=request.user, session=session, custom_author="BOT", sender_uuid="", text=text, ts=sent_ts, delivered_ts=sent_ts, read_source_service=identifier.service, ) success_message = "Draft sent." if force_send and not send_state["can_send"]: success_message = "Draft sent with override." response = _render_send_status(request, True, success_message, "success") response["HX-Trigger"] = json.dumps( { "gia-message-sent": { "person_id": str(person.id), "ts": sent_ts, "text": text, "author": "BOT", } } ) return response class AIWorkspaceQueueDraft(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) text = (request.POST.get("draft_text") or "").strip() if not text: return _render_send_status( request, False, "Select a draft before queueing.", "warning", ) identifier = _resolve_person_identifier_target( request.user, person, target_identifier_id=request.POST.get("target_identifier_id"), target_service=request.POST.get("target_service"), fallback_service=_preferred_service_for_person(request.user, person), ) if identifier is None: return _render_send_status( request, False, "No recipient identifier found.", "danger", ) manipulation = _get_queue_manipulation(request.user, person) if manipulation is None: return _render_send_status( request, False, "No enabled manipulation found for this recipient. Queue entry not created.", "warning", ) session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=identifier, ) QueuedMessage.objects.create( user=request.user, session=session, manipulation=manipulation, ts=int(dj_timezone.now().timestamp() * 1000), sender_uuid="", text=text, custom_author="BOT", ) return _render_send_status(request, True, "Draft added to queue.", "success") class AIWorkspaceCreateMitigation(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) ai_result_id = (request.POST.get("ai_result_id") or "").strip() output_profile = (request.POST.get("output_profile") or "").strip() if output_profile not in {"framework", "rule", "rules", "game", "games"}: return _render_mitigation_status( request, person, "Choose one mitigation output type: framework, rules, or games.", level="warning", ) user_context = (request.POST.get("user_context") or "").strip() creation_mode = "guided" if user_context else "auto" seed_from_context = _extract_seed_entities_from_context(user_context) fundamentals = seed_from_context.get("fundamentals", []) source_result = None if ai_result_id: source_result = ( AIResult.objects.filter( id=ai_result_id, user=request.user, ) .select_related("ai_request", "ai_request__conversation") .first() ) conversation = ( source_result.ai_request.conversation if source_result is not None else _conversation_for_person(request.user, person) ) conversation.participants.add(person) _refresh_conversation_stability(conversation, request.user, person) metric_context = _metric_pattern_context(conversation) source_text = "" if source_result is not None: source_text = source_result.working_summary or "" if not source_text: source_text = (request.POST.get("source_text") or "").strip() ai_obj = AI.objects.filter(user=request.user).first() artifacts = _build_mitigation_artifacts( ai_obj=ai_obj, person=person, source_text=source_text, creation_mode=creation_mode, inspiration=user_context, fundamentals=fundamentals, output_profile=output_profile, metric_context=metric_context, ) # Deterministically seed from pasted context so long-form frameworks can # create fundamentals/rules/games in one pass, even when AI output is sparse. artifacts = _merge_seed_entities(artifacts, seed_from_context) plan = PatternMitigationPlan.objects.create( user=request.user, conversation=conversation, source_ai_result=source_result, title=artifacts.get("title") or f"{person.name} Pattern Mitigation", objective=artifacts.get("objective") or "", fundamental_items=artifacts.get("fundamental_items") or fundamentals, creation_mode=creation_mode, status="draft", ) for rule in artifacts.get("rules", []): PatternMitigationRule.objects.create( user=request.user, plan=plan, title=str(rule.get("title") or "Rule").strip()[:255], content=str(rule.get("content") or "").strip(), ) for game in artifacts.get("games", []): PatternMitigationGame.objects.create( user=request.user, plan=plan, title=str(game.get("title") or "Game").strip()[:255], instructions=str(game.get("instructions") or "").strip(), ) existing_signatures = set() for correction in artifacts.get("corrections", []): title = _normalize_correction_title( correction.get("title") or "", fallback="Correction" ) clarification = str(correction.get("clarification") or "").strip() source_phrase = str(correction.get("source_phrase") or "").strip() if not clarification: continue signature = _correction_signature(title, clarification) if signature in existing_signatures: continue PatternMitigationCorrection.objects.create( user=request.user, plan=plan, title=title[:255], clarification=clarification[:2000], source_phrase=source_phrase[:1000], perspective="second_person", share_target="both", language_style="adapted", enabled=True, ) existing_signatures.add(signature) PatternMitigationMessage.objects.create( user=request.user, plan=plan, role="system", text="Plan created. Use the tabs below to refine rules, games, fundamentals, corrections, and AI guidance.", ) return render( request, "partials/ai-workspace-mitigation-panel.html", _mitigation_panel_context( person=person, plan=plan, notice_message="Mitigation plan created.", notice_level="success", active_tab="plan_board", ), ) class AIWorkspaceMitigationChat(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id, plan_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404( PatternMitigationPlan, id=plan_id, user=request.user, ) text = (request.POST.get("message") or "").strip() active_tab = _sanitize_active_tab( request.POST.get("active_tab"), default="ask_ai" ) if not text: return render( request, "partials/ai-workspace-mitigation-panel.html", _mitigation_panel_context( person=person, plan=plan, notice_message="Message is empty.", notice_level="warning", active_tab=active_tab, ), ) PatternMitigationMessage.objects.create( user=request.user, plan=plan, role="user", text=text, ) ai_obj = AI.objects.filter(user=request.user).first() assistant_text = "" if ai_obj: rules_text = "\n".join( [f"- {r.title}: {r.content}" for r in plan.rules.order_by("created_at")] ) games_text = "\n".join( [ f"- {g.title}: {g.instructions}" for g in plan.games.order_by("created_at") ] ) corrections_text = "\n".join( [ f"- {c.title}: {c.clarification}" for c in plan.corrections.order_by("created_at") ] ) recent_msgs = plan.messages.order_by("-created_at")[:10] recent_msgs = list(reversed(list(recent_msgs))) transcript = "\n".join([f"{m.role.upper()}: {m.text}" for m in recent_msgs]) prompt = [ { "role": "system", "content": ( "You are refining a mitigation protocol. " "Give concise practical updates to rules/games/corrections and explain tradeoffs." ), }, { "role": "user", "content": ( f"Plan objective: {plan.objective}\n" f"Fundamentals: {json.dumps(plan.fundamental_items or [])}\n" f"Rules:\n{rules_text or '(none)'}\n\n" f"Games:\n{games_text or '(none)'}\n\n" f"Corrections:\n{corrections_text or '(none)'}\n\n" f"Conversation:\n{transcript}" ), }, ] try: assistant_text = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj) except Exception as exc: assistant_text = f"Failed to run AI refinement: {exc}" else: assistant_text = ( "No AI configured. Add an AI config to use mitigation chat." ) PatternMitigationMessage.objects.create( user=request.user, plan=plan, role="assistant", text=assistant_text, ) return render( request, "partials/ai-workspace-mitigation-panel.html", _mitigation_panel_context( person=person, plan=plan, active_tab=active_tab, ), ) class AIWorkspaceExportArtifact(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id, plan_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404( PatternMitigationPlan, id=plan_id, user=request.user, ) artifact_type = (request.POST.get("artifact_type") or "rulebook").strip() if artifact_type not in {"rulebook", "rules", "games", "corrections"}: artifact_type = "rulebook" export_format = (request.POST.get("export_format") or "markdown").strip() active_tab = _sanitize_active_tab( request.POST.get("active_tab"), default="ask_ai" ) if export_format not in {"markdown", "json", "text"}: export_format = "markdown" payload, meta = _serialize_export_payload(plan, artifact_type, export_format) export_record = PatternArtifactExport.objects.create( user=request.user, plan=plan, artifact_type=artifact_type, export_format=export_format, protocol_version="artifact-v1", payload=payload, meta=meta, ) return _render_mitigation_panel( request, person, plan, notice_message=f"Exported {artifact_type} ({export_format}).", notice_level="success", export_record=export_record, active_tab=active_tab, ) class AIWorkspaceCreateArtifact(LoginRequiredMixin, View): allowed_types = {"widget"} kind_map = { "rule": (PatternMitigationRule, "content", "Rule"), "game": (PatternMitigationGame, "instructions", "Game"), "correction": (PatternMitigationCorrection, "clarification", "Correction"), } def post(self, request, type, person_id, plan_id, kind): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) kind_key = (kind or "").strip().lower() if kind_key not in self.kind_map: return HttpResponseBadRequest("Invalid artifact kind") model, body_field, label = self.kind_map[kind_key] if kind_key == "correction": candidate_signature = _correction_signature(f"New {label}", "") if candidate_signature in _existing_correction_signatures(plan): tab = _sanitize_active_tab( request.POST.get("active_tab"), default="corrections" ) return _render_mitigation_panel( request, person, plan, notice_message="Duplicate correction skipped.", notice_level="warning", active_tab=tab, ) payload = { "user": request.user, "plan": plan, "title": f"New {label}", body_field: "", "enabled": True, } model.objects.create(**payload) tab = _sanitize_active_tab( request.POST.get("active_tab"), default=("corrections" if kind_key == "correction" else "plan_board"), ) return _render_mitigation_panel( request, person, plan, notice_message=f"{label} created.", notice_level="success", active_tab=tab, ) class AIWorkspaceUpdateArtifact(LoginRequiredMixin, View): allowed_types = {"widget"} kind_map = { "rule": (PatternMitigationRule, "content", "Rule"), "game": (PatternMitigationGame, "instructions", "Game"), "correction": (PatternMitigationCorrection, "clarification", "Correction"), } def post(self, request, type, person_id, plan_id, kind, artifact_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) kind_key = (kind or "").strip().lower() if kind_key not in self.kind_map: return HttpResponseBadRequest("Invalid artifact kind") model, body_field, label = self.kind_map[kind_key] artifact = get_object_or_404( model, id=artifact_id, user=request.user, plan=plan, ) title = (request.POST.get("title") or "").strip() or artifact.title body = (request.POST.get("body") or "").strip() enabled = _is_truthy(request.POST.get("enabled")) tab = _sanitize_active_tab( request.POST.get("active_tab"), default=("corrections" if kind_key == "correction" else "plan_board"), ) if kind_key == "correction": title = _normalize_correction_title( title, fallback=artifact.title or "Correction" ) candidate_signature = _correction_signature(title, body) if candidate_signature in _existing_correction_signatures( plan, exclude_id=artifact.id ): return _render_mitigation_panel( request, person, plan, notice_message="Duplicate correction not saved.", notice_level="warning", active_tab=tab, ) artifact.title = title[:255] setattr(artifact, body_field, body) artifact.enabled = enabled if kind_key == "correction": artifact.source_phrase = (request.POST.get("source_phrase") or "").strip() perspective = (request.POST.get("perspective") or "third_person").strip() if perspective in {"first_person", "second_person", "third_person"}: artifact.perspective = perspective share_target = (request.POST.get("share_target") or "both").strip() if share_target in {"self", "other", "both"}: artifact.share_target = share_target language_style = (request.POST.get("language_style") or "adapted").strip() if language_style in {"same", "adapted"}: artifact.language_style = language_style artifact.save() return _render_mitigation_panel( request, person, plan, notice_message=f"{label} saved.", notice_level="success", active_tab=tab, ) class AIWorkspaceDeleteArtifact(LoginRequiredMixin, View): allowed_types = {"widget"} kind_map = { "rule": (PatternMitigationRule, "Rule"), "game": (PatternMitigationGame, "Game"), "correction": (PatternMitigationCorrection, "Correction"), } def post(self, request, type, person_id, plan_id, kind, artifact_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) kind_key = (kind or "").strip().lower() if kind_key not in self.kind_map: return HttpResponseBadRequest("Invalid artifact kind") model, label = self.kind_map[kind_key] artifact = get_object_or_404( model, id=artifact_id, user=request.user, plan=plan, ) artifact.delete() tab = _sanitize_active_tab( request.POST.get("active_tab"), default=("corrections" if kind_key == "correction" else "plan_board"), ) return _render_mitigation_panel( request, person, plan, notice_message=f"{label} deleted.", notice_level="success", active_tab=tab, ) class AIWorkspaceDeleteArtifactList(LoginRequiredMixin, View): allowed_types = {"widget"} kind_map = { "rule": (PatternMitigationRule, "rules"), "game": (PatternMitigationGame, "games"), "correction": (PatternMitigationCorrection, "corrections"), } def post(self, request, type, person_id, plan_id, kind): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) kind_key = (kind or "").strip().lower() if kind_key not in self.kind_map: return HttpResponseBadRequest("Invalid artifact kind") model, label = self.kind_map[kind_key] rows = model.objects.filter(user=request.user, plan=plan) delete_count = rows.count() if delete_count: rows.delete() notice_message = f"Deleted {delete_count} {label}." notice_level = "success" else: notice_message = f"No {label} to delete." notice_level = "info" tab = _sanitize_active_tab( request.POST.get("active_tab"), default=("corrections" if kind_key == "correction" else "plan_board"), ) return _render_mitigation_panel( request, person, plan, notice_message=notice_message, notice_level=notice_level, active_tab=tab, ) class AIWorkspaceEngageShare(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id, plan_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) source_ref = (request.POST.get("source_ref") or "").strip() share_target = (request.POST.get("share_target") or "self").strip() framing = (request.POST.get("framing") or "dont_change").strip() context_note = (request.POST.get("context_note") or "").strip() action = (request.POST.get("action") or "preview").strip().lower() force_send = _is_truthy(request.POST.get("force_send")) engage_form = { "source_ref": source_ref, "share_target": share_target, "framing": framing, "context_note": context_note, "target_identifier_id": str( request.POST.get("target_identifier_id") or "" ).strip(), } active_tab = _sanitize_active_tab( request.POST.get("active_tab"), default="engage" ) if ":" not in source_ref: return _render_mitigation_panel( request, person, plan, notice_message="Select a source item to engage.", notice_level="warning", engage_form=engage_form, active_tab=active_tab, ) source_kind, source_id = source_ref.split(":", 1) source_kind = source_kind.strip().lower() source_id = source_id.strip() model_map = { "rule": PatternMitigationRule, "game": PatternMitigationGame, "correction": PatternMitigationCorrection, } if source_kind not in model_map: return _render_mitigation_panel( request, person, plan, notice_message="Invalid source type for engage.", notice_level="danger", engage_form=engage_form, active_tab=active_tab, ) source_obj = get_object_or_404( model_map[source_kind], id=source_id, user=request.user, plan=plan, ) payload = _build_engage_payload( source_obj=source_obj, source_kind=source_kind, share_target=share_target, framing=framing, context_note=context_note, owner_name=( request.user.first_name or request.user.get_full_name().strip() or request.user.username or "You" ), recipient_name=person.name or "Other", ) engage_preview = payload["preview"] outbound_text = payload["outbound"] share_target = payload["share_target"] if action == "preview": return _render_mitigation_panel( request, person, plan, engage_preview=engage_preview, engage_preview_flash=True, engage_form=engage_form, active_tab=active_tab, ) if action == "send": send_state = _get_send_state(request.user, person) if not send_state["can_send"] and not force_send: return _render_mitigation_panel( request, person, plan, notice_message=f"Send blocked. {send_state['text']}", notice_level="warning", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) identifier = _resolve_person_identifier_target( request.user, person, target_identifier_id=request.POST.get("target_identifier_id"), target_service=request.POST.get("target_service"), fallback_service=plan.conversation.platform_type, ) if identifier is None: return _render_mitigation_panel( request, person, plan, notice_message="No recipient identifier found.", notice_level="danger", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) try: ts = async_to_sync(identifier.send)(outbound_text) except Exception as exc: return _render_mitigation_panel( request, person, plan, notice_message=f"Send failed: {exc}", notice_level="danger", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) if ts is False or ts is None: return _render_mitigation_panel( request, person, plan, notice_message="Send failed.", notice_level="danger", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=identifier, ) sent_ts = ( int(ts) if (ts is not None and not isinstance(ts, bool)) else int(dj_timezone.now().timestamp() * 1000) ) Message.objects.create( user=request.user, session=session, custom_author="BOT", sender_uuid="", text=outbound_text, ts=sent_ts, delivered_ts=sent_ts, read_source_service=identifier.service, ) notice = "Shared via engage." if force_send and not send_state["can_send"]: notice = "Shared via engage with override." response = _render_mitigation_panel( request, person, plan, notice_message=notice, notice_level="success", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) response["HX-Trigger"] = json.dumps( { "gia-message-sent": { "person_id": str(person.id), "ts": sent_ts, "text": outbound_text, "author": "BOT", } } ) return response if action == "queue": identifier = _resolve_person_identifier_target( request.user, person, target_identifier_id=request.POST.get("target_identifier_id"), target_service=request.POST.get("target_service"), fallback_service=plan.conversation.platform_type, ) if identifier is None: return _render_mitigation_panel( request, person, plan, notice_message="No recipient identifier found.", notice_level="danger", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) manipulation = _get_queue_manipulation(request.user, person) if manipulation is None: return _render_mitigation_panel( request, person, plan, notice_message="No enabled manipulation found for this recipient. Queue entry not created.", notice_level="warning", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) session, _ = ChatSession.objects.get_or_create( user=request.user, identifier=identifier, ) QueuedMessage.objects.create( user=request.user, session=session, manipulation=manipulation, ts=int(dj_timezone.now().timestamp() * 1000), sender_uuid="", text=outbound_text, custom_author="BOT", ) return _render_mitigation_panel( request, person, plan, notice_message="Engage text added to queue.", notice_level="success", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) return _render_mitigation_panel( request, person, plan, notice_message="Unknown engage action.", notice_level="warning", engage_preview=engage_preview, engage_form=engage_form, active_tab=active_tab, ) class AIWorkspaceAutoSettings(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id, plan_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) auto_settings = _get_or_create_auto_settings(request.user, plan.conversation) auto_settings.enabled = _is_truthy(request.POST.get("enabled")) auto_settings.auto_pattern_recognition = _is_truthy( request.POST.get("auto_pattern_recognition") ) auto_settings.auto_create_mitigation = _is_truthy( request.POST.get("auto_create_mitigation") ) auto_settings.auto_create_corrections = _is_truthy( request.POST.get("auto_create_corrections") ) auto_settings.auto_notify_enabled = _is_truthy( request.POST.get("auto_notify_enabled") ) auto_settings.ntfy_topic_override = ( request.POST.get("ntfy_topic_override") or "" ).strip() or None auto_settings.ntfy_url_override = ( request.POST.get("ntfy_url_override") or "" ).strip() or None try: auto_settings.sample_message_window = max( 10, min(int(request.POST.get("sample_message_window") or 40), 200) ) except Exception: auto_settings.sample_message_window = 40 try: auto_settings.check_cooldown_seconds = max( 0, min(int(request.POST.get("check_cooldown_seconds") or 300), 86400) ) except Exception: auto_settings.check_cooldown_seconds = 300 auto_settings.save() action = (request.POST.get("action") or "save").strip().lower() if action == "run_now": result = _run_auto_analysis_for_plan( user=request.user, person=person, conversation=plan.conversation, plan=plan, auto_settings=auto_settings, trigger="manual", ) notice_message = result["summary"] notice_level = "success" if result.get("ran") else "info" else: notice_message = "Automation settings saved." notice_level = "success" return _render_mitigation_panel( request, person, plan, notice_message=notice_message, notice_level=notice_level, active_tab="auto", auto_settings=auto_settings, ) class AIWorkspaceUpdateFundamentals(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id, plan_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) fundamentals_text = request.POST.get("fundamentals_text") or "" active_tab = _sanitize_active_tab( request.POST.get("active_tab"), default="fundamentals" ) plan.fundamental_items = _parse_fundamentals(fundamentals_text) plan.save(update_fields=["fundamental_items", "updated_at"]) return _render_mitigation_panel( request, person, plan, notice_message="Fundamentals saved.", notice_level="success", active_tab=active_tab, ) class AIWorkspaceUpdatePlanMeta(LoginRequiredMixin, View): allowed_types = {"widget"} def post(self, request, type, person_id, plan_id): if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") person = get_object_or_404(Person, pk=person_id, user=request.user) plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user) active_tab = _sanitize_active_tab( request.POST.get("active_tab"), default="plan_board" ) status_value = (request.POST.get("status") or "").strip() creation_mode_value = (request.POST.get("creation_mode") or "").strip() title_value = (request.POST.get("title") or "").strip() objective_value = (request.POST.get("objective") or "").strip() valid_statuses = {key for key, _label in PatternMitigationPlan.STATUS_CHOICES} valid_modes = { key for key, _label in PatternMitigationPlan.CREATION_MODE_CHOICES } update_fields = ["updated_at"] if status_value in valid_statuses: if plan.status != status_value: plan.status = status_value update_fields.append("status") if creation_mode_value in valid_modes: if plan.creation_mode != creation_mode_value: plan.creation_mode = creation_mode_value update_fields.append("creation_mode") if plan.title != title_value: plan.title = title_value[:255] update_fields.append("title") if plan.objective != objective_value: plan.objective = objective_value update_fields.append("objective") if len(update_fields) > 1: plan.save(update_fields=update_fields) return _render_mitigation_panel( request, person, plan, notice_message="Plan metadata saved.", notice_level="success", active_tab=active_tab, )