from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone from typing import Any, Callable from django.utils import timezone as dj_timezone from core.models import ChatSession, ConversationEvent, Message, PersonIdentifier from core.workspace.sampling import downsample_points MS_MINUTE = 60 * 1000 MS_HOUR = 60 * MS_MINUTE MS_DAY = 24 * MS_HOUR GRAPH_RANGE_DAYS = { "30d": 30, "90d": 90, "365d": 365, "all": 0, } BEHAVIORAL_GROUPS = { "ms": { "title": "Message State (MS)", "eyebrow": "sent -> delivered -> read -> typing -> replied", "summary": ( "Tracks the message journey from send through delivery, reading, " "typing, revision, abandonment, and response." ), }, "ps": { "title": "Presence State (PS)", "eyebrow": "unavailable -> available -> typing -> sent/stopped", "summary": ( "Tracks return-from-absence behavior: how long someone stayed away, " "how fast they typed on return, and whether that typing converted into " "a sent response." ), }, } BEHAVIORAL_METRIC_SPECS = ( { "slug": "ms_sent_to_delivered", "group": "ms", "title": "Delivered Time", "menu_title": "Delivered time", "icon": "fa-solid fa-truck-fast", "unit": "duration", "aggregate": "mean", "state_label": "Delay A", "calculation": ( "Average time from outbound message send until the recipient device is " "reported as delivered." ), "psychology": ( "Mostly technical. Persistent growth can still indicate longer device " "unavailability or deliberate disconnection when it repeats over time." ), }, { "slug": "ms_delivered_to_read", "group": "ms", "title": "Read Time", "menu_title": "Read time", "icon": "fa-solid fa-eye", "unit": "duration", "aggregate": "mean", "state_label": "Delay B", "calculation": ( "Average time from delivery receipt until read receipt for outbound " "messages." ), "psychology": ( "Attention allocation. Shrinking read time usually means rising " "salience; growing read time usually means declining priority." ), }, { "slug": "ms_read_to_typing", "group": "ms", "title": "Decision Latency", "menu_title": "Decision latency", "icon": "fa-solid fa-hourglass-half", "unit": "duration", "aggregate": "mean", "state_label": "Delay C.1", "calculation": ( "Average time from read receipt until the next inbound typing start in " "the same session." ), "psychology": ( "How long they appraise before engaging. High values often reflect " "deliberation, emotional load, or uncertainty." ), }, { "slug": "ms_read_to_replied", "group": "ms", "title": "Reply Time", "menu_title": "Responded time", "icon": "fa-solid fa-reply", "unit": "duration", "aggregate": "mean", "state_label": "Delay C", "calculation": ( "Average time from read receipt until the next inbound message in the " "same session." ), "psychology": ( "Full deliberation-to-response window. Rising values can mean lower " "priority, higher emotional weight, or more careful self-regulation." ), }, { "slug": "ms_typing_duration", "group": "ms", "title": "Typing Time", "menu_title": "Typing time", "icon": "fa-solid fa-keyboard", "unit": "duration", "aggregate": "mean", "state_label": "Delay C.2", "calculation": ( "Average time between the first inbound typing start in a cycle and the " "inbound message that resolves that cycle." ), "psychology": ( "Effort and backstage editing. Longer typing usually means more " "investment, greater complexity, or stronger impression management." ), }, { "slug": "ms_revision_cycles", "group": "ms", "title": "Revision Cycles", "menu_title": "Revision cycles", "icon": "fa-solid fa-rotate", "unit": "cycles", "aggregate": "mean", "state_label": "Delay C.3", "calculation": ( "Average number of typing revisions before an inbound message is sent." ), "psychology": ( "Repeated stop/start cycles imply self-censorship, uncertainty, or " "heightened concern about how the message will land." ), }, { "slug": "ms_aborted_count", "group": "ms", "title": "Aborted Messages", "menu_title": "Aborted messages", "icon": "fa-solid fa-ban", "unit": "count", "aggregate": "sum", "state_label": "Delay C.4", "calculation": ( "Count of inbound composing cycles that end in a synthetic " "composing_abandoned event." ), "psychology": ( "Approach without expression. Repeated abandonment often signals " "suppression, avoidance, or unresolved ambivalence." ), }, { "slug": "ms_aborted_rate", "group": "ms", "title": "Aborted Rate", "menu_title": "Aborted rate", "icon": "fa-solid fa-percent", "unit": "percent", "aggregate": "mean", "state_label": "Delay C.4", "calculation": ( "Share of inbound typing cycles that end abandoned instead of sent." ), "psychology": ( "A steadier relational suppression signal than raw counts because it " "normalises for overall typing volume." ), }, { "slug": "ps_offline_duration", "group": "ps", "title": "Offline Duration", "menu_title": "Offline duration", "icon": "fa-solid fa-moon", "unit": "duration", "aggregate": "mean", "state_label": "Delay E", "calculation": ( "Average time between presence_unavailable and the next " "presence_available event." ), "psychology": ( "Context window. This is not engagement by itself, but it frames how " "meaningful their return behavior is." ), }, { "slug": "ps_available_to_typing", "group": "ps", "title": "Typing On Return", "menu_title": "Available to typing", "icon": "fa-solid fa-bolt", "unit": "duration", "aggregate": "mean", "state_label": "Delay F", "calculation": ( "Average time from a return-to-available event until the first inbound " "typing start after that return." ), "psychology": ( "Priority on return. Very short values after long absences are strong " "signals of attentional rank." ), }, { "slug": "ps_typing_duration", "group": "ps", "title": "Typing After Return", "menu_title": "Typing after return", "icon": "fa-solid fa-keyboard", "unit": "duration", "aggregate": "mean", "state_label": "Delay G.1", "calculation": ( "Average typing duration for the first composing cycle that starts " "after a presence return." ), "psychology": ( "How much effort or self-editing happens once the contact actively " "chooses your conversation as part of their return sequence." ), }, { "slug": "ps_aborted_count", "group": "ps", "title": "Aborted On Return", "menu_title": "Aborted after return", "icon": "fa-solid fa-circle-stop", "unit": "count", "aggregate": "sum", "state_label": "Delay G.2", "calculation": ( "Count of first post-return composing cycles that are abandoned " "instead of sent." ), "psychology": ( "A high-signal approach-avoidance marker: they returned, chose your " "thread, started to type, and still withdrew." ), }, { "slug": "ps_aborted_rate", "group": "ps", "title": "Aborted On Return Rate", "menu_title": "Aborted on return rate", "icon": "fa-solid fa-chart-pie", "unit": "percent", "aggregate": "mean", "state_label": "Delay G.2", "calculation": ( "Share of first post-return composing cycles that end abandoned." ), "psychology": ( "One of the strongest signals of hesitation specifically during " "re-entry into the conversation." ), }, ) BEHAVIORAL_METRIC_MAP = { spec["slug"]: spec for spec in BEHAVIORAL_METRIC_SPECS } BEHAVIORAL_SUMMARY_SLUGS = ( "ms_sent_to_delivered", "ms_delivered_to_read", "ms_read_to_replied", "ms_typing_duration", "ms_aborted_rate", "ps_available_to_typing", ) def sanitize_graph_range(value: str) -> str: candidate = str(value or "").strip().lower() if candidate in GRAPH_RANGE_DAYS: return candidate return "90d" def _safe_int(value: Any, default: int = 0) -> int: try: return int(value) except Exception: return int(default) def _format_duration(value_ms: float | int | None) -> str: if value_ms is None: return "-" value = max(0, int(round(float(value_ms)))) if value < MS_MINUTE: return f"{value // 1000}s" if value < MS_HOUR: minutes = float(value) / float(MS_MINUTE) return f"{minutes:.1f}m" if value < MS_DAY: hours = float(value) / float(MS_HOUR) return f"{hours:.1f}h" days = float(value) / float(MS_DAY) return f"{days:.1f}d" def format_metric_value(spec: dict, value: float | int | None) -> str: if value is None: return "-" unit = str(spec.get("unit") or "").strip().lower() if unit == "duration": return _format_duration(value) if unit == "percent": return f"{float(value):.1f}%" if unit == "cycles": return f"{float(value):.2f} cycles" if unit == "count": return str(int(round(float(value)))) return str(value) def _format_metric_delta(spec: dict, current: float | int | None, previous: float | int | None) -> str: if current is None or previous is None: return "" delta = float(current) - float(previous) if abs(delta) < 0.0001: return "steady" unit = str(spec.get("unit") or "").strip().lower() if unit == "duration": direction = "longer" if delta > 0 else "shorter" return f"{_format_duration(abs(delta))} {direction}" if unit == "percent": direction = "higher" if delta > 0 else "lower" return f"{abs(delta):.1f}pp {direction}" if unit == "cycles": direction = "higher" if delta > 0 else "lower" return f"{abs(delta):.2f} cycles {direction}" if unit == "count": direction = "more" if delta > 0 else "fewer" return f"{int(round(abs(delta)))} {direction}" return "" def _bucket_label(ts_ms: int) -> str: dt_value = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) return dt_value.strftime("%Y-%m-%d") def _bucket_start_ms(ts_ms: int) -> int: dt_value = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) bucket = datetime(dt_value.year, dt_value.month, dt_value.day, tzinfo=timezone.utc) return int(bucket.timestamp() * 1000) def _graph_geometry(points: list[dict]) -> dict[str, Any]: clean = [row for row in list(points or []) if row.get("y") is not None] if not clean: return { "polyline": "", "area": "", "markers": [], "y_min": None, "y_max": None, } values = [float(row["y"]) for row in clean] y_min = min(values) y_max = max(values) if abs(y_max - y_min) < 0.0001: y_min -= 1.0 y_max += 1.0 width = 100.0 height = 48.0 pad_x = 4.0 pad_y = 5.0 usable_width = max(1.0, width - (pad_x * 2.0)) usable_height = max(1.0, height - (pad_y * 2.0)) points_attr = [] markers = [] for idx, row in enumerate(clean): x = pad_x if len(clean) > 1: x += usable_width * (float(idx) / float(len(clean) - 1)) y_ratio = (float(row["y"]) - y_min) / float(y_max - y_min) y = height - pad_y - (usable_height * y_ratio) points_attr.append(f"{x:.2f},{y:.2f}") markers.append( { "x": f"{x:.2f}", "y": f"{y:.2f}", "value_label": row.get("value_label") or "", "label": row.get("label") or "", } ) area = "" if points_attr: first_x = points_attr[0].split(",")[0] last_x = points_attr[-1].split(",")[0] baseline = f"{height - pad_y:.2f}" area = ( f"M {first_x},{baseline} " + " L ".join(points_attr) + f" L {last_x},{baseline} Z" ) return { "polyline": " ".join(points_attr), "area": area, "markers": markers[-10:], "y_min": y_min, "y_max": y_max, } def _collect_messages(user, session_ids: list[str], person_identifiers: set[str]) -> dict[str, list[dict]]: rows = ( Message.objects.filter(user=user, session_id__in=session_ids) .order_by("session_id", "ts") .values( "id", "session_id", "ts", "sender_uuid", "delivered_ts", "read_ts", ) ) grouped: dict[str, list[dict]] = defaultdict(list) for row in rows: sender = str(row.get("sender_uuid") or "").strip() direction = "in" if sender and sender in person_identifiers else "out" grouped[str(row.get("session_id") or "")].append( { "id": str(row.get("id") or ""), "ts": _safe_int(row.get("ts"), 0), "direction": direction, "delivered_ts": _safe_int(row.get("delivered_ts"), 0), "read_ts": _safe_int(row.get("read_ts"), 0), } ) return grouped def _collect_events(user, session_ids: list[str]) -> dict[str, list[dict]]: rows = ( ConversationEvent.objects.filter( user=user, session_id__in=session_ids, event_type__in=[ "message_created", "typing_started", "typing_stopped", "composing_abandoned", "presence_available", "presence_unavailable", "read_receipt", ], ) .order_by("session_id", "ts", "created_at") .values( "session_id", "ts", "event_type", "direction", "payload", ) ) grouped: dict[str, list[dict]] = defaultdict(list) for row in rows: payload = row.get("payload") or {} if not isinstance(payload, dict): payload = {} grouped[str(row.get("session_id") or "")].append( { "session_id": str(row.get("session_id") or ""), "ts": _safe_int(row.get("ts"), 0), "event_type": str(row.get("event_type") or "").strip().lower(), "direction": str(row.get("direction") or "").strip().lower(), "payload": payload, } ) return grouped def _append_sample(store: dict[str, list[dict]], slug: str, ts_ms: int, value: float) -> None: if ts_ms <= 0: return store[slug].append({"ts_ms": int(ts_ms), "value": float(value)}) def _session_message_samples(messages: list[dict], store: dict[str, list[dict]]) -> None: inbound_messages = [row for row in messages if row.get("direction") == "in"] if not inbound_messages: inbound_messages = [] inbound_index = 0 for message in messages: if message.get("direction") != "out": continue sent_ts = _safe_int(message.get("ts"), 0) delivered_ts = _safe_int(message.get("delivered_ts"), 0) read_ts = _safe_int(message.get("read_ts"), 0) if delivered_ts > 0 and delivered_ts >= sent_ts: _append_sample( store, "ms_sent_to_delivered", delivered_ts, delivered_ts - sent_ts, ) if read_ts > 0 and delivered_ts > 0 and read_ts >= delivered_ts: _append_sample( store, "ms_delivered_to_read", read_ts, read_ts - delivered_ts, ) if read_ts <= 0: continue while inbound_index < len(inbound_messages): candidate = inbound_messages[inbound_index] if _safe_int(candidate.get("ts"), 0) >= read_ts: break inbound_index += 1 if inbound_index >= len(inbound_messages): continue response = inbound_messages[inbound_index] response_ts = _safe_int(response.get("ts"), 0) if response_ts >= read_ts: _append_sample( store, "ms_read_to_replied", response_ts, response_ts - read_ts, ) def _session_event_samples(events: list[dict], store: dict[str, list[dict]]) -> None: typing_state: dict[str, Any] | None = None pending_read_ts = 0 available_state: dict[str, Any] | None = None unavailable_ts = 0 for event in events: event_type = str(event.get("event_type") or "") ts_ms = _safe_int(event.get("ts"), 0) direction = str(event.get("direction") or "") payload = event.get("payload") or {} if not isinstance(payload, dict): payload = {} if event_type == "presence_unavailable": unavailable_ts = ts_ms available_state = None continue if event_type == "presence_available": if unavailable_ts > 0 and ts_ms >= unavailable_ts: _append_sample( store, "ps_offline_duration", ts_ms, ts_ms - unavailable_ts, ) available_state = { "ts": ts_ms, "consumed": False, } unavailable_ts = 0 continue if event_type == "read_receipt": pending_read_ts = ts_ms continue if event_type == "typing_started" and direction == "in": revision = max(1, _safe_int(payload.get("revision"), 1)) if typing_state is None: after_return = bool( available_state and not bool(available_state.get("consumed")) and ts_ms >= _safe_int(available_state.get("ts"), 0) ) if after_return: available_ts = _safe_int(available_state.get("ts"), 0) _append_sample( store, "ps_available_to_typing", ts_ms, ts_ms - available_ts, ) available_state["consumed"] = True if pending_read_ts > 0 and ts_ms >= pending_read_ts: _append_sample( store, "ms_read_to_typing", ts_ms, ts_ms - pending_read_ts, ) pending_read_ts = 0 typing_state = { "started_ts": ts_ms, "revision": revision, "after_return": after_return, } else: typing_state["revision"] = max( int(typing_state.get("revision") or 1), revision, ) continue if event_type == "message_created" and direction == "in": if typing_state is None: pending_read_ts = 0 continue duration_ms = max(0, ts_ms - _safe_int(typing_state.get("started_ts"), 0)) _append_sample(store, "ms_typing_duration", ts_ms, duration_ms) _append_sample( store, "ms_revision_cycles", ts_ms, max(1, _safe_int(typing_state.get("revision"), 1)), ) _append_sample(store, "ms_aborted_rate", ts_ms, 0) if typing_state.get("after_return"): _append_sample(store, "ps_typing_duration", ts_ms, duration_ms) _append_sample(store, "ps_aborted_rate", ts_ms, 0) typing_state = None pending_read_ts = 0 continue if event_type == "composing_abandoned" and direction == "in": if typing_state is None: _append_sample(store, "ms_aborted_count", ts_ms, 1) _append_sample(store, "ms_aborted_rate", ts_ms, 1) continue _append_sample(store, "ms_aborted_count", ts_ms, 1) _append_sample(store, "ms_aborted_rate", ts_ms, 1) _append_sample( store, "ms_revision_cycles", ts_ms, max(1, _safe_int(typing_state.get("revision"), 1)), ) if typing_state.get("after_return"): _append_sample(store, "ps_aborted_count", ts_ms, 1) _append_sample(store, "ps_aborted_rate", ts_ms, 1) typing_state = None continue def _aggregate_bucket(spec: dict, rows: list[dict]) -> float | int | None: if not rows: return None values = [float(row.get("value") or 0.0) for row in rows] aggregate = str(spec.get("aggregate") or "mean").strip().lower() if aggregate == "sum": return sum(values) mean = sum(values) / float(len(values)) if str(spec.get("unit") or "").strip().lower() == "percent": return mean * 100.0 return mean def _build_metric_graph(spec: dict, samples: list[dict], range_key: str, density: str) -> dict[str, Any]: range_days = int(GRAPH_RANGE_DAYS.get(range_key, 90)) cutoff_ts = 0 if range_days > 0: cutoff_dt = dj_timezone.now() - timedelta(days=range_days) cutoff_ts = int(cutoff_dt.timestamp() * 1000) filtered = [ dict(row) for row in list(samples or []) if _safe_int(row.get("ts_ms"), 0) > 0 and (cutoff_ts <= 0 or _safe_int(row.get("ts_ms"), 0) >= cutoff_ts) ] buckets: dict[int, list[dict]] = defaultdict(list) for row in filtered: buckets[_bucket_start_ms(_safe_int(row.get("ts_ms"), 0))].append(row) raw_points = [] for bucket_ts in sorted(buckets.keys()): value = _aggregate_bucket(spec, buckets[bucket_ts]) if value is None: continue raw_points.append( { "x": datetime.fromtimestamp( bucket_ts / 1000, tz=timezone.utc ).isoformat(), "y": round(float(value), 3), "ts_ms": bucket_ts, "label": _bucket_label(bucket_ts), "value_label": format_metric_value(spec, value), } ) points = downsample_points(raw_points, density=density) for row in points: row["label"] = _bucket_label(_safe_int(row.get("ts_ms"), 0)) row["value_label"] = format_metric_value(spec, row.get("y")) latest_value = points[-1]["y"] if points else None previous_value = points[-2]["y"] if len(points) > 1 else None geometry = _graph_geometry(points) return { **spec, "points": points, "raw_count": len(filtered), "count": len(points), "current_value": latest_value, "current_value_label": format_metric_value(spec, latest_value), "delta_label": _format_metric_delta(spec, latest_value, previous_value), "latest_bucket_label": points[-1]["label"] if points else "", "has_data": bool(points), "polyline": geometry["polyline"], "area_path": geometry["area"], "markers": geometry["markers"], "y_min_label": format_metric_value(spec, geometry["y_min"]), "y_max_label": format_metric_value(spec, geometry["y_max"]), } def build_behavioral_graph_payload(*, user, person, range_key: str = "90d", density: str = "medium") -> dict[str, Any]: sessions = list( ChatSession.objects.filter(user=user, identifier__person=person) .select_related("identifier") .order_by("identifier__service", "identifier__identifier") ) session_ids = [str(session.id) for session in sessions] identifiers = set( PersonIdentifier.objects.filter(user=user, person=person).values_list( "identifier", flat=True ) ) samples: dict[str, list[dict]] = defaultdict(list) messages_by_session = _collect_messages(user, session_ids, identifiers) events_by_session = _collect_events(user, session_ids) for session_id in session_ids: _session_message_samples(messages_by_session.get(session_id, []), samples) _session_event_samples(events_by_session.get(session_id, []), samples) graphs = [ _build_metric_graph( spec, samples.get(spec["slug"], []), range_key=range_key, density=density, ) for spec in BEHAVIORAL_METRIC_SPECS ] summary_cards = [ graph for graph in graphs if graph["slug"] in BEHAVIORAL_SUMMARY_SLUGS ] return { "groups": BEHAVIORAL_GROUPS, "graphs": graphs, "summary_cards": summary_cards, "range_key": range_key, "range_label": { "30d": "Last 30 days", "90d": "Last 90 days", "365d": "Last year", "all": "All time", }.get(range_key, "Last 90 days"), "coverage": { "session_count": len(session_ids), "message_count": sum(len(rows) for rows in messages_by_session.values()), "event_count": sum(len(rows) for rows in events_by_session.values()), "services": sorted( { str(getattr(session.identifier, "service", "") or "").strip().lower() for session in sessions if getattr(session, "identifier", None) is not None } ), }, } def get_behavioral_metric_graph(*, user, person, metric_slug: str, range_key: str = "90d", density: str = "medium") -> dict[str, Any]: payload = build_behavioral_graph_payload( user=user, person=person, range_key=range_key, density=density, ) metric = next( (row for row in payload["graphs"] if row["slug"] == str(metric_slug or "").strip()), None, ) if metric is None: raise KeyError(metric_slug) return { **payload, "metric": metric, } def build_behavioral_metric_groups(item_builder: Callable[[dict], dict[str, Any]]) -> list[dict[str, Any]]: groups = [] for group_key, group in BEHAVIORAL_GROUPS.items(): groups.append( { "key": group_key, "title": group["title"], "eyebrow": group["eyebrow"], "items": [ item_builder(spec) for spec in BEHAVIORAL_METRIC_SPECS if spec["group"] == group_key ], } ) return groups