From 18351abb00b559c0ce56d84031da00833eeac2ff Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 3 Mar 2026 17:35:45 +0000 Subject: [PATCH] Implement workspace history reconciliation --- core/clients/signal.py | 11 ++ core/clients/signalapi.py | 60 ++++++- .../reconcile_workspace_metric_history.py | 28 +++ .../pages/ai-workspace-insight-graphs.html | 23 ++- core/templates/pages/tasks-hub.html | 4 +- core/templates/partials/compose-panel.html | 2 +- ...test_reconcile_workspace_metric_history.py | 23 +++ core/tests/test_signal_send_normalization.py | 74 ++++++++ core/tests/test_tasks_pages_management.py | 77 ++++---- core/tests/test_workspace_graph_sampling.py | 84 +++++++++ core/views/compose.py | 6 +- core/views/workspace.py | 53 ++++-- core/workspace/__init__.py | 3 + core/workspace/sampling.py | 165 ++++++++++++++++++ 14 files changed, 556 insertions(+), 57 deletions(-) create mode 100644 core/tests/test_signal_send_normalization.py create mode 100644 core/tests/test_workspace_graph_sampling.py create mode 100644 core/workspace/__init__.py create mode 100644 core/workspace/sampling.py diff --git a/core/clients/signal.py b/core/clients/signal.py index f992387..06352f6 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -1025,7 +1025,18 @@ class SignalClient(ClientBase): text=text, attachments=attachments, metadata=metadata, + detailed=True, ) + if isinstance(result, dict) and (not bool(result.get("ok"))): + status_value = int(result.get("status") or 0) + error_text = str(result.get("error") or "").strip() + recipient_value = str(result.get("recipient") or recipient).strip() + raise RuntimeError( + "signal_send_failed" + f" status={status_value or 'unknown'}" + f" recipient={recipient_value or 'unknown'}" + f" error={error_text or 'unknown'}" + ) if result is False or result is None: raise RuntimeError("signal_send_failed") transport.set_runtime_command_result( diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py index 155cf46..b9e56c2 100644 --- a/core/clients/signalapi.py +++ b/core/clients/signalapi.py @@ -1,6 +1,7 @@ import asyncio import base64 import logging +import re import aiohttp import orjson @@ -9,6 +10,25 @@ from django.conf import settings from rest_framework import status log = logging.getLogger(__name__) +SIGNAL_UUID_PATTERN = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", + re.IGNORECASE, +) + + +def normalize_signal_recipient(recipient: str) -> str: + raw = str(recipient or "").strip() + if not raw: + return "" + if SIGNAL_UUID_PATTERN.fullmatch(raw): + return raw + if raw.startswith("+"): + digits = re.sub(r"[^0-9]", "", raw) + return f"+{digits}" if digits else raw + digits_only = re.sub(r"[^0-9]", "", raw) + if digits_only and raw.isdigit(): + return f"+{digits_only}" + return raw async def start_typing(uuid): @@ -73,7 +93,9 @@ async def download_and_encode_base64(file_url, filename, content_type, session=N return None -async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata=None): +async def send_message_raw( + recipient_uuid, text=None, attachments=None, metadata=None, detailed=False +): """ Sends a message using the Signal REST API, ensuring attachment links are not included in the text body. @@ -88,8 +110,9 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata base = getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080").rstrip("/") url = f"{base}/v2/send" + normalized_recipient = normalize_signal_recipient(recipient_uuid) data = { - "recipients": [recipient_uuid], + "recipients": [normalized_recipient], "number": settings.SIGNAL_NUMBER, "base64_attachments": [], } @@ -168,8 +191,37 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata ts = orjson.loads(response_text).get("timestamp", None) return ts if ts else False if index == len(payloads) - 1: + log.warning( + "Signal send failed status=%s recipient=%s body=%s", + response_status, + normalized_recipient, + response_text[:300], + ) + if detailed: + return { + "ok": False, + "status": int(response_status), + "error": str(response_text or "").strip()[:500], + "recipient": normalized_recipient, + } return False - if response_status not in {status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY}: + if response_status not in { + status.HTTP_400_BAD_REQUEST, + status.HTTP_422_UNPROCESSABLE_ENTITY, + }: + log.warning( + "Signal send failed early status=%s recipient=%s body=%s", + response_status, + normalized_recipient, + response_text[:300], + ) + if detailed: + return { + "ok": False, + "status": int(response_status), + "error": str(response_text or "").strip()[:500], + "recipient": normalized_recipient, + } return False log.warning( "signal send quote payload rejected (%s), trying fallback shape: %s", @@ -305,7 +357,7 @@ def send_message_raw_sync(recipient_uuid, text=None, attachments=None): base = getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080").rstrip("/") url = f"{base}/v2/send" data = { - "recipients": [recipient_uuid], + "recipients": [normalize_signal_recipient(recipient_uuid)], "number": settings.SIGNAL_NUMBER, "base64_attachments": [], } diff --git a/core/management/commands/reconcile_workspace_metric_history.py b/core/management/commands/reconcile_workspace_metric_history.py index d12b18c..45227fa 100644 --- a/core/management/commands/reconcile_workspace_metric_history.py +++ b/core/management/commands/reconcile_workspace_metric_history.py @@ -14,6 +14,7 @@ from core.models import ( WorkspaceMetricSnapshot, ) from core.views.workspace import _conversation_for_person +from core.workspace import compact_snapshot_rows def _score_from_lag(lag_ms, target_hours=4): @@ -219,6 +220,7 @@ class Command(BaseCommand): parser.add_argument("--limit", type=int, default=200000) parser.add_argument("--dry-run", action="store_true", default=False) parser.add_argument("--no-reset", action="store_true", default=False) + parser.add_argument("--skip-compact", action="store_true", default=False) def handle(self, *args, **options): days = max(1, int(options.get("days") or 365)) @@ -229,6 +231,7 @@ class Command(BaseCommand): limit = max(1, int(options.get("limit") or 200000)) dry_run = bool(options.get("dry_run")) reset = not bool(options.get("no_reset")) + compact_enabled = not bool(options.get("skip_compact")) today_start = dj_timezone.now().astimezone(timezone.utc).replace( hour=0, minute=0, @@ -250,6 +253,7 @@ class Command(BaseCommand): deleted = 0 snapshots_created = 0 checkpoints_total = 0 + compacted_deleted = 0 for person in people: identifiers_qs = PersonIdentifier.objects.filter(user=person.user, person=person) @@ -410,6 +414,28 @@ class Command(BaseCommand): "participant_feedback", ] ) + if compact_enabled: + snapshot_rows = list( + WorkspaceMetricSnapshot.objects.filter(conversation=conversation) + .order_by("computed_at", "id") + .values("id", "computed_at", "source_event_ts") + ) + now_ts_ms = int(dj_timezone.now().timestamp() * 1000) + keep_ids = compact_snapshot_rows( + snapshot_rows=snapshot_rows, + now_ts_ms=now_ts_ms, + cutoff_ts_ms=cutoff_ts, + ) + if keep_ids: + compacted_deleted += ( + WorkspaceMetricSnapshot.objects.filter(conversation=conversation) + .exclude(id__in=list(keep_ids)) + .delete()[0] + ) + else: + compacted_deleted += WorkspaceMetricSnapshot.objects.filter( + conversation=conversation + ).delete()[0] self.stdout.write( self.style.SUCCESS( @@ -418,6 +444,8 @@ class Command(BaseCommand): f"checkpoints={checkpoints_total} " f"created={snapshots_created} " f"deleted={deleted} " + f"compacted_deleted={compacted_deleted} " + f"compact_enabled={compact_enabled} " f"reset={reset} dry_run={dry_run} " f"days={days} step_messages={step_messages} limit={limit}" ) diff --git a/core/templates/pages/ai-workspace-insight-graphs.html b/core/templates/pages/ai-workspace-insight-graphs.html index feeca74..e0da4b0 100644 --- a/core/templates/pages/ai-workspace-insight-graphs.html +++ b/core/templates/pages/ai-workspace-insight-graphs.html @@ -14,8 +14,25 @@

Insight Graphs: {{ person.name }}

- Historical metrics for workspace {{ workspace_conversation.id }}. Points come from deterministic message-history snapshots (not only mitigation runs). + Historical metrics for workspace {{ workspace_conversation.id }}. Points are range-downsampled server-side with high-resolution recent data and progressively sparser older ranges.

+ {% include "partials/ai-insight-nav.html" with active_tab="graphs" %}
@@ -26,7 +43,9 @@

{{ graph.group_title }}

{{ graph.title }}

-

{{ graph.count }} point{{ graph.count|pluralize }}

+

+ {{ graph.count }} displayed of {{ graph.raw_count }} source point{{ graph.raw_count|pluralize }} +

diff --git a/core/templates/pages/tasks-hub.html b/core/templates/pages/tasks-hub.html index 8ece46b..8b4c359 100644 --- a/core/templates/pages/tasks-hub.html +++ b/core/templates/pages/tasks-hub.html @@ -66,13 +66,13 @@ - {% for row in person_identifiers %} + {% for row in person_identifier_rows %}
IdentifierService
{{ row.identifier }} {{ row.service }} {% if selected_project %} - {% if tuple(selected_project.id, row.service, row.identifier) in mapping_pairs %} + {% if row.mapped %} Linked {% else %}
diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index a175a5d..4cdfebd 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -130,7 +130,7 @@ Quick Insights - + Tasks diff --git a/core/tests/test_reconcile_workspace_metric_history.py b/core/tests/test_reconcile_workspace_metric_history.py index cbef23b..96a07d6 100644 --- a/core/tests/test_reconcile_workspace_metric_history.py +++ b/core/tests/test_reconcile_workspace_metric_history.py @@ -54,6 +54,7 @@ class ReconcileWorkspaceMetricHistoryCommandTests(TestCase): "36500", "--step-messages", "2", + "--skip-compact", ) conversation = _conversation_for_person(self.user, self.person) first_count = WorkspaceMetricSnapshot.objects.filter( @@ -72,6 +73,7 @@ class ReconcileWorkspaceMetricHistoryCommandTests(TestCase): "--step-messages", "2", "--no-reset", + "--skip-compact", ) second_count = WorkspaceMetricSnapshot.objects.filter( conversation=conversation @@ -107,6 +109,7 @@ class ReconcileWorkspaceMetricHistoryCommandTests(TestCase): "36500", "--step-messages", "2", + "--skip-compact", ) conversation = _conversation_for_person(self.user, self.person) first_count = WorkspaceMetricSnapshot.objects.filter( @@ -124,8 +127,28 @@ class ReconcileWorkspaceMetricHistoryCommandTests(TestCase): "--step-messages", "2", "--no-reset", + "--skip-compact", ) second_count = WorkspaceMetricSnapshot.objects.filter( conversation=conversation ).count() self.assertEqual(first_count, second_count) + + def test_reconcile_compacts_historical_snapshots_by_age_bucket(self): + call_command( + "reconcile_workspace_metric_history", + "--person-id", + str(self.person.id), + "--service", + "whatsapp", + "--days", + "36500", + "--step-messages", + "1", + ) + conversation = _conversation_for_person(self.user, self.person) + count_after_compact = WorkspaceMetricSnapshot.objects.filter( + conversation=conversation + ).count() + self.assertGreaterEqual(count_after_compact, 1) + self.assertLess(count_after_compact, 10) diff --git a/core/tests/test_signal_send_normalization.py b/core/tests/test_signal_send_normalization.py new file mode 100644 index 0000000..4d38633 --- /dev/null +++ b/core/tests/test_signal_send_normalization.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from asgiref.sync import async_to_sync +from django.test import TestCase +from unittest.mock import patch + +from core.clients import signalapi + + +class _FakeResponse: + def __init__(self, status_code: int, body: str): + self.status = int(status_code) + self._body = str(body) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def text(self): + return self._body + + +class _FakeClientSession: + posted_payloads: list[dict] = [] + next_status = 400 + next_body = "invalid recipient" + + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + def post(self, url, json=None): + self.__class__.posted_payloads.append(dict(json or {})) + return _FakeResponse(self.__class__.next_status, self.__class__.next_body) + + +class SignalSendNormalizationTests(TestCase): + def test_normalize_signal_recipient_phone_and_uuid(self): + self.assertEqual("+447700900000", signalapi.normalize_signal_recipient("447700900000")) + self.assertEqual( + "+447700900000", signalapi.normalize_signal_recipient("+44 7700-900000") + ) + uuid_value = "756078fd-d447-426d-a620-581a86d64f51" + self.assertEqual(uuid_value, signalapi.normalize_signal_recipient(uuid_value)) + + def test_send_message_raw_returns_detailed_error_with_normalized_recipient(self): + _FakeClientSession.posted_payloads = [] + _FakeClientSession.next_status = 400 + _FakeClientSession.next_body = "invalid recipient format" + with patch("core.clients.signalapi.aiohttp.ClientSession", _FakeClientSession): + result = async_to_sync(signalapi.send_message_raw)( + recipient_uuid="447700900000", + text="hello", + attachments=[], + metadata={}, + detailed=True, + ) + + self.assertIsInstance(result, dict) + self.assertFalse(bool(result.get("ok"))) + self.assertEqual(400, int(result.get("status") or 0)) + self.assertEqual("+447700900000", str(result.get("recipient") or "")) + self.assertIn("invalid recipient", str(result.get("error") or "")) + self.assertGreaterEqual(len(_FakeClientSession.posted_payloads), 1) + first_payload = _FakeClientSession.posted_payloads[0] + self.assertEqual(["+447700900000"], first_payload.get("recipients")) + diff --git a/core/tests/test_tasks_pages_management.py b/core/tests/test_tasks_pages_management.py index 7360d3d..b263823 100644 --- a/core/tests/test_tasks_pages_management.py +++ b/core/tests/test_tasks_pages_management.py @@ -3,60 +3,65 @@ from __future__ import annotations from django.test import TestCase from django.urls import reverse -from core.models import ChatTaskSource, TaskEpic, TaskProject, User +from core.models import ChatTaskSource, Person, PersonIdentifier, TaskEpic, TaskProject, User class TasksPagesManagementTests(TestCase): def setUp(self): - self.user = User.objects.create_user("tasks-pages-user", "tasks-pages@example.com", "x") + self.user = User.objects.create_user( + "tasks-pages-user", "tasks-pages@example.com", "x" + ) self.client.force_login(self.user) + self.person = Person.objects.create(user=self.user, name="Scope Person") + self.pid_whatsapp = PersonIdentifier.objects.create( + user=self.user, + person=self.person, + service="whatsapp", + identifier="120363402761690215@g.us", + ) + self.pid_signal = PersonIdentifier.objects.create( + user=self.user, + person=self.person, + service="signal", + identifier="+15551230000", + ) - def test_tasks_hub_requires_group_scope_for_project_create(self): - create_response = self.client.post( + def test_tasks_hub_can_create_project_name_only(self): + response = self.client.post( reverse("tasks_hub"), - { - "action": "project_create", - "name": "Ops", - }, + {"action": "project_create", "name": "Ops"}, follow=True, ) - self.assertEqual(200, create_response.status_code) - self.assertFalse(TaskProject.objects.filter(user=self.user, name="Ops").exists()) - - def test_tasks_hub_can_create_scoped_project_and_delete(self): - create_response = self.client.post( - reverse("tasks_hub"), - { - "action": "project_create", - "name": "Ops", - "service": "whatsapp", - "channel_identifier": "120363402761690215@g.us", - }, - follow=True, - ) - self.assertEqual(200, create_response.status_code) + self.assertEqual(200, response.status_code) project = TaskProject.objects.get(user=self.user, name="Ops") self.assertIsNotNone(project) + self.assertFalse(ChatTaskSource.objects.filter(user=self.user, project=project).exists()) + + def test_tasks_hub_can_map_identifier_to_selected_project(self): + project = TaskProject.objects.create(user=self.user, name="Mapped") + response = self.client.post( + reverse("tasks_hub"), + { + "action": "project_map_identifier", + "project_id": str(project.id), + "person_identifier_id": str(self.pid_signal.id), + "person": str(self.person.id), + "service": "whatsapp", + "identifier": "120363402761690215@g.us", + }, + follow=True, + ) + self.assertEqual(200, response.status_code) self.assertTrue( ChatTaskSource.objects.filter( user=self.user, - service="whatsapp", - channel_identifier="120363402761690215@g.us", project=project, + service="signal", + channel_identifier="+15551230000", + enabled=True, ).exists() ) - delete_response = self.client.post( - reverse("tasks_hub"), - { - "action": "project_delete", - "project_id": str(project.id), - }, - follow=True, - ) - self.assertEqual(200, delete_response.status_code) - self.assertFalse(TaskProject.objects.filter(user=self.user, name="Ops").exists()) - def test_project_page_can_create_and_delete_epic(self): project = TaskProject.objects.create(user=self.user, name="Roadmap") create_response = self.client.post( diff --git a/core/tests/test_workspace_graph_sampling.py b/core/tests/test_workspace_graph_sampling.py new file mode 100644 index 0000000..82484b3 --- /dev/null +++ b/core/tests/test_workspace_graph_sampling.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from django.test import TestCase + +from core.models import User, WorkspaceConversation, WorkspaceMetricSnapshot +from core.views.workspace import _history_points +from core.workspace import DENSITY_POINT_CAPS, compact_snapshot_rows, downsample_points + + +class WorkspaceGraphSamplingTests(TestCase): + def test_downsample_points_respects_density_caps(self): + base_ts = 1_700_000_000_000 + points = [] + for idx in range(1_000): + points.append( + { + "x": "", + "y": float(idx % 100), + "ts_ms": base_ts + (idx * 60_000), + } + ) + low = downsample_points(points, density="low") + high = downsample_points(points, density="high") + self.assertLessEqual(len(low), DENSITY_POINT_CAPS["low"]) + self.assertLessEqual(len(high), DENSITY_POINT_CAPS["high"]) + self.assertGreaterEqual(len(high), len(low)) + + def test_history_points_uses_source_event_ts_for_graph_x(self): + user = User.objects.create_user("graph-user", "graph@example.com", "x") + conversation = WorkspaceConversation.objects.create( + user=user, + title="Graph", + platform_type="whatsapp", + ) + base_ts = 1_700_000_000_000 + for idx in range(300): + WorkspaceMetricSnapshot.objects.create( + conversation=conversation, + source_event_ts=base_ts + (idx * 60_000), + stability_state=WorkspaceConversation.StabilityState.CALIBRATING, + stability_score=None, + stability_confidence=0.0, + stability_sample_messages=idx + 1, + stability_sample_days=1, + commitment_inbound_score=None, + commitment_outbound_score=None, + commitment_confidence=0.0, + inbound_messages=0, + outbound_messages=idx + 1, + reciprocity_score=None, + continuity_score=None, + response_score=None, + volatility_score=None, + inbound_response_score=None, + outbound_response_score=None, + balance_inbound_score=None, + balance_outbound_score=None, + ) + points = _history_points( + conversation, "stability_sample_messages", density="low" + ) + self.assertTrue(points) + self.assertLessEqual(len(points), DENSITY_POINT_CAPS["low"]) + first_x = str(points[0].get("x") or "") + self.assertIn("2023", first_x) + + def test_compact_snapshot_rows_drops_outside_cutoff_and_buckets(self): + rows = [] + now_ts = 1_900_000_000_000 + old_ts = now_ts - (400 * 24 * 60 * 60 * 1000) + for idx in range(30): + rows.append( + { + "id": idx + 1, + "source_event_ts": old_ts + (idx * 60_000), + "computed_at": None, + } + ) + keep = compact_snapshot_rows( + snapshot_rows=rows, + now_ts_ms=now_ts, + cutoff_ts_ms=now_ts - (365 * 24 * 60 * 60 * 1000), + ) + self.assertEqual(set(), keep) diff --git a/core/views/compose.py b/core/views/compose.py index 09b81d4..9702adc 100644 --- a/core/views/compose.py +++ b/core/views/compose.py @@ -2753,7 +2753,11 @@ def _panel_context( ), "compose_answer_suggestion_send_url": reverse("compose_answer_suggestion_send"), "compose_ws_url": ws_url, - "tasks_hub_url": reverse("tasks_hub"), + "tasks_hub_url": ( + f"{reverse('tasks_hub')}?{urlencode({'person': str(base['person'].id), 'service': base['service'], 'identifier': base['identifier'] or ''})}" + if base["person"] + else reverse("tasks_hub") + ), "tasks_group_url": reverse( "tasks_group", kwargs={ diff --git a/core/views/workspace.py b/core/views/workspace.py index f7ecdbd..a788b45 100644 --- a/core/views/workspace.py +++ b/core/views/workspace.py @@ -39,6 +39,7 @@ from core.models import ( WorkspaceConversation, WorkspaceMetricSnapshot, ) +from core.workspace import DENSITY_POINT_CAPS, downsample_points SEND_ENABLED_MODES = {"active", "instant"} OPERATION_LABELS = { @@ -960,21 +961,28 @@ def _metric_psychological_read(metric_slug, conversation): return "" -def _history_points(conversation, field_name): +def _history_points(conversation, field_name, density="medium"): rows = ( conversation.metric_snapshots.exclude(**{f"{field_name}__isnull": True}) .order_by("computed_at") - .values("computed_at", field_name) + .values("computed_at", "source_event_ts", field_name) ) - points = [] + raw_points = [] for row in rows: - points.append( + source_ts = int(row.get("source_event_ts") or 0) + if source_ts > 0: + x_value = datetime.fromtimestamp(source_ts / 1000, tz=timezone.utc).isoformat() + else: + x_value = row["computed_at"].isoformat() + raw_points.append( { - "x": row["computed_at"].isoformat(), + "x": x_value, "y": row[field_name], + "ts_ms": source_ts, + "computed_at": row.get("computed_at"), } ) - return points + return downsample_points(raw_points, density=density) def _metric_supports_history(metric_slug, metric_spec): @@ -983,10 +991,15 @@ def _metric_supports_history(metric_slug, metric_spec): return any(graph["slug"] == metric_slug for graph in INSIGHT_GRAPH_SPECS) -def _all_graph_payload(conversation): +def _all_graph_payload(conversation, density="medium"): graphs = [] for spec in INSIGHT_GRAPH_SPECS: - points = _history_points(conversation, spec["field"]) + raw_count = ( + conversation.metric_snapshots.exclude( + **{f"{spec['field']}__isnull": True} + ).count() + ) + points = _history_points(conversation, spec["field"], density=density) graphs.append( { "slug": spec["slug"], @@ -995,6 +1008,7 @@ def _all_graph_payload(conversation): "group_title": INSIGHT_GROUPS[spec["group"]]["title"], "points": points, "count": len(points), + "raw_count": raw_count, "y_min": spec["y_min"], "y_max": spec["y_max"], } @@ -1002,6 +1016,13 @@ def _all_graph_payload(conversation): return graphs +def _sanitize_graph_density(value: str) -> str: + density = str(value or "").strip().lower() + if density in DENSITY_POINT_CAPS: + return density + return "medium" + + def _information_overview_rows(conversation): latest_snapshot = conversation.metric_snapshots.first() rows = [] @@ -3668,9 +3689,12 @@ class AIWorkspaceInsightDetail(LoginRequiredMixin, View): value = _format_metric_value(conversation, metric, latest_snapshot) group = INSIGHT_GROUPS[spec["group"]] graph_applicable = _metric_supports_history(metric, spec) + graph_density = _sanitize_graph_density(request.GET.get("density")) points = [] if graph_applicable: - points = _history_points(conversation, spec["history_field"]) + points = _history_points( + conversation, spec["history_field"], density=graph_density + ) context = { "person": person, @@ -3682,6 +3706,8 @@ class AIWorkspaceInsightDetail(LoginRequiredMixin, View): "metric_group": group, "graph_points": points, "graph_applicable": graph_applicable, + "graph_density": graph_density, + "graph_density_caps": DENSITY_POINT_CAPS, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-insight-detail.html", context) @@ -3696,11 +3722,14 @@ class AIWorkspaceInsightGraphs(LoginRequiredMixin, View): person = get_object_or_404(Person, pk=person_id, user=request.user) conversation = _conversation_for_person(request.user, person) - graph_cards = _all_graph_payload(conversation) + graph_density = _sanitize_graph_density(request.GET.get("density")) + graph_cards = _all_graph_payload(conversation, density=graph_density) context = { "person": person, "workspace_conversation": conversation, "graph_cards": graph_cards, + "graph_density": graph_density, + "graph_density_caps": DENSITY_POINT_CAPS, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-insight-graphs.html", context) @@ -3717,9 +3746,10 @@ class AIWorkspaceInformation(LoginRequiredMixin, View): conversation = _conversation_for_person(request.user, person) latest_snapshot = conversation.metric_snapshots.first() directionality = _commitment_directionality_payload(conversation) + graph_density = _sanitize_graph_density(request.GET.get("density")) commitment_graph_cards = [ card - for card in _all_graph_payload(conversation) + for card in _all_graph_payload(conversation, density=graph_density) if card["group"] == "commitment" ] @@ -3743,6 +3773,7 @@ class AIWorkspaceInformation(LoginRequiredMixin, View): "directionality": directionality, "overview_rows": _information_overview_rows(conversation), "commitment_graph_cards": commitment_graph_cards, + "graph_density": graph_density, **_workspace_nav_urls(person), } return render(request, "pages/ai-workspace-information.html", context) diff --git a/core/workspace/__init__.py b/core/workspace/__init__.py new file mode 100644 index 0000000..737b970 --- /dev/null +++ b/core/workspace/__init__.py @@ -0,0 +1,3 @@ +from .sampling import DENSITY_POINT_CAPS, compact_snapshot_rows, downsample_points + +__all__ = ["DENSITY_POINT_CAPS", "compact_snapshot_rows", "downsample_points"] diff --git a/core/workspace/sampling.py b/core/workspace/sampling.py new file mode 100644 index 0000000..303d0bc --- /dev/null +++ b/core/workspace/sampling.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +# Reasonable server-side caps per graph to keep payload/UI responsive. +DENSITY_POINT_CAPS = { + "low": 120, + "medium": 280, + "high": 560, +} + +MS_MINUTE = 60 * 1000 +MS_HOUR = 60 * MS_MINUTE +MS_DAY = 24 * MS_HOUR + + +def _effective_ts_ms(point: dict) -> int: + value = int(point.get("ts_ms") or 0) + if value > 0: + return value + dt_value = point.get("computed_at") + if isinstance(dt_value, datetime): + if dt_value.tzinfo is None: + dt_value = dt_value.replace(tzinfo=timezone.utc) + return int(dt_value.timestamp() * 1000) + return 0 + + +def _bucket_ms_for_age(age_ms: int) -> int: + if age_ms <= (1 * MS_DAY): + return 0 + if age_ms <= (7 * MS_DAY): + return 15 * MS_MINUTE + if age_ms <= (30 * MS_DAY): + return 2 * MS_HOUR + if age_ms <= (180 * MS_DAY): + return 12 * MS_HOUR + return 1 * MS_DAY + + +def _compress_to_target(points: list[dict], target: int) -> list[dict]: + if target <= 0 or len(points) <= target: + return points + if target <= 2: + return [points[0], points[-1]] + stride = max(1, int((len(points) - 2) / (target - 2))) + output = [points[0]] + idx = 1 + while idx < (len(points) - 1) and len(output) < (target - 1): + output.append(points[idx]) + idx += stride + output.append(points[-1]) + return output + + +def downsample_points(points: list[dict], density: str = "medium") -> list[dict]: + """ + Tiered time-range downsampling: + keep high-resolution recent data, progressively bucket older ranges. + """ + rows = [dict(row or {}) for row in list(points or [])] + if len(rows) <= 2: + return rows + rows.sort(key=_effective_ts_ms) + latest_ts = _effective_ts_ms(rows[-1]) + buckets: dict[tuple[int, int], dict] = {} + passthrough: list[dict] = [] + for row in rows: + ts_ms = _effective_ts_ms(row) + if ts_ms <= 0: + continue + age_ms = max(0, latest_ts - ts_ms) + bucket_ms = _bucket_ms_for_age(age_ms) + if bucket_ms <= 0: + passthrough.append( + { + "x": datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).isoformat(), + "y": row.get("y"), + "ts_ms": ts_ms, + } + ) + continue + bucket_key = (bucket_ms, int(ts_ms // bucket_ms)) + state = buckets.get(bucket_key) + y_value = row.get("y") + if state is None: + buckets[bucket_key] = { + "count": 1, + "sum": float(y_value) if y_value is not None else 0.0, + "last_ts": ts_ms, + "last_y": y_value, + "has_value": y_value is not None, + } + else: + state["count"] += 1 + if y_value is not None: + state["sum"] += float(y_value) + state["has_value"] = True + if ts_ms >= int(state["last_ts"]): + state["last_ts"] = ts_ms + state["last_y"] = y_value + output = list(passthrough) + for state in buckets.values(): + ts_ms = int(state["last_ts"]) + y_value = state["last_y"] + if bool(state["has_value"]) and int(state["count"]) > 0: + y_value = round(float(state["sum"]) / float(state["count"]), 3) + output.append( + { + "x": datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).isoformat(), + "y": y_value, + "ts_ms": ts_ms, + } + ) + output.sort(key=lambda row: int(row.get("ts_ms") or 0)) + if not output: + return [] + # Preserve first/last and reduce to density cap if still too large. + cap = int(DENSITY_POINT_CAPS.get(str(density or "").strip().lower(), DENSITY_POINT_CAPS["medium"])) + compact = _compress_to_target(output, cap) + return compact + + +def compact_snapshot_rows(snapshot_rows: list[dict], now_ts_ms: int, cutoff_ts_ms: int) -> set[int]: + """ + Returns IDs to keep using the same age-bucket policy as graph sampling. + Old rows below cutoff are dropped; remaining rows keep one representative + per age bucket (latest in bucket), while preserving newest and oldest. + """ + rows = [dict(row or {}) for row in list(snapshot_rows or [])] + if not rows: + return set() + enriched = [] + for row in rows: + ts_ms = int(row.get("source_event_ts") or 0) + if ts_ms <= 0: + computed_at = row.get("computed_at") + if isinstance(computed_at, datetime): + if computed_at.tzinfo is None: + computed_at = computed_at.replace(tzinfo=timezone.utc) + ts_ms = int(computed_at.timestamp() * 1000) + if ts_ms <= 0: + continue + if cutoff_ts_ms > 0 and ts_ms < int(cutoff_ts_ms): + continue + enriched.append((int(row.get("id") or 0), ts_ms)) + if not enriched: + return set() + enriched.sort(key=lambda item: item[1]) + keep_ids = {enriched[0][0], enriched[-1][0]} + bucket_map: dict[tuple[int, int], tuple[int, int]] = {} + latest_ts = int(now_ts_ms or enriched[-1][1]) + for snapshot_id, ts_ms in enriched: + age_ms = max(0, latest_ts - ts_ms) + bucket_ms = _bucket_ms_for_age(age_ms) + if bucket_ms <= 0: + keep_ids.add(snapshot_id) + continue + key = (bucket_ms, int(ts_ms // bucket_ms)) + current = bucket_map.get(key) + if current is None or ts_ms >= current[1]: + bucket_map[key] = (snapshot_id, ts_ms) + for snapshot_id, _ in bucket_map.values(): + keep_ids.add(snapshot_id) + return keep_ids