From 9c14e51b436d1196fc8108a568b56ee66fd73ff7 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 3 Mar 2026 16:41:28 +0000 Subject: [PATCH] Implement executing tasks --- app/urls.py | 6 + core/clients/signal.py | 42 ++- core/clients/whatsapp.py | 115 ++++++++- .../commands/backfill_contact_availability.py | 123 +++++++++ core/management/commands/codex_worker.py | 127 +++++++++ .../recalculate_contact_availability.py | 243 ++++++++++++++++++ core/messaging/history.py | 19 +- ...ontactavailability_and_externalchatlink.py | 236 +++++++++++++++++ core/models.py | 162 +++++++++++- core/modules/router.py | 150 ++++++++++- core/presence/__init__.py | 18 ++ core/presence/engine.py | 175 +++++++++++++ core/presence/inference.py | 28 ++ core/presence/query.py | 60 +++++ core/tasks/engine.py | 72 +++++- core/tasks/providers/__init__.py | 19 ++ core/tasks/providers/base.py | 1 + core/tasks/providers/codex_cli.py | 117 +++++++++ core/tasks/providers/mock.py | 9 - core/templates/base.html | 6 + .../pages/availability-settings.html | 128 +++++++++ core/templates/pages/command-routing.html | 56 +++- core/templates/pages/tasks-settings.html | 95 ++++++- core/templates/partials/compose-panel.html | 142 +++++++++- .../templates/partials/signal-chats-list.html | 8 + .../partials/whatsapp-chats-list.html | 10 +- core/tests/test_availability_settings_page.py | 38 +++ .../test_backfill_contact_availability.py | 66 +++++ core/tests/test_codex_cli_provider.py | 60 +++++ core/tests/test_presence_engine.py | 140 ++++++++++ ...test_presence_query_and_compose_context.py | 50 ++++ core/tests/test_signal_text_extraction.py | 31 +++ .../test_whatsapp_reaction_and_recalc.py | 198 ++++++++++++++ core/views/availability.py | 147 +++++++++++ core/views/compose.py | 150 ++++++++++- core/views/signal.py | 59 +++++ core/views/tasks.py | 110 +++++++- core/views/whatsapp.py | 37 ++- scripts/quadlet/manage.sh | 61 ++++- scripts/quadlet/render_units.py | 125 +++++---- scripts/quadlet/watchdog.sh | 82 ++++++ stack.env.example | 10 +- 42 files changed, 3410 insertions(+), 121 deletions(-) create mode 100644 core/management/commands/backfill_contact_availability.py create mode 100644 core/management/commands/codex_worker.py create mode 100644 core/management/commands/recalculate_contact_availability.py create mode 100644 core/migrations/0033_contactavailability_and_externalchatlink.py create mode 100644 core/presence/__init__.py create mode 100644 core/presence/engine.py create mode 100644 core/presence/inference.py create mode 100644 core/presence/query.py create mode 100644 core/tasks/providers/codex_cli.py create mode 100644 core/templates/pages/availability-settings.html create mode 100644 core/tests/test_availability_settings_page.py create mode 100644 core/tests/test_backfill_contact_availability.py create mode 100644 core/tests/test_codex_cli_provider.py create mode 100644 core/tests/test_presence_engine.py create mode 100644 core/tests/test_presence_query_and_compose_context.py create mode 100644 core/tests/test_signal_text_extraction.py create mode 100644 core/tests/test_whatsapp_reaction_and_recalc.py create mode 100644 core/views/availability.py create mode 100755 scripts/quadlet/watchdog.sh diff --git a/app/urls.py b/app/urls.py index 369c9e7..b903d75 100644 --- a/app/urls.py +++ b/app/urls.py @@ -23,6 +23,7 @@ from two_factor.urls import urlpatterns as tf_urls from core.views import ( ais, automation, + availability, base, compose, groups, @@ -307,6 +308,11 @@ urlpatterns = [ tasks.TaskSettings.as_view(), name="tasks_settings", ), + path( + "settings/availability/", + availability.AvailabilitySettingsPage.as_view(), + name="availability_settings", + ), # AIs path( "ai/workspace/", diff --git a/core/clients/signal.py b/core/clients/signal.py index bd187a1..f992387 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -193,6 +193,28 @@ def _extract_signal_reaction(envelope): } +def _extract_signal_text(raw_payload, default_text=""): + text = str(default_text or "").strip() + if text: + return text + payload = dict(raw_payload or {}) + envelope = dict(payload.get("envelope") or {}) + candidates = [ + envelope.get("dataMessage"), + _get_nested(envelope, ("syncMessage", "sentMessage", "message")), + _get_nested(envelope, ("syncMessage", "sentMessage")), + payload.get("dataMessage"), + payload, + ] + for item in candidates: + if isinstance(item, dict): + for key in ("message", "text", "body", "caption"): + value = str(item.get(key) or "").strip() + if value: + return value + return "" + + def _typing_started(typing_payload): action = str(typing_payload.get("action") or "").strip().lower() if action in {"started", "start", "typing", "composing"}: @@ -368,6 +390,7 @@ class HandleMessage(Command): source_number = c.message.source_number source_uuid = c.message.source_uuid text = c.message.text + text = _extract_signal_text(raw, text) ts = c.message.timestamp source_value = c.message.source envelope = raw.get("envelope", {}) @@ -1209,14 +1232,17 @@ class SignalClient(ClientBase): if isinstance(sync_sent_message, dict) and sync_sent_message: raw_text = sync_sent_message.get("message") if isinstance(raw_text, dict): - text = str( - raw_text.get("message") - or raw_text.get("text") - or raw_text.get("body") - or "" - ).strip() + text = _extract_signal_text( + {"envelope": {"syncMessage": {"sentMessage": {"message": raw_text}}}}, + str( + raw_text.get("message") + or raw_text.get("text") + or raw_text.get("body") + or "" + ).strip(), + ) else: - text = str(raw_text or "").strip() + text = _extract_signal_text(payload, str(raw_text or "").strip()) destination_uuid = str( sync_sent_message.get("destinationUuid") @@ -1373,7 +1399,7 @@ class SignalClient(ClientBase): ) return - text = str(data_message.get("message") or "").strip() + text = _extract_signal_text(payload, str(data_message.get("message") or "").strip()) if not text: return diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index 90c4dba..820c31c 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -2355,8 +2355,29 @@ class WhatsAppClient(ClientBase): return "application/octet-stream" def _extract_reaction_event(self, message_obj): - node = self._pluck(message_obj, "reactionMessage") or self._pluck( - message_obj, "reaction_message" + node = ( + self._pluck(message_obj, "reactionMessage") + or self._pluck(message_obj, "reaction_message") + or self._pluck(message_obj, "ephemeralMessage", "message", "reactionMessage") + or self._pluck(message_obj, "ephemeral_message", "message", "reaction_message") + or self._pluck(message_obj, "viewOnceMessage", "message", "reactionMessage") + or self._pluck(message_obj, "view_once_message", "message", "reaction_message") + or self._pluck(message_obj, "viewOnceMessageV2", "message", "reactionMessage") + or self._pluck(message_obj, "view_once_message_v2", "message", "reaction_message") + or self._pluck( + message_obj, + "viewOnceMessageV2Extension", + "message", + "reactionMessage", + ) + or self._pluck( + message_obj, + "view_once_message_v2_extension", + "message", + "reaction_message", + ) + or self._pluck(message_obj, "protocolMessage", "reactionMessage") + or self._pluck(message_obj, "protocol_message", "reaction_message") ) if not node: return None @@ -2366,17 +2387,34 @@ class WhatsAppClient(ClientBase): target_msg_id = str( self._pluck(node, "key", "id") or self._pluck(node, "key", "ID") + or self._pluck(node, "messageKey", "id") + or self._pluck(node, "message_key", "id") or self._pluck(node, "targetMessageKey", "id") or self._pluck(node, "target_message_key", "id") + or self._pluck(node, "stanzaId") + or self._pluck(node, "stanza_id") or "" ).strip() - remove = bool(not emoji) + target_ts = self._normalize_timestamp( + self._pluck(node, "key", "messageTimestamp") + or self._pluck(node, "targetMessageKey", "messageTimestamp") + or self._pluck(node, "target_message_key", "message_timestamp") + or self._pluck(node, "targetTimestamp") + or self._pluck(node, "target_timestamp") + or 0 + ) + explicit_remove = self._pluck(node, "remove") or self._pluck(node, "isRemove") + if explicit_remove is None: + explicit_remove = self._pluck(node, "is_remove") + remove = bool(explicit_remove) if explicit_remove is not None else bool(not emoji) if not target_msg_id: return None return { "emoji": emoji, "target_message_id": target_msg_id, "remove": remove, + "target_ts": int(target_ts or 0), + "raw": self._proto_to_dict(node) or dict(node or {}) if isinstance(node, dict) else {}, } async def _download_event_media(self, event): @@ -2438,6 +2476,10 @@ class WhatsAppClient(ClientBase): async def _handle_message_event(self, event): event_obj = self._proto_to_dict(event) or event msg_obj = self._pluck(event_obj, "message") or self._pluck(event_obj, "Message") + if self._pluck(msg_obj, "protocolMessage") or self._pluck( + msg_obj, "protocol_message" + ): + return text = self._message_text(msg_obj, event_obj) if not text: self.log.debug( @@ -2482,7 +2524,7 @@ class WhatsAppClient(ClientBase): ).strip() ts = self._normalize_timestamp(raw_ts) - reaction_payload = self._extract_reaction_event(msg_obj) + reaction_payload = self._extract_reaction_event(msg_obj or event_obj) if reaction_payload: self.log.debug( "reaction-bridge whatsapp-inbound msg_id=%s target_id=%s emoji=%s remove=%s sender=%s chat=%s", @@ -2508,6 +2550,26 @@ class WhatsAppClient(ClientBase): ) ) for identifier in identifiers: + try: + await history.apply_reaction( + identifier.user, + identifier, + target_message_id=str( + reaction_payload.get("target_message_id") or "" + ), + target_ts=int(reaction_payload.get("target_ts") or 0), + emoji=str(reaction_payload.get("emoji") or ""), + source_service="whatsapp", + actor=str(sender or chat or ""), + remove=bool(reaction_payload.get("remove")), + payload={ + "event": "reaction", + "message_id": msg_id, + "raw": reaction_payload.get("raw") or {}, + }, + ) + except Exception as exc: + self.log.warning("whatsapp reaction local apply failed: %s", exc) try: await self.ur.xmpp.client.apply_external_reaction( identifier.user, @@ -2527,6 +2589,21 @@ class WhatsAppClient(ClientBase): ) except Exception as exc: self.log.warning("whatsapp reaction relay to XMPP failed: %s", exc) + try: + await self.ur.presence_changed( + self.service, + identifier=identifier.identifier, + state="available", + confidence=0.9, + ts=int(ts or int(time.time() * 1000)), + payload={ + "event": "reaction", + "inferred_from": "reaction", + "message_id": msg_id, + }, + ) + except Exception: + pass return self._remember_contact( @@ -2907,14 +2984,42 @@ class WhatsAppClient(ClientBase): is_unavailable = bool( self._pluck(event, "Unavailable") or self._pluck(event, "unavailable") ) + last_seen_raw = ( + self._pluck(event, "LastSeen") + or self._pluck(event, "lastSeen") + or self._pluck(event, "last_seen") + or self._pluck(event, "Timestamp") + or self._pluck(event, "timestamp") + or 0 + ) + last_seen_ts = self._normalize_timestamp(last_seen_raw) self._remember_contact(sender, jid=sender) for candidate in self._normalize_identifier_candidates(sender): + try: + await self.ur.presence_changed( + self.service, + identifier=candidate, + state=("unavailable" if is_unavailable else "available"), + confidence=0.9 if not is_unavailable else 0.8, + ts=int(last_seen_ts or int(time.time() * 1000)), + payload={ + "presence": ("offline" if is_unavailable else "online"), + "sender": str(sender), + "last_seen_ts": int(last_seen_ts or 0), + }, + ) + except Exception: + pass if is_unavailable: await self.ur.stopped_typing( self.service, identifier=candidate, - payload={"presence": "offline", "sender": str(sender)}, + payload={ + "presence": "offline", + "sender": str(sender), + "last_seen_ts": int(last_seen_ts or 0), + }, ) def _extract_pair_qr(self, event): diff --git a/core/management/commands/backfill_contact_availability.py b/core/management/commands/backfill_contact_availability.py new file mode 100644 index 0000000..b0d2b25 --- /dev/null +++ b/core/management/commands/backfill_contact_availability.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from typing import Iterable + +from django.core.management.base import BaseCommand + +from core.models import Message, User +from core.presence import AvailabilitySignal, record_inferred_signal +from core.presence.inference import now_ms + + +class Command(BaseCommand): + help = "Backfill inferred contact availability events from historical message/read-receipt activity." + + def add_arguments(self, parser): + parser.add_argument("--days", type=int, default=30) + parser.add_argument("--limit", type=int, default=5000) + parser.add_argument("--service", default="") + parser.add_argument("--user-id", default="") + parser.add_argument("--dry-run", action="store_true", default=False) + + def _iter_messages(self, *, days: int, limit: int, service: str, user_id: str) -> Iterable[Message]: + cutoff_ts = now_ms() - (max(1, int(days)) * 24 * 60 * 60 * 1000) + qs = Message.objects.filter(ts__gte=cutoff_ts).select_related( + "user", "session", "session__identifier", "session__identifier__person" + ) + if service: + qs = qs.filter(source_service=str(service).strip().lower()) + if user_id: + qs = qs.filter(user_id=str(user_id).strip()) + return qs.order_by("ts")[: max(1, int(limit))] + + def handle(self, *args, **options): + days = max(1, int(options.get("days") or 30)) + limit = max(1, int(options.get("limit") or 5000)) + service_filter = str(options.get("service") or "").strip().lower() + user_filter = str(options.get("user_id") or "").strip() + dry_run = bool(options.get("dry_run")) + + created = 0 + scanned = 0 + + for msg in self._iter_messages(days=days, limit=limit, service=service_filter, user_id=user_filter): + scanned += 1 + identifier = getattr(getattr(msg, "session", None), "identifier", None) + person = getattr(identifier, "person", None) + user = getattr(msg, "user", None) + if not identifier or not person or not user: + continue + + service = str(getattr(msg, "source_service", "") or identifier.service or "").strip().lower() + if not service: + continue + + base_ts = int(getattr(msg, "ts", 0) or 0) + message_author = str(getattr(msg, "custom_author", "") or "").strip().upper() + outgoing = message_author in {"USER", "BOT"} + + candidates = [] + if base_ts > 0: + candidates.append( + { + "source_kind": "message_out" if outgoing else "message_in", + "availability_state": "available", + "confidence": 0.65 if outgoing else 0.75, + "ts": base_ts, + "payload": { + "origin": "backfill_contact_availability", + "message_id": str(msg.id), + "inferred_from": "message_activity", + }, + } + ) + + read_ts = int(getattr(msg, "read_ts", 0) or 0) + if read_ts > 0: + candidates.append( + { + "source_kind": "read_receipt", + "availability_state": "available", + "confidence": 0.95, + "ts": read_ts, + "payload": { + "origin": "backfill_contact_availability", + "message_id": str(msg.id), + "inferred_from": "read_receipt", + "read_by": str(getattr(msg, "read_by_identifier", "") or ""), + }, + } + ) + + for row in candidates: + exists = user.contact_availability_events.filter( + person=person, + person_identifier=identifier, + service=service, + source_kind=row["source_kind"], + ts=int(row["ts"]), + ).exists() + if exists: + continue + created += 1 + if dry_run: + continue + record_inferred_signal( + AvailabilitySignal( + user=user, + person=person, + person_identifier=identifier, + service=service, + source_kind=row["source_kind"], + availability_state=row["availability_state"], + confidence=float(row["confidence"]), + ts=int(row["ts"]), + payload=dict(row["payload"]), + ) + ) + + self.stdout.write( + self.style.SUCCESS( + f"backfill_contact_availability complete scanned={scanned} created={created} dry_run={dry_run} days={days} limit={limit}" + ) + ) diff --git a/core/management/commands/codex_worker.py b/core/management/commands/codex_worker.py new file mode 100644 index 0000000..dd3bf7d --- /dev/null +++ b/core/management/commands/codex_worker.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import time + +from django.core.management.base import BaseCommand + +from core.models import ExternalSyncEvent, TaskProviderConfig +from core.tasks.providers import get_provider +from core.util import logs + +log = logs.get_logger("codex_worker") + + +class Command(BaseCommand): + help = "Process queued external sync events for worker-backed providers (codex_cli)." + + def add_arguments(self, parser): + parser.add_argument("--once", action="store_true", default=False) + parser.add_argument("--sleep-seconds", type=float, default=2.0) + parser.add_argument("--batch-size", type=int, default=20) + parser.add_argument("--provider", default="codex_cli") + + def _claim_batch(self, provider: str, batch_size: int) -> list[str]: + ids: list[str] = [] + rows = list( + ExternalSyncEvent.objects.filter( + provider=provider, + status__in=["pending", "retrying"], + ) + .order_by("updated_at")[: max(1, batch_size)] + .values_list("id", flat=True) + ) + for row_id in rows: + updated = ExternalSyncEvent.objects.filter( + id=row_id, + provider=provider, + status__in=["pending", "retrying"], + ).update(status="retrying") + if updated: + ids.append(str(row_id)) + return ids + + def _run_event(self, event: ExternalSyncEvent) -> None: + provider = get_provider(event.provider) + if not bool(getattr(provider, "run_in_worker", False)): + return + + cfg = ( + TaskProviderConfig.objects.filter( + user=event.user, + provider=event.provider, + enabled=True, + ) + .order_by("-updated_at") + .first() + ) + if cfg is None: + event.status = "failed" + event.error = "provider_disabled_or_missing" + event.save(update_fields=["status", "error", "updated_at"]) + return + + payload = dict(event.payload or {}) + action = str(payload.get("action") or "append_update").strip().lower() + provider_payload = dict(payload.get("provider_payload") or payload) + + if action == "create": + result = provider.create_task(dict(cfg.settings or {}), provider_payload) + elif action == "complete": + result = provider.mark_complete(dict(cfg.settings or {}), provider_payload) + elif action == "link_task": + result = provider.link_task(dict(cfg.settings or {}), provider_payload) + else: + result = provider.append_update(dict(cfg.settings or {}), provider_payload) + + event.status = "ok" if result.ok else "failed" + event.error = str(result.error or "") + event.payload = dict( + payload, + worker_processed=True, + result=dict(result.payload or {}), + ) + event.save(update_fields=["status", "error", "payload", "updated_at"]) + + if result.ok and result.external_key and event.task_id and not str(event.task.external_key or "").strip(): + event.task.external_key = str(result.external_key) + event.task.save(update_fields=["external_key"]) + + def handle(self, *args, **options): + once = bool(options.get("once")) + sleep_seconds = max(0.2, float(options.get("sleep_seconds") or 2.0)) + batch_size = max(1, int(options.get("batch_size") or 20)) + provider_name = str(options.get("provider") or "codex_cli").strip().lower() + + log.info( + "codex_worker started provider=%s once=%s sleep=%s batch_size=%s", + provider_name, + once, + sleep_seconds, + batch_size, + ) + + while True: + claimed_ids = self._claim_batch(provider_name, batch_size) + if not claimed_ids: + if once: + log.info("codex_worker exiting: no pending events") + return + time.sleep(sleep_seconds) + continue + + for row_id in claimed_ids: + event = ExternalSyncEvent.objects.filter(id=row_id).select_related("task", "user").first() + if event is None: + continue + try: + self._run_event(event) + except Exception as exc: + log.exception("codex_worker failed processing id=%s", row_id) + ExternalSyncEvent.objects.filter(id=row_id).update( + status="failed", + error=f"worker_exception:{exc}", + ) + + if once: + log.info("codex_worker processed %s event(s)", len(claimed_ids)) + return diff --git a/core/management/commands/recalculate_contact_availability.py b/core/management/commands/recalculate_contact_availability.py new file mode 100644 index 0000000..03d358f --- /dev/null +++ b/core/management/commands/recalculate_contact_availability.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +from collections import defaultdict + +from django.core.management.base import BaseCommand + +from core.models import ContactAvailabilityEvent, ContactAvailabilitySpan, Message +from core.presence import AvailabilitySignal, record_native_signal +from core.presence.inference import now_ms + + +_SOURCE_ORDER = { + "message_in": 10, + "message_out": 20, + "read_receipt": 30, + "native_presence": 40, +} + + +class Command(BaseCommand): + help = ( + "Recalculate contact availability events/spans from persisted message, " + "read-receipt, and reaction history (deterministic rebuild)." + ) + + def add_arguments(self, parser): + parser.add_argument("--days", type=int, default=90) + parser.add_argument("--limit", type=int, default=20000) + parser.add_argument("--service", default="") + parser.add_argument("--user-id", default="") + parser.add_argument("--dry-run", action="store_true", default=False) + parser.add_argument("--no-reset", action="store_true", default=False) + + def _iter_messages(self, *, days: int, limit: int, service: str, user_id: str): + cutoff_ts = now_ms() - (max(1, int(days)) * 24 * 60 * 60 * 1000) + qs = Message.objects.filter(ts__gte=cutoff_ts).select_related( + "user", "session", "session__identifier", "session__identifier__person" + ) + if service: + qs = qs.filter(source_service=str(service).strip().lower()) + if user_id: + qs = qs.filter(user_id=str(user_id).strip()) + return qs.order_by("ts")[: max(1, int(limit))] + + def _build_event_rows(self, messages): + rows = [] + for msg in messages: + identifier = getattr(getattr(msg, "session", None), "identifier", None) + person = getattr(identifier, "person", None) + user = getattr(msg, "user", None) + if not identifier or not person or not user: + continue + + service = str( + getattr(msg, "source_service", "") or getattr(identifier, "service", "") + ).strip().lower() + if not service: + continue + + ts = int(getattr(msg, "ts", 0) or 0) + if ts > 0: + author = str(getattr(msg, "custom_author", "") or "").strip().upper() + outgoing = author in {"USER", "BOT"} + rows.append( + { + "user": user, + "person": person, + "person_identifier": identifier, + "service": service, + "source_kind": "message_out" if outgoing else "message_in", + "availability_state": "available", + "confidence": 0.65 if outgoing else 0.75, + "ts": ts, + "payload": { + "origin": "recalculate_contact_availability", + "message_id": str(msg.id), + "inferred_from": "message_activity", + }, + } + ) + + read_ts = int(getattr(msg, "read_ts", 0) or 0) + if read_ts > 0: + rows.append( + { + "user": user, + "person": person, + "person_identifier": identifier, + "service": service, + "source_kind": "read_receipt", + "availability_state": "available", + "confidence": 0.95, + "ts": read_ts, + "payload": { + "origin": "recalculate_contact_availability", + "message_id": str(msg.id), + "inferred_from": "read_receipt", + "read_by": str(getattr(msg, "read_by_identifier", "") or ""), + }, + } + ) + + reactions = list((getattr(msg, "receipt_payload", {}) or {}).get("reactions") or []) + for reaction in reactions: + item = dict(reaction or {}) + if bool(item.get("removed")): + continue + reaction_ts = int(item.get("updated_at") or 0) + if reaction_ts <= 0: + continue + rows.append( + { + "user": user, + "person": person, + "person_identifier": identifier, + "service": service, + "source_kind": "native_presence", + "availability_state": "available", + "confidence": 0.9, + "ts": reaction_ts, + "payload": { + "origin": "recalculate_contact_availability", + "message_id": str(msg.id), + "inferred_from": "reaction", + "emoji": str(item.get("emoji") or ""), + "actor": str(item.get("actor") or ""), + "source_service": str(item.get("source_service") or service), + }, + } + ) + + rows.sort( + key=lambda row: ( + str(getattr(row["user"], "id", "")), + str(getattr(row["person"], "id", "")), + str(row.get("service") or ""), + int(row.get("ts") or 0), + _SOURCE_ORDER.get(str(row.get("source_kind") or ""), 999), + str((row.get("payload") or {}).get("message_id") or ""), + ) + ) + return rows + + def handle(self, *args, **options): + days = max(1, int(options.get("days") or 90)) + limit = max(1, int(options.get("limit") or 20000)) + service_filter = str(options.get("service") or "").strip().lower() + user_filter = str(options.get("user_id") or "").strip() + dry_run = bool(options.get("dry_run")) + reset = not bool(options.get("no_reset")) + cutoff_ts = now_ms() - (days * 24 * 60 * 60 * 1000) + + messages = list( + self._iter_messages( + days=days, + limit=limit, + service=service_filter, + user_id=user_filter, + ) + ) + rows = self._build_event_rows(messages) + + keys_to_reset = set() + for row in rows: + keys_to_reset.add( + ( + str(getattr(row["user"], "id", "")), + str(getattr(row["person"], "id", "")), + str(row.get("service") or ""), + ) + ) + + deleted_events = 0 + deleted_spans = 0 + if reset and keys_to_reset and not dry_run: + for user_id, person_id, service in keys_to_reset: + deleted_events += ContactAvailabilityEvent.objects.filter( + user_id=user_id, + person_id=person_id, + service=service, + ts__gte=cutoff_ts, + ).delete()[0] + deleted_spans += ContactAvailabilitySpan.objects.filter( + user_id=user_id, + person_id=person_id, + service=service, + end_ts__gte=cutoff_ts, + ).delete()[0] + + created = 0 + dedup_seen = set() + for row in rows: + dedup_key = ( + str(getattr(row["user"], "id", "")), + str(getattr(row["person"], "id", "")), + str(getattr(row["person_identifier"], "id", "")), + str(row.get("service") or ""), + str(row.get("source_kind") or ""), + int(row.get("ts") or 0), + str((row.get("payload") or {}).get("message_id") or ""), + str((row.get("payload") or {}).get("inferred_from") or ""), + ) + if dedup_key in dedup_seen: + continue + dedup_seen.add(dedup_key) + + if not reset: + exists = ContactAvailabilityEvent.objects.filter( + user=row["user"], + person=row["person"], + person_identifier=row["person_identifier"], + service=row["service"], + source_kind=row["source_kind"], + ts=row["ts"], + ).exists() + if exists: + continue + + created += 1 + if dry_run: + continue + record_native_signal( + AvailabilitySignal( + user=row["user"], + person=row["person"], + person_identifier=row["person_identifier"], + service=row["service"], + source_kind=row["source_kind"], + availability_state=row["availability_state"], + confidence=float(row["confidence"]), + ts=int(row["ts"]), + payload=dict(row["payload"]), + ) + ) + + self.stdout.write( + self.style.SUCCESS( + "recalculate_contact_availability complete " + f"messages_scanned={len(messages)} candidates={len(rows)} " + f"created={created} deleted_events={deleted_events} deleted_spans={deleted_spans} " + f"reset={reset} dry_run={dry_run} days={days} limit={limit}" + ) + ) diff --git a/core/messaging/history.py b/core/messaging/history.py index 076acdd..41c4853 100644 --- a/core/messaging/history.py +++ b/core/messaging/history.py @@ -1,5 +1,6 @@ from asgiref.sync import sync_to_async from django.conf import settings +import uuid from core.messaging.utils import messages_to_string from core.models import ChatSession, Message, QueuedMessage @@ -316,9 +317,21 @@ async def apply_reaction( target = None target_uuid = str(target_message_id or "").strip() if target_uuid: - target = await sync_to_async( - lambda: queryset.filter(id=target_uuid).order_by("-ts").first() - )() + is_uuid = True + try: + uuid.UUID(str(target_uuid)) + except Exception: + is_uuid = False + if is_uuid: + target = await sync_to_async( + lambda: queryset.filter(id=target_uuid).order_by("-ts").first() + )() + if target is None: + target = await sync_to_async( + lambda: queryset.filter(source_message_id=target_uuid) + .order_by("-ts") + .first() + )() if target is None: try: diff --git a/core/migrations/0033_contactavailability_and_externalchatlink.py b/core/migrations/0033_contactavailability_and_externalchatlink.py new file mode 100644 index 0000000..93eefc1 --- /dev/null +++ b/core/migrations/0033_contactavailability_and_externalchatlink.py @@ -0,0 +1,236 @@ +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0032_commandvariantpolicy_store_document"), + ] + + operations = [ + migrations.CreateModel( + name="ContactAvailabilitySettings", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("enabled", models.BooleanField(default=True)), + ("show_in_chat", models.BooleanField(default=True)), + ("show_in_groups", models.BooleanField(default=True)), + ("inference_enabled", models.BooleanField(default=True)), + ("retention_days", models.PositiveIntegerField(default=90)), + ("fade_threshold_seconds", models.PositiveIntegerField(default=900)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "user", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="contact_availability_settings", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + ), + migrations.CreateModel( + name="ContactAvailabilityEvent", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "service", + models.CharField( + choices=[("signal", "Signal"), ("whatsapp", "WhatsApp"), ("xmpp", "XMPP"), ("instagram", "Instagram"), ("web", "Web")], + max_length=255, + ), + ), + ( + "source_kind", + models.CharField( + choices=[ + ("native_presence", "Native Presence"), + ("read_receipt", "Read Receipt"), + ("typing_start", "Typing Start"), + ("typing_stop", "Typing Stop"), + ("message_in", "Message In"), + ("message_out", "Message Out"), + ("inferred_timeout", "Inferred Timeout"), + ], + max_length=32, + ), + ), + ( + "availability_state", + models.CharField( + choices=[("available", "Available"), ("unavailable", "Unavailable"), ("unknown", "Unknown"), ("fading", "Fading")], + max_length=32, + ), + ), + ("confidence", models.FloatField(default=0.0)), + ("ts", models.BigIntegerField(db_index=True)), + ("payload", models.JSONField(blank=True, default=dict)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "person", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="availability_events", + to="core.person", + ), + ), + ( + "person_identifier", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="availability_events", + to="core.personidentifier", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="contact_availability_events", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index(fields=["user", "person", "ts"], name="core_contac_user_id_0da9b2_idx"), + models.Index(fields=["user", "service", "ts"], name="core_contac_user_id_bce271_idx"), + models.Index(fields=["user", "availability_state", "ts"], name="core_contac_user_id_1b50b3_idx"), + ], + }, + ), + migrations.CreateModel( + name="ExternalChatLink", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("provider", models.CharField(default="codex_cli", max_length=64)), + ("external_chat_id", models.CharField(max_length=255)), + ("metadata", models.JSONField(blank=True, default=dict)), + ("enabled", models.BooleanField(default=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "person", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="external_chat_links", + to="core.person", + ), + ), + ( + "person_identifier", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="external_chat_links", + to="core.personidentifier", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="external_chat_links", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index(fields=["user", "provider", "external_chat_id"], name="core_extern_user_id_f4a7b0_idx"), + models.Index(fields=["user", "provider", "enabled"], name="core_extern_user_id_7d2295_idx"), + ], + "constraints": [ + models.UniqueConstraint(fields=("user", "provider", "external_chat_id"), name="unique_external_chat_link_per_provider"), + ], + }, + ), + migrations.CreateModel( + name="ContactAvailabilitySpan", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "service", + models.CharField( + choices=[("signal", "Signal"), ("whatsapp", "WhatsApp"), ("xmpp", "XMPP"), ("instagram", "Instagram"), ("web", "Web")], + max_length=255, + ), + ), + ( + "state", + models.CharField( + choices=[("available", "Available"), ("unavailable", "Unavailable"), ("unknown", "Unknown"), ("fading", "Fading")], + max_length=32, + ), + ), + ("start_ts", models.BigIntegerField(db_index=True)), + ("end_ts", models.BigIntegerField(db_index=True)), + ("confidence_start", models.FloatField(default=0.0)), + ("confidence_end", models.FloatField(default=0.0)), + ("payload", models.JSONField(blank=True, default=dict)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "closing_event", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="closing_spans", + to="core.contactavailabilityevent", + ), + ), + ( + "opening_event", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="opening_spans", + to="core.contactavailabilityevent", + ), + ), + ( + "person", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="availability_spans", + to="core.person", + ), + ), + ( + "person_identifier", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="availability_spans", + to="core.personidentifier", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="contact_availability_spans", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index(fields=["user", "person", "start_ts"], name="core_contac_user_id_9cd15a_idx"), + models.Index(fields=["user", "person", "end_ts"], name="core_contac_user_id_88584a_idx"), + models.Index(fields=["user", "service", "start_ts"], name="core_contac_user_id_182ffb_idx"), + ], + }, + ), + ] diff --git a/core/models.py b/core/models.py index a97c5fa..435e9fe 100644 --- a/core/models.py +++ b/core/models.py @@ -20,14 +20,14 @@ SERVICE_CHOICES = ( ) CHANNEL_SERVICE_CHOICES = SERVICE_CHOICES + (("web", "Web"),) MBTI_CHOICES = ( - ("INTJ", "INTJ - Architect"), + ("INTJ", "INTJ - Architect"),# ;) ("INTP", "INTP - Logician"), ("ENTJ", "ENTJ - Commander"), ("ENTP", "ENTP - Debater"), ("INFJ", "INFJ - Advocate"), ("INFP", "INFP - Mediator"), ("ENFJ", "ENFJ - Protagonist"), - ("ENFP", "ENFP - Campaigner"), + ("ENFP", "ENFP - Campaigner"), # <3 ("ISTJ", "ISTJ - Logistician"), ("ISFJ", "ISFJ - Defender"), ("ESTJ", "ESTJ - Executive"), @@ -2227,6 +2227,164 @@ class TaskProviderConfig(models.Model): ] +class ContactAvailabilitySettings(models.Model): + user = models.OneToOneField( + User, + on_delete=models.CASCADE, + related_name="contact_availability_settings", + ) + enabled = models.BooleanField(default=True) + show_in_chat = models.BooleanField(default=True) + show_in_groups = models.BooleanField(default=True) + inference_enabled = models.BooleanField(default=True) + retention_days = models.PositiveIntegerField(default=90) + fade_threshold_seconds = models.PositiveIntegerField(default=900) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + +class ContactAvailabilityEvent(models.Model): + SOURCE_KIND_CHOICES = ( + ("native_presence", "Native Presence"), + ("read_receipt", "Read Receipt"), + ("typing_start", "Typing Start"), + ("typing_stop", "Typing Stop"), + ("message_in", "Message In"), + ("message_out", "Message Out"), + ("inferred_timeout", "Inferred Timeout"), + ) + STATE_CHOICES = ( + ("available", "Available"), + ("unavailable", "Unavailable"), + ("unknown", "Unknown"), + ("fading", "Fading"), + ) + + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="contact_availability_events", + ) + person = models.ForeignKey( + Person, + on_delete=models.CASCADE, + related_name="availability_events", + ) + person_identifier = models.ForeignKey( + PersonIdentifier, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="availability_events", + ) + service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + source_kind = models.CharField(max_length=32, choices=SOURCE_KIND_CHOICES) + availability_state = models.CharField(max_length=32, choices=STATE_CHOICES) + confidence = models.FloatField(default=0.0) + ts = models.BigIntegerField(db_index=True) + payload = models.JSONField(default=dict, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "person", "ts"]), + models.Index(fields=["user", "service", "ts"]), + models.Index(fields=["user", "availability_state", "ts"]), + ] + + +class ContactAvailabilitySpan(models.Model): + STATE_CHOICES = ContactAvailabilityEvent.STATE_CHOICES + + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="contact_availability_spans", + ) + person = models.ForeignKey( + Person, + on_delete=models.CASCADE, + related_name="availability_spans", + ) + person_identifier = models.ForeignKey( + PersonIdentifier, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="availability_spans", + ) + service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES) + state = models.CharField(max_length=32, choices=STATE_CHOICES) + start_ts = models.BigIntegerField(db_index=True) + end_ts = models.BigIntegerField(db_index=True) + confidence_start = models.FloatField(default=0.0) + confidence_end = models.FloatField(default=0.0) + opening_event = models.ForeignKey( + ContactAvailabilityEvent, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="opening_spans", + ) + closing_event = models.ForeignKey( + ContactAvailabilityEvent, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="closing_spans", + ) + payload = models.JSONField(default=dict, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "person", "start_ts"]), + models.Index(fields=["user", "person", "end_ts"]), + models.Index(fields=["user", "service", "start_ts"]), + ] + + +class ExternalChatLink(models.Model): + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="external_chat_links", + ) + provider = models.CharField(max_length=64, default="codex_cli") + person = models.ForeignKey( + Person, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="external_chat_links", + ) + person_identifier = models.ForeignKey( + PersonIdentifier, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="external_chat_links", + ) + external_chat_id = models.CharField(max_length=255) + metadata = models.JSONField(default=dict, blank=True) + enabled = models.BooleanField(default=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "provider", "external_chat_id"]), + models.Index(fields=["user", "provider", "enabled"]), + ] + constraints = [ + models.UniqueConstraint( + fields=["user", "provider", "external_chat_id"], + name="unique_external_chat_link_per_provider", + ) + ] + + class TaskCompletionPattern(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="task_completion_patterns") diff --git a/core/modules/router.py b/core/modules/router.py index efd1cef..8b9bf29 100644 --- a/core/modules/router.py +++ b/core/modules/router.py @@ -1,4 +1,5 @@ import asyncio +import re from asgiref.sync import sync_to_async from django.conf import settings @@ -13,6 +14,7 @@ from core.commands.base import CommandContext from core.commands.engine import process_inbound_message from core.messaging import history from core.models import PersonIdentifier +from core.presence import AvailabilitySignal, record_native_signal from core.realtime.typing_state import set_person_typing_state from core.translation.engine import process_inbound_translation from core.util import logs @@ -100,6 +102,32 @@ class UnifiedRouter(object): message_text = str(kwargs.get("text") or "").strip() if local_message is None: return + identifiers = await self._resolve_identifier_objects(protocol, identifier) + if identifiers: + outgoing = str(getattr(local_message, "custom_author", "") or "").strip().upper() in { + "USER", + "BOT", + } + source_kind = "message_out" if outgoing else "message_in" + confidence = 0.65 if outgoing else 0.75 + for row in identifiers: + record_native_signal( + AvailabilitySignal( + user=row.user, + person=row.person, + person_identifier=row, + service=str(protocol or "").strip().lower(), + source_kind=source_kind, + availability_state="available", + confidence=confidence, + ts=int(getattr(local_message, "ts", 0) or 0), + payload={ + "origin": "router.message_received", + "message_id": str(getattr(local_message, "id", "") or ""), + "outgoing": outgoing, + }, + ) + ) channel_identifier = "" if isinstance(identifier, PersonIdentifier): channel_identifier = str(identifier.identifier or "").strip() @@ -127,6 +155,31 @@ class UnifiedRouter(object): await process_inbound_assist(local_message) except Exception as exc: self.log.warning("Assist/task processing failed: %s", exc) + await self._refresh_workspace_metrics_for_identifiers(identifiers) + + async def _refresh_workspace_metrics_for_identifiers(self, identifiers): + if not identifiers: + return + seen = set() + for row in identifiers: + person = getattr(row, "person", None) + user = getattr(row, "user", None) + if person is None or user is None: + continue + person_key = str(getattr(person, "id", "") or "") + if not person_key or person_key in seen: + continue + seen.add(person_key) + try: + from core.views.workspace import _conversation_for_person + + await sync_to_async(_conversation_for_person)(user, person) + except Exception as exc: + self.log.warning( + "Workspace metrics refresh failed for person=%s: %s", + person_key, + exc, + ) async def _resolve_identifier_objects(self, protocol, identifier): if isinstance(identifier, PersonIdentifier): @@ -134,9 +187,28 @@ class UnifiedRouter(object): value = str(identifier or "").strip() if not value: return [] + variants = [value] + bare = value.split("@", 1)[0].strip() + if bare and bare not in variants: + variants.append(bare) + if protocol == "signal": + digits = re.sub(r"[^0-9]", "", value) + if digits and digits not in variants: + variants.append(digits) + if digits: + plus = f"+{digits}" + if plus not in variants: + variants.append(plus) + elif protocol == "whatsapp" and bare: + direct = f"{bare}@s.whatsapp.net" + group = f"{bare}@g.us" + if direct not in variants: + variants.append(direct) + if group not in variants: + variants.append(group) return await sync_to_async(list)( PersonIdentifier.objects.filter( - identifier=value, + identifier__in=variants, service=protocol, ) ) @@ -160,12 +232,75 @@ class UnifiedRouter(object): read_by_identifier=read_by or row.identifier, payload=payload, ) + record_native_signal( + AvailabilitySignal( + user=row.user, + person=row.person, + person_identifier=row, + service=str(protocol or "").strip().lower(), + source_kind="read_receipt", + availability_state="available", + confidence=0.95, + ts=int(read_ts or 0), + payload={ + "origin": "router.message_read", + "message_timestamps": [int(v) for v in list(timestamps or []) if str(v).isdigit()], + "read_by": str(read_by or row.identifier), + }, + ) + ) + await self._refresh_workspace_metrics_for_identifiers(identifiers) + + async def presence_changed(self, protocol, *args, **kwargs): + identifier = kwargs.get("identifier") + state = str(kwargs.get("state") or "unknown").strip().lower() + if state not in {"available", "unavailable", "unknown", "fading"}: + state = "unknown" + try: + confidence = float(kwargs.get("confidence") or 0.6) + except Exception: + confidence = 0.6 + try: + ts = int(kwargs.get("ts") or 0) + except Exception: + ts = 0 + payload = dict(kwargs.get("payload") or {}) + + identifiers = await self._resolve_identifier_objects(protocol, identifier) + for row in identifiers: + record_native_signal( + AvailabilitySignal( + user=row.user, + person=row.person, + person_identifier=row, + service=str(protocol or "").strip().lower(), + source_kind="native_presence", + availability_state=state, + confidence=confidence, + ts=ts, + payload=payload, + ) + ) + await self._refresh_workspace_metrics_for_identifiers(identifiers) async def started_typing(self, protocol, *args, **kwargs): self.log.info(f"Started typing ({protocol}) {args} {kwargs}") identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: + record_native_signal( + AvailabilitySignal( + user=src.user, + person=src.person, + person_identifier=src, + service=str(protocol or "").strip().lower(), + source_kind="typing_start", + availability_state="available", + confidence=0.9, + ts=0, + payload={"origin": "router.started_typing"}, + ) + ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, @@ -201,6 +336,19 @@ class UnifiedRouter(object): identifier = kwargs.get("identifier") identifiers = await self._resolve_identifier_objects(protocol, identifier) for src in identifiers: + record_native_signal( + AvailabilitySignal( + user=src.user, + person=src.person, + person_identifier=src, + service=str(protocol or "").strip().lower(), + source_kind="typing_stop", + availability_state="fading", + confidence=0.5, + ts=0, + payload={"origin": "router.stopped_typing"}, + ) + ) if protocol != "xmpp": set_person_typing_state( user_id=src.user_id, diff --git a/core/presence/__init__.py b/core/presence/__init__.py new file mode 100644 index 0000000..0830f6e --- /dev/null +++ b/core/presence/__init__.py @@ -0,0 +1,18 @@ +from .engine import ( + AvailabilitySignal, + ensure_fading_state, + get_settings, + record_inferred_signal, + record_native_signal, +) +from .query import latest_state_for_people, spans_for_range + +__all__ = [ + "AvailabilitySignal", + "ensure_fading_state", + "get_settings", + "record_inferred_signal", + "record_native_signal", + "latest_state_for_people", + "spans_for_range", +] diff --git a/core/presence/engine.py b/core/presence/engine.py new file mode 100644 index 0000000..5d51e21 --- /dev/null +++ b/core/presence/engine.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from django.db import transaction + +from core.models import ( + ContactAvailabilityEvent, + ContactAvailabilitySettings, + ContactAvailabilitySpan, + Person, + PersonIdentifier, + User, +) +from .inference import fade_confidence, now_ms, should_fade + +POSITIVE_SOURCE_KINDS = {"native_presence", "read_receipt", "typing_start", "message_in"} + + +@dataclass(slots=True) +class AvailabilitySignal: + user: User + person: Person + person_identifier: PersonIdentifier | None + service: str + source_kind: str + availability_state: str + confidence: float = 0.8 + ts: int = 0 + payload: dict | None = None + + +def get_settings(user: User) -> ContactAvailabilitySettings: + settings_row, _ = ContactAvailabilitySettings.objects.get_or_create(user=user) + return settings_row + + +def _normalize_ts(value: int | None) -> int: + try: + ts = int(value or 0) + except Exception: + ts = 0 + return ts if ts > 0 else now_ms() + + +def _upsert_spans_for_event(event: ContactAvailabilityEvent) -> None: + prev = ( + ContactAvailabilitySpan.objects.filter( + user=event.user, + person=event.person, + service=event.service, + ) + .order_by("-end_ts", "-id") + .first() + ) + + if prev and prev.state == event.availability_state: + prev.end_ts = max(int(prev.end_ts or 0), int(event.ts or 0)) + prev.confidence_end = float(event.confidence or 0.0) + prev.closing_event = event + prev.payload = dict(prev.payload or {}) + prev.payload.update({"extended_by": str(event.source_kind or "")}) + prev.save( + update_fields=[ + "end_ts", + "confidence_end", + "closing_event", + "payload", + "updated_at", + ] + ) + return + + ContactAvailabilitySpan.objects.create( + user=event.user, + person=event.person, + person_identifier=event.person_identifier, + service=event.service, + state=event.availability_state, + start_ts=int(event.ts or 0), + end_ts=int(event.ts or 0), + confidence_start=float(event.confidence or 0.0), + confidence_end=float(event.confidence or 0.0), + opening_event=event, + closing_event=event, + payload=dict(event.payload or {}), + ) + + +@transaction.atomic +def record_native_signal(signal: AvailabilitySignal) -> ContactAvailabilityEvent | None: + settings_row = get_settings(signal.user) + if not settings_row.enabled: + return None + + event = ContactAvailabilityEvent.objects.create( + user=signal.user, + person=signal.person, + person_identifier=signal.person_identifier, + service=str(signal.service or "").strip().lower() or "signal", + source_kind=str(signal.source_kind or "").strip() or "native_presence", + availability_state=str(signal.availability_state or "unknown").strip() or "unknown", + confidence=float(signal.confidence or 0.0), + ts=_normalize_ts(signal.ts), + payload=dict(signal.payload or {}), + ) + _upsert_spans_for_event(event) + _prune_old_data(signal.user, settings_row.retention_days) + return event + + +def record_inferred_signal(signal: AvailabilitySignal) -> ContactAvailabilityEvent | None: + settings_row = get_settings(signal.user) + if not settings_row.enabled or not settings_row.inference_enabled: + return None + return record_native_signal(signal) + + +def _prune_old_data(user: User, retention_days: int) -> None: + days = max(1, int(retention_days or 90)) + cutoff = now_ms() - (days * 24 * 60 * 60 * 1000) + ContactAvailabilityEvent.objects.filter(user=user, ts__lt=cutoff).delete() + ContactAvailabilitySpan.objects.filter(user=user, end_ts__lt=cutoff).delete() + + +def ensure_fading_state( + *, + user: User, + person: Person, + person_identifier: PersonIdentifier | None, + service: str, + at_ts: int | None = None, +) -> ContactAvailabilityEvent | None: + settings_row = get_settings(user) + if not settings_row.enabled or not settings_row.inference_enabled: + return None + + current_ts = _normalize_ts(at_ts) + latest = ( + ContactAvailabilityEvent.objects.filter( + user=user, + person=person, + service=str(service or "").strip().lower(), + ) + .order_by("-ts", "-id") + .first() + ) + if latest is None: + return None + if latest.availability_state in {"fading", "unavailable"}: + return None + if latest.source_kind not in POSITIVE_SOURCE_KINDS: + return None + if not should_fade(int(latest.ts or 0), current_ts, settings_row.fade_threshold_seconds): + return None + + elapsed = max(0, current_ts - int(latest.ts or 0)) + payload = { + "inferred_from": latest.source_kind, + "last_signal_ts": int(latest.ts or 0), + "elapsed_ms": elapsed, + } + return record_inferred_signal( + AvailabilitySignal( + user=user, + person=person, + person_identifier=person_identifier, + service=service, + source_kind="inferred_timeout", + availability_state="fading", + confidence=fade_confidence(elapsed, settings_row.fade_threshold_seconds), + ts=current_ts, + payload=payload, + ) + ) diff --git a/core/presence/inference.py b/core/presence/inference.py new file mode 100644 index 0000000..0a01a9d --- /dev/null +++ b/core/presence/inference.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import time + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def fade_confidence(elapsed_ms: int, fade_threshold_seconds: int) -> float: + """ + Convert elapsed inactivity to a confidence score for fading state. + + Confidence starts at 0.7 when fading begins and decays toward 0.2 over + 4x fade threshold windows. + """ + threshold_ms = max(1, int(fade_threshold_seconds or 900) * 1000) + if elapsed_ms <= threshold_ms: + return 0.7 + over = min(4.0, float(elapsed_ms - threshold_ms) / float(threshold_ms)) + return max(0.2, 0.7 - (over * 0.125)) + + +def should_fade(last_event_ts: int, now_ts: int, fade_threshold_seconds: int) -> bool: + if last_event_ts <= 0: + return False + threshold_ms = max(1, int(fade_threshold_seconds or 900) * 1000) + return int(now_ts) - int(last_event_ts) >= threshold_ms diff --git a/core/presence/query.py b/core/presence/query.py new file mode 100644 index 0000000..82f548c --- /dev/null +++ b/core/presence/query.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from django.db.models import Q + +from core.models import ContactAvailabilityEvent, ContactAvailabilitySpan, Person, User +from .engine import ensure_fading_state +from .inference import now_ms + + +def spans_for_range( + *, + user: User, + person: Person, + start_ts: int, + end_ts: int, + service: str = "", + limit: int = 200, +): + qs = ContactAvailabilitySpan.objects.filter( + user=user, + person=person, + ).filter( + Q(start_ts__lte=end_ts) & Q(end_ts__gte=start_ts) + ) + if service: + qs = qs.filter(service=str(service).strip().lower()) + + ensure_fading_state( + user=user, + person=person, + person_identifier=None, + service=(str(service or "").strip().lower() or "signal"), + at_ts=now_ms(), + ) + + return list(qs.order_by("-end_ts")[: max(1, min(int(limit or 200), 500))]) + + +def latest_state_for_people(*, user: User, person_ids: list, service: str = "") -> dict: + out = {} + if not person_ids: + return out + qs = ContactAvailabilityEvent.objects.filter(user=user, person_id__in=person_ids) + if service: + qs = qs.filter(service=str(service).strip().lower()) + rows = list(qs.order_by("person_id", "-ts", "-id")) + seen = set() + for row in rows: + person_key = str(row.person_id) + if person_key in seen: + continue + seen.add(person_key) + out[person_key] = { + "state": str(row.availability_state or "unknown"), + "confidence": float(row.confidence or 0.0), + "service": str(row.service or ""), + "ts": int(row.ts or 0), + "source_kind": str(row.source_kind or ""), + } + return out diff --git a/core/tasks/engine.py b/core/tasks/engine.py index a85fc66..6307ee3 100644 --- a/core/tasks/engine.py +++ b/core/tasks/engine.py @@ -4,6 +4,7 @@ import re from asgiref.sync import sync_to_async from django.conf import settings +from django.db.models import Q from core.clients.transport import send_message_raw from core.messaging import ai as ai_runner @@ -14,11 +15,13 @@ from core.models import ( DerivedTask, DerivedTaskEvent, ExternalSyncEvent, + ExternalChatLink, Message, + PersonIdentifier, TaskCompletionPattern, TaskProviderConfig, ) -from core.tasks.providers.mock import get_provider +from core.tasks.providers import get_provider _TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE) _COMPLETION_RE = re.compile(r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE) @@ -218,6 +221,64 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s provider_settings = dict(getattr(cfg, "settings", {}) or {}) provider = get_provider(provider_name) idempotency_key = f"{provider_name}:{task.id}:{event.id}" + variants = _channel_variants(task.source_service or "", task.source_channel or "") + person_identifier = None + if variants: + person_identifier = await sync_to_async( + lambda: PersonIdentifier.objects.filter( + user=task.user, + service=task.source_service, + identifier__in=variants, + ) + .select_related("person") + .order_by("-id") + .first() + )() + external_chat_id = "" + if person_identifier is not None: + link = await sync_to_async( + lambda: ExternalChatLink.objects.filter( + user=task.user, + provider=provider_name, + enabled=True, + ) + .filter( + Q(person_identifier=person_identifier) + | Q(person=person_identifier.person) + ) + .order_by("-updated_at", "-id") + .first() + )() + if link is not None: + external_chat_id = str(link.external_chat_id or "").strip() + + # Worker-backed providers are queued and executed by `manage.py codex_worker`. + if bool(getattr(provider, "run_in_worker", False)): + await sync_to_async(ExternalSyncEvent.objects.update_or_create)( + idempotency_key=idempotency_key, + defaults={ + "user": task.user, + "task": task, + "task_event": event, + "provider": provider_name, + "status": "pending", + "payload": { + "action": action, + "provider_payload": { + "task_id": str(task.id), + "title": task.title, + "external_key": task.external_key, + "reference_code": task.reference_code, + "source_service": str(task.source_service or ""), + "source_channel": str(task.source_channel or ""), + "external_chat_id": external_chat_id, + "payload": event.payload, + }, + }, + "error": "", + }, + ) + return if action == "create": result = provider.create_task(provider_settings, { @@ -225,18 +286,27 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s "title": task.title, "external_key": task.external_key, "reference_code": task.reference_code, + "source_service": str(task.source_service or ""), + "source_channel": str(task.source_channel or ""), + "external_chat_id": external_chat_id, }) elif action == "complete": result = provider.mark_complete(provider_settings, { "task_id": str(task.id), "external_key": task.external_key, "reference_code": task.reference_code, + "source_service": str(task.source_service or ""), + "source_channel": str(task.source_channel or ""), + "external_chat_id": external_chat_id, }) else: result = provider.append_update(provider_settings, { "task_id": str(task.id), "external_key": task.external_key, "reference_code": task.reference_code, + "source_service": str(task.source_service or ""), + "source_channel": str(task.source_channel or ""), + "external_chat_id": external_chat_id, "payload": event.payload, }) diff --git a/core/tasks/providers/__init__.py b/core/tasks/providers/__init__.py index e69de29..6f1cb9d 100644 --- a/core/tasks/providers/__init__.py +++ b/core/tasks/providers/__init__.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from .base import TaskProvider +from .codex_cli import CodexCLITaskProvider +from .mock import MockTaskProvider + +PROVIDERS = { + "mock": MockTaskProvider(), + "codex_cli": CodexCLITaskProvider(), +} + + +def get_provider(name: str) -> TaskProvider: + key = str(name or "").strip().lower() + return PROVIDERS.get(key, PROVIDERS["mock"]) + + +def list_providers() -> list[TaskProvider]: + return list(PROVIDERS.values()) diff --git a/core/tasks/providers/base.py b/core/tasks/providers/base.py index 375945e..b75b4de 100644 --- a/core/tasks/providers/base.py +++ b/core/tasks/providers/base.py @@ -13,6 +13,7 @@ class ProviderResult: class TaskProvider: name = "base" + run_in_worker = False def healthcheck(self, config: dict) -> ProviderResult: raise NotImplementedError diff --git a/core/tasks/providers/codex_cli.py b/core/tasks/providers/codex_cli.py new file mode 100644 index 0000000..4b25441 --- /dev/null +++ b/core/tasks/providers/codex_cli.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import json +import subprocess + +from .base import ProviderResult, TaskProvider + + +class CodexCLITaskProvider(TaskProvider): + name = "codex_cli" + run_in_worker = True + + def _timeout(self, config: dict) -> int: + try: + return max(1, int(config.get("timeout_seconds") or 60)) + except Exception: + return 60 + + def _command(self, config: dict) -> str: + return str(config.get("command") or "codex").strip() or "codex" + + def _workspace(self, config: dict) -> str: + return str(config.get("workspace_root") or "").strip() + + def _profile(self, config: dict) -> str: + return str(config.get("default_profile") or "").strip() + + def _run(self, config: dict, op: str, payload: dict) -> ProviderResult: + cmd = [self._command(config), "task-sync", "--op", str(op)] + workspace = self._workspace(config) + if workspace: + cmd.extend(["--workspace", workspace]) + profile = self._profile(config) + if profile: + cmd.extend(["--profile", profile]) + command_timeout = self._timeout(config) + data = json.dumps(dict(payload or {}), separators=(",", ":")) + cmd.extend(["--payload-json", data]) + + try: + completed = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=command_timeout, + check=False, + cwd=workspace if workspace else None, + ) + except subprocess.TimeoutExpired: + return ProviderResult( + ok=False, + error=f"codex_cli_timeout_{command_timeout}s", + payload={"op": op, "timeout_seconds": command_timeout}, + ) + except Exception as exc: + return ProviderResult(ok=False, error=f"codex_cli_exec_error:{exc}", payload={"op": op}) + + stdout = str(completed.stdout or "").strip() + stderr = str(completed.stderr or "").strip() + parsed = {} + if stdout: + try: + parsed = json.loads(stdout) + if not isinstance(parsed, dict): + parsed = {"raw_stdout": stdout} + except Exception: + parsed = {"raw_stdout": stdout} + + ext = ( + str(parsed.get("external_key") or "").strip() + or str(parsed.get("task_id") or "").strip() + or str(payload.get("external_key") or "").strip() + ) + + ok = completed.returncode == 0 + out_payload = { + "op": op, + "returncode": int(completed.returncode), + "stdout": stdout[:4000], + "stderr": stderr[:4000], + } + out_payload.update(parsed) + return ProviderResult(ok=ok, external_key=ext, error=("" if ok else stderr[:4000]), payload=out_payload) + + def healthcheck(self, config: dict) -> ProviderResult: + command = self._command(config) + try: + completed = subprocess.run( + [command, "--version"], + capture_output=True, + text=True, + timeout=max(1, min(20, self._timeout(config))), + check=False, + ) + except Exception as exc: + return ProviderResult(ok=False, error=f"codex_cli_unavailable:{exc}") + return ProviderResult( + ok=(completed.returncode == 0), + payload={ + "returncode": int(completed.returncode), + "stdout": str(completed.stdout or "").strip()[:1000], + "stderr": str(completed.stderr or "").strip()[:1000], + }, + error=("" if completed.returncode == 0 else str(completed.stderr or "").strip()[:1000]), + ) + + def create_task(self, config: dict, payload: dict) -> ProviderResult: + return self._run(config, "create", payload) + + def append_update(self, config: dict, payload: dict) -> ProviderResult: + return self._run(config, "append_update", payload) + + def mark_complete(self, config: dict, payload: dict) -> ProviderResult: + return self._run(config, "mark_complete", payload) + + def link_task(self, config: dict, payload: dict) -> ProviderResult: + return self._run(config, "link_task", payload) diff --git a/core/tasks/providers/mock.py b/core/tasks/providers/mock.py index c526394..044dae3 100644 --- a/core/tasks/providers/mock.py +++ b/core/tasks/providers/mock.py @@ -23,12 +23,3 @@ class MockTaskProvider(TaskProvider): def link_task(self, config: dict, payload: dict) -> ProviderResult: return ProviderResult(ok=True, external_key=str(payload.get("external_key") or ""), payload={"action": "link_task"}) - - -PROVIDERS = { - "mock": MockTaskProvider(), -} - - -def get_provider(name: str) -> TaskProvider: - return PROVIDERS.get(str(name or "").strip().lower(), PROVIDERS["mock"]) diff --git a/core/templates/base.html b/core/templates/base.html index ecaeaae..dc80ec6 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -377,6 +377,9 @@ Sessions + + Documents + @@ -401,6 +404,9 @@ Task Settings + + Availability + Translation diff --git a/core/templates/pages/availability-settings.html b/core/templates/pages/availability-settings.html new file mode 100644 index 0000000..ce2efce --- /dev/null +++ b/core/templates/pages/availability-settings.html @@ -0,0 +1,128 @@ +{% extends "base.html" %} + +{% block content %} +
+
+

Availability Settings

+
+ {% csrf_token %} +
+
+
+
+
+
+ + +
+
+ + +
+
+ +
+ +
+

Timeline Filters

+
+
+ +
+ +
+
+
+ +
+ +
+
+
+ +
+ +
+
+
+ +
+ +
+
+
+ + +
+
+ + +
+
+ +
+ +
+

Availability Events

+ + + + {% for row in events %} + + + + + + + + + {% empty %} + + {% endfor %} + +
tspersonservicesourcestateconfidence
{{ row.ts }}{{ row.person.name }}{{ row.service }}{{ row.source_kind }}{{ row.availability_state }}{{ row.confidence|floatformat:2 }}
No events in range.
+
+ +
+

Availability Spans

+ + + + {% for row in spans %} + + + + + + + + + {% empty %} + + {% endfor %} + +
personservicestatestartendconfidence
{{ row.person.name }}{{ row.service }}{{ row.state }}{{ row.start_ts }}{{ row.end_ts }}{{ row.confidence_start|floatformat:2 }} -> {{ row.confidence_end|floatformat:2 }}
No spans in range.
+
+
+
+{% endblock %} diff --git a/core/templates/pages/command-routing.html b/core/templates/pages/command-routing.html index abe3fe2..9da5c62 100644 --- a/core/templates/pages/command-routing.html +++ b/core/templates/pages/command-routing.html @@ -150,10 +150,10 @@ {% if variant.warn_verbatim_plan %} -

- Warning: {{ variant.variant_label }} is in verbatim mode with plan fanout enabled. +

+ Warning: {{ variant.variant_label }} is in verbatim mode with plan fanout enabled. Recipients will get raw transcript-style output. -

+
{% endif %} @@ -188,12 +188,15 @@

Effective Destinations

{% if profile.enabled_egress_bindings %} -