Files
GIA/core/events/behavior.py

214 lines
7.1 KiB
Python

from __future__ import annotations
import json
import statistics
from dataclasses import dataclass
from typing import Any
def safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except Exception:
return int(default)
def parse_payload(value: Any) -> dict:
if isinstance(value, dict):
return dict(value)
if isinstance(value, str):
text = value.strip()
if not text:
return {}
try:
loaded = json.loads(text)
except Exception:
return {}
if isinstance(loaded, dict):
return dict(loaded)
return {}
def median_ms(values: list[int]) -> int:
clean = [int(v) for v in values if safe_int(v, 0) > 0]
if not clean:
return 0
return int(statistics.median(clean))
def z_score(value: int, baseline_samples: list[int]) -> float:
clean = [int(v) for v in baseline_samples if safe_int(v, 0) > 0]
if len(clean) < 2:
return 0.0
baseline = statistics.median(clean)
stdev = statistics.pstdev(clean)
if stdev <= 0:
return 0.0
return float((float(value) - float(baseline)) / float(stdev))
@dataclass
class CompositionState:
started_ts: int
last_started_ts: int
stopped_ts: int = 0
revision: int = 1
class ComposingTracker:
def __init__(self, window_ms: int = 300000):
self.window_ms = max(1000, int(window_ms or 300000))
self._state: dict[str, CompositionState] = {}
def observe_started(self, session_id: str, ts: int) -> CompositionState:
key = str(session_id or "").strip()
if not key:
raise ValueError("session_id is required")
safe_ts_value = max(0, safe_int(ts, 0))
state = self._state.get(key)
if state is None:
state = CompositionState(
started_ts=safe_ts_value,
last_started_ts=safe_ts_value,
revision=1,
)
self._state[key] = state
return state
if state.stopped_ts > 0:
state.revision += 1
state.last_started_ts = safe_ts_value
state.stopped_ts = 0
return state
def observe_stopped(self, session_id: str, ts: int) -> dict | None:
key = str(session_id or "").strip()
state = self._state.get(key)
if state is None:
return None
safe_ts_value = max(0, safe_int(ts, 0))
duration_ms = max(0, safe_ts_value - int(state.started_ts or 0))
if duration_ms >= self.window_ms:
self._state.pop(key, None)
return {
"started_ts": int(state.started_ts or 0),
"stopped_ts": safe_ts_value,
"duration_ms": duration_ms,
"revision": int(state.revision or 1),
"abandoned": True,
}
state.stopped_ts = safe_ts_value
return None
def observe_message(self, session_id: str) -> CompositionState | None:
key = str(session_id or "").strip()
if not key:
return None
return self._state.pop(key, None)
def extract_metric_samples(rows: list[dict]) -> dict[str, list[int]]:
delivered_by_message: dict[str, int] = {}
read_by_message: dict[str, int] = {}
delay_c_samples: list[int] = []
delay_f_samples: list[int] = []
revision_samples: list[int] = []
abandoned_started = 0
abandoned_total = 0
composition_by_session: dict[str, dict[str, int]] = {}
presence_by_session: dict[str, int] = {}
for row in sorted(
list(rows or []),
key=lambda item: (
safe_int(item.get("ts"), 0),
str(item.get("kind") or ""),
str(item.get("session_id") or ""),
),
):
kind = str(row.get("kind") or "").strip().lower()
session_id = str(row.get("session_id") or "").strip()
ts = safe_int(row.get("ts"), 0)
payload = parse_payload(row.get("payload"))
message_id = str(
payload.get("message_id")
or payload.get("origin_message_id")
or row.get("origin_message_id")
or ""
).strip()
if kind == "message_delivered" and message_id:
delivered_by_message[message_id] = ts
continue
if kind == "message_read" and message_id:
read_by_message[message_id] = ts
continue
if kind == "presence_available" and session_id:
presence_by_session[session_id] = ts
continue
if kind == "composing_started" and session_id:
abandoned_started += 1
state = composition_by_session.get(session_id)
if state is None:
state = {"started_ts": ts, "revision": 1}
composition_by_session[session_id] = state
else:
state["revision"] = int(state.get("revision", 1)) + 1
if presence_by_session.get(session_id):
delta = ts - int(presence_by_session.get(session_id) or 0)
if delta >= 0:
delay_f_samples.append(delta)
continue
if kind == "composing_abandoned":
abandoned_total += 1
if session_id:
composition_by_session.pop(session_id, None)
continue
if kind == "message_sent" and session_id:
state = composition_by_session.pop(session_id, None)
if state is None:
continue
delta = ts - int(state.get("started_ts") or 0)
if delta >= 0:
delay_c_samples.append(delta)
revision_samples.append(max(1, int(state.get("revision") or 1)))
delay_b_samples = []
for message_id, delivered_ts in delivered_by_message.items():
read_ts = safe_int(read_by_message.get(message_id), 0)
if read_ts > 0 and read_ts >= delivered_ts:
delay_b_samples.append(read_ts - delivered_ts)
abandoned_rate_samples = []
if abandoned_started > 0:
abandoned_rate_samples.append(
int(round((float(abandoned_total) / float(abandoned_started)) * 1000))
)
return {
"delay_b": delay_b_samples,
"delay_c": delay_c_samples,
"delay_f": delay_f_samples,
"revision": revision_samples,
"abandoned_rate": abandoned_rate_samples,
}
def summarize_metrics(window_rows: list[dict], baseline_rows: list[dict]) -> dict[str, dict]:
window_samples = extract_metric_samples(window_rows)
baseline_samples = extract_metric_samples(baseline_rows)
metrics: dict[str, dict] = {}
for metric in ("delay_b", "delay_c", "delay_f", "revision", "abandoned_rate"):
samples = list(window_samples.get(metric) or [])
if not samples:
continue
baseline = list(baseline_samples.get(metric) or [])
value = median_ms(samples)
baseline_value = median_ms(baseline)
metrics[metric] = {
"value_ms": int(value),
"baseline_ms": int(baseline_value),
"z_score": float(round(z_score(value, baseline), 6)),
"sample_n": len(samples),
}
return metrics