diff --git a/core/clients/signal.py b/core/clients/signal.py index f992387..06352f6 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -1025,7 +1025,18 @@ class SignalClient(ClientBase): text=text, attachments=attachments, metadata=metadata, + detailed=True, ) + if isinstance(result, dict) and (not bool(result.get("ok"))): + status_value = int(result.get("status") or 0) + error_text = str(result.get("error") or "").strip() + recipient_value = str(result.get("recipient") or recipient).strip() + raise RuntimeError( + "signal_send_failed" + f" status={status_value or 'unknown'}" + f" recipient={recipient_value or 'unknown'}" + f" error={error_text or 'unknown'}" + ) if result is False or result is None: raise RuntimeError("signal_send_failed") transport.set_runtime_command_result( diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py index 155cf46..b9e56c2 100644 --- a/core/clients/signalapi.py +++ b/core/clients/signalapi.py @@ -1,6 +1,7 @@ import asyncio import base64 import logging +import re import aiohttp import orjson @@ -9,6 +10,25 @@ from django.conf import settings from rest_framework import status log = logging.getLogger(__name__) +SIGNAL_UUID_PATTERN = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", + re.IGNORECASE, +) + + +def normalize_signal_recipient(recipient: str) -> str: + raw = str(recipient or "").strip() + if not raw: + return "" + if SIGNAL_UUID_PATTERN.fullmatch(raw): + return raw + if raw.startswith("+"): + digits = re.sub(r"[^0-9]", "", raw) + return f"+{digits}" if digits else raw + digits_only = re.sub(r"[^0-9]", "", raw) + if digits_only and raw.isdigit(): + return f"+{digits_only}" + return raw async def start_typing(uuid): @@ -73,7 +93,9 @@ async def download_and_encode_base64(file_url, filename, content_type, session=N return None -async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata=None): +async def send_message_raw( + recipient_uuid, text=None, attachments=None, metadata=None, detailed=False +): """ Sends a message using the Signal REST API, ensuring attachment links are not included in the text body. @@ -88,8 +110,9 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata base = getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080").rstrip("/") url = f"{base}/v2/send" + normalized_recipient = normalize_signal_recipient(recipient_uuid) data = { - "recipients": [recipient_uuid], + "recipients": [normalized_recipient], "number": settings.SIGNAL_NUMBER, "base64_attachments": [], } @@ -168,8 +191,37 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None, metadata ts = orjson.loads(response_text).get("timestamp", None) return ts if ts else False if index == len(payloads) - 1: + log.warning( + "Signal send failed status=%s recipient=%s body=%s", + response_status, + normalized_recipient, + response_text[:300], + ) + if detailed: + return { + "ok": False, + "status": int(response_status), + "error": str(response_text or "").strip()[:500], + "recipient": normalized_recipient, + } return False - if response_status not in {status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY}: + if response_status not in { + status.HTTP_400_BAD_REQUEST, + status.HTTP_422_UNPROCESSABLE_ENTITY, + }: + log.warning( + "Signal send failed early status=%s recipient=%s body=%s", + response_status, + normalized_recipient, + response_text[:300], + ) + if detailed: + return { + "ok": False, + "status": int(response_status), + "error": str(response_text or "").strip()[:500], + "recipient": normalized_recipient, + } return False log.warning( "signal send quote payload rejected (%s), trying fallback shape: %s", @@ -305,7 +357,7 @@ def send_message_raw_sync(recipient_uuid, text=None, attachments=None): base = getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080").rstrip("/") url = f"{base}/v2/send" data = { - "recipients": [recipient_uuid], + "recipients": [normalize_signal_recipient(recipient_uuid)], "number": settings.SIGNAL_NUMBER, "base64_attachments": [], } diff --git a/core/management/commands/reconcile_workspace_metric_history.py b/core/management/commands/reconcile_workspace_metric_history.py index d12b18c..45227fa 100644 --- a/core/management/commands/reconcile_workspace_metric_history.py +++ b/core/management/commands/reconcile_workspace_metric_history.py @@ -14,6 +14,7 @@ from core.models import ( WorkspaceMetricSnapshot, ) from core.views.workspace import _conversation_for_person +from core.workspace import compact_snapshot_rows def _score_from_lag(lag_ms, target_hours=4): @@ -219,6 +220,7 @@ class Command(BaseCommand): parser.add_argument("--limit", type=int, default=200000) parser.add_argument("--dry-run", action="store_true", default=False) parser.add_argument("--no-reset", action="store_true", default=False) + parser.add_argument("--skip-compact", action="store_true", default=False) def handle(self, *args, **options): days = max(1, int(options.get("days") or 365)) @@ -229,6 +231,7 @@ class Command(BaseCommand): limit = max(1, int(options.get("limit") or 200000)) dry_run = bool(options.get("dry_run")) reset = not bool(options.get("no_reset")) + compact_enabled = not bool(options.get("skip_compact")) today_start = dj_timezone.now().astimezone(timezone.utc).replace( hour=0, minute=0, @@ -250,6 +253,7 @@ class Command(BaseCommand): deleted = 0 snapshots_created = 0 checkpoints_total = 0 + compacted_deleted = 0 for person in people: identifiers_qs = PersonIdentifier.objects.filter(user=person.user, person=person) @@ -410,6 +414,28 @@ class Command(BaseCommand): "participant_feedback", ] ) + if compact_enabled: + snapshot_rows = list( + WorkspaceMetricSnapshot.objects.filter(conversation=conversation) + .order_by("computed_at", "id") + .values("id", "computed_at", "source_event_ts") + ) + now_ts_ms = int(dj_timezone.now().timestamp() * 1000) + keep_ids = compact_snapshot_rows( + snapshot_rows=snapshot_rows, + now_ts_ms=now_ts_ms, + cutoff_ts_ms=cutoff_ts, + ) + if keep_ids: + compacted_deleted += ( + WorkspaceMetricSnapshot.objects.filter(conversation=conversation) + .exclude(id__in=list(keep_ids)) + .delete()[0] + ) + else: + compacted_deleted += WorkspaceMetricSnapshot.objects.filter( + conversation=conversation + ).delete()[0] self.stdout.write( self.style.SUCCESS( @@ -418,6 +444,8 @@ class Command(BaseCommand): f"checkpoints={checkpoints_total} " f"created={snapshots_created} " f"deleted={deleted} " + f"compacted_deleted={compacted_deleted} " + f"compact_enabled={compact_enabled} " f"reset={reset} dry_run={dry_run} " f"days={days} step_messages={step_messages} limit={limit}" ) diff --git a/core/templates/pages/ai-workspace-insight-graphs.html b/core/templates/pages/ai-workspace-insight-graphs.html index feeca74..e0da4b0 100644 --- a/core/templates/pages/ai-workspace-insight-graphs.html +++ b/core/templates/pages/ai-workspace-insight-graphs.html @@ -14,8 +14,25 @@
- Historical metrics for workspace {{ workspace_conversation.id }}. Points come from deterministic message-history snapshots (not only mitigation runs). + Historical metrics for workspace {{ workspace_conversation.id }}. Points are range-downsampled server-side with high-resolution recent data and progressively sparser older ranges.
+ {% include "partials/ai-insight-nav.html" with active_tab="graphs" %}{{ graph.group_title }}
{{ graph.title }}
-{{ graph.count }} point{{ graph.count|pluralize }}
++ {{ graph.count }} displayed of {{ graph.raw_count }} source point{{ graph.raw_count|pluralize }} +