Files
GIA/core/workspace/sampling.py

174 lines
5.7 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
# Reasonable server-side caps per graph to keep payload/UI responsive.
DENSITY_POINT_CAPS = {
"low": 120,
"medium": 280,
"high": 560,
}
MS_MINUTE = 60 * 1000
MS_HOUR = 60 * MS_MINUTE
MS_DAY = 24 * MS_HOUR
def _effective_ts_ms(point: dict) -> int:
value = int(point.get("ts_ms") or 0)
if value > 0:
return value
dt_value = point.get("computed_at")
if isinstance(dt_value, datetime):
if dt_value.tzinfo is None:
dt_value = dt_value.replace(tzinfo=timezone.utc)
return int(dt_value.timestamp() * 1000)
return 0
def _bucket_ms_for_age(age_ms: int) -> int:
if age_ms <= (1 * MS_DAY):
return 0
if age_ms <= (7 * MS_DAY):
return 15 * MS_MINUTE
if age_ms <= (30 * MS_DAY):
return 2 * MS_HOUR
if age_ms <= (180 * MS_DAY):
return 12 * MS_HOUR
return 1 * MS_DAY
def _compress_to_target(points: list[dict], target: int) -> list[dict]:
if target <= 0 or len(points) <= target:
return points
if target <= 2:
return [points[0], points[-1]]
stride = max(1, int((len(points) - 2) / (target - 2)))
output = [points[0]]
idx = 1
while idx < (len(points) - 1) and len(output) < (target - 1):
output.append(points[idx])
idx += stride
output.append(points[-1])
return output
def downsample_points(points: list[dict], density: str = "medium") -> list[dict]:
"""
Tiered time-range downsampling:
keep high-resolution recent data, progressively bucket older ranges.
"""
rows = [dict(row or {}) for row in list(points or [])]
if len(rows) <= 2:
return rows
rows.sort(key=_effective_ts_ms)
latest_ts = _effective_ts_ms(rows[-1])
buckets: dict[tuple[int, int], dict] = {}
passthrough: list[dict] = []
for row in rows:
ts_ms = _effective_ts_ms(row)
if ts_ms <= 0:
continue
age_ms = max(0, latest_ts - ts_ms)
bucket_ms = _bucket_ms_for_age(age_ms)
if bucket_ms <= 0:
passthrough.append(
{
"x": datetime.fromtimestamp(
ts_ms / 1000, tz=timezone.utc
).isoformat(),
"y": row.get("y"),
"ts_ms": ts_ms,
}
)
continue
bucket_key = (bucket_ms, int(ts_ms // bucket_ms))
state = buckets.get(bucket_key)
y_value = row.get("y")
if state is None:
buckets[bucket_key] = {
"count": 1,
"sum": float(y_value) if y_value is not None else 0.0,
"last_ts": ts_ms,
"last_y": y_value,
"has_value": y_value is not None,
}
else:
state["count"] += 1
if y_value is not None:
state["sum"] += float(y_value)
state["has_value"] = True
if ts_ms >= int(state["last_ts"]):
state["last_ts"] = ts_ms
state["last_y"] = y_value
output = list(passthrough)
for state in buckets.values():
ts_ms = int(state["last_ts"])
y_value = state["last_y"]
if bool(state["has_value"]) and int(state["count"]) > 0:
y_value = round(float(state["sum"]) / float(state["count"]), 3)
output.append(
{
"x": datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).isoformat(),
"y": y_value,
"ts_ms": ts_ms,
}
)
output.sort(key=lambda row: int(row.get("ts_ms") or 0))
if not output:
return []
# Preserve first/last and reduce to density cap if still too large.
cap = int(
DENSITY_POINT_CAPS.get(
str(density or "").strip().lower(), DENSITY_POINT_CAPS["medium"]
)
)
compact = _compress_to_target(output, cap)
return compact
def compact_snapshot_rows(
snapshot_rows: list[dict], now_ts_ms: int, cutoff_ts_ms: int
) -> set[int]:
"""
Returns IDs to keep using the same age-bucket policy as graph sampling.
Old rows below cutoff are dropped; remaining rows keep one representative
per age bucket (latest in bucket), while preserving newest and oldest.
"""
rows = [dict(row or {}) for row in list(snapshot_rows or [])]
if not rows:
return set()
enriched = []
for row in rows:
ts_ms = int(row.get("source_event_ts") or 0)
if ts_ms <= 0:
computed_at = row.get("computed_at")
if isinstance(computed_at, datetime):
if computed_at.tzinfo is None:
computed_at = computed_at.replace(tzinfo=timezone.utc)
ts_ms = int(computed_at.timestamp() * 1000)
if ts_ms <= 0:
continue
if cutoff_ts_ms > 0 and ts_ms < int(cutoff_ts_ms):
continue
enriched.append((int(row.get("id") or 0), ts_ms))
if not enriched:
return set()
enriched.sort(key=lambda item: item[1])
keep_ids = {enriched[0][0], enriched[-1][0]}
bucket_map: dict[tuple[int, int], tuple[int, int]] = {}
latest_ts = int(now_ts_ms or enriched[-1][1])
for snapshot_id, ts_ms in enriched:
age_ms = max(0, latest_ts - ts_ms)
bucket_ms = _bucket_ms_for_age(age_ms)
if bucket_ms <= 0:
keep_ids.add(snapshot_id)
continue
key = (bucket_ms, int(ts_ms // bucket_ms))
current = bucket_map.get(key)
if current is None or ts_ms >= current[1]:
bucket_map[key] = (snapshot_id, ts_ms)
for snapshot_id, _ in bucket_map.values():
keep_ids.add(snapshot_id)
return keep_ids