166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
# Reasonable server-side caps per graph to keep payload/UI responsive.
|
|
DENSITY_POINT_CAPS = {
|
|
"low": 120,
|
|
"medium": 280,
|
|
"high": 560,
|
|
}
|
|
|
|
MS_MINUTE = 60 * 1000
|
|
MS_HOUR = 60 * MS_MINUTE
|
|
MS_DAY = 24 * MS_HOUR
|
|
|
|
|
|
def _effective_ts_ms(point: dict) -> int:
|
|
value = int(point.get("ts_ms") or 0)
|
|
if value > 0:
|
|
return value
|
|
dt_value = point.get("computed_at")
|
|
if isinstance(dt_value, datetime):
|
|
if dt_value.tzinfo is None:
|
|
dt_value = dt_value.replace(tzinfo=timezone.utc)
|
|
return int(dt_value.timestamp() * 1000)
|
|
return 0
|
|
|
|
|
|
def _bucket_ms_for_age(age_ms: int) -> int:
|
|
if age_ms <= (1 * MS_DAY):
|
|
return 0
|
|
if age_ms <= (7 * MS_DAY):
|
|
return 15 * MS_MINUTE
|
|
if age_ms <= (30 * MS_DAY):
|
|
return 2 * MS_HOUR
|
|
if age_ms <= (180 * MS_DAY):
|
|
return 12 * MS_HOUR
|
|
return 1 * MS_DAY
|
|
|
|
|
|
def _compress_to_target(points: list[dict], target: int) -> list[dict]:
|
|
if target <= 0 or len(points) <= target:
|
|
return points
|
|
if target <= 2:
|
|
return [points[0], points[-1]]
|
|
stride = max(1, int((len(points) - 2) / (target - 2)))
|
|
output = [points[0]]
|
|
idx = 1
|
|
while idx < (len(points) - 1) and len(output) < (target - 1):
|
|
output.append(points[idx])
|
|
idx += stride
|
|
output.append(points[-1])
|
|
return output
|
|
|
|
|
|
def downsample_points(points: list[dict], density: str = "medium") -> list[dict]:
|
|
"""
|
|
Tiered time-range downsampling:
|
|
keep high-resolution recent data, progressively bucket older ranges.
|
|
"""
|
|
rows = [dict(row or {}) for row in list(points or [])]
|
|
if len(rows) <= 2:
|
|
return rows
|
|
rows.sort(key=_effective_ts_ms)
|
|
latest_ts = _effective_ts_ms(rows[-1])
|
|
buckets: dict[tuple[int, int], dict] = {}
|
|
passthrough: list[dict] = []
|
|
for row in rows:
|
|
ts_ms = _effective_ts_ms(row)
|
|
if ts_ms <= 0:
|
|
continue
|
|
age_ms = max(0, latest_ts - ts_ms)
|
|
bucket_ms = _bucket_ms_for_age(age_ms)
|
|
if bucket_ms <= 0:
|
|
passthrough.append(
|
|
{
|
|
"x": datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).isoformat(),
|
|
"y": row.get("y"),
|
|
"ts_ms": ts_ms,
|
|
}
|
|
)
|
|
continue
|
|
bucket_key = (bucket_ms, int(ts_ms // bucket_ms))
|
|
state = buckets.get(bucket_key)
|
|
y_value = row.get("y")
|
|
if state is None:
|
|
buckets[bucket_key] = {
|
|
"count": 1,
|
|
"sum": float(y_value) if y_value is not None else 0.0,
|
|
"last_ts": ts_ms,
|
|
"last_y": y_value,
|
|
"has_value": y_value is not None,
|
|
}
|
|
else:
|
|
state["count"] += 1
|
|
if y_value is not None:
|
|
state["sum"] += float(y_value)
|
|
state["has_value"] = True
|
|
if ts_ms >= int(state["last_ts"]):
|
|
state["last_ts"] = ts_ms
|
|
state["last_y"] = y_value
|
|
output = list(passthrough)
|
|
for state in buckets.values():
|
|
ts_ms = int(state["last_ts"])
|
|
y_value = state["last_y"]
|
|
if bool(state["has_value"]) and int(state["count"]) > 0:
|
|
y_value = round(float(state["sum"]) / float(state["count"]), 3)
|
|
output.append(
|
|
{
|
|
"x": datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).isoformat(),
|
|
"y": y_value,
|
|
"ts_ms": ts_ms,
|
|
}
|
|
)
|
|
output.sort(key=lambda row: int(row.get("ts_ms") or 0))
|
|
if not output:
|
|
return []
|
|
# Preserve first/last and reduce to density cap if still too large.
|
|
cap = int(DENSITY_POINT_CAPS.get(str(density or "").strip().lower(), DENSITY_POINT_CAPS["medium"]))
|
|
compact = _compress_to_target(output, cap)
|
|
return compact
|
|
|
|
|
|
def compact_snapshot_rows(snapshot_rows: list[dict], now_ts_ms: int, cutoff_ts_ms: int) -> set[int]:
|
|
"""
|
|
Returns IDs to keep using the same age-bucket policy as graph sampling.
|
|
Old rows below cutoff are dropped; remaining rows keep one representative
|
|
per age bucket (latest in bucket), while preserving newest and oldest.
|
|
"""
|
|
rows = [dict(row or {}) for row in list(snapshot_rows or [])]
|
|
if not rows:
|
|
return set()
|
|
enriched = []
|
|
for row in rows:
|
|
ts_ms = int(row.get("source_event_ts") or 0)
|
|
if ts_ms <= 0:
|
|
computed_at = row.get("computed_at")
|
|
if isinstance(computed_at, datetime):
|
|
if computed_at.tzinfo is None:
|
|
computed_at = computed_at.replace(tzinfo=timezone.utc)
|
|
ts_ms = int(computed_at.timestamp() * 1000)
|
|
if ts_ms <= 0:
|
|
continue
|
|
if cutoff_ts_ms > 0 and ts_ms < int(cutoff_ts_ms):
|
|
continue
|
|
enriched.append((int(row.get("id") or 0), ts_ms))
|
|
if not enriched:
|
|
return set()
|
|
enriched.sort(key=lambda item: item[1])
|
|
keep_ids = {enriched[0][0], enriched[-1][0]}
|
|
bucket_map: dict[tuple[int, int], tuple[int, int]] = {}
|
|
latest_ts = int(now_ts_ms or enriched[-1][1])
|
|
for snapshot_id, ts_ms in enriched:
|
|
age_ms = max(0, latest_ts - ts_ms)
|
|
bucket_ms = _bucket_ms_for_age(age_ms)
|
|
if bucket_ms <= 0:
|
|
keep_ids.add(snapshot_id)
|
|
continue
|
|
key = (bucket_ms, int(ts_ms // bucket_ms))
|
|
current = bucket_map.get(key)
|
|
if current is None or ts_ms >= current[1]:
|
|
bucket_map[key] = (snapshot_id, ts_ms)
|
|
for snapshot_id, _ in bucket_map.values():
|
|
keep_ids.add(snapshot_id)
|
|
return keep_ids
|