verbatim mode with plan fanout enabled.
Recipients will get raw transcript-style output.
- diff --git a/app/urls.py b/app/urls.py
index 369c9e7..b903d75 100644
--- a/app/urls.py
+++ b/app/urls.py
@@ -23,6 +23,7 @@ from two_factor.urls import urlpatterns as tf_urls
from core.views import (
ais,
automation,
+ availability,
base,
compose,
groups,
@@ -307,6 +308,11 @@ urlpatterns = [
tasks.TaskSettings.as_view(),
name="tasks_settings",
),
+ path(
+ "settings/availability/",
+ availability.AvailabilitySettingsPage.as_view(),
+ name="availability_settings",
+ ),
# AIs
path(
"ai/workspace/",
diff --git a/core/clients/signal.py b/core/clients/signal.py
index bd187a1..f992387 100644
--- a/core/clients/signal.py
+++ b/core/clients/signal.py
@@ -193,6 +193,28 @@ def _extract_signal_reaction(envelope):
}
+def _extract_signal_text(raw_payload, default_text=""):
+ text = str(default_text or "").strip()
+ if text:
+ return text
+ payload = dict(raw_payload or {})
+ envelope = dict(payload.get("envelope") or {})
+ candidates = [
+ envelope.get("dataMessage"),
+ _get_nested(envelope, ("syncMessage", "sentMessage", "message")),
+ _get_nested(envelope, ("syncMessage", "sentMessage")),
+ payload.get("dataMessage"),
+ payload,
+ ]
+ for item in candidates:
+ if isinstance(item, dict):
+ for key in ("message", "text", "body", "caption"):
+ value = str(item.get(key) or "").strip()
+ if value:
+ return value
+ return ""
+
+
def _typing_started(typing_payload):
action = str(typing_payload.get("action") or "").strip().lower()
if action in {"started", "start", "typing", "composing"}:
@@ -368,6 +390,7 @@ class HandleMessage(Command):
source_number = c.message.source_number
source_uuid = c.message.source_uuid
text = c.message.text
+ text = _extract_signal_text(raw, text)
ts = c.message.timestamp
source_value = c.message.source
envelope = raw.get("envelope", {})
@@ -1209,14 +1232,17 @@ class SignalClient(ClientBase):
if isinstance(sync_sent_message, dict) and sync_sent_message:
raw_text = sync_sent_message.get("message")
if isinstance(raw_text, dict):
- text = str(
- raw_text.get("message")
- or raw_text.get("text")
- or raw_text.get("body")
- or ""
- ).strip()
+ text = _extract_signal_text(
+ {"envelope": {"syncMessage": {"sentMessage": {"message": raw_text}}}},
+ str(
+ raw_text.get("message")
+ or raw_text.get("text")
+ or raw_text.get("body")
+ or ""
+ ).strip(),
+ )
else:
- text = str(raw_text or "").strip()
+ text = _extract_signal_text(payload, str(raw_text or "").strip())
destination_uuid = str(
sync_sent_message.get("destinationUuid")
@@ -1373,7 +1399,7 @@ class SignalClient(ClientBase):
)
return
- text = str(data_message.get("message") or "").strip()
+ text = _extract_signal_text(payload, str(data_message.get("message") or "").strip())
if not text:
return
diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py
index 90c4dba..820c31c 100644
--- a/core/clients/whatsapp.py
+++ b/core/clients/whatsapp.py
@@ -2355,8 +2355,29 @@ class WhatsAppClient(ClientBase):
return "application/octet-stream"
def _extract_reaction_event(self, message_obj):
- node = self._pluck(message_obj, "reactionMessage") or self._pluck(
- message_obj, "reaction_message"
+ node = (
+ self._pluck(message_obj, "reactionMessage")
+ or self._pluck(message_obj, "reaction_message")
+ or self._pluck(message_obj, "ephemeralMessage", "message", "reactionMessage")
+ or self._pluck(message_obj, "ephemeral_message", "message", "reaction_message")
+ or self._pluck(message_obj, "viewOnceMessage", "message", "reactionMessage")
+ or self._pluck(message_obj, "view_once_message", "message", "reaction_message")
+ or self._pluck(message_obj, "viewOnceMessageV2", "message", "reactionMessage")
+ or self._pluck(message_obj, "view_once_message_v2", "message", "reaction_message")
+ or self._pluck(
+ message_obj,
+ "viewOnceMessageV2Extension",
+ "message",
+ "reactionMessage",
+ )
+ or self._pluck(
+ message_obj,
+ "view_once_message_v2_extension",
+ "message",
+ "reaction_message",
+ )
+ or self._pluck(message_obj, "protocolMessage", "reactionMessage")
+ or self._pluck(message_obj, "protocol_message", "reaction_message")
)
if not node:
return None
@@ -2366,17 +2387,34 @@ class WhatsAppClient(ClientBase):
target_msg_id = str(
self._pluck(node, "key", "id")
or self._pluck(node, "key", "ID")
+ or self._pluck(node, "messageKey", "id")
+ or self._pluck(node, "message_key", "id")
or self._pluck(node, "targetMessageKey", "id")
or self._pluck(node, "target_message_key", "id")
+ or self._pluck(node, "stanzaId")
+ or self._pluck(node, "stanza_id")
or ""
).strip()
- remove = bool(not emoji)
+ target_ts = self._normalize_timestamp(
+ self._pluck(node, "key", "messageTimestamp")
+ or self._pluck(node, "targetMessageKey", "messageTimestamp")
+ or self._pluck(node, "target_message_key", "message_timestamp")
+ or self._pluck(node, "targetTimestamp")
+ or self._pluck(node, "target_timestamp")
+ or 0
+ )
+ explicit_remove = self._pluck(node, "remove") or self._pluck(node, "isRemove")
+ if explicit_remove is None:
+ explicit_remove = self._pluck(node, "is_remove")
+ remove = bool(explicit_remove) if explicit_remove is not None else bool(not emoji)
if not target_msg_id:
return None
return {
"emoji": emoji,
"target_message_id": target_msg_id,
"remove": remove,
+ "target_ts": int(target_ts or 0),
+ "raw": self._proto_to_dict(node) or dict(node or {}) if isinstance(node, dict) else {},
}
async def _download_event_media(self, event):
@@ -2438,6 +2476,10 @@ class WhatsAppClient(ClientBase):
async def _handle_message_event(self, event):
event_obj = self._proto_to_dict(event) or event
msg_obj = self._pluck(event_obj, "message") or self._pluck(event_obj, "Message")
+ if self._pluck(msg_obj, "protocolMessage") or self._pluck(
+ msg_obj, "protocol_message"
+ ):
+ return
text = self._message_text(msg_obj, event_obj)
if not text:
self.log.debug(
@@ -2482,7 +2524,7 @@ class WhatsAppClient(ClientBase):
).strip()
ts = self._normalize_timestamp(raw_ts)
- reaction_payload = self._extract_reaction_event(msg_obj)
+ reaction_payload = self._extract_reaction_event(msg_obj or event_obj)
if reaction_payload:
self.log.debug(
"reaction-bridge whatsapp-inbound msg_id=%s target_id=%s emoji=%s remove=%s sender=%s chat=%s",
@@ -2508,6 +2550,26 @@ class WhatsAppClient(ClientBase):
)
)
for identifier in identifiers:
+ try:
+ await history.apply_reaction(
+ identifier.user,
+ identifier,
+ target_message_id=str(
+ reaction_payload.get("target_message_id") or ""
+ ),
+ target_ts=int(reaction_payload.get("target_ts") or 0),
+ emoji=str(reaction_payload.get("emoji") or ""),
+ source_service="whatsapp",
+ actor=str(sender or chat or ""),
+ remove=bool(reaction_payload.get("remove")),
+ payload={
+ "event": "reaction",
+ "message_id": msg_id,
+ "raw": reaction_payload.get("raw") or {},
+ },
+ )
+ except Exception as exc:
+ self.log.warning("whatsapp reaction local apply failed: %s", exc)
try:
await self.ur.xmpp.client.apply_external_reaction(
identifier.user,
@@ -2527,6 +2589,21 @@ class WhatsAppClient(ClientBase):
)
except Exception as exc:
self.log.warning("whatsapp reaction relay to XMPP failed: %s", exc)
+ try:
+ await self.ur.presence_changed(
+ self.service,
+ identifier=identifier.identifier,
+ state="available",
+ confidence=0.9,
+ ts=int(ts or int(time.time() * 1000)),
+ payload={
+ "event": "reaction",
+ "inferred_from": "reaction",
+ "message_id": msg_id,
+ },
+ )
+ except Exception:
+ pass
return
self._remember_contact(
@@ -2907,14 +2984,42 @@ class WhatsAppClient(ClientBase):
is_unavailable = bool(
self._pluck(event, "Unavailable") or self._pluck(event, "unavailable")
)
+ last_seen_raw = (
+ self._pluck(event, "LastSeen")
+ or self._pluck(event, "lastSeen")
+ or self._pluck(event, "last_seen")
+ or self._pluck(event, "Timestamp")
+ or self._pluck(event, "timestamp")
+ or 0
+ )
+ last_seen_ts = self._normalize_timestamp(last_seen_raw)
self._remember_contact(sender, jid=sender)
for candidate in self._normalize_identifier_candidates(sender):
+ try:
+ await self.ur.presence_changed(
+ self.service,
+ identifier=candidate,
+ state=("unavailable" if is_unavailable else "available"),
+ confidence=0.9 if not is_unavailable else 0.8,
+ ts=int(last_seen_ts or int(time.time() * 1000)),
+ payload={
+ "presence": ("offline" if is_unavailable else "online"),
+ "sender": str(sender),
+ "last_seen_ts": int(last_seen_ts or 0),
+ },
+ )
+ except Exception:
+ pass
if is_unavailable:
await self.ur.stopped_typing(
self.service,
identifier=candidate,
- payload={"presence": "offline", "sender": str(sender)},
+ payload={
+ "presence": "offline",
+ "sender": str(sender),
+ "last_seen_ts": int(last_seen_ts or 0),
+ },
)
def _extract_pair_qr(self, event):
diff --git a/core/management/commands/backfill_contact_availability.py b/core/management/commands/backfill_contact_availability.py
new file mode 100644
index 0000000..b0d2b25
--- /dev/null
+++ b/core/management/commands/backfill_contact_availability.py
@@ -0,0 +1,123 @@
+from __future__ import annotations
+
+from typing import Iterable
+
+from django.core.management.base import BaseCommand
+
+from core.models import Message, User
+from core.presence import AvailabilitySignal, record_inferred_signal
+from core.presence.inference import now_ms
+
+
+class Command(BaseCommand):
+ help = "Backfill inferred contact availability events from historical message/read-receipt activity."
+
+ def add_arguments(self, parser):
+ parser.add_argument("--days", type=int, default=30)
+ parser.add_argument("--limit", type=int, default=5000)
+ parser.add_argument("--service", default="")
+ parser.add_argument("--user-id", default="")
+ parser.add_argument("--dry-run", action="store_true", default=False)
+
+ def _iter_messages(self, *, days: int, limit: int, service: str, user_id: str) -> Iterable[Message]:
+ cutoff_ts = now_ms() - (max(1, int(days)) * 24 * 60 * 60 * 1000)
+ qs = Message.objects.filter(ts__gte=cutoff_ts).select_related(
+ "user", "session", "session__identifier", "session__identifier__person"
+ )
+ if service:
+ qs = qs.filter(source_service=str(service).strip().lower())
+ if user_id:
+ qs = qs.filter(user_id=str(user_id).strip())
+ return qs.order_by("ts")[: max(1, int(limit))]
+
+ def handle(self, *args, **options):
+ days = max(1, int(options.get("days") or 30))
+ limit = max(1, int(options.get("limit") or 5000))
+ service_filter = str(options.get("service") or "").strip().lower()
+ user_filter = str(options.get("user_id") or "").strip()
+ dry_run = bool(options.get("dry_run"))
+
+ created = 0
+ scanned = 0
+
+ for msg in self._iter_messages(days=days, limit=limit, service=service_filter, user_id=user_filter):
+ scanned += 1
+ identifier = getattr(getattr(msg, "session", None), "identifier", None)
+ person = getattr(identifier, "person", None)
+ user = getattr(msg, "user", None)
+ if not identifier or not person or not user:
+ continue
+
+ service = str(getattr(msg, "source_service", "") or identifier.service or "").strip().lower()
+ if not service:
+ continue
+
+ base_ts = int(getattr(msg, "ts", 0) or 0)
+ message_author = str(getattr(msg, "custom_author", "") or "").strip().upper()
+ outgoing = message_author in {"USER", "BOT"}
+
+ candidates = []
+ if base_ts > 0:
+ candidates.append(
+ {
+ "source_kind": "message_out" if outgoing else "message_in",
+ "availability_state": "available",
+ "confidence": 0.65 if outgoing else 0.75,
+ "ts": base_ts,
+ "payload": {
+ "origin": "backfill_contact_availability",
+ "message_id": str(msg.id),
+ "inferred_from": "message_activity",
+ },
+ }
+ )
+
+ read_ts = int(getattr(msg, "read_ts", 0) or 0)
+ if read_ts > 0:
+ candidates.append(
+ {
+ "source_kind": "read_receipt",
+ "availability_state": "available",
+ "confidence": 0.95,
+ "ts": read_ts,
+ "payload": {
+ "origin": "backfill_contact_availability",
+ "message_id": str(msg.id),
+ "inferred_from": "read_receipt",
+ "read_by": str(getattr(msg, "read_by_identifier", "") or ""),
+ },
+ }
+ )
+
+ for row in candidates:
+ exists = user.contact_availability_events.filter(
+ person=person,
+ person_identifier=identifier,
+ service=service,
+ source_kind=row["source_kind"],
+ ts=int(row["ts"]),
+ ).exists()
+ if exists:
+ continue
+ created += 1
+ if dry_run:
+ continue
+ record_inferred_signal(
+ AvailabilitySignal(
+ user=user,
+ person=person,
+ person_identifier=identifier,
+ service=service,
+ source_kind=row["source_kind"],
+ availability_state=row["availability_state"],
+ confidence=float(row["confidence"]),
+ ts=int(row["ts"]),
+ payload=dict(row["payload"]),
+ )
+ )
+
+ self.stdout.write(
+ self.style.SUCCESS(
+ f"backfill_contact_availability complete scanned={scanned} created={created} dry_run={dry_run} days={days} limit={limit}"
+ )
+ )
diff --git a/core/management/commands/codex_worker.py b/core/management/commands/codex_worker.py
new file mode 100644
index 0000000..dd3bf7d
--- /dev/null
+++ b/core/management/commands/codex_worker.py
@@ -0,0 +1,127 @@
+from __future__ import annotations
+
+import time
+
+from django.core.management.base import BaseCommand
+
+from core.models import ExternalSyncEvent, TaskProviderConfig
+from core.tasks.providers import get_provider
+from core.util import logs
+
+log = logs.get_logger("codex_worker")
+
+
+class Command(BaseCommand):
+ help = "Process queued external sync events for worker-backed providers (codex_cli)."
+
+ def add_arguments(self, parser):
+ parser.add_argument("--once", action="store_true", default=False)
+ parser.add_argument("--sleep-seconds", type=float, default=2.0)
+ parser.add_argument("--batch-size", type=int, default=20)
+ parser.add_argument("--provider", default="codex_cli")
+
+ def _claim_batch(self, provider: str, batch_size: int) -> list[str]:
+ ids: list[str] = []
+ rows = list(
+ ExternalSyncEvent.objects.filter(
+ provider=provider,
+ status__in=["pending", "retrying"],
+ )
+ .order_by("updated_at")[: max(1, batch_size)]
+ .values_list("id", flat=True)
+ )
+ for row_id in rows:
+ updated = ExternalSyncEvent.objects.filter(
+ id=row_id,
+ provider=provider,
+ status__in=["pending", "retrying"],
+ ).update(status="retrying")
+ if updated:
+ ids.append(str(row_id))
+ return ids
+
+ def _run_event(self, event: ExternalSyncEvent) -> None:
+ provider = get_provider(event.provider)
+ if not bool(getattr(provider, "run_in_worker", False)):
+ return
+
+ cfg = (
+ TaskProviderConfig.objects.filter(
+ user=event.user,
+ provider=event.provider,
+ enabled=True,
+ )
+ .order_by("-updated_at")
+ .first()
+ )
+ if cfg is None:
+ event.status = "failed"
+ event.error = "provider_disabled_or_missing"
+ event.save(update_fields=["status", "error", "updated_at"])
+ return
+
+ payload = dict(event.payload or {})
+ action = str(payload.get("action") or "append_update").strip().lower()
+ provider_payload = dict(payload.get("provider_payload") or payload)
+
+ if action == "create":
+ result = provider.create_task(dict(cfg.settings or {}), provider_payload)
+ elif action == "complete":
+ result = provider.mark_complete(dict(cfg.settings or {}), provider_payload)
+ elif action == "link_task":
+ result = provider.link_task(dict(cfg.settings or {}), provider_payload)
+ else:
+ result = provider.append_update(dict(cfg.settings or {}), provider_payload)
+
+ event.status = "ok" if result.ok else "failed"
+ event.error = str(result.error or "")
+ event.payload = dict(
+ payload,
+ worker_processed=True,
+ result=dict(result.payload or {}),
+ )
+ event.save(update_fields=["status", "error", "payload", "updated_at"])
+
+ if result.ok and result.external_key and event.task_id and not str(event.task.external_key or "").strip():
+ event.task.external_key = str(result.external_key)
+ event.task.save(update_fields=["external_key"])
+
+ def handle(self, *args, **options):
+ once = bool(options.get("once"))
+ sleep_seconds = max(0.2, float(options.get("sleep_seconds") or 2.0))
+ batch_size = max(1, int(options.get("batch_size") or 20))
+ provider_name = str(options.get("provider") or "codex_cli").strip().lower()
+
+ log.info(
+ "codex_worker started provider=%s once=%s sleep=%s batch_size=%s",
+ provider_name,
+ once,
+ sleep_seconds,
+ batch_size,
+ )
+
+ while True:
+ claimed_ids = self._claim_batch(provider_name, batch_size)
+ if not claimed_ids:
+ if once:
+ log.info("codex_worker exiting: no pending events")
+ return
+ time.sleep(sleep_seconds)
+ continue
+
+ for row_id in claimed_ids:
+ event = ExternalSyncEvent.objects.filter(id=row_id).select_related("task", "user").first()
+ if event is None:
+ continue
+ try:
+ self._run_event(event)
+ except Exception as exc:
+ log.exception("codex_worker failed processing id=%s", row_id)
+ ExternalSyncEvent.objects.filter(id=row_id).update(
+ status="failed",
+ error=f"worker_exception:{exc}",
+ )
+
+ if once:
+ log.info("codex_worker processed %s event(s)", len(claimed_ids))
+ return
diff --git a/core/management/commands/recalculate_contact_availability.py b/core/management/commands/recalculate_contact_availability.py
new file mode 100644
index 0000000..03d358f
--- /dev/null
+++ b/core/management/commands/recalculate_contact_availability.py
@@ -0,0 +1,243 @@
+from __future__ import annotations
+
+from collections import defaultdict
+
+from django.core.management.base import BaseCommand
+
+from core.models import ContactAvailabilityEvent, ContactAvailabilitySpan, Message
+from core.presence import AvailabilitySignal, record_native_signal
+from core.presence.inference import now_ms
+
+
+_SOURCE_ORDER = {
+ "message_in": 10,
+ "message_out": 20,
+ "read_receipt": 30,
+ "native_presence": 40,
+}
+
+
+class Command(BaseCommand):
+ help = (
+ "Recalculate contact availability events/spans from persisted message, "
+ "read-receipt, and reaction history (deterministic rebuild)."
+ )
+
+ def add_arguments(self, parser):
+ parser.add_argument("--days", type=int, default=90)
+ parser.add_argument("--limit", type=int, default=20000)
+ parser.add_argument("--service", default="")
+ parser.add_argument("--user-id", default="")
+ parser.add_argument("--dry-run", action="store_true", default=False)
+ parser.add_argument("--no-reset", action="store_true", default=False)
+
+ def _iter_messages(self, *, days: int, limit: int, service: str, user_id: str):
+ cutoff_ts = now_ms() - (max(1, int(days)) * 24 * 60 * 60 * 1000)
+ qs = Message.objects.filter(ts__gte=cutoff_ts).select_related(
+ "user", "session", "session__identifier", "session__identifier__person"
+ )
+ if service:
+ qs = qs.filter(source_service=str(service).strip().lower())
+ if user_id:
+ qs = qs.filter(user_id=str(user_id).strip())
+ return qs.order_by("ts")[: max(1, int(limit))]
+
+ def _build_event_rows(self, messages):
+ rows = []
+ for msg in messages:
+ identifier = getattr(getattr(msg, "session", None), "identifier", None)
+ person = getattr(identifier, "person", None)
+ user = getattr(msg, "user", None)
+ if not identifier or not person or not user:
+ continue
+
+ service = str(
+ getattr(msg, "source_service", "") or getattr(identifier, "service", "")
+ ).strip().lower()
+ if not service:
+ continue
+
+ ts = int(getattr(msg, "ts", 0) or 0)
+ if ts > 0:
+ author = str(getattr(msg, "custom_author", "") or "").strip().upper()
+ outgoing = author in {"USER", "BOT"}
+ rows.append(
+ {
+ "user": user,
+ "person": person,
+ "person_identifier": identifier,
+ "service": service,
+ "source_kind": "message_out" if outgoing else "message_in",
+ "availability_state": "available",
+ "confidence": 0.65 if outgoing else 0.75,
+ "ts": ts,
+ "payload": {
+ "origin": "recalculate_contact_availability",
+ "message_id": str(msg.id),
+ "inferred_from": "message_activity",
+ },
+ }
+ )
+
+ read_ts = int(getattr(msg, "read_ts", 0) or 0)
+ if read_ts > 0:
+ rows.append(
+ {
+ "user": user,
+ "person": person,
+ "person_identifier": identifier,
+ "service": service,
+ "source_kind": "read_receipt",
+ "availability_state": "available",
+ "confidence": 0.95,
+ "ts": read_ts,
+ "payload": {
+ "origin": "recalculate_contact_availability",
+ "message_id": str(msg.id),
+ "inferred_from": "read_receipt",
+ "read_by": str(getattr(msg, "read_by_identifier", "") or ""),
+ },
+ }
+ )
+
+ reactions = list((getattr(msg, "receipt_payload", {}) or {}).get("reactions") or [])
+ for reaction in reactions:
+ item = dict(reaction or {})
+ if bool(item.get("removed")):
+ continue
+ reaction_ts = int(item.get("updated_at") or 0)
+ if reaction_ts <= 0:
+ continue
+ rows.append(
+ {
+ "user": user,
+ "person": person,
+ "person_identifier": identifier,
+ "service": service,
+ "source_kind": "native_presence",
+ "availability_state": "available",
+ "confidence": 0.9,
+ "ts": reaction_ts,
+ "payload": {
+ "origin": "recalculate_contact_availability",
+ "message_id": str(msg.id),
+ "inferred_from": "reaction",
+ "emoji": str(item.get("emoji") or ""),
+ "actor": str(item.get("actor") or ""),
+ "source_service": str(item.get("source_service") or service),
+ },
+ }
+ )
+
+ rows.sort(
+ key=lambda row: (
+ str(getattr(row["user"], "id", "")),
+ str(getattr(row["person"], "id", "")),
+ str(row.get("service") or ""),
+ int(row.get("ts") or 0),
+ _SOURCE_ORDER.get(str(row.get("source_kind") or ""), 999),
+ str((row.get("payload") or {}).get("message_id") or ""),
+ )
+ )
+ return rows
+
+ def handle(self, *args, **options):
+ days = max(1, int(options.get("days") or 90))
+ limit = max(1, int(options.get("limit") or 20000))
+ service_filter = str(options.get("service") or "").strip().lower()
+ user_filter = str(options.get("user_id") or "").strip()
+ dry_run = bool(options.get("dry_run"))
+ reset = not bool(options.get("no_reset"))
+ cutoff_ts = now_ms() - (days * 24 * 60 * 60 * 1000)
+
+ messages = list(
+ self._iter_messages(
+ days=days,
+ limit=limit,
+ service=service_filter,
+ user_id=user_filter,
+ )
+ )
+ rows = self._build_event_rows(messages)
+
+ keys_to_reset = set()
+ for row in rows:
+ keys_to_reset.add(
+ (
+ str(getattr(row["user"], "id", "")),
+ str(getattr(row["person"], "id", "")),
+ str(row.get("service") or ""),
+ )
+ )
+
+ deleted_events = 0
+ deleted_spans = 0
+ if reset and keys_to_reset and not dry_run:
+ for user_id, person_id, service in keys_to_reset:
+ deleted_events += ContactAvailabilityEvent.objects.filter(
+ user_id=user_id,
+ person_id=person_id,
+ service=service,
+ ts__gte=cutoff_ts,
+ ).delete()[0]
+ deleted_spans += ContactAvailabilitySpan.objects.filter(
+ user_id=user_id,
+ person_id=person_id,
+ service=service,
+ end_ts__gte=cutoff_ts,
+ ).delete()[0]
+
+ created = 0
+ dedup_seen = set()
+ for row in rows:
+ dedup_key = (
+ str(getattr(row["user"], "id", "")),
+ str(getattr(row["person"], "id", "")),
+ str(getattr(row["person_identifier"], "id", "")),
+ str(row.get("service") or ""),
+ str(row.get("source_kind") or ""),
+ int(row.get("ts") or 0),
+ str((row.get("payload") or {}).get("message_id") or ""),
+ str((row.get("payload") or {}).get("inferred_from") or ""),
+ )
+ if dedup_key in dedup_seen:
+ continue
+ dedup_seen.add(dedup_key)
+
+ if not reset:
+ exists = ContactAvailabilityEvent.objects.filter(
+ user=row["user"],
+ person=row["person"],
+ person_identifier=row["person_identifier"],
+ service=row["service"],
+ source_kind=row["source_kind"],
+ ts=row["ts"],
+ ).exists()
+ if exists:
+ continue
+
+ created += 1
+ if dry_run:
+ continue
+ record_native_signal(
+ AvailabilitySignal(
+ user=row["user"],
+ person=row["person"],
+ person_identifier=row["person_identifier"],
+ service=row["service"],
+ source_kind=row["source_kind"],
+ availability_state=row["availability_state"],
+ confidence=float(row["confidence"]),
+ ts=int(row["ts"]),
+ payload=dict(row["payload"]),
+ )
+ )
+
+ self.stdout.write(
+ self.style.SUCCESS(
+ "recalculate_contact_availability complete "
+ f"messages_scanned={len(messages)} candidates={len(rows)} "
+ f"created={created} deleted_events={deleted_events} deleted_spans={deleted_spans} "
+ f"reset={reset} dry_run={dry_run} days={days} limit={limit}"
+ )
+ )
diff --git a/core/messaging/history.py b/core/messaging/history.py
index 076acdd..41c4853 100644
--- a/core/messaging/history.py
+++ b/core/messaging/history.py
@@ -1,5 +1,6 @@
from asgiref.sync import sync_to_async
from django.conf import settings
+import uuid
from core.messaging.utils import messages_to_string
from core.models import ChatSession, Message, QueuedMessage
@@ -316,9 +317,21 @@ async def apply_reaction(
target = None
target_uuid = str(target_message_id or "").strip()
if target_uuid:
- target = await sync_to_async(
- lambda: queryset.filter(id=target_uuid).order_by("-ts").first()
- )()
+ is_uuid = True
+ try:
+ uuid.UUID(str(target_uuid))
+ except Exception:
+ is_uuid = False
+ if is_uuid:
+ target = await sync_to_async(
+ lambda: queryset.filter(id=target_uuid).order_by("-ts").first()
+ )()
+ if target is None:
+ target = await sync_to_async(
+ lambda: queryset.filter(source_message_id=target_uuid)
+ .order_by("-ts")
+ .first()
+ )()
if target is None:
try:
diff --git a/core/migrations/0033_contactavailability_and_externalchatlink.py b/core/migrations/0033_contactavailability_and_externalchatlink.py
new file mode 100644
index 0000000..93eefc1
--- /dev/null
+++ b/core/migrations/0033_contactavailability_and_externalchatlink.py
@@ -0,0 +1,236 @@
+from django.conf import settings
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("core", "0032_commandvariantpolicy_store_document"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="ContactAvailabilitySettings",
+ fields=[
+ ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
+ ("enabled", models.BooleanField(default=True)),
+ ("show_in_chat", models.BooleanField(default=True)),
+ ("show_in_groups", models.BooleanField(default=True)),
+ ("inference_enabled", models.BooleanField(default=True)),
+ ("retention_days", models.PositiveIntegerField(default=90)),
+ ("fade_threshold_seconds", models.PositiveIntegerField(default=900)),
+ ("created_at", models.DateTimeField(auto_now_add=True)),
+ ("updated_at", models.DateTimeField(auto_now=True)),
+ (
+ "user",
+ models.OneToOneField(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="contact_availability_settings",
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ ],
+ ),
+ migrations.CreateModel(
+ name="ContactAvailabilityEvent",
+ fields=[
+ ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
+ (
+ "service",
+ models.CharField(
+ choices=[("signal", "Signal"), ("whatsapp", "WhatsApp"), ("xmpp", "XMPP"), ("instagram", "Instagram"), ("web", "Web")],
+ max_length=255,
+ ),
+ ),
+ (
+ "source_kind",
+ models.CharField(
+ choices=[
+ ("native_presence", "Native Presence"),
+ ("read_receipt", "Read Receipt"),
+ ("typing_start", "Typing Start"),
+ ("typing_stop", "Typing Stop"),
+ ("message_in", "Message In"),
+ ("message_out", "Message Out"),
+ ("inferred_timeout", "Inferred Timeout"),
+ ],
+ max_length=32,
+ ),
+ ),
+ (
+ "availability_state",
+ models.CharField(
+ choices=[("available", "Available"), ("unavailable", "Unavailable"), ("unknown", "Unknown"), ("fading", "Fading")],
+ max_length=32,
+ ),
+ ),
+ ("confidence", models.FloatField(default=0.0)),
+ ("ts", models.BigIntegerField(db_index=True)),
+ ("payload", models.JSONField(blank=True, default=dict)),
+ ("created_at", models.DateTimeField(auto_now_add=True)),
+ (
+ "person",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="availability_events",
+ to="core.person",
+ ),
+ ),
+ (
+ "person_identifier",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="availability_events",
+ to="core.personidentifier",
+ ),
+ ),
+ (
+ "user",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="contact_availability_events",
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ ],
+ options={
+ "indexes": [
+ models.Index(fields=["user", "person", "ts"], name="core_contac_user_id_0da9b2_idx"),
+ models.Index(fields=["user", "service", "ts"], name="core_contac_user_id_bce271_idx"),
+ models.Index(fields=["user", "availability_state", "ts"], name="core_contac_user_id_1b50b3_idx"),
+ ],
+ },
+ ),
+ migrations.CreateModel(
+ name="ExternalChatLink",
+ fields=[
+ ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
+ ("provider", models.CharField(default="codex_cli", max_length=64)),
+ ("external_chat_id", models.CharField(max_length=255)),
+ ("metadata", models.JSONField(blank=True, default=dict)),
+ ("enabled", models.BooleanField(default=True)),
+ ("created_at", models.DateTimeField(auto_now_add=True)),
+ ("updated_at", models.DateTimeField(auto_now=True)),
+ (
+ "person",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="external_chat_links",
+ to="core.person",
+ ),
+ ),
+ (
+ "person_identifier",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="external_chat_links",
+ to="core.personidentifier",
+ ),
+ ),
+ (
+ "user",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="external_chat_links",
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ ],
+ options={
+ "indexes": [
+ models.Index(fields=["user", "provider", "external_chat_id"], name="core_extern_user_id_f4a7b0_idx"),
+ models.Index(fields=["user", "provider", "enabled"], name="core_extern_user_id_7d2295_idx"),
+ ],
+ "constraints": [
+ models.UniqueConstraint(fields=("user", "provider", "external_chat_id"), name="unique_external_chat_link_per_provider"),
+ ],
+ },
+ ),
+ migrations.CreateModel(
+ name="ContactAvailabilitySpan",
+ fields=[
+ ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
+ (
+ "service",
+ models.CharField(
+ choices=[("signal", "Signal"), ("whatsapp", "WhatsApp"), ("xmpp", "XMPP"), ("instagram", "Instagram"), ("web", "Web")],
+ max_length=255,
+ ),
+ ),
+ (
+ "state",
+ models.CharField(
+ choices=[("available", "Available"), ("unavailable", "Unavailable"), ("unknown", "Unknown"), ("fading", "Fading")],
+ max_length=32,
+ ),
+ ),
+ ("start_ts", models.BigIntegerField(db_index=True)),
+ ("end_ts", models.BigIntegerField(db_index=True)),
+ ("confidence_start", models.FloatField(default=0.0)),
+ ("confidence_end", models.FloatField(default=0.0)),
+ ("payload", models.JSONField(blank=True, default=dict)),
+ ("created_at", models.DateTimeField(auto_now_add=True)),
+ ("updated_at", models.DateTimeField(auto_now=True)),
+ (
+ "closing_event",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="closing_spans",
+ to="core.contactavailabilityevent",
+ ),
+ ),
+ (
+ "opening_event",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="opening_spans",
+ to="core.contactavailabilityevent",
+ ),
+ ),
+ (
+ "person",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="availability_spans",
+ to="core.person",
+ ),
+ ),
+ (
+ "person_identifier",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="availability_spans",
+ to="core.personidentifier",
+ ),
+ ),
+ (
+ "user",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="contact_availability_spans",
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ ],
+ options={
+ "indexes": [
+ models.Index(fields=["user", "person", "start_ts"], name="core_contac_user_id_9cd15a_idx"),
+ models.Index(fields=["user", "person", "end_ts"], name="core_contac_user_id_88584a_idx"),
+ models.Index(fields=["user", "service", "start_ts"], name="core_contac_user_id_182ffb_idx"),
+ ],
+ },
+ ),
+ ]
diff --git a/core/models.py b/core/models.py
index a97c5fa..435e9fe 100644
--- a/core/models.py
+++ b/core/models.py
@@ -20,14 +20,14 @@ SERVICE_CHOICES = (
)
CHANNEL_SERVICE_CHOICES = SERVICE_CHOICES + (("web", "Web"),)
MBTI_CHOICES = (
- ("INTJ", "INTJ - Architect"),
+ ("INTJ", "INTJ - Architect"),# ;)
("INTP", "INTP - Logician"),
("ENTJ", "ENTJ - Commander"),
("ENTP", "ENTP - Debater"),
("INFJ", "INFJ - Advocate"),
("INFP", "INFP - Mediator"),
("ENFJ", "ENFJ - Protagonist"),
- ("ENFP", "ENFP - Campaigner"),
+ ("ENFP", "ENFP - Campaigner"), # <3
("ISTJ", "ISTJ - Logistician"),
("ISFJ", "ISFJ - Defender"),
("ESTJ", "ESTJ - Executive"),
@@ -2227,6 +2227,164 @@ class TaskProviderConfig(models.Model):
]
+class ContactAvailabilitySettings(models.Model):
+ user = models.OneToOneField(
+ User,
+ on_delete=models.CASCADE,
+ related_name="contact_availability_settings",
+ )
+ enabled = models.BooleanField(default=True)
+ show_in_chat = models.BooleanField(default=True)
+ show_in_groups = models.BooleanField(default=True)
+ inference_enabled = models.BooleanField(default=True)
+ retention_days = models.PositiveIntegerField(default=90)
+ fade_threshold_seconds = models.PositiveIntegerField(default=900)
+ created_at = models.DateTimeField(auto_now_add=True)
+ updated_at = models.DateTimeField(auto_now=True)
+
+
+class ContactAvailabilityEvent(models.Model):
+ SOURCE_KIND_CHOICES = (
+ ("native_presence", "Native Presence"),
+ ("read_receipt", "Read Receipt"),
+ ("typing_start", "Typing Start"),
+ ("typing_stop", "Typing Stop"),
+ ("message_in", "Message In"),
+ ("message_out", "Message Out"),
+ ("inferred_timeout", "Inferred Timeout"),
+ )
+ STATE_CHOICES = (
+ ("available", "Available"),
+ ("unavailable", "Unavailable"),
+ ("unknown", "Unknown"),
+ ("fading", "Fading"),
+ )
+
+ user = models.ForeignKey(
+ User,
+ on_delete=models.CASCADE,
+ related_name="contact_availability_events",
+ )
+ person = models.ForeignKey(
+ Person,
+ on_delete=models.CASCADE,
+ related_name="availability_events",
+ )
+ person_identifier = models.ForeignKey(
+ PersonIdentifier,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="availability_events",
+ )
+ service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES)
+ source_kind = models.CharField(max_length=32, choices=SOURCE_KIND_CHOICES)
+ availability_state = models.CharField(max_length=32, choices=STATE_CHOICES)
+ confidence = models.FloatField(default=0.0)
+ ts = models.BigIntegerField(db_index=True)
+ payload = models.JSONField(default=dict, blank=True)
+ created_at = models.DateTimeField(auto_now_add=True)
+
+ class Meta:
+ indexes = [
+ models.Index(fields=["user", "person", "ts"]),
+ models.Index(fields=["user", "service", "ts"]),
+ models.Index(fields=["user", "availability_state", "ts"]),
+ ]
+
+
+class ContactAvailabilitySpan(models.Model):
+ STATE_CHOICES = ContactAvailabilityEvent.STATE_CHOICES
+
+ user = models.ForeignKey(
+ User,
+ on_delete=models.CASCADE,
+ related_name="contact_availability_spans",
+ )
+ person = models.ForeignKey(
+ Person,
+ on_delete=models.CASCADE,
+ related_name="availability_spans",
+ )
+ person_identifier = models.ForeignKey(
+ PersonIdentifier,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="availability_spans",
+ )
+ service = models.CharField(max_length=255, choices=CHANNEL_SERVICE_CHOICES)
+ state = models.CharField(max_length=32, choices=STATE_CHOICES)
+ start_ts = models.BigIntegerField(db_index=True)
+ end_ts = models.BigIntegerField(db_index=True)
+ confidence_start = models.FloatField(default=0.0)
+ confidence_end = models.FloatField(default=0.0)
+ opening_event = models.ForeignKey(
+ ContactAvailabilityEvent,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="opening_spans",
+ )
+ closing_event = models.ForeignKey(
+ ContactAvailabilityEvent,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="closing_spans",
+ )
+ payload = models.JSONField(default=dict, blank=True)
+ created_at = models.DateTimeField(auto_now_add=True)
+ updated_at = models.DateTimeField(auto_now=True)
+
+ class Meta:
+ indexes = [
+ models.Index(fields=["user", "person", "start_ts"]),
+ models.Index(fields=["user", "person", "end_ts"]),
+ models.Index(fields=["user", "service", "start_ts"]),
+ ]
+
+
+class ExternalChatLink(models.Model):
+ user = models.ForeignKey(
+ User,
+ on_delete=models.CASCADE,
+ related_name="external_chat_links",
+ )
+ provider = models.CharField(max_length=64, default="codex_cli")
+ person = models.ForeignKey(
+ Person,
+ on_delete=models.CASCADE,
+ null=True,
+ blank=True,
+ related_name="external_chat_links",
+ )
+ person_identifier = models.ForeignKey(
+ PersonIdentifier,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="external_chat_links",
+ )
+ external_chat_id = models.CharField(max_length=255)
+ metadata = models.JSONField(default=dict, blank=True)
+ enabled = models.BooleanField(default=True)
+ created_at = models.DateTimeField(auto_now_add=True)
+ updated_at = models.DateTimeField(auto_now=True)
+
+ class Meta:
+ indexes = [
+ models.Index(fields=["user", "provider", "external_chat_id"]),
+ models.Index(fields=["user", "provider", "enabled"]),
+ ]
+ constraints = [
+ models.UniqueConstraint(
+ fields=["user", "provider", "external_chat_id"],
+ name="unique_external_chat_link_per_provider",
+ )
+ ]
+
+
class TaskCompletionPattern(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="task_completion_patterns")
diff --git a/core/modules/router.py b/core/modules/router.py
index efd1cef..8b9bf29 100644
--- a/core/modules/router.py
+++ b/core/modules/router.py
@@ -1,4 +1,5 @@
import asyncio
+import re
from asgiref.sync import sync_to_async
from django.conf import settings
@@ -13,6 +14,7 @@ from core.commands.base import CommandContext
from core.commands.engine import process_inbound_message
from core.messaging import history
from core.models import PersonIdentifier
+from core.presence import AvailabilitySignal, record_native_signal
from core.realtime.typing_state import set_person_typing_state
from core.translation.engine import process_inbound_translation
from core.util import logs
@@ -100,6 +102,32 @@ class UnifiedRouter(object):
message_text = str(kwargs.get("text") or "").strip()
if local_message is None:
return
+ identifiers = await self._resolve_identifier_objects(protocol, identifier)
+ if identifiers:
+ outgoing = str(getattr(local_message, "custom_author", "") or "").strip().upper() in {
+ "USER",
+ "BOT",
+ }
+ source_kind = "message_out" if outgoing else "message_in"
+ confidence = 0.65 if outgoing else 0.75
+ for row in identifiers:
+ record_native_signal(
+ AvailabilitySignal(
+ user=row.user,
+ person=row.person,
+ person_identifier=row,
+ service=str(protocol or "").strip().lower(),
+ source_kind=source_kind,
+ availability_state="available",
+ confidence=confidence,
+ ts=int(getattr(local_message, "ts", 0) or 0),
+ payload={
+ "origin": "router.message_received",
+ "message_id": str(getattr(local_message, "id", "") or ""),
+ "outgoing": outgoing,
+ },
+ )
+ )
channel_identifier = ""
if isinstance(identifier, PersonIdentifier):
channel_identifier = str(identifier.identifier or "").strip()
@@ -127,6 +155,31 @@ class UnifiedRouter(object):
await process_inbound_assist(local_message)
except Exception as exc:
self.log.warning("Assist/task processing failed: %s", exc)
+ await self._refresh_workspace_metrics_for_identifiers(identifiers)
+
+ async def _refresh_workspace_metrics_for_identifiers(self, identifiers):
+ if not identifiers:
+ return
+ seen = set()
+ for row in identifiers:
+ person = getattr(row, "person", None)
+ user = getattr(row, "user", None)
+ if person is None or user is None:
+ continue
+ person_key = str(getattr(person, "id", "") or "")
+ if not person_key or person_key in seen:
+ continue
+ seen.add(person_key)
+ try:
+ from core.views.workspace import _conversation_for_person
+
+ await sync_to_async(_conversation_for_person)(user, person)
+ except Exception as exc:
+ self.log.warning(
+ "Workspace metrics refresh failed for person=%s: %s",
+ person_key,
+ exc,
+ )
async def _resolve_identifier_objects(self, protocol, identifier):
if isinstance(identifier, PersonIdentifier):
@@ -134,9 +187,28 @@ class UnifiedRouter(object):
value = str(identifier or "").strip()
if not value:
return []
+ variants = [value]
+ bare = value.split("@", 1)[0].strip()
+ if bare and bare not in variants:
+ variants.append(bare)
+ if protocol == "signal":
+ digits = re.sub(r"[^0-9]", "", value)
+ if digits and digits not in variants:
+ variants.append(digits)
+ if digits:
+ plus = f"+{digits}"
+ if plus not in variants:
+ variants.append(plus)
+ elif protocol == "whatsapp" and bare:
+ direct = f"{bare}@s.whatsapp.net"
+ group = f"{bare}@g.us"
+ if direct not in variants:
+ variants.append(direct)
+ if group not in variants:
+ variants.append(group)
return await sync_to_async(list)(
PersonIdentifier.objects.filter(
- identifier=value,
+ identifier__in=variants,
service=protocol,
)
)
@@ -160,12 +232,75 @@ class UnifiedRouter(object):
read_by_identifier=read_by or row.identifier,
payload=payload,
)
+ record_native_signal(
+ AvailabilitySignal(
+ user=row.user,
+ person=row.person,
+ person_identifier=row,
+ service=str(protocol or "").strip().lower(),
+ source_kind="read_receipt",
+ availability_state="available",
+ confidence=0.95,
+ ts=int(read_ts or 0),
+ payload={
+ "origin": "router.message_read",
+ "message_timestamps": [int(v) for v in list(timestamps or []) if str(v).isdigit()],
+ "read_by": str(read_by or row.identifier),
+ },
+ )
+ )
+ await self._refresh_workspace_metrics_for_identifiers(identifiers)
+
+ async def presence_changed(self, protocol, *args, **kwargs):
+ identifier = kwargs.get("identifier")
+ state = str(kwargs.get("state") or "unknown").strip().lower()
+ if state not in {"available", "unavailable", "unknown", "fading"}:
+ state = "unknown"
+ try:
+ confidence = float(kwargs.get("confidence") or 0.6)
+ except Exception:
+ confidence = 0.6
+ try:
+ ts = int(kwargs.get("ts") or 0)
+ except Exception:
+ ts = 0
+ payload = dict(kwargs.get("payload") or {})
+
+ identifiers = await self._resolve_identifier_objects(protocol, identifier)
+ for row in identifiers:
+ record_native_signal(
+ AvailabilitySignal(
+ user=row.user,
+ person=row.person,
+ person_identifier=row,
+ service=str(protocol or "").strip().lower(),
+ source_kind="native_presence",
+ availability_state=state,
+ confidence=confidence,
+ ts=ts,
+ payload=payload,
+ )
+ )
+ await self._refresh_workspace_metrics_for_identifiers(identifiers)
async def started_typing(self, protocol, *args, **kwargs):
self.log.info(f"Started typing ({protocol}) {args} {kwargs}")
identifier = kwargs.get("identifier")
identifiers = await self._resolve_identifier_objects(protocol, identifier)
for src in identifiers:
+ record_native_signal(
+ AvailabilitySignal(
+ user=src.user,
+ person=src.person,
+ person_identifier=src,
+ service=str(protocol or "").strip().lower(),
+ source_kind="typing_start",
+ availability_state="available",
+ confidence=0.9,
+ ts=0,
+ payload={"origin": "router.started_typing"},
+ )
+ )
if protocol != "xmpp":
set_person_typing_state(
user_id=src.user_id,
@@ -201,6 +336,19 @@ class UnifiedRouter(object):
identifier = kwargs.get("identifier")
identifiers = await self._resolve_identifier_objects(protocol, identifier)
for src in identifiers:
+ record_native_signal(
+ AvailabilitySignal(
+ user=src.user,
+ person=src.person,
+ person_identifier=src,
+ service=str(protocol or "").strip().lower(),
+ source_kind="typing_stop",
+ availability_state="fading",
+ confidence=0.5,
+ ts=0,
+ payload={"origin": "router.stopped_typing"},
+ )
+ )
if protocol != "xmpp":
set_person_typing_state(
user_id=src.user_id,
diff --git a/core/presence/__init__.py b/core/presence/__init__.py
new file mode 100644
index 0000000..0830f6e
--- /dev/null
+++ b/core/presence/__init__.py
@@ -0,0 +1,18 @@
+from .engine import (
+ AvailabilitySignal,
+ ensure_fading_state,
+ get_settings,
+ record_inferred_signal,
+ record_native_signal,
+)
+from .query import latest_state_for_people, spans_for_range
+
+__all__ = [
+ "AvailabilitySignal",
+ "ensure_fading_state",
+ "get_settings",
+ "record_inferred_signal",
+ "record_native_signal",
+ "latest_state_for_people",
+ "spans_for_range",
+]
diff --git a/core/presence/engine.py b/core/presence/engine.py
new file mode 100644
index 0000000..5d51e21
--- /dev/null
+++ b/core/presence/engine.py
@@ -0,0 +1,175 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from django.db import transaction
+
+from core.models import (
+ ContactAvailabilityEvent,
+ ContactAvailabilitySettings,
+ ContactAvailabilitySpan,
+ Person,
+ PersonIdentifier,
+ User,
+)
+from .inference import fade_confidence, now_ms, should_fade
+
+POSITIVE_SOURCE_KINDS = {"native_presence", "read_receipt", "typing_start", "message_in"}
+
+
+@dataclass(slots=True)
+class AvailabilitySignal:
+ user: User
+ person: Person
+ person_identifier: PersonIdentifier | None
+ service: str
+ source_kind: str
+ availability_state: str
+ confidence: float = 0.8
+ ts: int = 0
+ payload: dict | None = None
+
+
+def get_settings(user: User) -> ContactAvailabilitySettings:
+ settings_row, _ = ContactAvailabilitySettings.objects.get_or_create(user=user)
+ return settings_row
+
+
+def _normalize_ts(value: int | None) -> int:
+ try:
+ ts = int(value or 0)
+ except Exception:
+ ts = 0
+ return ts if ts > 0 else now_ms()
+
+
+def _upsert_spans_for_event(event: ContactAvailabilityEvent) -> None:
+ prev = (
+ ContactAvailabilitySpan.objects.filter(
+ user=event.user,
+ person=event.person,
+ service=event.service,
+ )
+ .order_by("-end_ts", "-id")
+ .first()
+ )
+
+ if prev and prev.state == event.availability_state:
+ prev.end_ts = max(int(prev.end_ts or 0), int(event.ts or 0))
+ prev.confidence_end = float(event.confidence or 0.0)
+ prev.closing_event = event
+ prev.payload = dict(prev.payload or {})
+ prev.payload.update({"extended_by": str(event.source_kind or "")})
+ prev.save(
+ update_fields=[
+ "end_ts",
+ "confidence_end",
+ "closing_event",
+ "payload",
+ "updated_at",
+ ]
+ )
+ return
+
+ ContactAvailabilitySpan.objects.create(
+ user=event.user,
+ person=event.person,
+ person_identifier=event.person_identifier,
+ service=event.service,
+ state=event.availability_state,
+ start_ts=int(event.ts or 0),
+ end_ts=int(event.ts or 0),
+ confidence_start=float(event.confidence or 0.0),
+ confidence_end=float(event.confidence or 0.0),
+ opening_event=event,
+ closing_event=event,
+ payload=dict(event.payload or {}),
+ )
+
+
+@transaction.atomic
+def record_native_signal(signal: AvailabilitySignal) -> ContactAvailabilityEvent | None:
+ settings_row = get_settings(signal.user)
+ if not settings_row.enabled:
+ return None
+
+ event = ContactAvailabilityEvent.objects.create(
+ user=signal.user,
+ person=signal.person,
+ person_identifier=signal.person_identifier,
+ service=str(signal.service or "").strip().lower() or "signal",
+ source_kind=str(signal.source_kind or "").strip() or "native_presence",
+ availability_state=str(signal.availability_state or "unknown").strip() or "unknown",
+ confidence=float(signal.confidence or 0.0),
+ ts=_normalize_ts(signal.ts),
+ payload=dict(signal.payload or {}),
+ )
+ _upsert_spans_for_event(event)
+ _prune_old_data(signal.user, settings_row.retention_days)
+ return event
+
+
+def record_inferred_signal(signal: AvailabilitySignal) -> ContactAvailabilityEvent | None:
+ settings_row = get_settings(signal.user)
+ if not settings_row.enabled or not settings_row.inference_enabled:
+ return None
+ return record_native_signal(signal)
+
+
+def _prune_old_data(user: User, retention_days: int) -> None:
+ days = max(1, int(retention_days or 90))
+ cutoff = now_ms() - (days * 24 * 60 * 60 * 1000)
+ ContactAvailabilityEvent.objects.filter(user=user, ts__lt=cutoff).delete()
+ ContactAvailabilitySpan.objects.filter(user=user, end_ts__lt=cutoff).delete()
+
+
+def ensure_fading_state(
+ *,
+ user: User,
+ person: Person,
+ person_identifier: PersonIdentifier | None,
+ service: str,
+ at_ts: int | None = None,
+) -> ContactAvailabilityEvent | None:
+ settings_row = get_settings(user)
+ if not settings_row.enabled or not settings_row.inference_enabled:
+ return None
+
+ current_ts = _normalize_ts(at_ts)
+ latest = (
+ ContactAvailabilityEvent.objects.filter(
+ user=user,
+ person=person,
+ service=str(service or "").strip().lower(),
+ )
+ .order_by("-ts", "-id")
+ .first()
+ )
+ if latest is None:
+ return None
+ if latest.availability_state in {"fading", "unavailable"}:
+ return None
+ if latest.source_kind not in POSITIVE_SOURCE_KINDS:
+ return None
+ if not should_fade(int(latest.ts or 0), current_ts, settings_row.fade_threshold_seconds):
+ return None
+
+ elapsed = max(0, current_ts - int(latest.ts or 0))
+ payload = {
+ "inferred_from": latest.source_kind,
+ "last_signal_ts": int(latest.ts or 0),
+ "elapsed_ms": elapsed,
+ }
+ return record_inferred_signal(
+ AvailabilitySignal(
+ user=user,
+ person=person,
+ person_identifier=person_identifier,
+ service=service,
+ source_kind="inferred_timeout",
+ availability_state="fading",
+ confidence=fade_confidence(elapsed, settings_row.fade_threshold_seconds),
+ ts=current_ts,
+ payload=payload,
+ )
+ )
diff --git a/core/presence/inference.py b/core/presence/inference.py
new file mode 100644
index 0000000..0a01a9d
--- /dev/null
+++ b/core/presence/inference.py
@@ -0,0 +1,28 @@
+from __future__ import annotations
+
+import time
+
+
+def now_ms() -> int:
+ return int(time.time() * 1000)
+
+
+def fade_confidence(elapsed_ms: int, fade_threshold_seconds: int) -> float:
+ """
+ Convert elapsed inactivity to a confidence score for fading state.
+
+ Confidence starts at 0.7 when fading begins and decays toward 0.2 over
+ 4x fade threshold windows.
+ """
+ threshold_ms = max(1, int(fade_threshold_seconds or 900) * 1000)
+ if elapsed_ms <= threshold_ms:
+ return 0.7
+ over = min(4.0, float(elapsed_ms - threshold_ms) / float(threshold_ms))
+ return max(0.2, 0.7 - (over * 0.125))
+
+
+def should_fade(last_event_ts: int, now_ts: int, fade_threshold_seconds: int) -> bool:
+ if last_event_ts <= 0:
+ return False
+ threshold_ms = max(1, int(fade_threshold_seconds or 900) * 1000)
+ return int(now_ts) - int(last_event_ts) >= threshold_ms
diff --git a/core/presence/query.py b/core/presence/query.py
new file mode 100644
index 0000000..82f548c
--- /dev/null
+++ b/core/presence/query.py
@@ -0,0 +1,60 @@
+from __future__ import annotations
+
+from django.db.models import Q
+
+from core.models import ContactAvailabilityEvent, ContactAvailabilitySpan, Person, User
+from .engine import ensure_fading_state
+from .inference import now_ms
+
+
+def spans_for_range(
+ *,
+ user: User,
+ person: Person,
+ start_ts: int,
+ end_ts: int,
+ service: str = "",
+ limit: int = 200,
+):
+ qs = ContactAvailabilitySpan.objects.filter(
+ user=user,
+ person=person,
+ ).filter(
+ Q(start_ts__lte=end_ts) & Q(end_ts__gte=start_ts)
+ )
+ if service:
+ qs = qs.filter(service=str(service).strip().lower())
+
+ ensure_fading_state(
+ user=user,
+ person=person,
+ person_identifier=None,
+ service=(str(service or "").strip().lower() or "signal"),
+ at_ts=now_ms(),
+ )
+
+ return list(qs.order_by("-end_ts")[: max(1, min(int(limit or 200), 500))])
+
+
+def latest_state_for_people(*, user: User, person_ids: list, service: str = "") -> dict:
+ out = {}
+ if not person_ids:
+ return out
+ qs = ContactAvailabilityEvent.objects.filter(user=user, person_id__in=person_ids)
+ if service:
+ qs = qs.filter(service=str(service).strip().lower())
+ rows = list(qs.order_by("person_id", "-ts", "-id"))
+ seen = set()
+ for row in rows:
+ person_key = str(row.person_id)
+ if person_key in seen:
+ continue
+ seen.add(person_key)
+ out[person_key] = {
+ "state": str(row.availability_state or "unknown"),
+ "confidence": float(row.confidence or 0.0),
+ "service": str(row.service or ""),
+ "ts": int(row.ts or 0),
+ "source_kind": str(row.source_kind or ""),
+ }
+ return out
diff --git a/core/tasks/engine.py b/core/tasks/engine.py
index a85fc66..6307ee3 100644
--- a/core/tasks/engine.py
+++ b/core/tasks/engine.py
@@ -4,6 +4,7 @@ import re
from asgiref.sync import sync_to_async
from django.conf import settings
+from django.db.models import Q
from core.clients.transport import send_message_raw
from core.messaging import ai as ai_runner
@@ -14,11 +15,13 @@ from core.models import (
DerivedTask,
DerivedTaskEvent,
ExternalSyncEvent,
+ ExternalChatLink,
Message,
+ PersonIdentifier,
TaskCompletionPattern,
TaskProviderConfig,
)
-from core.tasks.providers.mock import get_provider
+from core.tasks.providers import get_provider
_TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE)
_COMPLETION_RE = re.compile(r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE)
@@ -218,6 +221,64 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s
provider_settings = dict(getattr(cfg, "settings", {}) or {})
provider = get_provider(provider_name)
idempotency_key = f"{provider_name}:{task.id}:{event.id}"
+ variants = _channel_variants(task.source_service or "", task.source_channel or "")
+ person_identifier = None
+ if variants:
+ person_identifier = await sync_to_async(
+ lambda: PersonIdentifier.objects.filter(
+ user=task.user,
+ service=task.source_service,
+ identifier__in=variants,
+ )
+ .select_related("person")
+ .order_by("-id")
+ .first()
+ )()
+ external_chat_id = ""
+ if person_identifier is not None:
+ link = await sync_to_async(
+ lambda: ExternalChatLink.objects.filter(
+ user=task.user,
+ provider=provider_name,
+ enabled=True,
+ )
+ .filter(
+ Q(person_identifier=person_identifier)
+ | Q(person=person_identifier.person)
+ )
+ .order_by("-updated_at", "-id")
+ .first()
+ )()
+ if link is not None:
+ external_chat_id = str(link.external_chat_id or "").strip()
+
+ # Worker-backed providers are queued and executed by `manage.py codex_worker`.
+ if bool(getattr(provider, "run_in_worker", False)):
+ await sync_to_async(ExternalSyncEvent.objects.update_or_create)(
+ idempotency_key=idempotency_key,
+ defaults={
+ "user": task.user,
+ "task": task,
+ "task_event": event,
+ "provider": provider_name,
+ "status": "pending",
+ "payload": {
+ "action": action,
+ "provider_payload": {
+ "task_id": str(task.id),
+ "title": task.title,
+ "external_key": task.external_key,
+ "reference_code": task.reference_code,
+ "source_service": str(task.source_service or ""),
+ "source_channel": str(task.source_channel or ""),
+ "external_chat_id": external_chat_id,
+ "payload": event.payload,
+ },
+ },
+ "error": "",
+ },
+ )
+ return
if action == "create":
result = provider.create_task(provider_settings, {
@@ -225,18 +286,27 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s
"title": task.title,
"external_key": task.external_key,
"reference_code": task.reference_code,
+ "source_service": str(task.source_service or ""),
+ "source_channel": str(task.source_channel or ""),
+ "external_chat_id": external_chat_id,
})
elif action == "complete":
result = provider.mark_complete(provider_settings, {
"task_id": str(task.id),
"external_key": task.external_key,
"reference_code": task.reference_code,
+ "source_service": str(task.source_service or ""),
+ "source_channel": str(task.source_channel or ""),
+ "external_chat_id": external_chat_id,
})
else:
result = provider.append_update(provider_settings, {
"task_id": str(task.id),
"external_key": task.external_key,
"reference_code": task.reference_code,
+ "source_service": str(task.source_service or ""),
+ "source_channel": str(task.source_channel or ""),
+ "external_chat_id": external_chat_id,
"payload": event.payload,
})
diff --git a/core/tasks/providers/__init__.py b/core/tasks/providers/__init__.py
index e69de29..6f1cb9d 100644
--- a/core/tasks/providers/__init__.py
+++ b/core/tasks/providers/__init__.py
@@ -0,0 +1,19 @@
+from __future__ import annotations
+
+from .base import TaskProvider
+from .codex_cli import CodexCLITaskProvider
+from .mock import MockTaskProvider
+
+PROVIDERS = {
+ "mock": MockTaskProvider(),
+ "codex_cli": CodexCLITaskProvider(),
+}
+
+
+def get_provider(name: str) -> TaskProvider:
+ key = str(name or "").strip().lower()
+ return PROVIDERS.get(key, PROVIDERS["mock"])
+
+
+def list_providers() -> list[TaskProvider]:
+ return list(PROVIDERS.values())
diff --git a/core/tasks/providers/base.py b/core/tasks/providers/base.py
index 375945e..b75b4de 100644
--- a/core/tasks/providers/base.py
+++ b/core/tasks/providers/base.py
@@ -13,6 +13,7 @@ class ProviderResult:
class TaskProvider:
name = "base"
+ run_in_worker = False
def healthcheck(self, config: dict) -> ProviderResult:
raise NotImplementedError
diff --git a/core/tasks/providers/codex_cli.py b/core/tasks/providers/codex_cli.py
new file mode 100644
index 0000000..4b25441
--- /dev/null
+++ b/core/tasks/providers/codex_cli.py
@@ -0,0 +1,117 @@
+from __future__ import annotations
+
+import json
+import subprocess
+
+from .base import ProviderResult, TaskProvider
+
+
+class CodexCLITaskProvider(TaskProvider):
+ name = "codex_cli"
+ run_in_worker = True
+
+ def _timeout(self, config: dict) -> int:
+ try:
+ return max(1, int(config.get("timeout_seconds") or 60))
+ except Exception:
+ return 60
+
+ def _command(self, config: dict) -> str:
+ return str(config.get("command") or "codex").strip() or "codex"
+
+ def _workspace(self, config: dict) -> str:
+ return str(config.get("workspace_root") or "").strip()
+
+ def _profile(self, config: dict) -> str:
+ return str(config.get("default_profile") or "").strip()
+
+ def _run(self, config: dict, op: str, payload: dict) -> ProviderResult:
+ cmd = [self._command(config), "task-sync", "--op", str(op)]
+ workspace = self._workspace(config)
+ if workspace:
+ cmd.extend(["--workspace", workspace])
+ profile = self._profile(config)
+ if profile:
+ cmd.extend(["--profile", profile])
+ command_timeout = self._timeout(config)
+ data = json.dumps(dict(payload or {}), separators=(",", ":"))
+ cmd.extend(["--payload-json", data])
+
+ try:
+ completed = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=command_timeout,
+ check=False,
+ cwd=workspace if workspace else None,
+ )
+ except subprocess.TimeoutExpired:
+ return ProviderResult(
+ ok=False,
+ error=f"codex_cli_timeout_{command_timeout}s",
+ payload={"op": op, "timeout_seconds": command_timeout},
+ )
+ except Exception as exc:
+ return ProviderResult(ok=False, error=f"codex_cli_exec_error:{exc}", payload={"op": op})
+
+ stdout = str(completed.stdout or "").strip()
+ stderr = str(completed.stderr or "").strip()
+ parsed = {}
+ if stdout:
+ try:
+ parsed = json.loads(stdout)
+ if not isinstance(parsed, dict):
+ parsed = {"raw_stdout": stdout}
+ except Exception:
+ parsed = {"raw_stdout": stdout}
+
+ ext = (
+ str(parsed.get("external_key") or "").strip()
+ or str(parsed.get("task_id") or "").strip()
+ or str(payload.get("external_key") or "").strip()
+ )
+
+ ok = completed.returncode == 0
+ out_payload = {
+ "op": op,
+ "returncode": int(completed.returncode),
+ "stdout": stdout[:4000],
+ "stderr": stderr[:4000],
+ }
+ out_payload.update(parsed)
+ return ProviderResult(ok=ok, external_key=ext, error=("" if ok else stderr[:4000]), payload=out_payload)
+
+ def healthcheck(self, config: dict) -> ProviderResult:
+ command = self._command(config)
+ try:
+ completed = subprocess.run(
+ [command, "--version"],
+ capture_output=True,
+ text=True,
+ timeout=max(1, min(20, self._timeout(config))),
+ check=False,
+ )
+ except Exception as exc:
+ return ProviderResult(ok=False, error=f"codex_cli_unavailable:{exc}")
+ return ProviderResult(
+ ok=(completed.returncode == 0),
+ payload={
+ "returncode": int(completed.returncode),
+ "stdout": str(completed.stdout or "").strip()[:1000],
+ "stderr": str(completed.stderr or "").strip()[:1000],
+ },
+ error=("" if completed.returncode == 0 else str(completed.stderr or "").strip()[:1000]),
+ )
+
+ def create_task(self, config: dict, payload: dict) -> ProviderResult:
+ return self._run(config, "create", payload)
+
+ def append_update(self, config: dict, payload: dict) -> ProviderResult:
+ return self._run(config, "append_update", payload)
+
+ def mark_complete(self, config: dict, payload: dict) -> ProviderResult:
+ return self._run(config, "mark_complete", payload)
+
+ def link_task(self, config: dict, payload: dict) -> ProviderResult:
+ return self._run(config, "link_task", payload)
diff --git a/core/tasks/providers/mock.py b/core/tasks/providers/mock.py
index c526394..044dae3 100644
--- a/core/tasks/providers/mock.py
+++ b/core/tasks/providers/mock.py
@@ -23,12 +23,3 @@ class MockTaskProvider(TaskProvider):
def link_task(self, config: dict, payload: dict) -> ProviderResult:
return ProviderResult(ok=True, external_key=str(payload.get("external_key") or ""), payload={"action": "link_task"})
-
-
-PROVIDERS = {
- "mock": MockTaskProvider(),
-}
-
-
-def get_provider(name: str) -> TaskProvider:
- return PROVIDERS.get(str(name or "").strip().lower(), PROVIDERS["mock"])
diff --git a/core/templates/base.html b/core/templates/base.html
index ecaeaae..dc80ec6 100644
--- a/core/templates/base.html
+++ b/core/templates/base.html
@@ -377,6 +377,9 @@
Sessions
+
+ Documents
+
@@ -401,6 +404,9 @@
Task Settings
+
+ Availability
+
Translation
diff --git a/core/templates/pages/availability-settings.html b/core/templates/pages/availability-settings.html
new file mode 100644
index 0000000..ce2efce
--- /dev/null
+++ b/core/templates/pages/availability-settings.html
@@ -0,0 +1,128 @@
+{% extends "base.html" %}
+
+{% block content %}
+Availability Settings
+
+
+
+
+ Availability Events
+
+
+
+
+ {% for row in events %}
+ ts person service source state confidence
+
+ {% empty %}
+ {{ row.ts }}
+ {{ row.person.name }}
+ {{ row.service }}
+ {{ row.source_kind }}
+ {{ row.availability_state }}
+ {{ row.confidence|floatformat:2 }}
+
+ {% endfor %}
+
+ No events in range. Availability Spans
+
+
+
+
+ {% for row in spans %}
+ person service state start end confidence
+
+ {% empty %}
+ {{ row.person.name }}
+ {{ row.service }}
+ {{ row.state }}
+ {{ row.start_ts }}
+ {{ row.end_ts }}
+ {{ row.confidence_start|floatformat:2 }} -> {{ row.confidence_end|floatformat:2 }}
+
+ {% endfor %}
+
+ No spans in range.
- Warning: {{ variant.variant_label }} is in verbatim mode with plan fanout enabled.
+ verbatim mode with plan fanout enabled.
Recipients will get raw transcript-style output.
-
{{ row.channel_identifier }}{{ row.channel_identifier }}
+ {{ profile.enabled_egress_bindings|length }} enabled egress destination{{ profile.enabled_egress_bindings|length|pluralize }}.
+{{ profile.enabled_egress_bindings|length }} enabled egress destination{{ profile.enabled_egress_bindings|length|pluralize }}.
{% else %}| Provider | Person | Identifier | External Chat | Enabled | |
|---|---|---|---|---|---|
| {{ row.provider }} | +{% if row.person %}{{ row.person.name }}{% else %}-{% endif %} | +{% if row.person_identifier %}{{ row.person_identifier.service }} ยท {{ row.person_identifier.identifier }}{% else %}-{% endif %} | +{{ row.external_chat_id }} | +{{ row.enabled }} | ++ + | +
| No external chat links. | |||||