diff --git a/core/management/commands/reconcile_workspace_metric_history.py b/core/management/commands/reconcile_workspace_metric_history.py new file mode 100644 index 0000000..d12b18c --- /dev/null +++ b/core/management/commands/reconcile_workspace_metric_history.py @@ -0,0 +1,424 @@ +from __future__ import annotations + +import statistics +from datetime import datetime, timezone + +from django.core.management.base import BaseCommand +from django.utils import timezone as dj_timezone + +from core.models import ( + Message, + Person, + PersonIdentifier, + WorkspaceConversation, + WorkspaceMetricSnapshot, +) +from core.views.workspace import _conversation_for_person + + +def _score_from_lag(lag_ms, target_hours=4): + if lag_ms is None: + return 50.0 + target_ms = max(1, int(target_hours)) * 60 * 60 * 1000 + return max(0.0, min(100.0, 100.0 / (1.0 + (float(lag_ms) / target_ms)))) + + +def _median_or_none(values): + if not values: + return None + return float(statistics.median(values)) + + +def _calibrating_payload(last_ts=None): + return { + "source_event_ts": int(last_ts) if last_ts else None, + "stability_state": WorkspaceConversation.StabilityState.CALIBRATING, + "stability_score": None, + "stability_confidence": 0.0, + "stability_sample_messages": 0, + "stability_sample_days": 0, + "commitment_inbound_score": None, + "commitment_outbound_score": None, + "commitment_confidence": 0.0, + "inbound_messages": 0, + "outbound_messages": 0, + "reciprocity_score": None, + "continuity_score": None, + "response_score": None, + "volatility_score": None, + "inbound_response_score": None, + "outbound_response_score": None, + "balance_inbound_score": None, + "balance_outbound_score": None, + } + + +def _compute_payload(rows, identifier_values): + if not rows: + return _calibrating_payload(None) + + inbound_count = 0 + outbound_count = 0 + daily_counts = {} + inbound_response_lags = [] + outbound_response_lags = [] + pending_in_ts = None + pending_out_ts = None + first_ts = int(rows[0]["ts"] or 0) + last_ts = int(rows[-1]["ts"] or 0) + latest_service = str(rows[-1].get("session__identifier__service") or "").strip().lower() + + for row in rows: + ts = int(row.get("ts") or 0) + sender = str(row.get("sender_uuid") or "").strip() + author = str(row.get("custom_author") or "").strip().upper() + if author in {"USER", "BOT"}: + is_inbound = False + elif author == "OTHER": + is_inbound = True + else: + is_inbound = sender in identifier_values + direction = "in" if is_inbound else "out" + day_key = datetime.fromtimestamp(ts / 1000, tz=timezone.utc).date().isoformat() + daily_counts[day_key] = daily_counts.get(day_key, 0) + 1 + + if direction == "in": + inbound_count += 1 + if pending_out_ts is not None and ts >= pending_out_ts: + inbound_response_lags.append(ts - pending_out_ts) + pending_out_ts = None + pending_in_ts = ts + else: + outbound_count += 1 + if pending_in_ts is not None and ts >= pending_in_ts: + outbound_response_lags.append(ts - pending_in_ts) + pending_in_ts = None + pending_out_ts = ts + + message_count = len(rows) + span_days = max(1, int(((last_ts - first_ts) / (24 * 60 * 60 * 1000)) + 1)) + sample_days = len(daily_counts) + + total_messages = max(1, inbound_count + outbound_count) + reciprocity_score = 100.0 * ( + 1.0 - abs(inbound_count - outbound_count) / total_messages + ) + continuity_score = 100.0 * min(1.0, sample_days / max(1, span_days)) + out_resp_score = _score_from_lag(_median_or_none(outbound_response_lags)) + in_resp_score = _score_from_lag(_median_or_none(inbound_response_lags)) + response_score = (out_resp_score + in_resp_score) / 2.0 + + daily_values = list(daily_counts.values()) + if len(daily_values) > 1: + mean_daily = statistics.mean(daily_values) + stdev_daily = statistics.pstdev(daily_values) + cv = (stdev_daily / mean_daily) if mean_daily else 1.0 + volatility_score = max(0.0, 100.0 * (1.0 - min(cv, 1.5) / 1.5)) + else: + volatility_score = 60.0 + + stability_score = ( + (0.35 * reciprocity_score) + + (0.25 * continuity_score) + + (0.20 * response_score) + + (0.20 * volatility_score) + ) + + balance_out = 100.0 * min(1.0, outbound_count / max(1, inbound_count)) + balance_in = 100.0 * min(1.0, inbound_count / max(1, outbound_count)) + commitment_out = (0.60 * out_resp_score) + (0.40 * balance_out) + commitment_in = (0.60 * in_resp_score) + (0.40 * balance_in) + + msg_conf = min(1.0, message_count / 200.0) + day_conf = min(1.0, sample_days / 30.0) + pair_conf = min( + 1.0, (len(inbound_response_lags) + len(outbound_response_lags)) / 40.0 + ) + confidence = (0.50 * msg_conf) + (0.30 * day_conf) + (0.20 * pair_conf) + + if message_count < 20 or sample_days < 3 or confidence < 0.25: + stability_state = WorkspaceConversation.StabilityState.CALIBRATING + stability_score_value = None + commitment_in_value = None + commitment_out_value = None + else: + stability_score_value = round(stability_score, 2) + commitment_in_value = round(commitment_in, 2) + commitment_out_value = round(commitment_out, 2) + if stability_score_value >= 70: + stability_state = WorkspaceConversation.StabilityState.STABLE + elif stability_score_value >= 50: + stability_state = WorkspaceConversation.StabilityState.WATCH + else: + stability_state = WorkspaceConversation.StabilityState.FRAGILE + + feedback_state = "balanced" + if outbound_count > (inbound_count * 1.5): + feedback_state = "withdrawing" + elif inbound_count > (outbound_count * 1.5): + feedback_state = "overextending" + + payload = { + "source_event_ts": last_ts, + "stability_state": stability_state, + "stability_score": float(stability_score_value) + if stability_score_value is not None + else None, + "stability_confidence": round(confidence, 3), + "stability_sample_messages": message_count, + "stability_sample_days": sample_days, + "commitment_inbound_score": float(commitment_in_value) + if commitment_in_value is not None + else None, + "commitment_outbound_score": float(commitment_out_value) + if commitment_out_value is not None + else None, + "commitment_confidence": round(confidence, 3), + "inbound_messages": inbound_count, + "outbound_messages": outbound_count, + "reciprocity_score": round(reciprocity_score, 3), + "continuity_score": round(continuity_score, 3), + "response_score": round(response_score, 3), + "volatility_score": round(volatility_score, 3), + "inbound_response_score": round(in_resp_score, 3), + "outbound_response_score": round(out_resp_score, 3), + "balance_inbound_score": round(balance_in, 3), + "balance_outbound_score": round(balance_out, 3), + } + return payload, latest_service, feedback_state + + +def _payload_signature(payload: dict) -> tuple: + return ( + int(payload.get("source_event_ts") or 0), + str(payload.get("stability_state") or ""), + payload.get("stability_score"), + float(payload.get("stability_confidence") or 0.0), + int(payload.get("stability_sample_messages") or 0), + int(payload.get("stability_sample_days") or 0), + payload.get("commitment_inbound_score"), + payload.get("commitment_outbound_score"), + float(payload.get("commitment_confidence") or 0.0), + int(payload.get("inbound_messages") or 0), + int(payload.get("outbound_messages") or 0), + ) + + +class Command(BaseCommand): + help = ( + "Reconcile AI Workspace metric history by deterministically rebuilding " + "WorkspaceMetricSnapshot points from message history." + ) + + def add_arguments(self, parser): + parser.add_argument("--days", type=int, default=365) + parser.add_argument("--service", default="") + parser.add_argument("--user-id", default="") + parser.add_argument("--person-id", default="") + parser.add_argument("--step-messages", type=int, default=2) + parser.add_argument("--limit", type=int, default=200000) + parser.add_argument("--dry-run", action="store_true", default=False) + parser.add_argument("--no-reset", action="store_true", default=False) + + def handle(self, *args, **options): + days = max(1, int(options.get("days") or 365)) + service = str(options.get("service") or "").strip().lower() + user_id = str(options.get("user_id") or "").strip() + person_id = str(options.get("person_id") or "").strip() + step_messages = max(1, int(options.get("step_messages") or 2)) + limit = max(1, int(options.get("limit") or 200000)) + dry_run = bool(options.get("dry_run")) + reset = not bool(options.get("no_reset")) + today_start = dj_timezone.now().astimezone(timezone.utc).replace( + hour=0, + minute=0, + second=0, + microsecond=0, + ) + cutoff_ts = int( + (today_start.timestamp() * 1000) - (days * 24 * 60 * 60 * 1000) + ) + + people_qs = Person.objects.all() + if user_id: + people_qs = people_qs.filter(user_id=user_id) + if person_id: + people_qs = people_qs.filter(id=person_id) + people = list(people_qs.order_by("user_id", "name", "id")) + + conversations_scanned = 0 + deleted = 0 + snapshots_created = 0 + checkpoints_total = 0 + + for person in people: + identifiers_qs = PersonIdentifier.objects.filter(user=person.user, person=person) + if service: + identifiers_qs = identifiers_qs.filter(service=service) + identifiers = list(identifiers_qs) + if not identifiers: + continue + identifier_values = { + str(row.identifier or "").strip() for row in identifiers if row.identifier + } + if not identifier_values: + continue + + rows = list( + Message.objects.filter( + user=person.user, + session__identifier__in=identifiers, + ts__gte=cutoff_ts, + ) + .order_by("ts", "id") + .values( + "id", + "ts", + "sender_uuid", + "custom_author", + "session__identifier__service", + )[:limit] + ) + if not rows: + continue + + conversation = _conversation_for_person(person.user, person) + conversations_scanned += 1 + + if reset and not dry_run: + deleted += WorkspaceMetricSnapshot.objects.filter( + conversation=conversation + ).delete()[0] + + existing_signatures = set() + if not reset: + existing_signatures = set( + _payload_signature( + { + "source_event_ts": row.source_event_ts, + "stability_state": row.stability_state, + "stability_score": row.stability_score, + "stability_confidence": row.stability_confidence, + "stability_sample_messages": row.stability_sample_messages, + "stability_sample_days": row.stability_sample_days, + "commitment_inbound_score": row.commitment_inbound_score, + "commitment_outbound_score": row.commitment_outbound_score, + "commitment_confidence": row.commitment_confidence, + "inbound_messages": row.inbound_messages, + "outbound_messages": row.outbound_messages, + } + ) + for row in WorkspaceMetricSnapshot.objects.filter( + conversation=conversation + ).only( + "source_event_ts", + "stability_state", + "stability_score", + "stability_confidence", + "stability_sample_messages", + "stability_sample_days", + "commitment_inbound_score", + "commitment_outbound_score", + "commitment_confidence", + "inbound_messages", + "outbound_messages", + ) + ) + + checkpoints = list(range(step_messages, len(rows) + 1, step_messages)) + if not checkpoints or checkpoints[-1] != len(rows): + checkpoints.append(len(rows)) + checkpoints_total += len(checkpoints) + + latest_payload = None + latest_service = "" + latest_feedback_state = "balanced" + + for stop in checkpoints: + computed = _compute_payload(rows[:stop], identifier_values) + payload = computed[0] + latest_payload = payload + latest_service = computed[1] + latest_feedback_state = computed[2] + signature = _payload_signature(payload) + if not reset and signature in existing_signatures: + continue + snapshots_created += 1 + if dry_run: + continue + WorkspaceMetricSnapshot.objects.create(conversation=conversation, **payload) + existing_signatures.add(signature) + + if not latest_payload: + continue + + feedback = dict(conversation.participant_feedback or {}) + feedback[str(person.id)] = { + "state": latest_feedback_state, + "inbound_messages": int(latest_payload.get("inbound_messages") or 0), + "outbound_messages": int(latest_payload.get("outbound_messages") or 0), + "sample_messages": int( + latest_payload.get("stability_sample_messages") or 0 + ), + "sample_days": int(latest_payload.get("stability_sample_days") or 0), + "updated_at": dj_timezone.now().isoformat(), + } + if not dry_run: + conversation.platform_type = latest_service or conversation.platform_type + conversation.last_event_ts = latest_payload.get("source_event_ts") + conversation.stability_state = str( + latest_payload.get("stability_state") + or WorkspaceConversation.StabilityState.CALIBRATING + ) + conversation.stability_score = latest_payload.get("stability_score") + conversation.stability_confidence = float( + latest_payload.get("stability_confidence") or 0.0 + ) + conversation.stability_sample_messages = int( + latest_payload.get("stability_sample_messages") or 0 + ) + conversation.stability_sample_days = int( + latest_payload.get("stability_sample_days") or 0 + ) + conversation.commitment_inbound_score = latest_payload.get( + "commitment_inbound_score" + ) + conversation.commitment_outbound_score = latest_payload.get( + "commitment_outbound_score" + ) + conversation.commitment_confidence = float( + latest_payload.get("commitment_confidence") or 0.0 + ) + now_ts = dj_timezone.now() + conversation.stability_last_computed_at = now_ts + conversation.commitment_last_computed_at = now_ts + conversation.participant_feedback = feedback + conversation.save( + update_fields=[ + "platform_type", + "last_event_ts", + "stability_state", + "stability_score", + "stability_confidence", + "stability_sample_messages", + "stability_sample_days", + "stability_last_computed_at", + "commitment_inbound_score", + "commitment_outbound_score", + "commitment_confidence", + "commitment_last_computed_at", + "participant_feedback", + ] + ) + + self.stdout.write( + self.style.SUCCESS( + "reconcile_workspace_metric_history complete " + f"conversations_scanned={conversations_scanned} " + f"checkpoints={checkpoints_total} " + f"created={snapshots_created} " + f"deleted={deleted} " + f"reset={reset} dry_run={dry_run} " + f"days={days} step_messages={step_messages} limit={limit}" + ) + ) diff --git a/core/templates/pages/ai-workspace-information.html b/core/templates/pages/ai-workspace-information.html index 952a59d..bba06fa 100644 --- a/core/templates/pages/ai-workspace-information.html +++ b/core/templates/pages/ai-workspace-information.html @@ -13,7 +13,7 @@
Commitment directionality and underlying metric factors.
+Commitment directionality and underlying metric factors from deterministic message-history snapshots.
{% include "partials/ai-insight-nav.html" with active_tab="information" %}- Historical metrics for workspace {{ workspace_conversation.id }} + Historical metrics for workspace {{ workspace_conversation.id }}. Points come from deterministic message-history snapshots (not only mitigation runs).
{% include "partials/ai-insight-nav.html" with active_tab="graphs" %}Combined explanation for each metric collection group and what it can - imply in relationship dynamics. + imply in relationship dynamics. Scoring is deterministic from message + history and can be backfilled via metric history reconciliation.
{% include "partials/ai-insight-nav.html" with active_tab="help" %} diff --git a/core/templates/pages/availability-settings.html b/core/templates/pages/availability-settings.html index ce2efce..1a8e0bf 100644 --- a/core/templates/pages/availability-settings.html +++ b/core/templates/pages/availability-settings.html @@ -23,102 +23,45 @@ - -| ts | person | service | source | state | confidence | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Contact | +Service | +Total | +Available | +Fading | +Unavailable | +Unknown | +Native | +Read | +Typing | +Msg Activity | +Timeout | +Last Event TS | +|||||
| {{ row.ts }} | -{{ row.person.name }} | +{{ row.person__name }} | {{ row.service }} | -{{ row.source_kind }} | -{{ row.availability_state }} | -{{ row.confidence|floatformat:2 }} | +{{ row.total_events }} | +{{ row.available_events }} | +{{ row.fading_events }} | +{{ row.unavailable_events }} | +{{ row.unknown_events }} | +{{ row.native_presence_events }} | +{{ row.read_receipt_events }} | +{{ row.typing_events }} | +{{ row.message_activity_events }} | +{{ row.inferred_timeout_events }} | +{{ row.last_event_ts }} |
| No events in range. | |||||||||||||||||
| person | service | state | start | end | confidence | |||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| {{ row.person.name }} | -{{ row.service }} | -{{ row.state }} | -{{ row.start_ts }} | -{{ row.end_ts }} | -{{ row.confidence_start|floatformat:2 }} -> {{ row.confidence_end|floatformat:2 }} | -|||||||
| No spans in range. | ||||||||||||
| No availability events found. | ||||||||||||
{{ service_label }} · {{ identifier }}
+| Project | Epic | Channel | Enabled | |
|---|---|---|---|---|
| {{ row.project.name }} | +{% if row.epic %}{{ row.epic.name }}{% else %}-{% endif %} | +
+ {{ row.service }} · {{ row.channel_identifier }}{{ channel_display_name }} + {% endif %} + |
+ {{ row.enabled }} | +Open Project | +
| No mappings for this group. | ||||
| Ref | Title | Project | Status | |
|---|---|---|---|---|
| #{{ row.reference_code }} | +{{ row.title }} | +{{ row.project.name }}{% if row.epic %} / {{ row.epic.name }}{% endif %} | +{{ row.status_snapshot }} | +Open | +
| No tasks yet. | ||||
Immutable tasks derived from chat activity.
-