Improve tasks and backdate insights
This commit is contained in:
424
core/management/commands/reconcile_workspace_metric_history.py
Normal file
424
core/management/commands/reconcile_workspace_metric_history.py
Normal file
@@ -0,0 +1,424 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import statistics
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.utils import timezone as dj_timezone
|
||||
|
||||
from core.models import (
|
||||
Message,
|
||||
Person,
|
||||
PersonIdentifier,
|
||||
WorkspaceConversation,
|
||||
WorkspaceMetricSnapshot,
|
||||
)
|
||||
from core.views.workspace import _conversation_for_person
|
||||
|
||||
|
||||
def _score_from_lag(lag_ms, target_hours=4):
|
||||
if lag_ms is None:
|
||||
return 50.0
|
||||
target_ms = max(1, int(target_hours)) * 60 * 60 * 1000
|
||||
return max(0.0, min(100.0, 100.0 / (1.0 + (float(lag_ms) / target_ms))))
|
||||
|
||||
|
||||
def _median_or_none(values):
|
||||
if not values:
|
||||
return None
|
||||
return float(statistics.median(values))
|
||||
|
||||
|
||||
def _calibrating_payload(last_ts=None):
|
||||
return {
|
||||
"source_event_ts": int(last_ts) if last_ts else None,
|
||||
"stability_state": WorkspaceConversation.StabilityState.CALIBRATING,
|
||||
"stability_score": None,
|
||||
"stability_confidence": 0.0,
|
||||
"stability_sample_messages": 0,
|
||||
"stability_sample_days": 0,
|
||||
"commitment_inbound_score": None,
|
||||
"commitment_outbound_score": None,
|
||||
"commitment_confidence": 0.0,
|
||||
"inbound_messages": 0,
|
||||
"outbound_messages": 0,
|
||||
"reciprocity_score": None,
|
||||
"continuity_score": None,
|
||||
"response_score": None,
|
||||
"volatility_score": None,
|
||||
"inbound_response_score": None,
|
||||
"outbound_response_score": None,
|
||||
"balance_inbound_score": None,
|
||||
"balance_outbound_score": None,
|
||||
}
|
||||
|
||||
|
||||
def _compute_payload(rows, identifier_values):
|
||||
if not rows:
|
||||
return _calibrating_payload(None)
|
||||
|
||||
inbound_count = 0
|
||||
outbound_count = 0
|
||||
daily_counts = {}
|
||||
inbound_response_lags = []
|
||||
outbound_response_lags = []
|
||||
pending_in_ts = None
|
||||
pending_out_ts = None
|
||||
first_ts = int(rows[0]["ts"] or 0)
|
||||
last_ts = int(rows[-1]["ts"] or 0)
|
||||
latest_service = str(rows[-1].get("session__identifier__service") or "").strip().lower()
|
||||
|
||||
for row in rows:
|
||||
ts = int(row.get("ts") or 0)
|
||||
sender = str(row.get("sender_uuid") or "").strip()
|
||||
author = str(row.get("custom_author") or "").strip().upper()
|
||||
if author in {"USER", "BOT"}:
|
||||
is_inbound = False
|
||||
elif author == "OTHER":
|
||||
is_inbound = True
|
||||
else:
|
||||
is_inbound = sender in identifier_values
|
||||
direction = "in" if is_inbound else "out"
|
||||
day_key = datetime.fromtimestamp(ts / 1000, tz=timezone.utc).date().isoformat()
|
||||
daily_counts[day_key] = daily_counts.get(day_key, 0) + 1
|
||||
|
||||
if direction == "in":
|
||||
inbound_count += 1
|
||||
if pending_out_ts is not None and ts >= pending_out_ts:
|
||||
inbound_response_lags.append(ts - pending_out_ts)
|
||||
pending_out_ts = None
|
||||
pending_in_ts = ts
|
||||
else:
|
||||
outbound_count += 1
|
||||
if pending_in_ts is not None and ts >= pending_in_ts:
|
||||
outbound_response_lags.append(ts - pending_in_ts)
|
||||
pending_in_ts = None
|
||||
pending_out_ts = ts
|
||||
|
||||
message_count = len(rows)
|
||||
span_days = max(1, int(((last_ts - first_ts) / (24 * 60 * 60 * 1000)) + 1))
|
||||
sample_days = len(daily_counts)
|
||||
|
||||
total_messages = max(1, inbound_count + outbound_count)
|
||||
reciprocity_score = 100.0 * (
|
||||
1.0 - abs(inbound_count - outbound_count) / total_messages
|
||||
)
|
||||
continuity_score = 100.0 * min(1.0, sample_days / max(1, span_days))
|
||||
out_resp_score = _score_from_lag(_median_or_none(outbound_response_lags))
|
||||
in_resp_score = _score_from_lag(_median_or_none(inbound_response_lags))
|
||||
response_score = (out_resp_score + in_resp_score) / 2.0
|
||||
|
||||
daily_values = list(daily_counts.values())
|
||||
if len(daily_values) > 1:
|
||||
mean_daily = statistics.mean(daily_values)
|
||||
stdev_daily = statistics.pstdev(daily_values)
|
||||
cv = (stdev_daily / mean_daily) if mean_daily else 1.0
|
||||
volatility_score = max(0.0, 100.0 * (1.0 - min(cv, 1.5) / 1.5))
|
||||
else:
|
||||
volatility_score = 60.0
|
||||
|
||||
stability_score = (
|
||||
(0.35 * reciprocity_score)
|
||||
+ (0.25 * continuity_score)
|
||||
+ (0.20 * response_score)
|
||||
+ (0.20 * volatility_score)
|
||||
)
|
||||
|
||||
balance_out = 100.0 * min(1.0, outbound_count / max(1, inbound_count))
|
||||
balance_in = 100.0 * min(1.0, inbound_count / max(1, outbound_count))
|
||||
commitment_out = (0.60 * out_resp_score) + (0.40 * balance_out)
|
||||
commitment_in = (0.60 * in_resp_score) + (0.40 * balance_in)
|
||||
|
||||
msg_conf = min(1.0, message_count / 200.0)
|
||||
day_conf = min(1.0, sample_days / 30.0)
|
||||
pair_conf = min(
|
||||
1.0, (len(inbound_response_lags) + len(outbound_response_lags)) / 40.0
|
||||
)
|
||||
confidence = (0.50 * msg_conf) + (0.30 * day_conf) + (0.20 * pair_conf)
|
||||
|
||||
if message_count < 20 or sample_days < 3 or confidence < 0.25:
|
||||
stability_state = WorkspaceConversation.StabilityState.CALIBRATING
|
||||
stability_score_value = None
|
||||
commitment_in_value = None
|
||||
commitment_out_value = None
|
||||
else:
|
||||
stability_score_value = round(stability_score, 2)
|
||||
commitment_in_value = round(commitment_in, 2)
|
||||
commitment_out_value = round(commitment_out, 2)
|
||||
if stability_score_value >= 70:
|
||||
stability_state = WorkspaceConversation.StabilityState.STABLE
|
||||
elif stability_score_value >= 50:
|
||||
stability_state = WorkspaceConversation.StabilityState.WATCH
|
||||
else:
|
||||
stability_state = WorkspaceConversation.StabilityState.FRAGILE
|
||||
|
||||
feedback_state = "balanced"
|
||||
if outbound_count > (inbound_count * 1.5):
|
||||
feedback_state = "withdrawing"
|
||||
elif inbound_count > (outbound_count * 1.5):
|
||||
feedback_state = "overextending"
|
||||
|
||||
payload = {
|
||||
"source_event_ts": last_ts,
|
||||
"stability_state": stability_state,
|
||||
"stability_score": float(stability_score_value)
|
||||
if stability_score_value is not None
|
||||
else None,
|
||||
"stability_confidence": round(confidence, 3),
|
||||
"stability_sample_messages": message_count,
|
||||
"stability_sample_days": sample_days,
|
||||
"commitment_inbound_score": float(commitment_in_value)
|
||||
if commitment_in_value is not None
|
||||
else None,
|
||||
"commitment_outbound_score": float(commitment_out_value)
|
||||
if commitment_out_value is not None
|
||||
else None,
|
||||
"commitment_confidence": round(confidence, 3),
|
||||
"inbound_messages": inbound_count,
|
||||
"outbound_messages": outbound_count,
|
||||
"reciprocity_score": round(reciprocity_score, 3),
|
||||
"continuity_score": round(continuity_score, 3),
|
||||
"response_score": round(response_score, 3),
|
||||
"volatility_score": round(volatility_score, 3),
|
||||
"inbound_response_score": round(in_resp_score, 3),
|
||||
"outbound_response_score": round(out_resp_score, 3),
|
||||
"balance_inbound_score": round(balance_in, 3),
|
||||
"balance_outbound_score": round(balance_out, 3),
|
||||
}
|
||||
return payload, latest_service, feedback_state
|
||||
|
||||
|
||||
def _payload_signature(payload: dict) -> tuple:
|
||||
return (
|
||||
int(payload.get("source_event_ts") or 0),
|
||||
str(payload.get("stability_state") or ""),
|
||||
payload.get("stability_score"),
|
||||
float(payload.get("stability_confidence") or 0.0),
|
||||
int(payload.get("stability_sample_messages") or 0),
|
||||
int(payload.get("stability_sample_days") or 0),
|
||||
payload.get("commitment_inbound_score"),
|
||||
payload.get("commitment_outbound_score"),
|
||||
float(payload.get("commitment_confidence") or 0.0),
|
||||
int(payload.get("inbound_messages") or 0),
|
||||
int(payload.get("outbound_messages") or 0),
|
||||
)
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = (
|
||||
"Reconcile AI Workspace metric history by deterministically rebuilding "
|
||||
"WorkspaceMetricSnapshot points from message history."
|
||||
)
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument("--days", type=int, default=365)
|
||||
parser.add_argument("--service", default="")
|
||||
parser.add_argument("--user-id", default="")
|
||||
parser.add_argument("--person-id", default="")
|
||||
parser.add_argument("--step-messages", type=int, default=2)
|
||||
parser.add_argument("--limit", type=int, default=200000)
|
||||
parser.add_argument("--dry-run", action="store_true", default=False)
|
||||
parser.add_argument("--no-reset", action="store_true", default=False)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
days = max(1, int(options.get("days") or 365))
|
||||
service = str(options.get("service") or "").strip().lower()
|
||||
user_id = str(options.get("user_id") or "").strip()
|
||||
person_id = str(options.get("person_id") or "").strip()
|
||||
step_messages = max(1, int(options.get("step_messages") or 2))
|
||||
limit = max(1, int(options.get("limit") or 200000))
|
||||
dry_run = bool(options.get("dry_run"))
|
||||
reset = not bool(options.get("no_reset"))
|
||||
today_start = dj_timezone.now().astimezone(timezone.utc).replace(
|
||||
hour=0,
|
||||
minute=0,
|
||||
second=0,
|
||||
microsecond=0,
|
||||
)
|
||||
cutoff_ts = int(
|
||||
(today_start.timestamp() * 1000) - (days * 24 * 60 * 60 * 1000)
|
||||
)
|
||||
|
||||
people_qs = Person.objects.all()
|
||||
if user_id:
|
||||
people_qs = people_qs.filter(user_id=user_id)
|
||||
if person_id:
|
||||
people_qs = people_qs.filter(id=person_id)
|
||||
people = list(people_qs.order_by("user_id", "name", "id"))
|
||||
|
||||
conversations_scanned = 0
|
||||
deleted = 0
|
||||
snapshots_created = 0
|
||||
checkpoints_total = 0
|
||||
|
||||
for person in people:
|
||||
identifiers_qs = PersonIdentifier.objects.filter(user=person.user, person=person)
|
||||
if service:
|
||||
identifiers_qs = identifiers_qs.filter(service=service)
|
||||
identifiers = list(identifiers_qs)
|
||||
if not identifiers:
|
||||
continue
|
||||
identifier_values = {
|
||||
str(row.identifier or "").strip() for row in identifiers if row.identifier
|
||||
}
|
||||
if not identifier_values:
|
||||
continue
|
||||
|
||||
rows = list(
|
||||
Message.objects.filter(
|
||||
user=person.user,
|
||||
session__identifier__in=identifiers,
|
||||
ts__gte=cutoff_ts,
|
||||
)
|
||||
.order_by("ts", "id")
|
||||
.values(
|
||||
"id",
|
||||
"ts",
|
||||
"sender_uuid",
|
||||
"custom_author",
|
||||
"session__identifier__service",
|
||||
)[:limit]
|
||||
)
|
||||
if not rows:
|
||||
continue
|
||||
|
||||
conversation = _conversation_for_person(person.user, person)
|
||||
conversations_scanned += 1
|
||||
|
||||
if reset and not dry_run:
|
||||
deleted += WorkspaceMetricSnapshot.objects.filter(
|
||||
conversation=conversation
|
||||
).delete()[0]
|
||||
|
||||
existing_signatures = set()
|
||||
if not reset:
|
||||
existing_signatures = set(
|
||||
_payload_signature(
|
||||
{
|
||||
"source_event_ts": row.source_event_ts,
|
||||
"stability_state": row.stability_state,
|
||||
"stability_score": row.stability_score,
|
||||
"stability_confidence": row.stability_confidence,
|
||||
"stability_sample_messages": row.stability_sample_messages,
|
||||
"stability_sample_days": row.stability_sample_days,
|
||||
"commitment_inbound_score": row.commitment_inbound_score,
|
||||
"commitment_outbound_score": row.commitment_outbound_score,
|
||||
"commitment_confidence": row.commitment_confidence,
|
||||
"inbound_messages": row.inbound_messages,
|
||||
"outbound_messages": row.outbound_messages,
|
||||
}
|
||||
)
|
||||
for row in WorkspaceMetricSnapshot.objects.filter(
|
||||
conversation=conversation
|
||||
).only(
|
||||
"source_event_ts",
|
||||
"stability_state",
|
||||
"stability_score",
|
||||
"stability_confidence",
|
||||
"stability_sample_messages",
|
||||
"stability_sample_days",
|
||||
"commitment_inbound_score",
|
||||
"commitment_outbound_score",
|
||||
"commitment_confidence",
|
||||
"inbound_messages",
|
||||
"outbound_messages",
|
||||
)
|
||||
)
|
||||
|
||||
checkpoints = list(range(step_messages, len(rows) + 1, step_messages))
|
||||
if not checkpoints or checkpoints[-1] != len(rows):
|
||||
checkpoints.append(len(rows))
|
||||
checkpoints_total += len(checkpoints)
|
||||
|
||||
latest_payload = None
|
||||
latest_service = ""
|
||||
latest_feedback_state = "balanced"
|
||||
|
||||
for stop in checkpoints:
|
||||
computed = _compute_payload(rows[:stop], identifier_values)
|
||||
payload = computed[0]
|
||||
latest_payload = payload
|
||||
latest_service = computed[1]
|
||||
latest_feedback_state = computed[2]
|
||||
signature = _payload_signature(payload)
|
||||
if not reset and signature in existing_signatures:
|
||||
continue
|
||||
snapshots_created += 1
|
||||
if dry_run:
|
||||
continue
|
||||
WorkspaceMetricSnapshot.objects.create(conversation=conversation, **payload)
|
||||
existing_signatures.add(signature)
|
||||
|
||||
if not latest_payload:
|
||||
continue
|
||||
|
||||
feedback = dict(conversation.participant_feedback or {})
|
||||
feedback[str(person.id)] = {
|
||||
"state": latest_feedback_state,
|
||||
"inbound_messages": int(latest_payload.get("inbound_messages") or 0),
|
||||
"outbound_messages": int(latest_payload.get("outbound_messages") or 0),
|
||||
"sample_messages": int(
|
||||
latest_payload.get("stability_sample_messages") or 0
|
||||
),
|
||||
"sample_days": int(latest_payload.get("stability_sample_days") or 0),
|
||||
"updated_at": dj_timezone.now().isoformat(),
|
||||
}
|
||||
if not dry_run:
|
||||
conversation.platform_type = latest_service or conversation.platform_type
|
||||
conversation.last_event_ts = latest_payload.get("source_event_ts")
|
||||
conversation.stability_state = str(
|
||||
latest_payload.get("stability_state")
|
||||
or WorkspaceConversation.StabilityState.CALIBRATING
|
||||
)
|
||||
conversation.stability_score = latest_payload.get("stability_score")
|
||||
conversation.stability_confidence = float(
|
||||
latest_payload.get("stability_confidence") or 0.0
|
||||
)
|
||||
conversation.stability_sample_messages = int(
|
||||
latest_payload.get("stability_sample_messages") or 0
|
||||
)
|
||||
conversation.stability_sample_days = int(
|
||||
latest_payload.get("stability_sample_days") or 0
|
||||
)
|
||||
conversation.commitment_inbound_score = latest_payload.get(
|
||||
"commitment_inbound_score"
|
||||
)
|
||||
conversation.commitment_outbound_score = latest_payload.get(
|
||||
"commitment_outbound_score"
|
||||
)
|
||||
conversation.commitment_confidence = float(
|
||||
latest_payload.get("commitment_confidence") or 0.0
|
||||
)
|
||||
now_ts = dj_timezone.now()
|
||||
conversation.stability_last_computed_at = now_ts
|
||||
conversation.commitment_last_computed_at = now_ts
|
||||
conversation.participant_feedback = feedback
|
||||
conversation.save(
|
||||
update_fields=[
|
||||
"platform_type",
|
||||
"last_event_ts",
|
||||
"stability_state",
|
||||
"stability_score",
|
||||
"stability_confidence",
|
||||
"stability_sample_messages",
|
||||
"stability_sample_days",
|
||||
"stability_last_computed_at",
|
||||
"commitment_inbound_score",
|
||||
"commitment_outbound_score",
|
||||
"commitment_confidence",
|
||||
"commitment_last_computed_at",
|
||||
"participant_feedback",
|
||||
]
|
||||
)
|
||||
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
"reconcile_workspace_metric_history complete "
|
||||
f"conversations_scanned={conversations_scanned} "
|
||||
f"checkpoints={checkpoints_total} "
|
||||
f"created={snapshots_created} "
|
||||
f"deleted={deleted} "
|
||||
f"reset={reset} dry_run={dry_run} "
|
||||
f"days={days} step_messages={step_messages} limit={limit}"
|
||||
)
|
||||
)
|
||||
Reference in New Issue
Block a user