from __future__ import annotations from datetime import datetime, timezone # Reasonable server-side caps per graph to keep payload/UI responsive. DENSITY_POINT_CAPS = { "low": 120, "medium": 280, "high": 560, } MS_MINUTE = 60 * 1000 MS_HOUR = 60 * MS_MINUTE MS_DAY = 24 * MS_HOUR def _effective_ts_ms(point: dict) -> int: value = int(point.get("ts_ms") or 0) if value > 0: return value dt_value = point.get("computed_at") if isinstance(dt_value, datetime): if dt_value.tzinfo is None: dt_value = dt_value.replace(tzinfo=timezone.utc) return int(dt_value.timestamp() * 1000) return 0 def _bucket_ms_for_age(age_ms: int) -> int: if age_ms <= (1 * MS_DAY): return 0 if age_ms <= (7 * MS_DAY): return 15 * MS_MINUTE if age_ms <= (30 * MS_DAY): return 2 * MS_HOUR if age_ms <= (180 * MS_DAY): return 12 * MS_HOUR return 1 * MS_DAY def _compress_to_target(points: list[dict], target: int) -> list[dict]: if target <= 0 or len(points) <= target: return points if target <= 2: return [points[0], points[-1]] stride = max(1, int((len(points) - 2) / (target - 2))) output = [points[0]] idx = 1 while idx < (len(points) - 1) and len(output) < (target - 1): output.append(points[idx]) idx += stride output.append(points[-1]) return output def downsample_points(points: list[dict], density: str = "medium") -> list[dict]: """ Tiered time-range downsampling: keep high-resolution recent data, progressively bucket older ranges. """ rows = [dict(row or {}) for row in list(points or [])] if len(rows) <= 2: return rows rows.sort(key=_effective_ts_ms) latest_ts = _effective_ts_ms(rows[-1]) buckets: dict[tuple[int, int], dict] = {} passthrough: list[dict] = [] for row in rows: ts_ms = _effective_ts_ms(row) if ts_ms <= 0: continue age_ms = max(0, latest_ts - ts_ms) bucket_ms = _bucket_ms_for_age(age_ms) if bucket_ms <= 0: passthrough.append( { "x": datetime.fromtimestamp( ts_ms / 1000, tz=timezone.utc ).isoformat(), "y": row.get("y"), "ts_ms": ts_ms, } ) continue bucket_key = (bucket_ms, int(ts_ms // bucket_ms)) state = buckets.get(bucket_key) y_value = row.get("y") if state is None: buckets[bucket_key] = { "count": 1, "sum": float(y_value) if y_value is not None else 0.0, "last_ts": ts_ms, "last_y": y_value, "has_value": y_value is not None, } else: state["count"] += 1 if y_value is not None: state["sum"] += float(y_value) state["has_value"] = True if ts_ms >= int(state["last_ts"]): state["last_ts"] = ts_ms state["last_y"] = y_value output = list(passthrough) for state in buckets.values(): ts_ms = int(state["last_ts"]) y_value = state["last_y"] if bool(state["has_value"]) and int(state["count"]) > 0: y_value = round(float(state["sum"]) / float(state["count"]), 3) output.append( { "x": datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).isoformat(), "y": y_value, "ts_ms": ts_ms, } ) output.sort(key=lambda row: int(row.get("ts_ms") or 0)) if not output: return [] # Preserve first/last and reduce to density cap if still too large. cap = int( DENSITY_POINT_CAPS.get( str(density or "").strip().lower(), DENSITY_POINT_CAPS["medium"] ) ) compact = _compress_to_target(output, cap) return compact def compact_snapshot_rows( snapshot_rows: list[dict], now_ts_ms: int, cutoff_ts_ms: int ) -> set[int]: """ Returns IDs to keep using the same age-bucket policy as graph sampling. Old rows below cutoff are dropped; remaining rows keep one representative per age bucket (latest in bucket), while preserving newest and oldest. """ rows = [dict(row or {}) for row in list(snapshot_rows or [])] if not rows: return set() enriched = [] for row in rows: ts_ms = int(row.get("source_event_ts") or 0) if ts_ms <= 0: computed_at = row.get("computed_at") if isinstance(computed_at, datetime): if computed_at.tzinfo is None: computed_at = computed_at.replace(tzinfo=timezone.utc) ts_ms = int(computed_at.timestamp() * 1000) if ts_ms <= 0: continue if cutoff_ts_ms > 0 and ts_ms < int(cutoff_ts_ms): continue enriched.append((int(row.get("id") or 0), ts_ms)) if not enriched: return set() enriched.sort(key=lambda item: item[1]) keep_ids = {enriched[0][0], enriched[-1][0]} bucket_map: dict[tuple[int, int], tuple[int, int]] = {} latest_ts = int(now_ts_ms or enriched[-1][1]) for snapshot_id, ts_ms in enriched: age_ms = max(0, latest_ts - ts_ms) bucket_ms = _bucket_ms_for_age(age_ms) if bucket_ms <= 0: keep_ids.add(snapshot_id) continue key = (bucket_ms, int(ts_ms // bucket_ms)) current = bucket_map.get(key) if current is None or ts_ms >= current[1]: bucket_map[key] = (snapshot_id, ts_ms) for snapshot_id, _ in bucket_map.values(): keep_ids.add(snapshot_id) return keep_ids