Files
GIA/core/views/workspace.py

5191 lines
180 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
import statistics
from datetime import datetime, timezone
from urllib.parse import urlencode
from asgiref.sync import async_to_sync
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpResponseBadRequest
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone as dj_timezone
from django.views import View
from core.forms import AIWorkspaceWindowForm
from core.lib.notify import raw_sendmsg
from core.messaging import ai as ai_runner
from core.messaging.utils import messages_to_string
from core.models import (
AI,
AIRequest,
AIResult,
AIResultSignal,
ChatSession,
Manipulation,
Message,
MessageEvent,
PatternArtifactExport,
PatternMitigationAutoSettings,
PatternMitigationCorrection,
PatternMitigationGame,
PatternMitigationMessage,
PatternMitigationPlan,
PatternMitigationRule,
Person,
PersonIdentifier,
QueuedMessage,
WorkspaceConversation,
WorkspaceMetricSnapshot,
)
SEND_ENABLED_MODES = {"active", "instant"}
OPERATION_LABELS = {
"summarise": "Summarise",
"draft_reply": "Draft Reply",
"extract_patterns": "Extract Patterns",
"artifacts": "Plan",
}
MITIGATION_TABS = {
"plan_board",
"corrections",
"engage",
"fundamentals",
"ask_ai",
"auto",
}
INSIGHT_GROUPS = {
"identity": {
"title": "Conversation Identity",
"summary": (
"Describes where this workspace data comes from and which thread is "
"being analysed."
),
},
"stability": {
"title": "Stability Scoring",
"summary": (
"Stability combines reciprocity, continuity, response pace, and "
"message-volatility regularity into a 0-100 score."
),
},
"confidence": {
"title": "Sample And Confidence",
"summary": (
"Confidence scales with data volume, day coverage, and observed "
"response pairs so sparse windows do not overclaim certainty."
),
},
"commitment": {
"title": "Commitment Directionality",
"summary": (
"Commitment estimates effort in each direction by combining response "
"speed with participation balance."
),
},
"timeline": {
"title": "Recency And Cadence",
"summary": (
"Tracks when messages and AI analysis most recently occurred to "
"separate stale from current assessments."
),
},
}
INSIGHT_METRICS = {
"platform": {
"title": "Platform",
"group": "identity",
"history_field": None,
"calculation": (
"Updated to the service of the latest message in the sampled history "
"window."
),
"psychology": (
"Platform shifts can indicate context switches in how the relationship "
"is maintained across channels."
),
},
"thread": {
"title": "Thread",
"group": "identity",
"history_field": None,
"calculation": (
"Primary thread identifier used to anchor this workspace conversation."
),
"psychology": (
"A stable thread usually implies continuity; frequent thread resets can "
"reflect fragmentation."
),
},
"workspace_created": {
"title": "Workspace Created",
"group": "identity",
"history_field": None,
"calculation": "Creation timestamp of this workspace conversation record.",
"psychology": (
"Older workspaces allow stronger trend reading because they include "
"longer temporal context."
),
},
"stability_state": {
"title": "Stability State",
"group": "stability",
"history_field": None,
"calculation": (
"Derived from stability score bands: Stable >= 70, Watch >= 50, "
"Fragile < 50, or Calibrating if data is insufficient."
),
"psychology": (
"Use state as a risk band, not a diagnosis. It indicates likely "
"interaction friction level."
),
},
"stability_score": {
"title": "Stability Score",
"group": "stability",
"history_field": "stability_score",
"calculation": (
"0.35*reciprocity + 0.25*continuity + 0.20*response + " "0.20*volatility."
),
"psychology": (
"Higher values suggest consistent mutual engagement patterns; falling "
"values often precede misunderstandings or withdrawal cycles."
),
},
"reciprocity_score": {
"title": "Reciprocity Component",
"group": "stability",
"history_field": "reciprocity_score",
"calculation": (
"100 * (1 - |inbound - outbound| / total_messages). Higher means "
"more balanced participation."
),
"psychology": (
"Lower reciprocity can reflect perceived asymmetry and rising pursuit/"
"withdraw cycles."
),
},
"continuity_score": {
"title": "Continuity Component",
"group": "stability",
"history_field": "continuity_score",
"calculation": (
"100 * min(1, distinct_sample_days / span_days). Higher means steadier "
"day-to-day continuity."
),
"psychology": ("Drops can signal communication becoming episodic or reactive."),
},
"response_score": {
"title": "Response Component",
"group": "stability",
"history_field": "response_score",
"calculation": (
"Average of inbound and outbound response-lag scores, each mapped from "
"median lag to a 0-100 curve."
),
"psychology": (
"Lower response score can indicate delayed repair loops during tension."
),
},
"volatility_score": {
"title": "Volatility Component",
"group": "stability",
"history_field": "volatility_score",
"calculation": (
"Derived from coefficient of variation of daily message counts and "
"inverted to a 0-100 stability signal."
),
"psychology": (
"High volatility can suggest inconsistent rhythm and reduced predictability."
),
},
"stability_confidence": {
"title": "Stability Confidence",
"group": "confidence",
"history_field": "stability_confidence",
"calculation": (
"0.50*message_volume + 0.30*day_coverage + 0.20*response_pair_density."
),
"psychology": (
"Low confidence means defer strong conclusions; treat outputs as "
"tentative signals."
),
},
"sample_messages": {
"title": "Sample Messages",
"group": "confidence",
"history_field": "stability_sample_messages",
"calculation": "Count of messages in the analysed window.",
"psychology": (
"Larger sample size improves reliability and reduces overreaction to "
"single events."
),
},
"sample_days": {
"title": "Sample Days",
"group": "confidence",
"history_field": "stability_sample_days",
"calculation": "Count of distinct calendar days represented in the sample.",
"psychology": (
"Coverage across days better captures rhythm, not just intensity " "bursts."
),
},
"stability_computed": {
"title": "Stability Computed",
"group": "stability",
"history_field": None,
"calculation": "Timestamp of the latest stability computation run.",
"psychology": (
"Recent recomputation indicates current context; stale computations may "
"lag emotional reality."
),
},
"commitment_inbound": {
"title": "Commit In",
"group": "commitment",
"history_field": "commitment_inbound_score",
"calculation": ("0.60*inbound_response_score + 0.40*inbound_balance_score."),
"psychology": (
"Estimates counterpart follow-through and reciprocity toward the user."
),
},
"commitment_outbound": {
"title": "Commit Out",
"group": "commitment",
"history_field": "commitment_outbound_score",
"calculation": ("0.60*outbound_response_score + 0.40*outbound_balance_score."),
"psychology": (
"Estimates user follow-through and consistency toward the counterpart."
),
},
"inbound_response_score": {
"title": "Inbound Response Score",
"group": "commitment",
"history_field": "inbound_response_score",
"calculation": (
"Response-speed score built from median lag between user outbound and "
"counterpart inbound replies."
),
"psychology": (
"Lower values suggest delayed reciprocity from counterpart direction."
),
},
"outbound_response_score": {
"title": "Outbound Response Score",
"group": "commitment",
"history_field": "outbound_response_score",
"calculation": (
"Response-speed score built from median lag between counterpart inbound "
"and user outbound replies."
),
"psychology": "Lower values suggest slower follow-through from user direction.",
},
"balance_inbound_score": {
"title": "Inbound Balance Score",
"group": "commitment",
"history_field": "balance_inbound_score",
"calculation": (
"100 * min(1, inbound_messages / outbound_messages). Captures inbound "
"participation parity."
),
"psychology": (
"Lower values can indicate one-sided conversational load from user side."
),
},
"balance_outbound_score": {
"title": "Outbound Balance Score",
"group": "commitment",
"history_field": "balance_outbound_score",
"calculation": (
"100 * min(1, outbound_messages / inbound_messages). Captures outbound "
"participation parity."
),
"psychology": (
"Lower values can indicate one-sided conversational load from counterpart side."
),
},
"commitment_confidence": {
"title": "Commit Confidence",
"group": "confidence",
"history_field": "commitment_confidence",
"calculation": (
"Uses the same confidence weighting as stability for directional scores."
),
"psychology": (
"Low confidence means directionality gaps might be temporary rather "
"than structural."
),
},
"commitment_computed": {
"title": "Commitment Computed",
"group": "commitment",
"history_field": None,
"calculation": "Timestamp of the latest commitment computation run.",
"psychology": (
"Recency matters because commitment signals shift quickly under stress."
),
},
"last_event": {
"title": "Last Event",
"group": "timeline",
"history_field": None,
"calculation": "Unix ms timestamp of the newest message in this workspace.",
"psychology": (
"Long inactivity windows can indicate pause, repair distance, or "
"channel migration."
),
},
"last_ai_run": {
"title": "Last AI Run",
"group": "timeline",
"history_field": None,
"calculation": "Most recent completed AI analysis timestamp.",
"psychology": (
"Recency of AI summaries affects relevance of suggested interventions."
),
},
}
INSIGHT_GRAPH_SPECS = [
{
"slug": "stability_score",
"title": "Stability Score",
"field": "stability_score",
"group": "stability",
"y_min": 0,
"y_max": 100,
},
{
"slug": "stability_confidence",
"title": "Stability Confidence",
"field": "stability_confidence",
"group": "confidence",
"y_min": 0,
"y_max": 1,
},
{
"slug": "sample_messages",
"title": "Sample Messages",
"field": "stability_sample_messages",
"group": "confidence",
"y_min": 0,
"y_max": None,
},
{
"slug": "sample_days",
"title": "Sample Days",
"field": "stability_sample_days",
"group": "confidence",
"y_min": 0,
"y_max": None,
},
{
"slug": "commitment_inbound",
"title": "Commit In",
"field": "commitment_inbound_score",
"group": "commitment",
"y_min": 0,
"y_max": 100,
},
{
"slug": "commitment_outbound",
"title": "Commit Out",
"field": "commitment_outbound_score",
"group": "commitment",
"y_min": 0,
"y_max": 100,
},
{
"slug": "commitment_confidence",
"title": "Commit Confidence",
"field": "commitment_confidence",
"group": "confidence",
"y_min": 0,
"y_max": 1,
},
{
"slug": "inbound_messages",
"title": "Inbound Messages",
"field": "inbound_messages",
"group": "timeline",
"y_min": 0,
"y_max": None,
},
{
"slug": "outbound_messages",
"title": "Outbound Messages",
"field": "outbound_messages",
"group": "timeline",
"y_min": 0,
"y_max": None,
},
{
"slug": "reciprocity_score",
"title": "Reciprocity Component",
"field": "reciprocity_score",
"group": "stability",
"y_min": 0,
"y_max": 100,
},
{
"slug": "continuity_score",
"title": "Continuity Component",
"field": "continuity_score",
"group": "stability",
"y_min": 0,
"y_max": 100,
},
{
"slug": "response_score",
"title": "Response Component",
"field": "response_score",
"group": "stability",
"y_min": 0,
"y_max": 100,
},
{
"slug": "volatility_score",
"title": "Volatility Component",
"field": "volatility_score",
"group": "stability",
"y_min": 0,
"y_max": 100,
},
{
"slug": "inbound_response_score",
"title": "Inbound Response Score",
"field": "inbound_response_score",
"group": "commitment",
"y_min": 0,
"y_max": 100,
},
{
"slug": "outbound_response_score",
"title": "Outbound Response Score",
"field": "outbound_response_score",
"group": "commitment",
"y_min": 0,
"y_max": 100,
},
{
"slug": "balance_inbound_score",
"title": "Inbound Balance Score",
"field": "balance_inbound_score",
"group": "commitment",
"y_min": 0,
"y_max": 100,
},
{
"slug": "balance_outbound_score",
"title": "Outbound Balance Score",
"field": "balance_outbound_score",
"group": "commitment",
"y_min": 0,
"y_max": 100,
},
]
INFORMATION_OVERVIEW_SLUGS = (
"platform",
"thread",
"workspace_created",
"stability_state",
"stability_computed",
"commitment_computed",
"last_event",
"last_ai_run",
)
def _format_unix_ms(ts):
if not ts:
return ""
dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M UTC")
def _infer_direction(message, person_identifiers):
"""
Infer message direction relative to workspace owner.
"""
sender = message.sender_uuid or ""
if sender and sender in person_identifiers:
return "in"
return "out"
def _get_send_state(user, person):
"""
Resolve current send capability from user's enabled manipulations for this person.
"""
manipulations = (
Manipulation.objects.filter(
user=user,
enabled=True,
group__people=person,
)
.select_related("group")
.distinct()
)
if not manipulations.exists():
return {
"can_send": False,
"level": "warning",
"text": "Sending is blocked: no enabled manipulation matched this recipient.",
"manipulation_id": None,
}
send_manip = manipulations.filter(mode__in=SEND_ENABLED_MODES).first()
if send_manip:
return {
"can_send": True,
"level": "success",
"text": f"Enabled by manipulation '{send_manip.name}' ({send_manip.mode}).",
"manipulation_id": str(send_manip.id),
}
mode_list = ", ".join(sorted({(m.mode or "unset") for m in manipulations}))
return {
"can_send": False,
"level": "warning",
"text": f"Sending is blocked by active mode(s): {mode_list}.",
"manipulation_id": None,
}
def _get_queue_manipulation(user, person):
"""
Resolve a manipulation for queue entries:
prefer send-enabled, otherwise any enabled manipulation in recipient groups.
"""
matched = (
Manipulation.objects.filter(
user=user,
enabled=True,
group__people=person,
)
.select_related("group")
.distinct()
)
return matched.filter(mode__in=SEND_ENABLED_MODES).first() or matched.first()
def _resolve_person_identifier(user, person, preferred_service=None):
"""
Resolve the best identifier for outbound share/send operations.
Prefer `preferred_service` when provided, then Signal, then any identifier.
"""
if preferred_service:
preferred = PersonIdentifier.objects.filter(
user=user,
person=person,
service=preferred_service,
).first()
if preferred:
return preferred
signal_row = PersonIdentifier.objects.filter(
user=user,
person=person,
service="signal",
).first()
if signal_row:
return signal_row
return PersonIdentifier.objects.filter(user=user, person=person).first()
def _send_target_options_for_person(user, person):
rows = list(
PersonIdentifier.objects.filter(user=user, person=person)
.exclude(identifier="")
.order_by("service", "identifier", "id")
)
if not rows:
return {"options": [], "selected_id": ""}
preferred_service = _preferred_service_for_person(user, person)
labels = {
"signal": "Signal",
"whatsapp": "WhatsApp",
"instagram": "Instagram",
"xmpp": "XMPP",
}
seen = set()
options = []
for row in rows:
service = str(row.service or "").strip().lower()
identifier = str(row.identifier or "").strip()
if not service or not identifier:
continue
dedupe_key = (service, identifier)
if dedupe_key in seen:
continue
seen.add(dedupe_key)
options.append(
{
"id": str(row.id),
"service": service,
"service_label": labels.get(service, service.title()),
"identifier": identifier,
}
)
if not options:
return {"options": [], "selected_id": ""}
selected_id = options[0]["id"]
if preferred_service:
preferred = next(
(item for item in options if item["service"] == preferred_service),
None,
)
if preferred is not None:
selected_id = preferred["id"]
return {"options": options, "selected_id": selected_id}
def _resolve_person_identifier_target(
user,
person,
target_identifier_id="",
target_service="",
fallback_service=None,
):
target_id = str(target_identifier_id or "").strip()
if target_id:
selected = PersonIdentifier.objects.filter(
user=user,
person=person,
id=target_id,
).first()
if selected is not None:
return selected
preferred = str(target_service or "").strip().lower() or fallback_service
return _resolve_person_identifier(
user=user,
person=person,
preferred_service=preferred,
)
def _preferred_service_for_person(user, person):
"""
Best-effort service hint from the most recent workspace conversation.
"""
conversation = (
WorkspaceConversation.objects.filter(
user=user,
participants=person,
)
.exclude(platform_type="")
.order_by("-last_event_ts", "-created_at")
.first()
)
if conversation and conversation.platform_type:
return conversation.platform_type
return None
def _compose_page_url_for_person(user, person):
preferred_service = _preferred_service_for_person(user, person)
identifier_row = _resolve_person_identifier(
user=user,
person=person,
preferred_service=preferred_service,
)
if identifier_row is None:
return ""
query = urlencode(
{
"service": identifier_row.service,
"identifier": identifier_row.identifier,
"person": str(person.id),
}
)
return f"{reverse('compose_page')}?{query}"
def _compose_widget_url_for_person(user, person, limit=40):
preferred_service = _preferred_service_for_person(user, person)
identifier_row = _resolve_person_identifier(
user=user,
person=person,
preferred_service=preferred_service,
)
if identifier_row is None:
return ""
try:
safe_limit = int(limit or 40)
except (TypeError, ValueError):
safe_limit = 40
safe_limit = max(10, min(safe_limit, 200))
query = urlencode(
{
"service": identifier_row.service,
"identifier": identifier_row.identifier,
"person": str(person.id),
"limit": safe_limit,
}
)
return f"{reverse('compose_widget')}?{query}"
def _participant_feedback_display(conversation, person):
payload = conversation.participant_feedback or {}
if not isinstance(payload, dict):
return None
raw = payload.get(str(person.id)) or {}
if not isinstance(raw, dict):
return None
state_key = str(raw.get("state") or "").strip().lower()
state_label = {
"withdrawing": "Withdrawing",
"overextending": "Overextending",
"balanced": "Balanced",
}.get(state_key, "Unknown")
state_icon = {
"withdrawing": "fa-regular fa-face-frown",
"overextending": "fa-regular fa-face-meh",
"balanced": "fa-regular fa-face-smile",
}.get(state_key, "fa-regular fa-face-meh-blank")
state_class = {
"withdrawing": "has-text-danger",
"overextending": "has-text-warning",
"balanced": "has-text-success",
}.get(state_key, "has-text-grey")
updated_label = ""
updated_raw = raw.get("updated_at")
if updated_raw:
try:
dt_value = datetime.fromisoformat(str(updated_raw))
if dt_value.tzinfo is None:
dt_value = dt_value.replace(tzinfo=timezone.utc)
updated_label = dj_timezone.localtime(dt_value).strftime("%Y-%m-%d %H:%M")
except Exception:
updated_label = str(updated_raw)
return {
"state_key": state_key or "unknown",
"state_label": state_label,
"state_icon": state_icon,
"state_class": state_class,
"inbound_messages": raw.get("inbound_messages"),
"outbound_messages": raw.get("outbound_messages"),
"sample_messages": raw.get("sample_messages"),
"sample_days": raw.get("sample_days"),
"updated_at_label": updated_label,
}
def _message_rows_for_person(user, person, limit):
sessions = ChatSession.objects.filter(user=user, identifier__person=person)
identifiers = set(
PersonIdentifier.objects.filter(user=user, person=person).values_list(
"identifier", flat=True
)
)
messages = (
Message.objects.filter(user=user, session__in=sessions)
.select_related("session", "session__identifier")
.order_by("-ts")[:limit]
)
rows = []
for message in reversed(list(messages)):
rows.append(
{
"message": message,
"direction": _infer_direction(message, identifiers),
"ts_label": _format_unix_ms(message.ts),
}
)
return rows
def _recent_messages_for_person(user, person, limit):
sessions = ChatSession.objects.filter(user=user, identifier__person=person)
messages = (
Message.objects.filter(user=user, session__in=sessions)
.select_related("session", "session__identifier")
.order_by("-ts")[:limit]
)
return list(reversed(list(messages)))
def _is_truthy(value):
return str(value or "").strip().lower() in {"1", "true", "on", "yes"}
def _sanitize_active_tab(value, default="plan_board"):
tab = (value or "").strip()
if tab in MITIGATION_TABS:
return tab
return default
def _to_float(value):
if value is None:
return None
return float(value)
def _format_metric_value(conversation, metric_slug, latest_snapshot=None):
snapshot = latest_snapshot
if snapshot is None:
snapshot = conversation.metric_snapshots.first()
if metric_slug == "platform":
return conversation.get_platform_type_display() or "-"
if metric_slug == "thread":
return conversation.platform_thread_id or "-"
if metric_slug == "workspace_created":
return conversation.created_at
if metric_slug == "stability_state":
return conversation.get_stability_state_display()
if metric_slug == "stability_score":
return conversation.stability_score
if metric_slug == "reciprocity_score":
return snapshot.reciprocity_score if snapshot else None
if metric_slug == "continuity_score":
return snapshot.continuity_score if snapshot else None
if metric_slug == "response_score":
return snapshot.response_score if snapshot else None
if metric_slug == "volatility_score":
return snapshot.volatility_score if snapshot else None
if metric_slug == "stability_confidence":
return conversation.stability_confidence
if metric_slug == "sample_messages":
return conversation.stability_sample_messages
if metric_slug == "sample_days":
return conversation.stability_sample_days
if metric_slug == "stability_computed":
return conversation.stability_last_computed_at
if metric_slug == "commitment_inbound":
return conversation.commitment_inbound_score
if metric_slug == "commitment_outbound":
return conversation.commitment_outbound_score
if metric_slug == "inbound_response_score":
return snapshot.inbound_response_score if snapshot else None
if metric_slug == "outbound_response_score":
return snapshot.outbound_response_score if snapshot else None
if metric_slug == "balance_inbound_score":
return snapshot.balance_inbound_score if snapshot else None
if metric_slug == "balance_outbound_score":
return snapshot.balance_outbound_score if snapshot else None
if metric_slug == "commitment_confidence":
return conversation.commitment_confidence
if metric_slug == "commitment_computed":
return conversation.commitment_last_computed_at
if metric_slug == "last_event":
return _format_unix_ms(conversation.last_event_ts) or "-"
if metric_slug == "last_ai_run":
return conversation.last_ai_run_at
return "-"
def _metric_psychological_read(metric_slug, conversation):
if metric_slug == "stability_state":
state = conversation.stability_state
if state == WorkspaceConversation.StabilityState.CALIBRATING:
return (
"Calibrating means the system does not yet have enough longitudinal "
"signal to classify friction reliably. Prioritize collecting a few "
"more days of normal interaction before drawing conclusions."
)
if state == WorkspaceConversation.StabilityState.STABLE:
return (
"Stable indicates low-friction reciprocity and predictable cadence in "
"the sampled window. Keep routines consistent and focus on maintenance "
"habits rather than heavy corrective interventions."
)
if state == WorkspaceConversation.StabilityState.WATCH:
return (
"Watch indicates meaningful strain without full collapse. This often "
"matches early misunderstanding cycles: repair is still easy if you "
"slow pace, validate first, and reduce escalation triggers."
)
if state == WorkspaceConversation.StabilityState.FRAGILE:
return (
"Fragile indicates high volatility or directional imbalance in recent "
"interaction. Use short, clear, safety-first communication and avoid "
"high-load conversations until cadence normalizes."
)
return (
"State is an operational risk band, not a diagnosis. Read it alongside "
"confidence and recent events."
)
if metric_slug == "stability_score":
score = conversation.stability_score
if score is None:
return "Calibrating: collect more interaction data before interpreting."
if score >= 70:
return (
"Pattern suggests low relational friction and resilient repair cycles."
)
if score >= 50:
return "Pattern suggests moderate strain; monitor for repeated escalation loops."
return (
"Pattern suggests high friction risk; prioritise safety and repair pacing."
)
if metric_slug == "stability_confidence":
conf = conversation.stability_confidence or 0.0
if conf < 0.25:
return "Low certainty: treat this as a weak signal, not a conclusion."
if conf < 0.6:
return (
"Moderate certainty: useful directional cue, still context-dependent."
)
return "High certainty: trend interpretation is likely reliable."
if metric_slug in {"commitment_inbound", "commitment_outbound"}:
inbound = conversation.commitment_inbound_score
outbound = conversation.commitment_outbound_score
if inbound is None or outbound is None:
return "Calibrating: directional commitment cannot be inferred yet."
gap = abs(inbound - outbound)
if gap < 10:
return "Directional effort appears balanced."
if inbound > outbound:
return "Counterpart appears more responsive than user in this sample."
return "User appears more responsive than counterpart in this sample."
if metric_slug == "sample_days":
days = conversation.stability_sample_days or 0
if days < 3:
return "Coverage is narrow; day-to-day rhythm inference is weak."
return "Coverage spans multiple days, improving cadence interpretation."
return ""
def _history_points(conversation, field_name):
rows = (
conversation.metric_snapshots.exclude(**{f"{field_name}__isnull": True})
.order_by("computed_at")
.values("computed_at", field_name)
)
points = []
for row in rows:
points.append(
{
"x": row["computed_at"].isoformat(),
"y": row[field_name],
}
)
return points
def _metric_supports_history(metric_slug, metric_spec):
if not metric_spec.get("history_field"):
return False
return any(graph["slug"] == metric_slug for graph in INSIGHT_GRAPH_SPECS)
def _all_graph_payload(conversation):
graphs = []
for spec in INSIGHT_GRAPH_SPECS:
points = _history_points(conversation, spec["field"])
graphs.append(
{
"slug": spec["slug"],
"title": spec["title"],
"group": spec["group"],
"group_title": INSIGHT_GROUPS[spec["group"]]["title"],
"points": points,
"count": len(points),
"y_min": spec["y_min"],
"y_max": spec["y_max"],
}
)
return graphs
def _information_overview_rows(conversation):
latest_snapshot = conversation.metric_snapshots.first()
rows = []
for slug in INFORMATION_OVERVIEW_SLUGS:
spec = INSIGHT_METRICS.get(slug)
if not spec:
continue
rows.append(
{
"slug": slug,
"title": spec.get("title") or slug.replace("_", " ").title(),
"value": _format_metric_value(conversation, slug, latest_snapshot),
"group": spec.get("group"),
"group_title": INSIGHT_GROUPS.get(spec.get("group"), {}).get(
"title", "Information"
),
}
)
return rows
def _commitment_directionality_payload(conversation):
latest_snapshot = conversation.metric_snapshots.first()
inbound = conversation.commitment_inbound_score
outbound = conversation.commitment_outbound_score
confidence = conversation.commitment_confidence or 0.0
if inbound is None or outbound is None:
return {
"direction_key": "calibrating",
"direction_label": "Calibrating",
"magnitude": None,
"delta": None,
"confidence": confidence,
"conclusion": (
"Directionality cannot be inferred yet. Collect more exchanges to "
"stabilize directional signal."
),
"factors": [],
"graph_refs": [],
}
delta = round(float(outbound) - float(inbound), 2)
magnitude = round(abs(delta), 2)
if magnitude < 4:
direction_key = "balanced"
direction_label = "Balanced"
conclusion = (
"Commitment appears symmetric. Keep current cadence and focus on "
"maintaining clarity."
)
elif delta > 0:
direction_key = "outbound"
direction_label = "Outbound-Leaning"
conclusion = (
"You are carrying relatively more directional effort right now. "
"Consider reducing over-functioning and asking for explicit reciprocity."
)
else:
direction_key = "inbound"
direction_label = "Inbound-Leaning"
conclusion = (
"The other party is carrying relatively more directional effort right now. "
"Acknowledge this and match consistency to reduce asymmetry."
)
graph_refs = [
{"slug": "commitment_inbound", "title": "Commit In"},
{"slug": "commitment_outbound", "title": "Commit Out"},
{"slug": "inbound_response_score", "title": "Inbound Response Score"},
{"slug": "outbound_response_score", "title": "Outbound Response Score"},
{"slug": "balance_inbound_score", "title": "Inbound Balance Score"},
{"slug": "balance_outbound_score", "title": "Outbound Balance Score"},
{"slug": "commitment_confidence", "title": "Commit Confidence"},
]
factor_lookup = {
"inbound_response_score": (
latest_snapshot.inbound_response_score if latest_snapshot else None
),
"outbound_response_score": (
latest_snapshot.outbound_response_score if latest_snapshot else None
),
"balance_inbound_score": (
latest_snapshot.balance_inbound_score if latest_snapshot else None
),
"balance_outbound_score": (
latest_snapshot.balance_outbound_score if latest_snapshot else None
),
"commitment_confidence": confidence,
}
factors = [
{
"title": "Inbound Response",
"icon": "fa-solid fa-inbox",
"weight": "60% of Commit In",
"value": factor_lookup["inbound_response_score"],
"slug": "inbound_response_score",
},
{
"title": "Inbound Balance",
"icon": "fa-solid fa-scale-balanced",
"weight": "40% of Commit In",
"value": factor_lookup["balance_inbound_score"],
"slug": "balance_inbound_score",
},
{
"title": "Outbound Response",
"icon": "fa-solid fa-paper-plane",
"weight": "60% of Commit Out",
"value": factor_lookup["outbound_response_score"],
"slug": "outbound_response_score",
},
{
"title": "Outbound Balance",
"icon": "fa-solid fa-arrows-left-right",
"weight": "40% of Commit Out",
"value": factor_lookup["balance_outbound_score"],
"slug": "balance_outbound_score",
},
{
"title": "Confidence",
"icon": "fa-solid fa-shield-check",
"weight": "Applies To Direction",
"value": confidence,
"slug": "commitment_confidence",
},
]
return {
"direction_key": direction_key,
"direction_label": direction_label,
"magnitude": magnitude,
"delta": delta,
"confidence": confidence,
"conclusion": conclusion,
"commit_in": inbound,
"commit_out": outbound,
"factors": factors,
"graph_refs": graph_refs,
}
def _metric_pattern_context(conversation):
latest_snapshot = conversation.metric_snapshots.first()
directionality = _commitment_directionality_payload(conversation)
confidence = conversation.stability_confidence or 0.0
risk_signals = []
state_key = str(conversation.stability_state or "").lower()
if state_key == WorkspaceConversation.StabilityState.FRAGILE:
risk_signals.append(
{
"key": "stability_fragile",
"label": "Fragile Stability",
"severity": "high",
"explanation": (
"Stability is in fragile range. Bias corrections toward "
"de-escalation and explicit repair loops."
),
}
)
elif state_key == WorkspaceConversation.StabilityState.WATCH:
risk_signals.append(
{
"key": "stability_watch",
"label": "Watch Stability",
"severity": "medium",
"explanation": (
"Stability is watch-range. Reinforce concise requests and "
"misinterpretation checks before escalation."
),
}
)
if confidence < 0.25:
risk_signals.append(
{
"key": "low_confidence",
"label": "Low Confidence Window",
"severity": "low",
"explanation": (
"Confidence is low. Prefer reversible, low-risk corrections "
"that can be validated quickly."
),
}
)
magnitude = directionality.get("magnitude")
if magnitude is not None:
severity = "high" if magnitude >= 15 else "medium" if magnitude >= 8 else None
if severity:
risk_signals.append(
{
"key": "commitment_asymmetry",
"label": "Commitment Asymmetry",
"severity": severity,
"explanation": (
"Directional commitment is asymmetric. Add corrections "
"that restore reciprocity and explicit confirmation."
),
}
)
if latest_snapshot:
if (
latest_snapshot.volatility_score is not None
and latest_snapshot.volatility_score >= 70
):
risk_signals.append(
{
"key": "volatility_spike",
"label": "Volatility Spike",
"severity": "medium",
"explanation": (
"Volatility is elevated. Use short, bounded wording to "
"reduce sudden interaction swings."
),
}
)
if (
latest_snapshot.reciprocity_score is not None
and latest_snapshot.reciprocity_score <= 35
):
risk_signals.append(
{
"key": "reciprocity_drop",
"label": "Reciprocity Drop",
"severity": "medium",
"explanation": (
"Reciprocity is low. Add corrections that request and "
"acknowledge balanced turn-taking."
),
}
)
if (
latest_snapshot.response_score is not None
and latest_snapshot.response_score <= 35
):
risk_signals.append(
{
"key": "response_drag",
"label": "Response Drag",
"severity": "medium",
"explanation": (
"Response pace is slow. Prefer corrections that set timing "
"expectations and explicit follow-up windows."
),
}
)
state_label = (
conversation.get_stability_state_display()
if hasattr(conversation, "get_stability_state_display")
else str(conversation.stability_state or "")
)
return {
"stability": {
"state": state_label,
"score": conversation.stability_score,
"confidence": confidence,
"sample_messages": conversation.stability_sample_messages,
"sample_days": conversation.stability_sample_days,
"computed_at": conversation.stability_last_computed_at,
},
"commitment": {
"inbound": conversation.commitment_inbound_score,
"outbound": conversation.commitment_outbound_score,
"confidence": conversation.commitment_confidence,
"computed_at": conversation.commitment_last_computed_at,
"directionality": directionality,
},
"components": (
{
"reciprocity": latest_snapshot.reciprocity_score,
"continuity": latest_snapshot.continuity_score,
"response": latest_snapshot.response_score,
"volatility": latest_snapshot.volatility_score,
"inbound_response": latest_snapshot.inbound_response_score,
"outbound_response": latest_snapshot.outbound_response_score,
"balance_inbound": latest_snapshot.balance_inbound_score,
"balance_outbound": latest_snapshot.balance_outbound_score,
"source_event_ts": latest_snapshot.source_event_ts,
}
if latest_snapshot
else {}
),
"risk_signals": risk_signals[:8],
}
def _store_metric_snapshot(conversation, payload):
compare_keys = [
"source_event_ts",
"stability_state",
"stability_score",
"stability_confidence",
"stability_sample_messages",
"stability_sample_days",
"commitment_inbound_score",
"commitment_outbound_score",
"commitment_confidence",
"inbound_messages",
"outbound_messages",
"reciprocity_score",
"continuity_score",
"response_score",
"volatility_score",
"inbound_response_score",
"outbound_response_score",
"balance_inbound_score",
"balance_outbound_score",
]
last = conversation.metric_snapshots.first()
if last and all(getattr(last, key) == payload.get(key) for key in compare_keys):
return
WorkspaceMetricSnapshot.objects.create(conversation=conversation, **payload)
def _parse_draft_options(result_text):
"""
Parse model output into labeled draft options shown simultaneously in UI.
"""
content = (result_text or "").strip()
if not content:
return []
def clean_option_text(value):
value = (value or "").strip(" \n\r\t*:")
# Strip surrounding quotes when the whole option is wrapped.
if len(value) >= 2 and (
(value[0] == '"' and value[-1] == '"')
or (value[0] == "'" and value[-1] == "'")
):
value = value[1:-1].strip()
return value
def dedupe_by_label(seq):
ordered = []
seen = set()
for item in seq:
label = (item.get("label") or "").strip().title()
text = clean_option_text(item.get("text") or "")
if not label or not text:
continue
key = label.lower()
if key in seen:
continue
seen.add(key)
ordered.append({"label": label, "text": text})
return ordered
# Primary parser: line-based labeled blocks.
# Accepts:
# - Soft/Neutral/Firm
# - optional Tone/Response/Reply suffix
# - optional markdown bold markers
# - content on same line or subsequent lines
block_re = re.compile(
r"(?ims)^\s*(?:[-*]\s*)?(?:\*\*)?\s*(Soft|Neutral|Firm)\s*"
r"(?:(?:Tone|Response|Reply))?\s*:?\s*(?:\*\*)?\s*"
r"(.*?)(?=^\s*(?:[-*]\s*)?(?:\*\*)?\s*(?:Soft|Neutral|Firm)\s*"
r"(?:(?:Tone|Response|Reply))?\s*:?\s*(?:\*\*)?\s*|\Z)"
)
options = [
{"label": match.group(1).strip().title(), "text": match.group(2)}
for match in block_re.finditer(content)
]
options = dedupe_by_label(options)
if options:
return options[:3]
# Secondary parser: inline labeled segments in one paragraph.
inline_re = re.compile(
r"(?is)\b(Soft|Neutral|Firm)\s*(?:(?:Tone|Response|Reply))?\s*:\s*(.*?)"
r"(?=\b(?:Soft|Neutral|Firm)\s*(?:(?:Tone|Response|Reply))?\s*:|$)"
)
options = [
{"label": match.group(1).strip().title(), "text": match.group(2)}
for match in inline_re.finditer(content)
]
options = dedupe_by_label(options)
if options:
return options[:3]
# Secondary parser: Option 1/2/3 blocks.
option_split_re = re.compile(r"(?im)^\s*Option\s+\d+\s*$")
chunks = [
chunk.strip() for chunk in option_split_re.split(content) if chunk.strip()
]
parsed = []
prefix_re = re.compile(
r"(?im)^(?:\*\*)?\s*(Soft|Neutral|Firm)\s*(?:Tone|Response|Reply)?\s*:?\s*(?:\*\*)?\s*"
)
for idx, chunk in enumerate(chunks, start=1):
label = f"Option {idx}"
prefix_match = prefix_re.match(chunk)
if prefix_match:
label = prefix_match.group(1).strip().title()
chunk = prefix_re.sub("", chunk, count=1).strip(" \n\r\t*:")
if chunk:
parsed.append({"label": label, "text": chunk})
if parsed:
return dedupe_by_label(parsed)[:3]
# Final fallback: use first non-empty paragraphs.
paragraphs = [
para.strip() for para in re.split(r"\n\s*\n", content) if para.strip()
]
return dedupe_by_label(
[
{"label": f"Option {idx}", "text": para}
for idx, para in enumerate(paragraphs[:3], start=1)
]
)
def _extract_seed_entities_from_context(raw_context):
"""
Heuristic extractor for pasted long-form frameworks.
Returns candidate fundamentals/rules/games without model dependency.
"""
text = (raw_context or "").strip()
if not text:
return {"fundamentals": [], "rules": [], "games": []}
lines = [line.strip() for line in text.splitlines()]
fundamentals = []
rules = []
games = []
current_rule = None
current_game = None
in_core_principle = False
in_quick_cheat = False
def flush_rule():
nonlocal current_rule
if not current_rule:
return
title = current_rule.get("title", "").strip()
content = " ".join(current_rule.get("body", [])).strip()
if title and content:
rules.append({"title": title, "content": content})
elif title:
rules.append({"title": title, "content": title})
current_rule = None
def flush_game():
nonlocal current_game
if not current_game:
return
title = current_game.get("title", "").strip()
instructions = " ".join(current_game.get("body", [])).strip()
if title and instructions:
games.append({"title": title, "instructions": instructions})
elif title:
games.append({"title": title, "instructions": title})
current_game = None
for line in lines:
if not line:
flush_rule()
flush_game()
in_core_principle = False
in_quick_cheat = False
continue
if re.match(r"^SECTION\s+\d+", line, re.IGNORECASE):
in_core_principle = False
in_quick_cheat = False
flush_rule()
flush_game()
continue
if re.match(r"^CORE PRINCIPLE", line, re.IGNORECASE):
in_core_principle = True
in_quick_cheat = False
continue
if re.match(r"^QUICK CHEAT SHEET", line, re.IGNORECASE):
in_quick_cheat = True
in_core_principle = False
continue
rule_match = re.match(r"^Rule\s+(\d+)\s*:\s*(.+)$", line, re.IGNORECASE)
game_match = re.match(r"^Game\s+(\d+)\s*:\s*(.+)$", line, re.IGNORECASE)
mantra_match = re.match(r"^Mantra\s*:\s*(.+)$", line, re.IGNORECASE)
if rule_match:
flush_rule()
flush_game()
title = rule_match.group(2).strip()
current_rule = {"title": title, "body": []}
if rule_match.group(1) in {"1", "11"} and title:
fundamentals.append(title)
continue
if game_match:
flush_rule()
flush_game()
title = game_match.group(2).strip()
current_game = {"title": title, "body": []}
continue
if mantra_match:
fundamentals.append(mantra_match.group(1).strip())
continue
if in_core_principle and len(line) <= 120 and ":" not in line:
fundamentals.append(line)
continue
if in_quick_cheat:
quick_line = re.sub(r"^\s*(?:[-*]|\d+\.)\s*", "", line).strip()
if (
quick_line
and len(quick_line) <= 120
and not quick_line.lower().startswith("if you want")
):
fundamentals.append(quick_line)
continue
if "Emotional safety > Accuracy > Analysis" in line:
fundamentals.append("Emotional safety > Accuracy > Analysis")
continue
if current_rule:
current_rule["body"].append(line)
continue
if current_game:
current_game["body"].append(line)
continue
flush_rule()
flush_game()
# Keep order, remove duplicates.
def dedupe_strings(seq):
seen = set()
out = []
for item in seq:
key = item.strip().lower()
if not key or key in seen:
continue
seen.add(key)
out.append(item.strip())
return out
def dedupe_dicts(seq):
seen = set()
out = []
for item in seq:
title = (item.get("title") or "").strip()
key = title.lower()
if not key or key in seen:
continue
seen.add(key)
out.append(item)
return out
return {
"fundamentals": dedupe_strings(fundamentals)[:20],
"rules": dedupe_dicts(rules)[:40],
"games": dedupe_dicts(games)[:40],
}
def _merge_seed_entities(artifacts, seed):
merged = dict(artifacts or {})
seed = seed or {}
fundamentals = list(merged.get("fundamental_items") or [])
fundamentals = list(
dict.fromkeys(fundamentals + list(seed.get("fundamentals") or []))
)
merged["fundamental_items"] = fundamentals
def merge_artifact_list(existing, injected, body_key):
existing = list(existing or [])
injected = list(injected or [])
seen = {(item.get("title") or "").strip().lower() for item in existing}
for item in injected:
title = (item.get("title") or "").strip()
body = (item.get(body_key) or "").strip()
if not title or not body:
continue
key = title.lower()
if key in seen:
continue
existing.append({"title": title, body_key: body})
seen.add(key)
return existing
merged["rules"] = merge_artifact_list(
merged.get("rules"), seed.get("rules"), "content"
)
merged["games"] = merge_artifact_list(
merged.get("games"), seed.get("games"), "instructions"
)
return merged
def _normalize_markdown_titles(text):
"""
Minimal markdown cleanup:
- convert '**Title:**' style lines into markdown headings so Bulma headers can style them.
"""
out = []
for line in (text or "").splitlines():
match = re.match(r"^\s*\*\*(.+?)\*\*\s*:?\s*$", line)
if match:
out.append(f"## {match.group(1).strip()}")
else:
out.append(line)
return "\n".join(out)
def _clean_inline_markdown(value):
value = re.sub(r"\*\*(.*?)\*\*", r"\1", value)
value = re.sub(r"\*(.*?)\*", r"\1", value)
value = re.sub(r"`(.*?)`", r"\1", value)
return value.strip()
def _append_block(section, block_type, values):
if not values:
return
section["blocks"].append({"type": block_type, "items": values})
def _parse_result_sections(result_text):
"""
Minimal markdown-ish parser used by UI:
- '#/##/### Title' become section headers
- bullet lines become lists
- remaining lines are grouped as paragraphs
Returned structure is template-safe (no raw HTML).
"""
text = _normalize_markdown_titles(result_text or "")
lines = text.splitlines()
sections = []
current = {"title": "Output", "level": 3, "blocks": []}
paragraph = []
bullets = []
def flush_paragraph():
nonlocal paragraph
if paragraph:
_append_block(current, "p", [" ".join(paragraph)])
paragraph = []
def flush_bullets():
nonlocal bullets
if bullets:
_append_block(current, "ul", bullets)
bullets = []
def flush_section(force=False):
if force or current["blocks"]:
sections.append(current.copy())
for raw_line in lines:
line = raw_line.rstrip()
heading_match = re.match(r"^\s*(#{1,6})\s+(.+?)\s*$", line)
if heading_match:
flush_paragraph()
flush_bullets()
flush_section()
level = len(heading_match.group(1))
title = _clean_inline_markdown(heading_match.group(2))
current = {"title": title or "Section", "level": level, "blocks": []}
continue
bullet_match = re.match(r"^\s*(?:[-*]|\d+\.)\s+(.+?)\s*$", line)
if bullet_match:
flush_paragraph()
bullets.append(_clean_inline_markdown(bullet_match.group(1)))
continue
if not line.strip():
flush_paragraph()
flush_bullets()
continue
flush_bullets()
paragraph.append(_clean_inline_markdown(line))
flush_paragraph()
flush_bullets()
flush_section(force=True)
cleaned = [sec for sec in sections if sec.get("blocks")]
if cleaned:
return cleaned
fallback = _clean_inline_markdown(result_text or "")
return [
{"title": "Output", "level": 3, "blocks": [{"type": "p", "items": [fallback]}]}
]
def _build_interaction_signals(operation, result_text, message_event_ids):
def _normalize_signal_key(value):
key = re.sub(r"[^a-z0-9]+", "_", str(value or "").strip().lower()).strip("_")
if key == "open_loops":
return "open_loop"
return key
def _signal_display_label(label, key):
if str(label or "").strip():
return str(label).strip().title()
return str(key or "Signal").replace("_", " ").strip().title()
meaning_by_key = {
"repair": "Repair markers suggest active attempts to restore connection.",
"de_escalation": "De-escalation markers suggest pressure reduction and safer tone.",
"open_loop": "Open loops are unresolved topics likely to reappear later.",
"risk": "Risk markers indicate escalating friction or potential rupture points.",
"conflict": "Conflict markers indicate direct tension or adversarial framing.",
"draft_generated": "Draft generation indicates actionable next-step options are available.",
}
text = (result_text or "").lower()
signals = []
heuristics = [
("repair", "repair", "positive"),
("de-escalation", "de-escalat", "positive"),
("open loop", "open loop", "neutral"),
("risk", "risk", "risk"),
("conflict", "conflict", "risk"),
]
for label, token, valence in heuristics:
if token in text:
signal_key = _normalize_signal_key(label)
signals.append(
{
"label": label,
"display_label": _signal_display_label(label, signal_key),
"signal_key": signal_key,
"meaning": meaning_by_key.get(signal_key, ""),
"valence": valence,
"message_event_ids": message_event_ids[:6],
}
)
if not signals and operation == "draft_reply":
signal_key = _normalize_signal_key("draft_generated")
signals.append(
{
"label": "draft_generated",
"display_label": _signal_display_label("draft_generated", signal_key),
"signal_key": signal_key,
"meaning": meaning_by_key.get(signal_key, ""),
"valence": "positive",
"message_event_ids": message_event_ids[:3],
}
)
return signals[:8]
def _memory_kind_from_title(title):
lowered = str(title or "").strip().lower()
if "open" in lowered and "loop" in lowered:
return "open_loops", "Open Loops"
if "emotion" in lowered or "state" in lowered:
return "emotional_state", "Emotional State"
if "pattern" in lowered:
return "patterns", "Patterns"
if "friction" in lowered:
return "friction_loops", "Friction Loops"
if "rule" in lowered or "next-step" in lowered:
return "rules", "Rules"
if "summary" in lowered or "key" in lowered:
return "summary", "Summary"
return "insights", "Insights"
def _build_memory_proposals(operation, result_text):
"""
Build structured, grouped proposals from model output sections.
"""
if operation not in {"summarise", "extract_patterns"}:
return []
sections = _parse_result_sections(result_text)
proposals = []
for section in sections:
kind, label = _memory_kind_from_title(section.get("title"))
section_title = (section.get("title") or "").strip()
for block in section.get("blocks", []):
for item in block.get("items", []):
content = str(item or "").strip()
if not content:
continue
proposals.append(
{
"kind": kind,
"kind_label": label,
"section_title": section_title,
"content": content,
"status": "proposed",
}
)
if not proposals:
fallback = (result_text or "").strip()
if fallback:
proposals.append(
{
"kind": "summary",
"kind_label": "Summary",
"section_title": "Summary",
"content": fallback,
"status": "proposed",
}
)
return proposals[:80]
def _group_memory_proposals(memory_proposals):
signal_keys_by_kind = {
"open_loops": ["open_loop"],
"emotional_state": ["de_escalation", "repair", "conflict"],
"patterns": ["repair", "conflict"],
"friction_loops": ["conflict", "risk"],
"summary": ["open_loop", "repair", "de_escalation", "conflict", "risk"],
"rules": ["repair", "conflict", "risk"],
"insights": ["open_loop", "repair", "de_escalation", "conflict", "risk"],
}
grouped = {}
for item in memory_proposals or []:
label = str(item.get("kind_label") or item.get("kind") or "Insights").strip()
key = str(item.get("kind") or label).strip().lower()
if key not in grouped:
grouped[key] = {
"title": label,
"key": key,
"signal_keys": list(signal_keys_by_kind.get(key, [])),
"items": [],
}
grouped[key]["items"].append(item)
return list(grouped.values())
def _window_spec_tags(window_spec):
if not isinstance(window_spec, dict):
return []
out = []
if "limit" in window_spec:
out.append(f"Window: last {window_spec.get('limit')} messages")
if "since_ts" in window_spec:
out.append(f"Since: {_format_unix_ms(window_spec.get('since_ts'))}")
if "between_ts" in window_spec and isinstance(window_spec.get("between_ts"), list):
values = window_spec.get("between_ts")
if len(values) == 2:
out.append(
f"Range: {_format_unix_ms(values[0])} to {_format_unix_ms(values[1])}"
)
if not out:
for key, value in window_spec.items():
out.append(f"{str(key).replace('_', ' ').title()}: {value}")
return out[:6]
def _policy_snapshot_tags(policy_snapshot):
if not isinstance(policy_snapshot, dict):
return []
out = []
send_state = policy_snapshot.get("send_state")
if isinstance(send_state, dict):
out.append(f"Send: {'enabled' if send_state.get('can_send') else 'blocked'}")
text = str(send_state.get("text") or "").strip()
if text:
out.append(text)
for key, value in policy_snapshot.items():
if key == "send_state":
continue
out.append(f"{str(key).replace('_', ' ').title()}: {value}")
return out[:6]
def _extract_json_object(raw):
text = (raw or "").strip()
if not text:
return None
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return parsed
except Exception:
pass
start = text.find("{")
if start == -1:
return None
depth = 0
end = None
for index, char in enumerate(text[start:], start=start):
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
end = index + 1
break
if end is None:
return None
try:
parsed = json.loads(text[start:end])
if isinstance(parsed, dict):
return parsed
except Exception:
return None
return None
def _section_lines(section):
lines = []
for block in section.get("blocks", []):
lines.extend([item for item in block.get("items", []) if item])
return lines
def _shape_artifacts_for_profile(rules, games, output_profile):
"""
Apply lightweight profile shaping for generated mitigation artifacts.
"""
profile = (output_profile or "framework").strip().lower()
if profile in {"rules", "rule"}:
return rules[:12], games[:2]
if profile in {"games", "game"}:
return rules[:3], games[:12]
# framework: balanced
return rules[:10], games[:10]
def _default_artifacts_from_patterns(result_text, person, output_profile="framework"):
sections = _parse_result_sections(result_text)
rules = []
games = []
for section in sections:
title = (section.get("title") or "").lower()
lines = _section_lines(section)
if not lines:
continue
if "rule" in title or "next-step" in title or "mitigation" in title:
for idx, line in enumerate(lines, start=1):
rules.append({"title": f"Rule {idx}", "content": line})
elif "game" in title or "protocol" in title:
for idx, line in enumerate(lines, start=1):
games.append({"title": f"Game {idx}", "instructions": line})
if not rules:
rules = [
{
"title": "Safety Before Analysis",
"content": "Prioritize reducing emotional escalation before introducing analysis.",
},
{
"title": "State Matching",
"content": "If either side is flooded, pause first and resume with a time-bound return.",
},
]
if not games:
games = [
{
"title": "Two-Turn Pause",
"instructions": "Limit conflict responses to two short turns, then pause with a clear return time.",
},
{
"title": "Mirror Then Ask",
"instructions": "Mirror what you heard, validate emotion, then ask whether comfort or solutions are wanted.",
},
]
rules, games = _shape_artifacts_for_profile(rules, games, output_profile)
return {
"title": f"{person.name} Pattern Mitigation",
"objective": "Reduce repeated friction loops while preserving trust and clarity.",
"fundamental_items": [],
"rules": rules,
"games": games,
"corrections": [],
}
def _build_mitigation_artifacts(
ai_obj,
person,
source_text,
creation_mode,
inspiration,
fundamentals,
output_profile,
metric_context=None,
):
fallback = _default_artifacts_from_patterns(source_text, person, output_profile)
if not ai_obj:
if fundamentals:
fallback["fundamental_items"] = fundamentals
return fallback
prompt = [
{
"role": "system",
"content": (
"You design practical relationship mitigation protocols. "
"Return strict JSON only with keys: title, objective, "
"fundamental_items, rules, games, corrections. "
"Each rule item must have title and content. "
"Each game item must have title and instructions. "
"Each correction item must have title and clarification. "
"If mode is auto, choose strongest artifacts. If mode is guided, strongly follow inspiration. "
"Use provided metrics as risk context to tighten corrections. "
"Output profile controls emphasis: framework (balanced), "
"rules (rules-first), games (games-first)."
),
},
{
"role": "user",
"content": (
f"Person: {person.name}\n"
f"Mode: {creation_mode}\n"
f"Output profile: {output_profile}\n"
f"User inspiration: {inspiration or 'None'}\n"
f"Fundamental items (pre-agreed): {json.dumps(fundamentals)}\n\n"
"Metric context:\n"
f"{json.dumps(metric_context or {}, ensure_ascii=False, default=str)}\n\n"
f"Pattern analysis:\n{source_text}"
),
},
]
try:
raw = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj)
except Exception:
raw = ""
parsed = _extract_json_object(raw) or {}
title = (parsed.get("title") or "").strip() or fallback["title"]
objective = (parsed.get("objective") or "").strip() or fallback["objective"]
parsed_fundamentals = parsed.get("fundamental_items")
if isinstance(parsed_fundamentals, list):
merged_fundamentals = [
str(item).strip() for item in parsed_fundamentals if str(item).strip()
]
else:
merged_fundamentals = []
if fundamentals:
merged_fundamentals = list(dict.fromkeys(fundamentals + merged_fundamentals))
raw_rules = parsed.get("rules")
rules = []
if isinstance(raw_rules, list):
for item in raw_rules:
if not isinstance(item, dict):
continue
title_i = str(item.get("title") or "").strip()
content_i = str(item.get("content") or "").strip()
if title_i and content_i:
rules.append({"title": title_i, "content": content_i})
raw_games = parsed.get("games")
games = []
if isinstance(raw_games, list):
for item in raw_games:
if not isinstance(item, dict):
continue
title_i = str(item.get("title") or "").strip()
instructions_i = str(item.get("instructions") or "").strip()
if title_i and instructions_i:
games.append({"title": title_i, "instructions": instructions_i})
raw_corrections = parsed.get("corrections")
corrections = []
if isinstance(raw_corrections, list):
for item in raw_corrections:
if not isinstance(item, dict):
continue
title_i = _normalize_correction_title(
item.get("title") or "", fallback="Correction"
)
clarification_i = str(
item.get("clarification") or item.get("content") or ""
).strip()
source_phrase_i = str(item.get("source_phrase") or "").strip()
if title_i and clarification_i:
corrections.append(
{
"title": title_i[:255],
"clarification": clarification_i[:2000],
"source_phrase": source_phrase_i[:1000],
}
)
if not rules:
rules = fallback["rules"]
if not games:
games = fallback["games"]
rules, games = _shape_artifacts_for_profile(rules, games, output_profile)
return {
"title": title,
"objective": objective,
"fundamental_items": merged_fundamentals,
"rules": rules,
"games": games,
"corrections": _normalize_violation_items(corrections),
}
def _serialize_export_payload(plan, artifact_type, export_format):
rules = list(
plan.rules.order_by("created_at").values("title", "content", "enabled")
)
games = list(
plan.games.order_by("created_at").values("title", "instructions", "enabled")
)
corrections = list(
plan.corrections.order_by("created_at").values(
"title", "clarification", "enabled"
)
)
body = {
"protocol_version": "artifact-v1",
"plan_id": str(plan.id),
"plan_title": plan.title,
"objective": plan.objective,
"fundamental_items": plan.fundamental_items or [],
"rules": rules,
"games": games,
"corrections": corrections,
}
if artifact_type == "rules":
body = {
**body,
"games": [],
"corrections": [],
}
elif artifact_type == "games":
body = {
**body,
"rules": [],
"corrections": [],
}
elif artifact_type == "corrections":
body = {
**body,
"rules": [],
"games": [],
}
if export_format == "json":
payload = json.dumps(body, indent=2)
else:
lines = [
f"# {plan.title or 'Pattern Mitigation Artifact'}",
"",
"Protocol: artifact-v1",
f"Artifact Type: {artifact_type}",
"",
"## Objective",
plan.objective or "(none)",
"",
"## Fundamental Items",
]
fundamentals = plan.fundamental_items or []
if fundamentals:
lines.extend([f"- {item}" for item in fundamentals])
else:
lines.append("- (none)")
if artifact_type in {"rulebook", "rules"}:
lines.append("")
lines.append("## Rules")
if rules:
for idx, rule in enumerate(rules, start=1):
lines.append(f"{idx}. **{rule['title']}** - {rule['content']}")
else:
lines.append("- (none)")
if artifact_type in {"rulebook", "games"}:
lines.append("")
lines.append("## Games")
if games:
for idx, game in enumerate(games, start=1):
lines.append(f"{idx}. **{game['title']}** - {game['instructions']}")
else:
lines.append("- (none)")
if artifact_type in {"rulebook", "corrections"}:
lines.append("")
lines.append("## Corrections")
if corrections:
for idx, correction in enumerate(corrections, start=1):
lines.append(
f"{idx}. **{correction['title']}** - {correction['clarification']}"
)
else:
lines.append("- (none)")
payload = "\n".join(lines)
meta = {
"rule_count": len(rules),
"game_count": len(games),
"correction_count": len(corrections),
"fundamental_count": len(plan.fundamental_items or []),
}
return payload, meta
def _conversation_for_person(user, person):
primary_identifier = (
PersonIdentifier.objects.filter(user=user, person=person)
.order_by("service")
.first()
)
default_platform = primary_identifier.service if primary_identifier else "signal"
thread_id = str(person.id)
conversation = (
WorkspaceConversation.objects.filter(
user=user,
platform_thread_id=thread_id,
).first()
or WorkspaceConversation.objects.filter(
user=user,
participants=person,
)
.order_by("-created_at")
.first()
)
if conversation is None:
conversation = WorkspaceConversation.objects.create(
user=user,
platform_type=default_platform,
title=f"{person.name} Workspace",
platform_thread_id=thread_id,
)
else:
update_fields = []
if conversation.platform_thread_id != thread_id:
conversation.platform_thread_id = thread_id
update_fields.append("platform_thread_id")
expected_title = f"{person.name} Workspace"
if not conversation.title:
conversation.title = expected_title
update_fields.append("title")
if update_fields:
conversation.save(update_fields=update_fields)
conversation.participants.add(person)
_refresh_conversation_stability(conversation, user, person)
return conversation
def _score_from_lag(lag_ms, target_hours=4):
if lag_ms is None:
return 50.0
target_ms = max(1, target_hours) * 60 * 60 * 1000
return max(0.0, min(100.0, 100.0 / (1.0 + (lag_ms / target_ms))))
def _median_or_none(values):
if not values:
return None
return float(statistics.median(values))
def _refresh_conversation_stability(conversation, user, person):
"""
Populate stability/commitment fields from cross-platform history.
Uses Person -> PersonIdentifier -> ChatSession -> Message so all linked
services contribute to one workspace conversation.
"""
now_ts = dj_timezone.now()
identifiers = list(
PersonIdentifier.objects.filter(
user=user,
person=person,
)
)
identifier_values = {
str(row.identifier or "").strip() for row in identifiers if row.identifier
}
if not identifiers:
conversation.stability_state = WorkspaceConversation.StabilityState.CALIBRATING
conversation.stability_score = None
conversation.stability_confidence = 0.0
conversation.stability_sample_messages = 0
conversation.stability_sample_days = 0
conversation.commitment_inbound_score = None
conversation.commitment_outbound_score = None
conversation.commitment_confidence = 0.0
conversation.stability_last_computed_at = now_ts
conversation.commitment_last_computed_at = now_ts
conversation.save(
update_fields=[
"stability_state",
"stability_score",
"stability_confidence",
"stability_sample_messages",
"stability_sample_days",
"commitment_inbound_score",
"commitment_outbound_score",
"commitment_confidence",
"stability_last_computed_at",
"commitment_last_computed_at",
]
)
_store_metric_snapshot(
conversation,
{
"source_event_ts": conversation.last_event_ts,
"stability_state": conversation.stability_state,
"stability_score": None,
"stability_confidence": 0.0,
"stability_sample_messages": 0,
"stability_sample_days": 0,
"commitment_inbound_score": None,
"commitment_outbound_score": None,
"commitment_confidence": 0.0,
"inbound_messages": 0,
"outbound_messages": 0,
"reciprocity_score": None,
"continuity_score": None,
"response_score": None,
"volatility_score": None,
"inbound_response_score": None,
"outbound_response_score": None,
"balance_inbound_score": None,
"balance_outbound_score": None,
},
)
return
latest_ts = (
Message.objects.filter(
user=user,
session__identifier__in=identifiers,
)
.order_by("-ts")
.values_list("ts", flat=True)
.first()
)
if (
conversation.stability_last_computed_at
and latest_ts is not None
and conversation.last_event_ts == latest_ts
and (now_ts - conversation.stability_last_computed_at).total_seconds() < 120
):
return
rows = list(
Message.objects.filter(
user=user,
session__identifier__in=identifiers,
)
.order_by("ts")
.values("ts", "sender_uuid", "session__identifier__service")
)
if not rows:
conversation.stability_state = WorkspaceConversation.StabilityState.CALIBRATING
conversation.stability_score = None
conversation.stability_confidence = 0.0
conversation.stability_sample_messages = 0
conversation.stability_sample_days = 0
conversation.last_event_ts = None
conversation.commitment_inbound_score = None
conversation.commitment_outbound_score = None
conversation.commitment_confidence = 0.0
conversation.stability_last_computed_at = now_ts
conversation.commitment_last_computed_at = now_ts
conversation.save(
update_fields=[
"stability_state",
"stability_score",
"stability_confidence",
"stability_sample_messages",
"stability_sample_days",
"last_event_ts",
"commitment_inbound_score",
"commitment_outbound_score",
"commitment_confidence",
"stability_last_computed_at",
"commitment_last_computed_at",
]
)
_store_metric_snapshot(
conversation,
{
"source_event_ts": conversation.last_event_ts,
"stability_state": conversation.stability_state,
"stability_score": None,
"stability_confidence": 0.0,
"stability_sample_messages": 0,
"stability_sample_days": 0,
"commitment_inbound_score": None,
"commitment_outbound_score": None,
"commitment_confidence": 0.0,
"inbound_messages": 0,
"outbound_messages": 0,
"reciprocity_score": None,
"continuity_score": None,
"response_score": None,
"volatility_score": None,
"inbound_response_score": None,
"outbound_response_score": None,
"balance_inbound_score": None,
"balance_outbound_score": None,
},
)
return
inbound_count = 0
outbound_count = 0
daily_counts = {}
inbound_response_lags = []
outbound_response_lags = []
pending_in_ts = None
pending_out_ts = None
first_ts = rows[0]["ts"]
last_ts = rows[-1]["ts"]
latest_service = (
rows[-1].get("session__identifier__service") or conversation.platform_type
)
for row in rows:
ts = int(row["ts"] or 0)
sender = str(row.get("sender_uuid") or "").strip()
is_inbound = sender in identifier_values
direction = "in" if is_inbound else "out"
day_key = datetime.fromtimestamp(ts / 1000, tz=timezone.utc).date().isoformat()
daily_counts[day_key] = daily_counts.get(day_key, 0) + 1
if direction == "in":
inbound_count += 1
if pending_out_ts is not None and ts >= pending_out_ts:
inbound_response_lags.append(ts - pending_out_ts)
pending_out_ts = None
pending_in_ts = ts
else:
outbound_count += 1
if pending_in_ts is not None and ts >= pending_in_ts:
outbound_response_lags.append(ts - pending_in_ts)
pending_in_ts = None
pending_out_ts = ts
message_count = len(rows)
span_days = max(1, int(((last_ts - first_ts) / (24 * 60 * 60 * 1000)) + 1))
sample_days = len(daily_counts)
total_messages = max(1, inbound_count + outbound_count)
reciprocity_score = 100.0 * (
1.0 - abs(inbound_count - outbound_count) / total_messages
)
continuity_score = 100.0 * min(1.0, sample_days / max(1, span_days))
out_resp_score = _score_from_lag(_median_or_none(outbound_response_lags))
in_resp_score = _score_from_lag(_median_or_none(inbound_response_lags))
response_score = (out_resp_score + in_resp_score) / 2.0
daily_values = list(daily_counts.values())
if len(daily_values) > 1:
mean_daily = statistics.mean(daily_values)
stdev_daily = statistics.pstdev(daily_values)
cv = (stdev_daily / mean_daily) if mean_daily else 1.0
volatility_score = max(0.0, 100.0 * (1.0 - min(cv, 1.5) / 1.5))
else:
volatility_score = 60.0
stability_score = (
(0.35 * reciprocity_score)
+ (0.25 * continuity_score)
+ (0.20 * response_score)
+ (0.20 * volatility_score)
)
balance_out = 100.0 * min(1.0, outbound_count / max(1, inbound_count))
balance_in = 100.0 * min(1.0, inbound_count / max(1, outbound_count))
commitment_out = (0.60 * out_resp_score) + (0.40 * balance_out)
commitment_in = (0.60 * in_resp_score) + (0.40 * balance_in)
msg_conf = min(1.0, message_count / 200.0)
day_conf = min(1.0, sample_days / 30.0)
pair_conf = min(
1.0, (len(inbound_response_lags) + len(outbound_response_lags)) / 40.0
)
confidence = (0.50 * msg_conf) + (0.30 * day_conf) + (0.20 * pair_conf)
if message_count < 20 or sample_days < 3 or confidence < 0.25:
stability_state = WorkspaceConversation.StabilityState.CALIBRATING
stability_score_value = None
commitment_in_value = None
commitment_out_value = None
else:
stability_score_value = round(stability_score, 2)
commitment_in_value = round(commitment_in, 2)
commitment_out_value = round(commitment_out, 2)
if stability_score_value >= 70:
stability_state = WorkspaceConversation.StabilityState.STABLE
elif stability_score_value >= 50:
stability_state = WorkspaceConversation.StabilityState.WATCH
else:
stability_state = WorkspaceConversation.StabilityState.FRAGILE
feedback_state = "balanced"
if outbound_count > (inbound_count * 1.5):
feedback_state = "withdrawing"
elif inbound_count > (outbound_count * 1.5):
feedback_state = "overextending"
feedback = dict(conversation.participant_feedback or {})
feedback[str(person.id)] = {
"state": feedback_state,
"inbound_messages": inbound_count,
"outbound_messages": outbound_count,
"sample_messages": message_count,
"sample_days": sample_days,
"updated_at": now_ts.isoformat(),
}
conversation.platform_type = latest_service or conversation.platform_type
conversation.last_event_ts = last_ts
conversation.stability_state = stability_state
conversation.stability_score = stability_score_value
conversation.stability_confidence = round(confidence, 3)
conversation.stability_sample_messages = message_count
conversation.stability_sample_days = sample_days
conversation.stability_last_computed_at = now_ts
conversation.commitment_inbound_score = commitment_in_value
conversation.commitment_outbound_score = commitment_out_value
conversation.commitment_confidence = round(confidence, 3)
conversation.commitment_last_computed_at = now_ts
conversation.participant_feedback = feedback
conversation.save(
update_fields=[
"platform_type",
"last_event_ts",
"stability_state",
"stability_score",
"stability_confidence",
"stability_sample_messages",
"stability_sample_days",
"stability_last_computed_at",
"commitment_inbound_score",
"commitment_outbound_score",
"commitment_confidence",
"commitment_last_computed_at",
"participant_feedback",
]
)
_store_metric_snapshot(
conversation,
{
"source_event_ts": last_ts,
"stability_state": stability_state,
"stability_score": _to_float(stability_score_value),
"stability_confidence": round(confidence, 3),
"stability_sample_messages": message_count,
"stability_sample_days": sample_days,
"commitment_inbound_score": _to_float(commitment_in_value),
"commitment_outbound_score": _to_float(commitment_out_value),
"commitment_confidence": round(confidence, 3),
"inbound_messages": inbound_count,
"outbound_messages": outbound_count,
"reciprocity_score": round(reciprocity_score, 3),
"continuity_score": round(continuity_score, 3),
"response_score": round(response_score, 3),
"volatility_score": round(volatility_score, 3),
"inbound_response_score": round(in_resp_score, 3),
"outbound_response_score": round(out_resp_score, 3),
"balance_inbound_score": round(balance_in, 3),
"balance_outbound_score": round(balance_out, 3),
},
)
def _parse_fundamentals(raw_text):
lines = []
for line in (raw_text or "").splitlines():
cleaned = line.strip()
if cleaned:
lines.append(cleaned)
return lines
def _engage_source_options(plan):
options = []
for rule in plan.rules.order_by("created_at"):
options.append(
{
"value": f"rule:{rule.id}",
"label": f"Rule: {rule.title}",
}
)
for game in plan.games.order_by("created_at"):
options.append(
{
"value": f"game:{game.id}",
"label": f"Game: {game.title}",
}
)
for correction in plan.corrections.order_by("created_at"):
options.append(
{
"value": f"correction:{correction.id}",
"label": f"Correction: {correction.title}",
}
)
return options
def _normalize_correction_title(value, fallback="Correction"):
cleaned = re.sub(r"\s+", " ", str(value or "").strip())
cleaned = cleaned.strip("\"'` ")
if not cleaned:
return fallback
# Capitalize each lexical token for consistent correction naming.
words = []
for token in cleaned.split(" "):
if not token:
continue
if token.isupper() and len(token) <= 4:
words.append(token)
else:
words.append(token[:1].upper() + token[1:])
return " ".join(words)
def _build_engage_payload(
source_obj,
source_kind,
share_target,
framing,
context_note,
owner_name,
recipient_name,
):
share_key = (share_target or "self").strip().lower()
framing_key = (framing or "dont_change").strip().lower()
if share_key not in {"self", "other", "both"}:
share_key = "self"
if framing_key != "shared":
framing_key = "dont_change"
artifact_type_label = {
"rule": "Rule",
"game": "Game",
"correction": "Correction",
}.get(source_kind, (source_kind or "Artifact").title())
artifact_name_raw = (
getattr(source_obj, "title", None) or f"{artifact_type_label} Item"
).strip()
artifact_name = (
_normalize_correction_title(
artifact_name_raw, fallback=f"{artifact_type_label} Item"
)
if source_kind == "correction"
else artifact_name_raw
)
if source_kind == "rule":
insight_text = source_obj.content.strip() or source_obj.title.strip()
elif source_kind == "game":
insight_text = source_obj.instructions.strip() or source_obj.title.strip()
else:
insight_text = source_obj.clarification.strip() or source_obj.title.strip()
owner_label = (owner_name or "You").strip()
recipient_label = (recipient_name or "Other").strip()
def _clean_text(value):
cleaned = re.sub(r"\s+", " ", (value or "").strip())
cleaned = cleaned.strip("\"' ")
cleaned = re.sub(r"^\s*#{1,6}\s*", "", cleaned)
cleaned = re.sub(r"\*\*(.*?)\*\*", r"\1", cleaned)
cleaned = re.sub(r"__(.*?)__", r"\1", cleaned)
cleaned = re.sub(r"`(.*?)`", r"\1", cleaned)
cleaned = re.sub(r"^[\-*•]\s*", "", cleaned)
cleaned = re.sub(r"^\d+[.)]\s*", "", cleaned)
return cleaned.strip()
def _split_sentences(value):
parts = []
for line in (value or "").splitlines():
line = _clean_text(line)
if not line:
continue
for piece in re.split(r"(?<=[.!?;])\s+", line.strip()):
piece = piece.strip()
if piece:
parts.append(piece)
return parts
def _expand_shorthand_tokens(value):
text = value or ""
alias_map = {}
for name in [owner_label, recipient_label]:
lowered = name.lower()
if lowered in {"you", "we", "us", "our", "i", "me", "other"}:
continue
initial = lowered[:1]
if initial and initial not in alias_map:
alias_map[initial] = name
# Expand any known initial shorthand before modal verbs.
for initial, name in alias_map.items():
text = re.sub(
rf"(?i)\b{re.escape(initial)}\s+(?=(?:should|will|must|can|need to|needs to|have to|has to|am|are|is|was|were)\b)",
f"{name} ",
text,
)
def replace_quoted_marker(match):
marker = match.group(1) or ""
lower_marker = marker.strip().lower()
replacement = marker
if lower_marker in alias_map:
replacement = alias_map[lower_marker]
elif lower_marker in {"you", "we", "i"}:
replacement = "both parties"
return f"('{replacement}')"
text = re.sub(
r"\(\s*['\"]([A-Za-z]{1,8})['\"]\s*\)",
replace_quoted_marker,
text,
)
return text
def _fix_shared_grammar(value):
text = value
replacements = [
(r"(?i)\bwe needs to\b", "we need to"),
(r"(?i)\bwe has to\b", "we have to"),
(r"(?i)\bwe is\b", "we are"),
(r"(?i)\bwe was\b", "we were"),
(r"(?i)\bus needs to\b", "we need to"),
(r"(?i)\bus need to\b", "we need to"),
(r"(?i)\bus is\b", "we are"),
(r"(?i)\bus are\b", "we are"),
(r"(?i)\bwe does not\b", "we do not"),
(r"(?i)\bwe doesn't\b", "we don't"),
(r"(?i)\bwe says\b", "we say"),
(r"(?i)\bwe responds\b", "we respond"),
(r"(?i)\bwe follows\b", "we follow"),
]
for pattern, replacement in replacements:
text = re.sub(pattern, replacement, text)
return re.sub(r"\s+", " ", text).strip()
def _rewrite_shared_sentence(sentence):
text = _clean_text(sentence)
if not text:
return ""
punctuation = "."
if text[-1] in ".!?":
punctuation = text[-1]
text = text[:-1].strip()
text = _clean_text(text)
if not text:
return ""
# Shared-only edge-case logic.
text = _expand_shorthand_tokens(text)
shared_replacements = [
(r"\bi[']m\b", "we're"),
(r"\bi[']ve\b", "we've"),
(r"\bi[']ll\b", "we'll"),
(r"\bi[']d\b", "we'd"),
(r"\byou[']re\b", "we're"),
(r"\byou[']ve\b", "we've"),
(r"\byou[']ll\b", "we'll"),
(r"\byou[']d\b", "we'd"),
(r"\bmy\b", "our"),
(r"\bmine\b", "ours"),
(r"\bmyself\b", "ourselves"),
(r"\byour\b", "our"),
(r"\byours\b", "ours"),
(r"\byourself\b", "ourselves"),
(r"\byourselves\b", "ourselves"),
(r"\bi\b", "we"),
(r"\bme\b", "us"),
(r"\byou\b", "we"),
(r"\bhis\b", "our"),
(r"\bher\b", "our"),
(r"\btheir\b", "our"),
(r"\bhim\b", "us"),
(r"\bthem\b", "us"),
(r"\bhe\b", "we"),
(r"\bshe\b", "we"),
(r"\bthey\b", "we"),
]
for pattern, replacement in shared_replacements:
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
for name in [owner_label, recipient_label]:
name = str(name or "").strip()
if not name:
continue
if name.lower() in {"you", "we", "us", "our", "i", "me", "other"}:
continue
text = re.sub(
r"(?<!\w)" + re.escape(name) + r"'s(?!\w)",
"our",
text,
flags=re.IGNORECASE,
)
text = re.sub(
r"(?<!\w)" + re.escape(name) + r"(?!\w)",
"we",
text,
flags=re.IGNORECASE,
)
text = _fix_shared_grammar(text).rstrip(".!?").strip()
if not text:
return ""
if not re.match(
r"(?i)^we\s+(?:should|will|must|can|need to|have to|do not|don't|are|were|[a-z]+)\b",
text,
):
lowered = text[:1].lower() + text[1:] if len(text) > 1 else text.lower()
text = f"We should {lowered}"
text = text[:1].upper() + text[1:]
return f"{text}{punctuation}"
def _rewrite_shared_text(value):
sentences = _split_sentences(value)
if not sentences:
return ""
adapted = [_rewrite_shared_sentence(sentence) for sentence in sentences]
adapted = [part for part in adapted if part]
return " ".join(adapted).strip()
preview_lines = []
outbound_lines = []
def _format_artifact_message(lines):
lines = [
f"**{artifact_name}** ({artifact_type_label})",
"",
"Guidance:",
] + [line.strip() for line in (lines or []) if (line or "").strip()]
if lines[-1] == "Guidance:":
lines.append("No guidance text available.")
return "\n".join(lines).strip()
if framing_key == "shared":
shared_line = _rewrite_shared_text(insight_text)
preview_lines = [shared_line]
outbound_lines = [shared_line]
else:
unchanged = _clean_text(insight_text) or (insight_text or "").strip()
preview_lines = [unchanged]
outbound_lines = [unchanged]
preview = _format_artifact_message(preview_lines)
outbound = _format_artifact_message(outbound_lines)
# Context note is metadata for the operator and should not alter shared outbound text.
_ = (context_note or "").strip()
return {
"preview": preview,
"outbound": outbound,
"share_target": share_key,
"framing": framing_key,
}
def _get_or_create_auto_settings(user, conversation):
settings_obj, _ = PatternMitigationAutoSettings.objects.get_or_create(
user=user,
conversation=conversation,
)
return settings_obj
def _metric_guided_artifact_candidates(plan, metric_context):
signals = list((metric_context or {}).get("risk_signals") or [])
if not signals:
return []
artifacts = []
for rule in plan.rules.filter(enabled=True).order_by("created_at")[:10]:
artifacts.append(
{
"kind": "rule",
"title": str(rule.title or "").strip(),
"body": str(rule.content or "").strip(),
}
)
for game in plan.games.filter(enabled=True).order_by("created_at")[:10]:
artifacts.append(
{
"kind": "game",
"title": str(game.title or "").strip(),
"body": str(game.instructions or "").strip(),
}
)
if not artifacts:
for item in (plan.fundamental_items or [])[:10]:
text = str(item or "").strip()
if not text:
continue
artifacts.append(
{
"kind": "fundamental",
"title": text[:100],
"body": text,
}
)
if not artifacts:
return []
out = []
for idx, signal in enumerate(signals[:8]):
artifact = artifacts[idx % len(artifacts)]
kind_label = {
"rule": "Rule",
"game": "Game",
"fundamental": "Fundamental",
}.get(artifact["kind"], "Artifact")
title = _normalize_correction_title(
f"{signal.get('label') or 'Metric Signal'} Safeguard"
)
clarification = (
f"{str(signal.get('explanation') or '').strip()} "
f"Apply {kind_label.lower()} '{artifact['title']}' in the next exchange: "
f"{artifact['body']}"
).strip()
source_phrase = (
f"Metric signal: {signal.get('label') or 'Metric Signal'}; "
f"Artifact: {kind_label} '{artifact['title']}'"
)
out.append(
{
"title": title,
"source_phrase": source_phrase[:1000],
"clarification": clarification[:2000],
"severity": str(signal.get("severity") or "medium"),
}
)
return _normalize_violation_items(out)
def _detect_violation_candidates(plan, recent_rows):
candidates = []
for row in recent_rows:
text = (row.get("text") or "").strip()
if not text:
continue
upper_ratio = (
(
sum(1 for c in text if c.isupper())
/ max(1, sum(1 for c in text if c.isalpha()))
)
if any(c.isalpha() for c in text)
else 0
)
if upper_ratio > 0.6 and len(text) > 10:
candidates.append(
{
"title": "Escalated tone spike",
"source_phrase": text[:500],
"clarification": "Rephrase into one direct request and one feeling statement.",
"severity": "medium",
}
)
lowered = text.lower()
if "you always" in lowered or "you never" in lowered:
candidates.append(
{
"title": "Absolute framing",
"source_phrase": text[:500],
"clarification": "Replace absolutes with one concrete example and a bounded request.",
"severity": "medium",
}
)
return candidates
def _normalize_violation_items(raw_items):
normalized = []
seen = set()
for item in raw_items or []:
title = _normalize_correction_title(
item.get("title") or "", fallback="Correction"
)
phrase = str(item.get("source_phrase") or "").strip()
clarification = str(
item.get("clarification") or item.get("correction") or ""
).strip()
severity = str(item.get("severity") or "medium").strip().lower()
if severity not in {"low", "medium", "high"}:
severity = "medium"
if not title or not clarification:
continue
key = _correction_signature(title, clarification)
if key in seen:
continue
seen.add(key)
normalized.append(
{
"title": title[:255],
"source_phrase": phrase[:1000],
"clarification": clarification[:2000],
"severity": severity,
}
)
return normalized
def _normalize_correction_text(value):
cleaned = re.sub(r"\s+", " ", str(value or "").strip())
cleaned = cleaned.strip("\"'` ")
cleaned = cleaned.rstrip(" .;:")
return cleaned
def _correction_signature(title, clarification):
"""
Normalized key used to deduplicate corrections before persisting.
Source phrase is intentionally excluded so the same correction guidance
cannot be stored repeatedly with minor phrase variations.
"""
normalized_title = _normalize_correction_text(title).lower()
normalized_clarification = _normalize_correction_text(clarification).lower()
return (normalized_title, normalized_clarification)
def _existing_correction_signatures(plan, exclude_id=None):
query = plan.corrections.all()
if exclude_id is not None:
query = query.exclude(id=exclude_id)
signatures = set()
for row in query.values("title", "clarification"):
signatures.add(
_correction_signature(
row.get("title") or "",
row.get("clarification") or "",
)
)
return signatures
def _ai_detect_violations(user, plan, person, recent_rows, metric_context=None):
ai_obj = AI.objects.filter(user=user).first()
if ai_obj is None:
return {"violations": [], "artifact_corrections": []}
rules_payload = [
{"id": str(rule.id), "title": rule.title, "content": rule.content}
for rule in plan.rules.filter(enabled=True).order_by("created_at")[:30]
]
games_payload = [
{"id": str(game.id), "title": game.title, "instructions": game.instructions}
for game in plan.games.filter(enabled=True).order_by("created_at")[:30]
]
corrections_payload = [
{
"id": str(correction.id),
"title": correction.title,
"source_phrase": correction.source_phrase,
"clarification": correction.clarification,
}
for correction in plan.corrections.filter(enabled=True).order_by("created_at")[
:30
]
]
source_payload = {
"person": person.name,
"plan": {
"id": str(plan.id),
"title": plan.title,
"objective": plan.objective,
"fundamentals": plan.fundamental_items or [],
"rules": rules_payload,
"games": games_payload,
"corrections": corrections_payload,
},
"metrics": metric_context or {},
"recent_messages": recent_rows,
"output_schema": {
"violations": [
{
"title": "short string",
"source_phrase": "exact snippet from recent_messages",
"clarification": "correction-style guidance",
"severity": "low|medium|high",
}
],
"artifact_corrections": [
{
"title": "short string",
"source_phrase": "artifact reference + metric rationale",
"clarification": "proactive correction mapped to an artifact",
"severity": "low|medium|high",
}
],
},
}
prompt = [
{
"role": "system",
"content": (
"You detect violations of mitigation patterns in a conversation. "
"Use recent_messages for direct violations. "
"Use plan artifacts plus metrics for proactive artifact_corrections. "
"Return strict JSON only. No markdown. No prose wrapper. "
"Use only schema keys requested."
),
},
{
"role": "user",
"content": json.dumps(source_payload, ensure_ascii=False, default=str),
},
]
try:
raw = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj)
except Exception:
return {"violations": [], "artifact_corrections": []}
parsed = _extract_json_object(raw) or {}
return {
"violations": _normalize_violation_items(parsed.get("violations") or []),
"artifact_corrections": _normalize_violation_items(
parsed.get("artifact_corrections")
or parsed.get("artifact_based_corrections")
or []
),
}
def _maybe_send_auto_notification(user, auto_settings, title, body):
topic_override = (auto_settings.ntfy_topic_override or "").strip()
if topic_override:
raw_sendmsg(
body,
title=title,
url=(auto_settings.ntfy_url_override or None),
topic=topic_override,
)
return
user.sendmsg(body, title=title)
def _run_auto_analysis_for_plan(
user, person, conversation, plan, auto_settings, trigger="manual"
):
if not auto_settings.enabled:
return {
"ran": False,
"summary": "Automation is disabled.",
"violations": [],
"created_corrections": 0,
"notified": False,
}
if trigger == "auto" and not auto_settings.auto_pattern_recognition:
return {
"ran": False,
"summary": "Automatic pattern recognition is disabled.",
"violations": [],
"created_corrections": 0,
"notified": False,
}
now = dj_timezone.now()
if (
trigger == "auto"
and auto_settings.last_run_at
and auto_settings.check_cooldown_seconds
):
elapsed = (now - auto_settings.last_run_at).total_seconds()
if elapsed < auto_settings.check_cooldown_seconds:
return {
"ran": False,
"summary": "Skipped: cooldown active.",
"violations": [],
"created_corrections": 0,
"notified": False,
}
limit = max(10, min(int(auto_settings.sample_message_window or 40), 200))
_refresh_conversation_stability(conversation, user, person)
metric_context = _metric_pattern_context(conversation)
sessions = ChatSession.objects.filter(user=user, identifier__person=person)
messages = (
Message.objects.filter(user=user, session__in=sessions)
.order_by("-ts")
.values("id", "ts", "sender_uuid", "text")[:limit]
)
recent_rows = []
for row in reversed(list(messages)):
recent_rows.append(
{
"id": str(row["id"]),
"ts": row["ts"],
"sender_uuid": row["sender_uuid"] or "",
"text": row["text"] or "",
}
)
if not recent_rows:
auto_settings.last_result_summary = (
"No recent messages available for automation."
)
auto_settings.last_run_at = now
auto_settings.save(
update_fields=["last_result_summary", "last_run_at", "updated_at"]
)
return {
"ran": True,
"summary": auto_settings.last_result_summary,
"violations": [],
"created_corrections": 0,
"notified": False,
}
latest_message_ts = recent_rows[-1]["ts"]
if (
trigger == "auto"
and auto_settings.last_checked_event_ts
and latest_message_ts <= auto_settings.last_checked_event_ts
):
return {
"ran": False,
"summary": "Skipped: no new messages since last check.",
"violations": [],
"created_corrections": 0,
"notified": False,
}
ai_detection = _ai_detect_violations(
user,
plan,
person,
recent_rows,
metric_context=metric_context,
)
ai_candidates = list(ai_detection.get("violations") or [])
artifact_candidates_ai = list(ai_detection.get("artifact_corrections") or [])
heuristic_candidates = _detect_violation_candidates(plan, recent_rows)
artifact_candidates_metric = _metric_guided_artifact_candidates(
plan, metric_context
)
violations = _normalize_violation_items(
ai_candidates
+ heuristic_candidates
+ artifact_candidates_ai
+ artifact_candidates_metric
)
created_corrections = 0
if auto_settings.auto_create_corrections and violations:
existing_signatures = _existing_correction_signatures(plan)
for item in violations[:8]:
signature = _correction_signature(item["title"], item["clarification"])
if signature in existing_signatures:
continue
PatternMitigationCorrection.objects.create(
user=user,
plan=plan,
title=item["title"],
source_phrase=item["source_phrase"],
clarification=item["clarification"],
perspective="second_person",
share_target="both",
language_style="adapted",
enabled=True,
)
existing_signatures.add(signature)
created_corrections += 1
notified = False
if auto_settings.auto_notify_enabled and violations:
title = f"[GIA] Auto pattern alerts for {person.name}"
preview = "\n".join(
[f"- {item['title']}: {item['clarification']}" for item in violations[:3]]
)
body = (
f"Detected {len(violations)} potential mitigation violations.\n"
f"Created corrections: {created_corrections}\n\n"
f"{preview}"
)
_maybe_send_auto_notification(user, auto_settings, title, body)
notified = True
summary = (
f"Auto analysis ran on {len(recent_rows)} messages. "
f"Detected {len(violations)} candidates. "
f"Created {created_corrections} corrections."
)
auto_settings.last_result_summary = summary
auto_settings.last_run_at = now
auto_settings.last_checked_event_ts = latest_message_ts
auto_settings.save(
update_fields=[
"last_result_summary",
"last_run_at",
"last_checked_event_ts",
"updated_at",
]
)
return {
"ran": True,
"summary": summary,
"violations": violations,
"created_corrections": created_corrections,
"notified": notified,
}
def _create_baseline_mitigation_plan(user, person, conversation, source_text=""):
artifacts = _default_artifacts_from_patterns(
source_text or f"{person.name} baseline mitigation",
person,
output_profile="framework",
)
plan = PatternMitigationPlan.objects.create(
user=user,
conversation=conversation,
source_ai_result=None,
title=artifacts.get("title") or f"{person.name} Pattern Mitigation",
objective=artifacts.get("objective") or "",
fundamental_items=artifacts.get("fundamental_items") or [],
creation_mode="auto",
status="draft",
)
for rule in artifacts.get("rules", []):
PatternMitigationRule.objects.create(
user=user,
plan=plan,
title=str(rule.get("title") or "Rule").strip()[:255],
content=str(rule.get("content") or "").strip(),
)
for game in artifacts.get("games", []):
PatternMitigationGame.objects.create(
user=user,
plan=plan,
title=str(game.get("title") or "Game").strip()[:255],
instructions=str(game.get("instructions") or "").strip(),
)
PatternMitigationMessage.objects.create(
user=user,
plan=plan,
role="system",
text="Baseline plan auto-created by automation settings.",
)
return plan
def _mitigation_panel_context(
person,
plan,
notice_message="",
notice_level="info",
export_record=None,
engage_preview="",
engage_preview_flash=False,
engage_form=None,
active_tab="plan_board",
auto_settings=None,
):
engage_form = engage_form or {}
engage_options = _engage_source_options(plan)
selected_ref = engage_form.get("source_ref") or (
engage_options[0]["value"] if engage_options else ""
)
send_target_bundle = _send_target_options_for_person(plan.user, person)
selected_target_id = str(engage_form.get("target_identifier_id") or "").strip()
if selected_target_id and not any(
item["id"] == selected_target_id for item in send_target_bundle["options"]
):
selected_target_id = ""
if not selected_target_id:
selected_target_id = send_target_bundle["selected_id"]
auto_settings = auto_settings or _get_or_create_auto_settings(
plan.user, plan.conversation
)
return {
"person": person,
"plan": plan,
"plan_status_choices": PatternMitigationPlan.STATUS_CHOICES,
"plan_creation_mode_choices": PatternMitigationPlan.CREATION_MODE_CHOICES,
"rules": plan.rules.order_by("created_at"),
"games": plan.games.order_by("created_at"),
"corrections": plan.corrections.order_by("created_at"),
"fundamentals_text": "\n".join(plan.fundamental_items or []),
"mitigation_messages": plan.messages.order_by("created_at")[:40],
"latest_export": export_record,
"artifact_type_choices": PatternArtifactExport.ARTIFACT_TYPE_CHOICES,
"artifact_format_choices": PatternArtifactExport.FORMAT_CHOICES,
"notice_message": notice_message,
"notice_level": notice_level,
"engage_preview": engage_preview,
"engage_preview_flash": engage_preview_flash,
"engage_options": engage_options,
"engage_form": {
"source_ref": selected_ref,
"share_target": engage_form.get("share_target") or "self",
"framing": engage_form.get("framing") or "dont_change",
"context_note": engage_form.get("context_note") or "",
"target_identifier_id": selected_target_id,
},
"send_target_bundle": send_target_bundle,
"send_state": _get_send_state(plan.user, person),
"active_tab": _sanitize_active_tab(active_tab),
"auto_settings": auto_settings,
}
def _latest_plan_bundle(conversation):
latest_plan = conversation.mitigation_plans.order_by("-updated_at").first()
latest_plan_rules = latest_plan.rules.order_by("created_at") if latest_plan else []
latest_plan_games = latest_plan.games.order_by("created_at") if latest_plan else []
latest_plan_corrections = (
latest_plan.corrections.order_by("created_at") if latest_plan else []
)
latest_plan_messages = (
latest_plan.messages.order_by("created_at")[:40] if latest_plan else []
)
latest_plan_export = (
latest_plan.exports.order_by("-created_at").first() if latest_plan else None
)
latest_auto_settings = _get_or_create_auto_settings(conversation.user, conversation)
return {
"latest_plan": latest_plan,
"latest_plan_rules": latest_plan_rules,
"latest_plan_games": latest_plan_games,
"latest_plan_corrections": latest_plan_corrections,
"latest_plan_messages": latest_plan_messages,
"latest_plan_export": latest_plan_export,
"latest_auto_settings": latest_auto_settings,
}
def _render_send_status(request, ok, message, level):
return render(
request,
"partials/ai-workspace-send-status.html",
{"ok": ok, "message": message, "level": level},
)
def _render_mitigation_status(request, person, message, level="info"):
return render(
request,
"partials/ai-workspace-mitigation-status.html",
{"person": person, "level": level, "message": message},
)
def _render_mitigation_panel(request, person, plan, **kwargs):
return render(
request,
"partials/ai-workspace-mitigation-panel.html",
_mitigation_panel_context(person=person, plan=plan, **kwargs),
)
def _workspace_nav_urls(person):
return {
"graphs_url": reverse(
"ai_workspace_insight_graphs",
kwargs={"type": "page", "person_id": person.id},
),
"information_url": reverse(
"ai_workspace_information",
kwargs={"type": "page", "person_id": person.id},
),
"help_url": reverse(
"ai_workspace_insight_help",
kwargs={"type": "page", "person_id": person.id},
),
"workspace_url": f"{reverse('ai_workspace')}?person={person.id}",
}
def _person_plan_or_404(request, person_id, plan_id):
person = get_object_or_404(Person, pk=person_id, user=request.user)
plan = get_object_or_404(PatternMitigationPlan, id=plan_id, user=request.user)
return person, plan
class AIWorkspace(LoginRequiredMixin, View):
template_name = "pages/ai-workspace.html"
def get(self, request):
selected_person_id = ""
raw_person = str(request.GET.get("person") or "").strip()
if raw_person:
person = Person.objects.filter(id=raw_person, user=request.user).first()
if person:
selected_person_id = str(person.id)
return render(
request,
self.template_name,
{"selected_person_id": selected_person_id},
)
class AIWorkspaceContactsWidget(LoginRequiredMixin, View):
allowed_types = {"widget"}
def _contact_rows(self, user):
rows = []
people = Person.objects.filter(user=user).order_by("name")
for person in people:
sessions = ChatSession.objects.filter(user=user, identifier__person=person)
message_qs = Message.objects.filter(user=user, session__in=sessions)
last_message = message_qs.order_by("-ts").first()
rows.append(
{
"person": person,
"message_count": message_qs.count(),
"last_text": (last_message.text or "")[:120]
if last_message
else "",
"last_ts": last_message.ts if last_message else None,
"last_ts_label": _format_unix_ms(last_message.ts)
if last_message
else "",
}
)
rows.sort(key=lambda row: row["last_ts"] or 0, reverse=True)
return rows
def get(self, request, type):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
context = {
"title": "AI Workspace",
"unique": "ai-workspace-contacts",
"window_content": "partials/ai-workspace-widget.html",
"widget_options": 'gs-w="4" gs-h="14" gs-x="0" gs-y="0" gs-min-w="3"',
"contact_rows": self._contact_rows(request.user),
"window_form": AIWorkspaceWindowForm(request.GET or None),
}
return render(request, "mixins/wm/widget.html", context)
class AIWorkspacePersonWidget(LoginRequiredMixin, View):
allowed_types = {"widget"}
def get(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
conversation = _conversation_for_person(request.user, person)
try:
limit = int(request.GET.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(5, min(limit, 200))
context = {
"title": f"{person.name} AI",
"unique": f"ai-person-{person.id}",
"window_content": "partials/ai-workspace-person-widget.html",
"widget_options": 'gs-w="8" gs-h="11" gs-x="4" gs-y="0" gs-min-w="4"',
"person": person,
"workspace_conversation": conversation,
"participant_feedback_display": _participant_feedback_display(
conversation, person
),
"limit": limit,
"ai_operations": [
("artifacts", "Plan"),
("summarise", "Summary"),
("draft_reply", "Draft"),
("extract_patterns", "Patterns"),
],
"send_state": _get_send_state(request.user, person),
"compose_page_url": _compose_page_url_for_person(request.user, person),
"compose_page_base_url": reverse("compose_page"),
"compose_widget_url": _compose_widget_url_for_person(
request.user,
person,
limit=limit,
),
"compose_widget_base_url": reverse("compose_widget"),
"manual_icon_class": "fa-solid fa-paper-plane",
"send_target_bundle": _send_target_options_for_person(request.user, person),
}
return render(request, "mixins/wm/widget.html", context)
class AIWorkspacePersonTimelineWidget(LoginRequiredMixin, View):
allowed_types = {"widget"}
def get(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
try:
limit = int(request.GET.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(5, min(limit, 200))
context = {
"title": f"{person.name} Timeline",
"unique": f"ai-timeline-{person.id}",
"window_content": "partials/ai-workspace-person-timeline-widget.html",
"widget_options": 'gs-w="8" gs-h="10" gs-x="4" gs-y="11" gs-min-w="4"',
"person": person,
"limit": limit,
"message_rows": _message_rows_for_person(request.user, person, limit),
}
return render(request, "mixins/wm/widget.html", context)
class AIWorkspaceInsightDetail(LoginRequiredMixin, View):
allowed_types = {"page", "widget"}
def get(self, request, type, person_id, metric):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
spec = INSIGHT_METRICS.get(metric)
if spec is None:
return HttpResponseBadRequest("Unknown insight metric")
person = get_object_or_404(Person, pk=person_id, user=request.user)
conversation = _conversation_for_person(request.user, person)
latest_snapshot = conversation.metric_snapshots.first()
value = _format_metric_value(conversation, metric, latest_snapshot)
group = INSIGHT_GROUPS[spec["group"]]
graph_applicable = _metric_supports_history(metric, spec)
points = []
if graph_applicable:
points = _history_points(conversation, spec["history_field"])
context = {
"person": person,
"workspace_conversation": conversation,
"metric_slug": metric,
"metric": spec,
"metric_value": value,
"metric_psychology_hint": _metric_psychological_read(metric, conversation),
"metric_group": group,
"graph_points": points,
"graph_applicable": graph_applicable,
**_workspace_nav_urls(person),
}
return render(request, "pages/ai-workspace-insight-detail.html", context)
class AIWorkspaceInsightGraphs(LoginRequiredMixin, View):
allowed_types = {"page", "widget"}
def get(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
conversation = _conversation_for_person(request.user, person)
graph_cards = _all_graph_payload(conversation)
context = {
"person": person,
"workspace_conversation": conversation,
"graph_cards": graph_cards,
**_workspace_nav_urls(person),
}
return render(request, "pages/ai-workspace-insight-graphs.html", context)
class AIWorkspaceInformation(LoginRequiredMixin, View):
allowed_types = {"page", "widget"}
def get(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
conversation = _conversation_for_person(request.user, person)
latest_snapshot = conversation.metric_snapshots.first()
directionality = _commitment_directionality_payload(conversation)
commitment_graph_cards = [
card
for card in _all_graph_payload(conversation)
if card["group"] == "commitment"
]
graph_refs = []
for ref in directionality.get("graph_refs", []):
slug = ref.get("slug")
if not slug:
continue
graph_refs.append(
{
**ref,
"slug": slug,
"value": _format_metric_value(conversation, slug, latest_snapshot),
}
)
directionality["graph_refs"] = graph_refs
context = {
"person": person,
"workspace_conversation": conversation,
"directionality": directionality,
"overview_rows": _information_overview_rows(conversation),
"commitment_graph_cards": commitment_graph_cards,
**_workspace_nav_urls(person),
}
return render(request, "pages/ai-workspace-information.html", context)
class AIWorkspaceInsightHelp(LoginRequiredMixin, View):
allowed_types = {"page", "widget"}
def get(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
conversation = _conversation_for_person(request.user, person)
latest_snapshot = conversation.metric_snapshots.first()
metrics = []
for slug, spec in INSIGHT_METRICS.items():
metrics.append(
{
"slug": slug,
"title": spec["title"],
"group": spec["group"],
"group_title": INSIGHT_GROUPS[spec["group"]]["title"],
"calculation": spec["calculation"],
"psychology": spec["psychology"],
"value": _format_metric_value(
conversation,
slug,
latest_snapshot,
),
}
)
context = {
"person": person,
"workspace_conversation": conversation,
"groups": INSIGHT_GROUPS,
"metrics": metrics,
**_workspace_nav_urls(person),
}
return render(request, "pages/ai-workspace-insight-help.html", context)
class AIWorkspaceRunOperation(LoginRequiredMixin, View):
allowed_types = {"widget"}
allowed_operations = {"artifacts", "summarise", "draft_reply", "extract_patterns"}
def _empty_plan_bundle(self):
return {
"latest_plan": None,
"latest_plan_rules": [],
"latest_plan_games": [],
"latest_plan_corrections": [],
"latest_plan_messages": [],
"latest_plan_export": None,
}
def _render_result(
self,
request,
operation,
person,
send_state,
send_target_bundle,
**extra,
):
return render(
request,
"partials/ai-workspace-ai-result.html",
{
"operation_label": OPERATION_LABELS.get(
operation, operation.replace("_", " ").title()
),
"operation": operation,
"person": person,
"send_state": send_state,
"send_target_bundle": send_target_bundle,
**extra,
},
)
def _ensure_message_events(self, user, conversation, person_identifiers, messages):
"""
Materialize workspace MessageEvent rows from legacy Message rows and
return ordered event IDs for the selected window.
"""
event_ids = []
for message in messages:
legacy_id = str(message.id)
event = MessageEvent.objects.filter(
user=user,
conversation=conversation,
raw_payload_ref__legacy_message_id=legacy_id,
).first()
if event is None:
event = MessageEvent.objects.create(
user=user,
conversation=conversation,
source_system=message.session.identifier.service or "signal",
ts=message.ts,
direction=_infer_direction(message, person_identifiers),
sender_uuid=message.sender_uuid or "",
text=message.text or "",
attachments=[],
raw_payload_ref={"legacy_message_id": legacy_id},
)
else:
# Keep event fields in sync if upstream message rows changed.
update_fields = []
new_direction = _infer_direction(message, person_identifiers)
if event.ts != message.ts:
event.ts = message.ts
update_fields.append("ts")
new_source_system = message.session.identifier.service or "signal"
if event.source_system != new_source_system:
event.source_system = new_source_system
update_fields.append("source_system")
if event.direction != new_direction:
event.direction = new_direction
update_fields.append("direction")
if event.sender_uuid != (message.sender_uuid or ""):
event.sender_uuid = message.sender_uuid or ""
update_fields.append("sender_uuid")
if event.text != (message.text or ""):
event.text = message.text or ""
update_fields.append("text")
if update_fields:
event.save(update_fields=update_fields)
event_ids.append(str(event.id))
return event_ids
def _citation_rows(self, user, citation_ids):
ids = [str(item) for item in (citation_ids or []) if item]
if not ids:
return []
events = MessageEvent.objects.filter(user=user, id__in=ids)
by_id = {str(event.id): event for event in events}
rows = []
for cid in ids:
event = by_id.get(cid)
if not event:
rows.append(
{
"id": cid,
"ts_label": "",
"source_system": "",
"direction": "",
"text": "",
}
)
continue
rows.append(
{
"id": cid,
"ts_label": _format_unix_ms(event.ts),
"source_system": event.source_system,
"direction": event.direction,
"text": (event.text or "").strip(),
}
)
return rows
def _build_prompt(self, operation, owner_name, person, transcript, user_notes):
notes = (user_notes or "").strip()
if operation == "draft_reply":
instruction = (
"Generate 3 concise reply options in different tones: soft, neutral, firm. "
"Return plain text with clear section labels."
)
elif operation == "extract_patterns":
instruction = (
"Extract recurring interaction patterns, friction loops, and practical next-step rules. "
"Keep it actionable and concise."
)
else:
instruction = "Summarize this conversation window with key points, emotional state shifts, and open loops."
prompt = [
{
"role": "system",
"content": (
f"{instruction} "
"Use participant names directly. "
"Do not refer to either side as 'the user'."
),
},
{
"role": "user",
"content": (
f"Owner: {owner_name}\n"
f"Person: {person.name}\n"
f"Notes: {notes or 'None'}\n\n"
f"Conversation:\n{transcript}"
),
},
]
return prompt
def get(self, request, type, person_id, operation):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
if operation not in self.allowed_operations:
return HttpResponseBadRequest("Invalid operation specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
send_state = _get_send_state(request.user, person)
send_target_bundle = _send_target_options_for_person(request.user, person)
conversation = _conversation_for_person(request.user, person)
if operation == "artifacts":
auto_settings = _get_or_create_auto_settings(request.user, conversation)
plan_bundle = _latest_plan_bundle(conversation)
mitigation_notice_message = ""
mitigation_notice_level = "info"
if (
plan_bundle["latest_plan"] is None
and auto_settings.enabled
and auto_settings.auto_create_mitigation
):
recent_messages = _recent_messages_for_person(
request.user,
person,
max(20, min(auto_settings.sample_message_window, 200)),
)
source_text = (
messages_to_string(recent_messages) if recent_messages else ""
)
_create_baseline_mitigation_plan(
user=request.user,
person=person,
conversation=conversation,
source_text=source_text,
)
plan_bundle = _latest_plan_bundle(conversation)
mitigation_notice_message = "Baseline plan auto-created."
mitigation_notice_level = "success"
if (
plan_bundle["latest_plan"] is not None
and auto_settings.enabled
and auto_settings.auto_pattern_recognition
):
auto_result = _run_auto_analysis_for_plan(
user=request.user,
person=person,
conversation=conversation,
plan=plan_bundle["latest_plan"],
auto_settings=auto_settings,
trigger="auto",
)
if auto_result.get("ran"):
mitigation_notice_message = auto_result["summary"]
mitigation_notice_level = "info"
if auto_result.get("created_corrections"):
plan_bundle = _latest_plan_bundle(conversation)
return self._render_result(
request,
operation,
person,
send_state,
send_target_bundle,
result_text="",
result_sections=[],
error=False,
ai_result_id="",
mitigation_notice_message=mitigation_notice_message,
mitigation_notice_level=mitigation_notice_level,
**plan_bundle,
)
ai_obj = AI.objects.filter(user=request.user).first()
if ai_obj is None:
message = "No AI configured for this user yet."
return self._render_result(
request,
operation,
person,
send_state,
send_target_bundle,
result_text=message,
result_sections=_parse_result_sections(message),
error=True,
**self._empty_plan_bundle(),
)
try:
limit = int(request.GET.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(5, min(limit, 200))
user_notes = request.GET.get("user_notes", "")
messages = _recent_messages_for_person(request.user, person, limit)
owner_name = (
request.user.first_name
or request.user.get_full_name().strip()
or request.user.username
or "Me"
)
transcript = messages_to_string(
messages,
author_rewrites={
"USER": owner_name,
"BOT": "Assistant",
},
)
person_identifiers = set(
PersonIdentifier.objects.filter(
user=request.user,
person=person,
).values_list("identifier", flat=True)
)
if messages:
conversation.last_event_ts = messages[-1].ts
conversation.save(update_fields=["last_event_ts"])
message_event_ids = self._ensure_message_events(
request.user,
conversation,
person_identifiers,
messages,
)
ai_request = AIRequest.objects.create(
user=request.user,
conversation=conversation,
window_spec={"limit": limit},
message_ids=message_event_ids,
user_notes=user_notes,
operation=operation,
policy_snapshot={"send_state": send_state},
status="running",
started_at=dj_timezone.now(),
)
try:
prompt = self._build_prompt(
operation=operation,
owner_name=owner_name,
person=person,
transcript=transcript,
user_notes=user_notes,
)
result_text = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj)
draft_options = (
_parse_draft_options(result_text) if operation == "draft_reply" else []
)
interaction_signals = _build_interaction_signals(
operation,
result_text,
message_event_ids,
)
memory_proposals = _build_memory_proposals(operation, result_text)
ai_result = AIResult.objects.create(
user=request.user,
ai_request=ai_request,
working_summary=result_text if operation != "draft_reply" else "",
draft_replies=draft_options,
interaction_signals=interaction_signals,
memory_proposals=memory_proposals,
citations=message_event_ids,
)
first_event = None
if message_event_ids:
first_event = MessageEvent.objects.filter(
id=message_event_ids[0],
user=request.user,
).first()
for signal in interaction_signals:
AIResultSignal.objects.create(
user=request.user,
ai_result=ai_result,
message_event=first_event,
label=signal["label"][:128],
valence=signal["valence"],
score=None,
rationale="Auto-tagged from operation output.",
)
ai_request.status = "done"
ai_request.finished_at = dj_timezone.now()
ai_request.save(update_fields=["status", "finished_at"])
conversation.last_ai_run_at = dj_timezone.now()
conversation.save(update_fields=["last_ai_run_at"])
plan_bundle = _latest_plan_bundle(conversation)
return self._render_result(
request,
operation,
person,
send_state,
send_target_bundle,
result_text=result_text,
result_sections=_parse_result_sections(result_text),
draft_replies=ai_result.draft_replies,
interaction_signals=ai_result.interaction_signals,
memory_proposals=ai_result.memory_proposals,
memory_proposal_groups=_group_memory_proposals(
ai_result.memory_proposals
),
citations=ai_result.citations,
citation_rows=self._citation_rows(request.user, ai_result.citations),
error=False,
ai_result_id=str(ai_result.id),
ai_result_created_at=ai_result.created_at,
ai_request_status=ai_request.status,
ai_request_started_at=ai_request.started_at,
ai_request_finished_at=ai_request.finished_at,
ai_request_window_spec=ai_request.window_spec,
ai_request_window_tags=_window_spec_tags(ai_request.window_spec),
ai_request_message_count=len(ai_request.message_ids or []),
ai_request_policy_snapshot=ai_request.policy_snapshot,
ai_request_policy_tags=_policy_snapshot_tags(
ai_request.policy_snapshot
),
**plan_bundle,
)
except Exception as exc:
ai_request.status = "failed"
ai_request.error = str(exc)
ai_request.finished_at = dj_timezone.now()
ai_request.save(update_fields=["status", "error", "finished_at"])
error_text = str(exc)
return self._render_result(
request,
operation,
person,
send_state,
send_target_bundle,
result_text=error_text,
result_sections=_parse_result_sections(error_text),
error=True,
**self._empty_plan_bundle(),
)
class AIWorkspaceSendDraft(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
send_state = _get_send_state(request.user, person)
text = (request.POST.get("draft_text") or "").strip()
force_send = _is_truthy(request.POST.get("force_send"))
if not text:
return _render_send_status(request, False, "Draft is empty.", "danger")
if not send_state["can_send"] and not force_send:
return _render_send_status(
request,
False,
f"Send blocked. {send_state['text']}",
"warning",
)
identifier = _resolve_person_identifier_target(
request.user,
person,
target_identifier_id=request.POST.get("target_identifier_id"),
target_service=request.POST.get("target_service"),
fallback_service=_preferred_service_for_person(request.user, person),
)
if identifier is None:
return _render_send_status(
request,
False,
"No recipient identifier found.",
"danger",
)
try:
ts = async_to_sync(identifier.send)(text)
except Exception as exc:
return _render_send_status(request, False, f"Send failed: {exc}", "danger")
if ts is False or ts is None:
return _render_send_status(request, False, "Send failed.", "danger")
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=identifier,
)
sent_ts = (
int(ts)
if (ts is not None and not isinstance(ts, bool))
else int(dj_timezone.now().timestamp() * 1000)
)
Message.objects.create(
user=request.user,
session=session,
custom_author="BOT",
sender_uuid="",
text=text,
ts=sent_ts,
delivered_ts=sent_ts,
read_source_service=identifier.service,
)
success_message = "Draft sent."
if force_send and not send_state["can_send"]:
success_message = "Draft sent with override."
response = _render_send_status(request, True, success_message, "success")
response["HX-Trigger"] = json.dumps(
{
"gia-message-sent": {
"person_id": str(person.id),
"ts": sent_ts,
"text": text,
"author": "BOT",
}
}
)
return response
class AIWorkspaceQueueDraft(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
text = (request.POST.get("draft_text") or "").strip()
if not text:
return _render_send_status(
request,
False,
"Select a draft before queueing.",
"warning",
)
identifier = _resolve_person_identifier_target(
request.user,
person,
target_identifier_id=request.POST.get("target_identifier_id"),
target_service=request.POST.get("target_service"),
fallback_service=_preferred_service_for_person(request.user, person),
)
if identifier is None:
return _render_send_status(
request,
False,
"No recipient identifier found.",
"danger",
)
manipulation = _get_queue_manipulation(request.user, person)
if manipulation is None:
return _render_send_status(
request,
False,
"No enabled manipulation found for this recipient. Queue entry not created.",
"warning",
)
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=identifier,
)
QueuedMessage.objects.create(
user=request.user,
session=session,
manipulation=manipulation,
ts=int(dj_timezone.now().timestamp() * 1000),
sender_uuid="",
text=text,
custom_author="BOT",
)
return _render_send_status(request, True, "Draft added to queue.", "success")
class AIWorkspaceCreateMitigation(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person = get_object_or_404(Person, pk=person_id, user=request.user)
ai_result_id = (request.POST.get("ai_result_id") or "").strip()
output_profile = (request.POST.get("output_profile") or "").strip()
if output_profile not in {"framework", "rule", "rules", "game", "games"}:
return _render_mitigation_status(
request,
person,
"Choose one mitigation output type: framework, rules, or games.",
level="warning",
)
user_context = (request.POST.get("user_context") or "").strip()
creation_mode = "guided" if user_context else "auto"
seed_from_context = _extract_seed_entities_from_context(user_context)
fundamentals = seed_from_context.get("fundamentals", [])
source_result = None
if ai_result_id:
source_result = (
AIResult.objects.filter(
id=ai_result_id,
user=request.user,
)
.select_related("ai_request", "ai_request__conversation")
.first()
)
conversation = (
source_result.ai_request.conversation
if source_result is not None
else _conversation_for_person(request.user, person)
)
conversation.participants.add(person)
_refresh_conversation_stability(conversation, request.user, person)
metric_context = _metric_pattern_context(conversation)
source_text = ""
if source_result is not None:
source_text = source_result.working_summary or ""
if not source_text:
source_text = (request.POST.get("source_text") or "").strip()
ai_obj = AI.objects.filter(user=request.user).first()
artifacts = _build_mitigation_artifacts(
ai_obj=ai_obj,
person=person,
source_text=source_text,
creation_mode=creation_mode,
inspiration=user_context,
fundamentals=fundamentals,
output_profile=output_profile,
metric_context=metric_context,
)
# Deterministically seed from pasted context so long-form frameworks can
# create fundamentals/rules/games in one pass, even when AI output is sparse.
artifacts = _merge_seed_entities(artifacts, seed_from_context)
plan = PatternMitigationPlan.objects.create(
user=request.user,
conversation=conversation,
source_ai_result=source_result,
title=artifacts.get("title") or f"{person.name} Pattern Mitigation",
objective=artifacts.get("objective") or "",
fundamental_items=artifacts.get("fundamental_items") or fundamentals,
creation_mode=creation_mode,
status="draft",
)
for rule in artifacts.get("rules", []):
PatternMitigationRule.objects.create(
user=request.user,
plan=plan,
title=str(rule.get("title") or "Rule").strip()[:255],
content=str(rule.get("content") or "").strip(),
)
for game in artifacts.get("games", []):
PatternMitigationGame.objects.create(
user=request.user,
plan=plan,
title=str(game.get("title") or "Game").strip()[:255],
instructions=str(game.get("instructions") or "").strip(),
)
existing_signatures = set()
for correction in artifacts.get("corrections", []):
title = _normalize_correction_title(
correction.get("title") or "", fallback="Correction"
)
clarification = str(correction.get("clarification") or "").strip()
source_phrase = str(correction.get("source_phrase") or "").strip()
if not clarification:
continue
signature = _correction_signature(title, clarification)
if signature in existing_signatures:
continue
PatternMitigationCorrection.objects.create(
user=request.user,
plan=plan,
title=title[:255],
clarification=clarification[:2000],
source_phrase=source_phrase[:1000],
perspective="second_person",
share_target="both",
language_style="adapted",
enabled=True,
)
existing_signatures.add(signature)
PatternMitigationMessage.objects.create(
user=request.user,
plan=plan,
role="system",
text="Plan created. Use the tabs below to refine rules, games, fundamentals, corrections, and AI guidance.",
)
return render(
request,
"partials/ai-workspace-mitigation-panel.html",
_mitigation_panel_context(
person=person,
plan=plan,
notice_message="Mitigation plan created.",
notice_level="success",
active_tab="plan_board",
),
)
class AIWorkspaceMitigationChat(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id, plan_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
text = (request.POST.get("message") or "").strip()
active_tab = _sanitize_active_tab(
request.POST.get("active_tab"), default="ask_ai"
)
if not text:
return render(
request,
"partials/ai-workspace-mitigation-panel.html",
_mitigation_panel_context(
person=person,
plan=plan,
notice_message="Message is empty.",
notice_level="warning",
active_tab=active_tab,
),
)
PatternMitigationMessage.objects.create(
user=request.user,
plan=plan,
role="user",
text=text,
)
ai_obj = AI.objects.filter(user=request.user).first()
assistant_text = ""
if ai_obj:
rules_text = "\n".join(
[f"- {r.title}: {r.content}" for r in plan.rules.order_by("created_at")]
)
games_text = "\n".join(
[
f"- {g.title}: {g.instructions}"
for g in plan.games.order_by("created_at")
]
)
corrections_text = "\n".join(
[
f"- {c.title}: {c.clarification}"
for c in plan.corrections.order_by("created_at")
]
)
recent_msgs = plan.messages.order_by("-created_at")[:10]
recent_msgs = list(reversed(list(recent_msgs)))
transcript = "\n".join([f"{m.role.upper()}: {m.text}" for m in recent_msgs])
prompt = [
{
"role": "system",
"content": (
"You are refining a mitigation protocol. "
"Give concise practical updates to rules/games/corrections and explain tradeoffs."
),
},
{
"role": "user",
"content": (
f"Plan objective: {plan.objective}\n"
f"Fundamentals: {json.dumps(plan.fundamental_items or [])}\n"
f"Rules:\n{rules_text or '(none)'}\n\n"
f"Games:\n{games_text or '(none)'}\n\n"
f"Corrections:\n{corrections_text or '(none)'}\n\n"
f"Conversation:\n{transcript}"
),
},
]
try:
assistant_text = async_to_sync(ai_runner.run_prompt)(prompt, ai_obj)
except Exception as exc:
assistant_text = f"Failed to run AI refinement: {exc}"
else:
assistant_text = (
"No AI configured. Add an AI config to use mitigation chat."
)
PatternMitigationMessage.objects.create(
user=request.user,
plan=plan,
role="assistant",
text=assistant_text,
)
return _render_mitigation_panel(
request,
person,
plan,
active_tab=active_tab,
)
class AIWorkspaceExportArtifact(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id, plan_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
artifact_type = (request.POST.get("artifact_type") or "rulebook").strip()
if artifact_type not in {"rulebook", "rules", "games", "corrections"}:
artifact_type = "rulebook"
export_format = (request.POST.get("export_format") or "markdown").strip()
active_tab = _sanitize_active_tab(
request.POST.get("active_tab"), default="ask_ai"
)
if export_format not in {"markdown", "json", "text"}:
export_format = "markdown"
payload, meta = _serialize_export_payload(plan, artifact_type, export_format)
export_record = PatternArtifactExport.objects.create(
user=request.user,
plan=plan,
artifact_type=artifact_type,
export_format=export_format,
protocol_version="artifact-v1",
payload=payload,
meta=meta,
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message=f"Exported {artifact_type} ({export_format}).",
notice_level="success",
export_record=export_record,
active_tab=active_tab,
)
class AIWorkspaceCreateArtifact(LoginRequiredMixin, View):
allowed_types = {"widget"}
kind_map = {
"rule": (PatternMitigationRule, "content", "Rule"),
"game": (PatternMitigationGame, "instructions", "Game"),
"correction": (PatternMitigationCorrection, "clarification", "Correction"),
}
def post(self, request, type, person_id, plan_id, kind):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
kind_key = (kind or "").strip().lower()
if kind_key not in self.kind_map:
return HttpResponseBadRequest("Invalid artifact kind")
model, body_field, label = self.kind_map[kind_key]
if kind_key == "correction":
candidate_signature = _correction_signature(f"New {label}", "")
if candidate_signature in _existing_correction_signatures(plan):
tab = _sanitize_active_tab(
request.POST.get("active_tab"), default="corrections"
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Duplicate correction skipped.",
notice_level="warning",
active_tab=tab,
)
payload = {
"user": request.user,
"plan": plan,
"title": f"New {label}",
body_field: "",
"enabled": True,
}
model.objects.create(**payload)
tab = _sanitize_active_tab(
request.POST.get("active_tab"),
default=("corrections" if kind_key == "correction" else "plan_board"),
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message=f"{label} created.",
notice_level="success",
active_tab=tab,
)
class AIWorkspaceUpdateArtifact(LoginRequiredMixin, View):
allowed_types = {"widget"}
kind_map = {
"rule": (PatternMitigationRule, "content", "Rule"),
"game": (PatternMitigationGame, "instructions", "Game"),
"correction": (PatternMitigationCorrection, "clarification", "Correction"),
}
def post(self, request, type, person_id, plan_id, kind, artifact_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
kind_key = (kind or "").strip().lower()
if kind_key not in self.kind_map:
return HttpResponseBadRequest("Invalid artifact kind")
model, body_field, label = self.kind_map[kind_key]
artifact = get_object_or_404(
model,
id=artifact_id,
user=request.user,
plan=plan,
)
title = (request.POST.get("title") or "").strip() or artifact.title
body = (request.POST.get("body") or "").strip()
enabled = _is_truthy(request.POST.get("enabled"))
tab = _sanitize_active_tab(
request.POST.get("active_tab"),
default=("corrections" if kind_key == "correction" else "plan_board"),
)
if kind_key == "correction":
title = _normalize_correction_title(
title, fallback=artifact.title or "Correction"
)
candidate_signature = _correction_signature(title, body)
if candidate_signature in _existing_correction_signatures(
plan, exclude_id=artifact.id
):
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Duplicate correction not saved.",
notice_level="warning",
active_tab=tab,
)
artifact.title = title[:255]
setattr(artifact, body_field, body)
artifact.enabled = enabled
if kind_key == "correction":
artifact.source_phrase = (request.POST.get("source_phrase") or "").strip()
perspective = (request.POST.get("perspective") or "third_person").strip()
if perspective in {"first_person", "second_person", "third_person"}:
artifact.perspective = perspective
share_target = (request.POST.get("share_target") or "both").strip()
if share_target in {"self", "other", "both"}:
artifact.share_target = share_target
language_style = (request.POST.get("language_style") or "adapted").strip()
if language_style in {"same", "adapted"}:
artifact.language_style = language_style
artifact.save()
return _render_mitigation_panel(
request,
person,
plan,
notice_message=f"{label} saved.",
notice_level="success",
active_tab=tab,
)
class AIWorkspaceDeleteArtifact(LoginRequiredMixin, View):
allowed_types = {"widget"}
kind_map = {
"rule": (PatternMitigationRule, "Rule"),
"game": (PatternMitigationGame, "Game"),
"correction": (PatternMitigationCorrection, "Correction"),
}
def post(self, request, type, person_id, plan_id, kind, artifact_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
kind_key = (kind or "").strip().lower()
if kind_key not in self.kind_map:
return HttpResponseBadRequest("Invalid artifact kind")
model, label = self.kind_map[kind_key]
artifact = get_object_or_404(
model,
id=artifact_id,
user=request.user,
plan=plan,
)
artifact.delete()
tab = _sanitize_active_tab(
request.POST.get("active_tab"),
default=("corrections" if kind_key == "correction" else "plan_board"),
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message=f"{label} deleted.",
notice_level="success",
active_tab=tab,
)
class AIWorkspaceDeleteArtifactList(LoginRequiredMixin, View):
allowed_types = {"widget"}
kind_map = {
"rule": (PatternMitigationRule, "rules"),
"game": (PatternMitigationGame, "games"),
"correction": (PatternMitigationCorrection, "corrections"),
}
def post(self, request, type, person_id, plan_id, kind):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
kind_key = (kind or "").strip().lower()
if kind_key not in self.kind_map:
return HttpResponseBadRequest("Invalid artifact kind")
model, label = self.kind_map[kind_key]
rows = model.objects.filter(user=request.user, plan=plan)
delete_count = rows.count()
if delete_count:
rows.delete()
notice_message = f"Deleted {delete_count} {label}."
notice_level = "success"
else:
notice_message = f"No {label} to delete."
notice_level = "info"
tab = _sanitize_active_tab(
request.POST.get("active_tab"),
default=("corrections" if kind_key == "correction" else "plan_board"),
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message=notice_message,
notice_level=notice_level,
active_tab=tab,
)
class AIWorkspaceEngageShare(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id, plan_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
source_ref = (request.POST.get("source_ref") or "").strip()
share_target = (request.POST.get("share_target") or "self").strip()
framing = (request.POST.get("framing") or "dont_change").strip()
context_note = (request.POST.get("context_note") or "").strip()
action = (request.POST.get("action") or "preview").strip().lower()
force_send = _is_truthy(request.POST.get("force_send"))
engage_form = {
"source_ref": source_ref,
"share_target": share_target,
"framing": framing,
"context_note": context_note,
"target_identifier_id": str(
request.POST.get("target_identifier_id") or ""
).strip(),
}
active_tab = _sanitize_active_tab(
request.POST.get("active_tab"), default="engage"
)
if ":" not in source_ref:
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Select a source item to engage.",
notice_level="warning",
engage_form=engage_form,
active_tab=active_tab,
)
source_kind, source_id = source_ref.split(":", 1)
source_kind = source_kind.strip().lower()
source_id = source_id.strip()
model_map = {
"rule": PatternMitigationRule,
"game": PatternMitigationGame,
"correction": PatternMitigationCorrection,
}
if source_kind not in model_map:
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Invalid source type for engage.",
notice_level="danger",
engage_form=engage_form,
active_tab=active_tab,
)
source_obj = get_object_or_404(
model_map[source_kind],
id=source_id,
user=request.user,
plan=plan,
)
payload = _build_engage_payload(
source_obj=source_obj,
source_kind=source_kind,
share_target=share_target,
framing=framing,
context_note=context_note,
owner_name=(
request.user.first_name
or request.user.get_full_name().strip()
or request.user.username
or "You"
),
recipient_name=person.name or "Other",
)
engage_preview = payload["preview"]
outbound_text = payload["outbound"]
share_target = payload["share_target"]
if action == "preview":
return _render_mitigation_panel(
request,
person,
plan,
engage_preview=engage_preview,
engage_preview_flash=True,
engage_form=engage_form,
active_tab=active_tab,
)
if action == "send":
send_state = _get_send_state(request.user, person)
if not send_state["can_send"] and not force_send:
return _render_mitigation_panel(
request,
person,
plan,
notice_message=f"Send blocked. {send_state['text']}",
notice_level="warning",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
identifier = _resolve_person_identifier_target(
request.user,
person,
target_identifier_id=request.POST.get("target_identifier_id"),
target_service=request.POST.get("target_service"),
fallback_service=plan.conversation.platform_type,
)
if identifier is None:
return _render_mitigation_panel(
request,
person,
plan,
notice_message="No recipient identifier found.",
notice_level="danger",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
try:
ts = async_to_sync(identifier.send)(outbound_text)
except Exception as exc:
return _render_mitigation_panel(
request,
person,
plan,
notice_message=f"Send failed: {exc}",
notice_level="danger",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
if ts is False or ts is None:
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Send failed.",
notice_level="danger",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=identifier,
)
sent_ts = (
int(ts)
if (ts is not None and not isinstance(ts, bool))
else int(dj_timezone.now().timestamp() * 1000)
)
Message.objects.create(
user=request.user,
session=session,
custom_author="BOT",
sender_uuid="",
text=outbound_text,
ts=sent_ts,
delivered_ts=sent_ts,
read_source_service=identifier.service,
)
notice = "Shared via engage."
if force_send and not send_state["can_send"]:
notice = "Shared via engage with override."
response = _render_mitigation_panel(
request,
person,
plan,
notice_message=notice,
notice_level="success",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
response["HX-Trigger"] = json.dumps(
{
"gia-message-sent": {
"person_id": str(person.id),
"ts": sent_ts,
"text": outbound_text,
"author": "BOT",
}
}
)
return response
if action == "queue":
identifier = _resolve_person_identifier_target(
request.user,
person,
target_identifier_id=request.POST.get("target_identifier_id"),
target_service=request.POST.get("target_service"),
fallback_service=plan.conversation.platform_type,
)
if identifier is None:
return _render_mitigation_panel(
request,
person,
plan,
notice_message="No recipient identifier found.",
notice_level="danger",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
manipulation = _get_queue_manipulation(request.user, person)
if manipulation is None:
return _render_mitigation_panel(
request,
person,
plan,
notice_message="No enabled manipulation found for this recipient. Queue entry not created.",
notice_level="warning",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=identifier,
)
QueuedMessage.objects.create(
user=request.user,
session=session,
manipulation=manipulation,
ts=int(dj_timezone.now().timestamp() * 1000),
sender_uuid="",
text=outbound_text,
custom_author="BOT",
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Engage text added to queue.",
notice_level="success",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Unknown engage action.",
notice_level="warning",
engage_preview=engage_preview,
engage_form=engage_form,
active_tab=active_tab,
)
class AIWorkspaceAutoSettings(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id, plan_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
auto_settings = _get_or_create_auto_settings(request.user, plan.conversation)
auto_settings.enabled = _is_truthy(request.POST.get("enabled"))
auto_settings.auto_pattern_recognition = _is_truthy(
request.POST.get("auto_pattern_recognition")
)
auto_settings.auto_create_mitigation = _is_truthy(
request.POST.get("auto_create_mitigation")
)
auto_settings.auto_create_corrections = _is_truthy(
request.POST.get("auto_create_corrections")
)
auto_settings.auto_notify_enabled = _is_truthy(
request.POST.get("auto_notify_enabled")
)
auto_settings.ntfy_topic_override = (
request.POST.get("ntfy_topic_override") or ""
).strip() or None
auto_settings.ntfy_url_override = (
request.POST.get("ntfy_url_override") or ""
).strip() or None
try:
auto_settings.sample_message_window = max(
10, min(int(request.POST.get("sample_message_window") or 40), 200)
)
except Exception:
auto_settings.sample_message_window = 40
try:
auto_settings.check_cooldown_seconds = max(
0, min(int(request.POST.get("check_cooldown_seconds") or 300), 86400)
)
except Exception:
auto_settings.check_cooldown_seconds = 300
auto_settings.save()
action = (request.POST.get("action") or "save").strip().lower()
if action == "run_now":
result = _run_auto_analysis_for_plan(
user=request.user,
person=person,
conversation=plan.conversation,
plan=plan,
auto_settings=auto_settings,
trigger="manual",
)
notice_message = result["summary"]
notice_level = "success" if result.get("ran") else "info"
else:
notice_message = "Automation settings saved."
notice_level = "success"
return _render_mitigation_panel(
request,
person,
plan,
notice_message=notice_message,
notice_level=notice_level,
active_tab="auto",
auto_settings=auto_settings,
)
class AIWorkspaceUpdateFundamentals(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id, plan_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
fundamentals_text = request.POST.get("fundamentals_text") or ""
active_tab = _sanitize_active_tab(
request.POST.get("active_tab"), default="fundamentals"
)
plan.fundamental_items = _parse_fundamentals(fundamentals_text)
plan.save(update_fields=["fundamental_items", "updated_at"])
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Fundamentals saved.",
notice_level="success",
active_tab=active_tab,
)
class AIWorkspaceUpdatePlanMeta(LoginRequiredMixin, View):
allowed_types = {"widget"}
def post(self, request, type, person_id, plan_id):
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
person, plan = _person_plan_or_404(request, person_id, plan_id)
active_tab = _sanitize_active_tab(
request.POST.get("active_tab"), default="plan_board"
)
status_value = (request.POST.get("status") or "").strip()
creation_mode_value = (request.POST.get("creation_mode") or "").strip()
title_value = (request.POST.get("title") or "").strip()
objective_value = (request.POST.get("objective") or "").strip()
valid_statuses = {key for key, _label in PatternMitigationPlan.STATUS_CHOICES}
valid_modes = {
key for key, _label in PatternMitigationPlan.CREATION_MODE_CHOICES
}
update_fields = ["updated_at"]
if status_value in valid_statuses:
if plan.status != status_value:
plan.status = status_value
update_fields.append("status")
if creation_mode_value in valid_modes:
if plan.creation_mode != creation_mode_value:
plan.creation_mode = creation_mode_value
update_fields.append("creation_mode")
if plan.title != title_value:
plan.title = title_value[:255]
update_fields.append("title")
if plan.objective != objective_value:
plan.objective = objective_value
update_fields.append("objective")
if len(update_fields) > 1:
plan.save(update_fields=update_fields)
return _render_mitigation_panel(
request,
person,
plan,
notice_message="Plan metadata saved.",
notice_level="success",
active_tab=active_tab,
)