from __future__ import annotations import json import statistics from dataclasses import dataclass from typing import Any def safe_int(value: Any, default: int = 0) -> int: try: return int(value) except Exception: return int(default) def parse_payload(value: Any) -> dict: if isinstance(value, dict): return dict(value) if isinstance(value, str): text = value.strip() if not text: return {} try: loaded = json.loads(text) except Exception: return {} if isinstance(loaded, dict): return dict(loaded) return {} def median_ms(values: list[int]) -> int: clean = [int(v) for v in values if safe_int(v, 0) > 0] if not clean: return 0 return int(statistics.median(clean)) def z_score(value: int, baseline_samples: list[int]) -> float: clean = [int(v) for v in baseline_samples if safe_int(v, 0) > 0] if len(clean) < 2: return 0.0 baseline = statistics.median(clean) stdev = statistics.pstdev(clean) if stdev <= 0: return 0.0 return float((float(value) - float(baseline)) / float(stdev)) @dataclass class CompositionState: started_ts: int last_started_ts: int stopped_ts: int = 0 revision: int = 1 class ComposingTracker: def __init__(self, window_ms: int = 300000): self.window_ms = max(1000, int(window_ms or 300000)) self._state: dict[str, CompositionState] = {} def observe_started(self, session_id: str, ts: int) -> CompositionState: key = str(session_id or "").strip() if not key: raise ValueError("session_id is required") safe_ts_value = max(0, safe_int(ts, 0)) state = self._state.get(key) if state is None: state = CompositionState( started_ts=safe_ts_value, last_started_ts=safe_ts_value, revision=1, ) self._state[key] = state return state if state.stopped_ts > 0: state.revision += 1 state.last_started_ts = safe_ts_value state.stopped_ts = 0 return state def observe_stopped(self, session_id: str, ts: int) -> dict | None: key = str(session_id or "").strip() state = self._state.get(key) if state is None: return None safe_ts_value = max(0, safe_int(ts, 0)) duration_ms = max(0, safe_ts_value - int(state.started_ts or 0)) if duration_ms >= self.window_ms: self._state.pop(key, None) return { "started_ts": int(state.started_ts or 0), "stopped_ts": safe_ts_value, "duration_ms": duration_ms, "revision": int(state.revision or 1), "abandoned": True, } state.stopped_ts = safe_ts_value return None def observe_message(self, session_id: str) -> CompositionState | None: key = str(session_id or "").strip() if not key: return None return self._state.pop(key, None) def extract_metric_samples(rows: list[dict]) -> dict[str, list[int]]: delivered_by_message: dict[str, int] = {} read_by_message: dict[str, int] = {} delay_c_samples: list[int] = [] delay_f_samples: list[int] = [] revision_samples: list[int] = [] abandoned_started = 0 abandoned_total = 0 composition_by_session: dict[str, dict[str, int]] = {} presence_by_session: dict[str, int] = {} for row in sorted( list(rows or []), key=lambda item: ( safe_int(item.get("ts"), 0), str(item.get("kind") or ""), str(item.get("session_id") or ""), ), ): kind = str(row.get("kind") or "").strip().lower() session_id = str(row.get("session_id") or "").strip() ts = safe_int(row.get("ts"), 0) payload = parse_payload(row.get("payload")) message_id = str( payload.get("message_id") or payload.get("origin_message_id") or row.get("origin_message_id") or "" ).strip() if kind == "message_delivered" and message_id: delivered_by_message[message_id] = ts continue if kind == "message_read" and message_id: read_by_message[message_id] = ts continue if kind == "presence_available" and session_id: presence_by_session[session_id] = ts continue if kind == "composing_started" and session_id: abandoned_started += 1 state = composition_by_session.get(session_id) if state is None: state = {"started_ts": ts, "revision": 1} composition_by_session[session_id] = state else: state["revision"] = int(state.get("revision", 1)) + 1 if presence_by_session.get(session_id): delta = ts - int(presence_by_session.get(session_id) or 0) if delta >= 0: delay_f_samples.append(delta) continue if kind == "composing_abandoned": abandoned_total += 1 if session_id: composition_by_session.pop(session_id, None) continue if kind == "message_sent" and session_id: state = composition_by_session.pop(session_id, None) if state is None: continue delta = ts - int(state.get("started_ts") or 0) if delta >= 0: delay_c_samples.append(delta) revision_samples.append(max(1, int(state.get("revision") or 1))) delay_b_samples = [] for message_id, delivered_ts in delivered_by_message.items(): read_ts = safe_int(read_by_message.get(message_id), 0) if read_ts > 0 and read_ts >= delivered_ts: delay_b_samples.append(read_ts - delivered_ts) abandoned_rate_samples = [] if abandoned_started > 0: abandoned_rate_samples.append( int(round((float(abandoned_total) / float(abandoned_started)) * 1000)) ) return { "delay_b": delay_b_samples, "delay_c": delay_c_samples, "delay_f": delay_f_samples, "revision": revision_samples, "abandoned_rate": abandoned_rate_samples, } def summarize_metrics(window_rows: list[dict], baseline_rows: list[dict]) -> dict[str, dict]: window_samples = extract_metric_samples(window_rows) baseline_samples = extract_metric_samples(baseline_rows) metrics: dict[str, dict] = {} for metric in ("delay_b", "delay_c", "delay_f", "revision", "abandoned_rate"): samples = list(window_samples.get(metric) or []) if not samples: continue baseline = list(baseline_samples.get(metric) or []) value = median_ms(samples) baseline_value = median_ms(baseline) metrics[metric] = { "value_ms": int(value), "baseline_ms": int(baseline_value), "z_score": float(round(z_score(value, baseline), 6)), "sample_n": len(samples), } return metrics