diff --git a/app/urls.py b/app/urls.py index b903d75..b155778 100644 --- a/app/urls.py +++ b/app/urls.py @@ -193,6 +193,11 @@ urlpatterns = [ compose.ComposeSend.as_view(), name="compose_send", ), + path( + "compose/react/", + compose.ComposeReact.as_view(), + name="compose_react", + ), path( "compose/cancel-send/", compose.ComposeCancelSend.as_view(), @@ -303,11 +308,26 @@ urlpatterns = [ tasks.TaskDetail.as_view(), name="tasks_task", ), + path( + "tasks/codex/submit/", + tasks.TaskCodexSubmit.as_view(), + name="tasks_codex_submit", + ), path( "settings/tasks/", tasks.TaskSettings.as_view(), name="tasks_settings", ), + path( + "settings/codex/", + tasks.CodexSettingsPage.as_view(), + name="codex_settings", + ), + path( + "settings/codex/approval/", + tasks.CodexApprovalAction.as_view(), + name="codex_approval", + ), path( "settings/availability/", availability.AvailabilitySettingsPage.as_view(), diff --git a/core/commands/engine.py b/core/commands/engine.py index c11cb3c..baf1f7c 100644 --- a/core/commands/engine.py +++ b/core/commands/engine.py @@ -1,18 +1,20 @@ from __future__ import annotations from asgiref.sync import sync_to_async -from django.conf import settings from core.commands.base import CommandContext, CommandResult from core.commands.handlers.bp import ( BPCommandHandler, bp_reply_is_optional_for_trigger, + bp_subcommands_enabled, bp_trigger_matches, ) +from core.commands.handlers.codex import CodexCommandHandler, codex_trigger_matches +from core.commands.policies import ensure_variant_policies_for_profile from core.commands.registry import get as get_handler from core.commands.registry import register from core.messaging.reply_sync import is_mirrored_origin -from core.models import CommandChannelBinding, CommandProfile, Message +from core.models import CommandAction, CommandChannelBinding, CommandProfile, Message from core.util import logs log = logs.get_logger("command_engine") @@ -36,17 +38,178 @@ def _channel_variants(service: str, channel_identifier: str) -> list[str]: return variants +def _canonical_channel_identifier(service: str, channel_identifier: str) -> str: + value = str(channel_identifier or "").strip() + if not value: + return "" + if str(service or "").strip().lower() == "whatsapp": + return value.split("@", 1)[0].strip() + return value + + +def _effective_bootstrap_scope( + ctx: CommandContext, + trigger_message: Message, +) -> tuple[str, str]: + service = str(ctx.service or "").strip().lower() + identifier = str(ctx.channel_identifier or "").strip() + if service != "web": + return service, identifier + session_identifier = getattr(getattr(trigger_message, "session", None), "identifier", None) + fallback_service = str(getattr(session_identifier, "service", "") or "").strip().lower() + fallback_identifier = str(getattr(session_identifier, "identifier", "") or "").strip() + if fallback_service and fallback_identifier and fallback_service != "web": + return fallback_service, fallback_identifier + return service, identifier + + +def _ensure_bp_profile(user_id: int) -> CommandProfile: + profile, _ = CommandProfile.objects.get_or_create( + user_id=user_id, + slug="bp", + defaults={ + "name": "Business Plan", + "enabled": True, + "trigger_token": ".bp", + "reply_required": True, + "exact_match_only": True, + "window_scope": "conversation", + "visibility_mode": "status_in_source", + }, + ) + updated = False + if not profile.enabled: + profile.enabled = True + updated = True + if updated: + profile.save(update_fields=["enabled", "updated_at"]) + if str(profile.trigger_token or "").strip() != ".bp": + profile.trigger_token = ".bp" + profile.save(update_fields=["trigger_token", "updated_at"]) + for action_type, position in (("extract_bp", 0), ("save_document", 1), ("post_result", 2)): + action, created = CommandAction.objects.get_or_create( + profile=profile, + action_type=action_type, + defaults={"enabled": True, "position": position}, + ) + if (not created) and (not action.enabled): + action.enabled = True + action.save(update_fields=["enabled", "updated_at"]) + ensure_variant_policies_for_profile(profile) + return profile + + +def _ensure_codex_profile(user_id: int) -> CommandProfile: + profile, _ = CommandProfile.objects.get_or_create( + user_id=user_id, + slug="codex", + defaults={ + "name": "Codex", + "enabled": True, + "trigger_token": ".codex", + "reply_required": False, + "exact_match_only": False, + "window_scope": "conversation", + "visibility_mode": "status_in_source", + }, + ) + if not profile.enabled: + profile.enabled = True + profile.save(update_fields=["enabled", "updated_at"]) + if str(profile.trigger_token or "").strip() != ".codex": + profile.trigger_token = ".codex" + profile.save(update_fields=["trigger_token", "updated_at"]) + return profile + + +def _ensure_profile_for_slug(user_id: int, slug: str) -> CommandProfile | None: + if slug == "bp": + return _ensure_bp_profile(user_id) + if slug == "codex": + return _ensure_codex_profile(user_id) + return None + + +def _detected_bootstrap_slugs(message_text: str) -> list[str]: + slugs: list[str] = [] + if bp_trigger_matches(message_text, ".bp", False): + slugs.append("bp") + if codex_trigger_matches(message_text, ".codex", False): + slugs.append("codex") + return slugs + + +def _auto_setup_profile_bindings_for_first_command( + ctx: CommandContext, + trigger_message: Message, +) -> None: + author = str(getattr(trigger_message, "custom_author", "") or "").strip().upper() + if author != "USER": + return + slugs = _detected_bootstrap_slugs(ctx.message_text) + if not slugs: + return + service, identifier = _effective_bootstrap_scope(ctx, trigger_message) + service = str(service or "").strip().lower() + canonical = _canonical_channel_identifier(service, identifier) + variants = _channel_variants(service, canonical) + if not service or not variants: + return + for slug in slugs: + profile = _ensure_profile_for_slug(ctx.user_id, slug) + if profile is None: + continue + already_enabled = CommandChannelBinding.objects.filter( + profile=profile, + enabled=True, + direction="ingress", + service=service, + channel_identifier__in=variants, + ).exists() + if already_enabled: + continue + for direction in ("ingress", "egress"): + binding, _ = CommandChannelBinding.objects.get_or_create( + profile=profile, + direction=direction, + service=service, + channel_identifier=canonical, + defaults={"enabled": True}, + ) + if not binding.enabled: + binding.enabled = True + binding.save(update_fields=["enabled", "updated_at"]) + alternate_variants = [value for value in variants if value != canonical] + if alternate_variants: + CommandChannelBinding.objects.filter( + profile=profile, + direction=direction, + service=service, + channel_identifier__in=alternate_variants, + ).update(enabled=False) + + def ensure_handlers_registered(): global _REGISTERED if _REGISTERED: return register(BPCommandHandler()) + register(CodexCommandHandler()) _REGISTERED = True async def _eligible_profiles(ctx: CommandContext) -> list[CommandProfile]: def _load(): + trigger = ( + Message.objects.select_related("session", "session__identifier") + .filter(id=ctx.message_id, user_id=ctx.user_id) + .first() + ) direct_variants = _channel_variants(ctx.service, ctx.channel_identifier) + source_channel = str(getattr(trigger, "source_chat_id", "") or "").strip() + for expanded in _channel_variants(ctx.service, source_channel): + if expanded and expanded not in direct_variants: + direct_variants.append(expanded) if not direct_variants: return [] direct = list( @@ -65,15 +228,13 @@ async def _eligible_profiles(ctx: CommandContext) -> list[CommandProfile]: # underlying conversation is mapped to a platform identifier. if str(ctx.service or "").strip().lower() != "web": return [] - trigger = ( - Message.objects.select_related("session", "session__identifier") - .filter(id=ctx.message_id, user_id=ctx.user_id) - .first() - ) identifier = getattr(getattr(trigger, "session", None), "identifier", None) fallback_service = str(getattr(identifier, "service", "") or "").strip().lower() fallback_identifier = str(getattr(identifier, "identifier", "") or "").strip() fallback_variants = _channel_variants(fallback_service, fallback_identifier) + for expanded in _channel_variants(fallback_service, source_channel): + if expanded and expanded not in fallback_variants: + fallback_variants.append(expanded) if not fallback_service or not fallback_variants: return [] return list( @@ -91,12 +252,18 @@ async def _eligible_profiles(ctx: CommandContext) -> list[CommandProfile]: def _matches_trigger(profile: CommandProfile, text: str) -> bool: - if profile.slug == "bp" and bool(getattr(settings, "BP_SUBCOMMANDS_V1", True)): + if profile.slug == "bp" and bp_subcommands_enabled(): return bp_trigger_matches( message_text=text, trigger_token=profile.trigger_token, exact_match_only=profile.exact_match_only, ) + if profile.slug == "codex": + return codex_trigger_matches( + message_text=text, + trigger_token=profile.trigger_token, + exact_match_only=profile.exact_match_only, + ) body = str(text or "").strip() trigger = str(profile.trigger_token or "").strip() if not trigger: @@ -115,6 +282,10 @@ async def process_inbound_message(ctx: CommandContext) -> list[CommandResult]: return [] if is_mirrored_origin(trigger_message.message_meta): return [] + await sync_to_async(_auto_setup_profile_bindings_for_first_command)( + ctx, + trigger_message, + ) profiles = await _eligible_profiles(ctx) results: list[CommandResult] = [] @@ -124,7 +295,7 @@ async def process_inbound_message(ctx: CommandContext) -> list[CommandResult]: if profile.reply_required and trigger_message.reply_to_id is None: if ( profile.slug == "bp" - and bool(getattr(settings, "BP_SUBCOMMANDS_V1", True)) + and bp_subcommands_enabled() and bp_reply_is_optional_for_trigger(ctx.message_text) ): pass diff --git a/core/commands/handlers/bp.py b/core/commands/handlers/bp.py index e0624cd..a238a27 100644 --- a/core/commands/handlers/bp.py +++ b/core/commands/handlers/bp.py @@ -23,8 +23,15 @@ from core.models import ( Message, ) -_BP_SET_RE = re.compile(r"^\s*#bp\s+set#(?P.*)$", re.IGNORECASE | re.DOTALL) -_BP_SET_RANGE_RE = re.compile(r"^\s*#bp\s+set\s+range#(?:.*)$", re.IGNORECASE | re.DOTALL) +_BP_ROOT_RE = re.compile(r"^\s*(?:\.bp\b|#bp#?)\s*$", re.IGNORECASE) +_BP_SET_RE = re.compile( + r"^\s*(?:\.bp\s+set\b|#bp\s+set#?)(?P.*)$", + re.IGNORECASE | re.DOTALL, +) +_BP_SET_RANGE_RE = re.compile( + r"^\s*(?:\.bp\s+set\s+range\b|#bp\s+set\s+range#?)(?:.*)$", + re.IGNORECASE | re.DOTALL, +) class BPParsedCommand(dict): @@ -49,17 +56,26 @@ def parse_bp_subcommand(text: str) -> BPParsedCommand: return BPParsedCommand(command=None, remainder_text="") +def bp_subcommands_enabled() -> bool: + raw = getattr(settings, "BP_SUBCOMMANDS_V1", True) + if raw is None: + return True + return bool(raw) + + def bp_trigger_matches(message_text: str, trigger_token: str, exact_match_only: bool) -> bool: body = str(message_text or "").strip() trigger = str(trigger_token or "").strip() parsed = parse_bp_subcommand(body) - if parsed.command and bool(getattr(settings, "BP_SUBCOMMANDS_V1", True)): + if parsed.command and bp_subcommands_enabled(): + return True + if _BP_ROOT_RE.match(body): return True if not trigger: return False if exact_match_only: - return body == trigger - return trigger in body + return body.lower() == trigger.lower() + return trigger.lower() in body.lower() def bp_reply_is_optional_for_trigger(message_text: str) -> bool: @@ -119,7 +135,8 @@ class BPCommandHandler(CommandHandler): "generation_mode": str(policy.generation_mode or "verbatim"), "send_plan_to_egress": bool(policy.send_plan_to_egress) and ("post_result" in action_types), - "send_status_to_source": bool(policy.send_status_to_source), + "send_status_to_source": bool(policy.send_status_to_source) + or str(profile.visibility_mode or "") == "status_in_source", "send_status_to_egress": bool(policy.send_status_to_egress), "store_document": bool(getattr(policy, "store_document", True)), } @@ -614,7 +631,7 @@ class BPCommandHandler(CommandHandler): return CommandResult(ok=False, status="skipped", error=run.error) parsed = parse_bp_subcommand(ctx.message_text) - if parsed.command and bool(getattr(settings, "BP_SUBCOMMANDS_V1", True)): + if parsed.command and bp_subcommands_enabled(): return await self._execute_set_or_range( trigger=trigger, run=run, diff --git a/core/commands/handlers/codex.py b/core/commands/handlers/codex.py new file mode 100644 index 0000000..283103c --- /dev/null +++ b/core/commands/handlers/codex.py @@ -0,0 +1,498 @@ +from __future__ import annotations + +import hashlib +import re + +from asgiref.sync import sync_to_async +from django.utils import timezone + +from core.commands.base import CommandContext, CommandHandler, CommandResult +from core.commands.delivery import post_status_in_source +from core.messaging.text_export import plain_text_blob +from core.models import ( + ChatTaskSource, + CodexPermissionRequest, + CodexRun, + CommandProfile, + DerivedTask, + ExternalSyncEvent, + Message, + TaskProject, + TaskProviderConfig, +) +from core.tasks.codex_support import channel_variants, resolve_external_chat_id + +_CODEX_DEFAULT_RE = re.compile( + r"^\s*(?:\.codex\b|#codex#?)(?P.*)$", + re.IGNORECASE | re.DOTALL, +) +_CODEX_PLAN_RE = re.compile( + r"^\s*(?:\.codex\s+plan\b|#codex\s+plan#?)(?P.*)$", + re.IGNORECASE | re.DOTALL, +) +_CODEX_STATUS_RE = re.compile(r"^\s*(?:\.codex\s+status\b|#codex\s+status#?)\s*$", re.IGNORECASE) +_CODEX_APPROVE_DENY_RE = re.compile( + r"^\s*(?:\.codex|#codex)\s+(?Papprove|deny)\s+(?P[A-Za-z0-9._:-]+)#?\s*$", + re.IGNORECASE, +) +_PROJECT_TOKEN_RE = re.compile(r"\[\s*project\s*:\s*([^\]]+)\]", re.IGNORECASE) +_REFERENCE_RE = re.compile(r"(? str | None: + value = self.get("command") + return str(value) if value else None + + @property + def body_text(self) -> str: + return str(self.get("body_text") or "") + + @property + def approval_key(self) -> str: + return str(self.get("approval_key") or "") + + + +def parse_codex_command(text: str) -> CodexParsedCommand: + body = str(text or "") + m = _CODEX_APPROVE_DENY_RE.match(body) + if m: + return CodexParsedCommand( + command=str(m.group("action") or "").strip().lower(), + body_text="", + approval_key=str(m.group("approval_key") or "").strip(), + ) + if _CODEX_STATUS_RE.match(body): + return CodexParsedCommand(command="status", body_text="", approval_key="") + m = _CODEX_PLAN_RE.match(body) + if m: + return CodexParsedCommand( + command="plan", + body_text=str(m.group("body") or "").strip(), + approval_key="", + ) + m = _CODEX_DEFAULT_RE.match(body) + if m: + return CodexParsedCommand( + command="default", + body_text=str(m.group("body") or "").strip(), + approval_key="", + ) + return CodexParsedCommand(command=None, body_text="", approval_key="") + + +def codex_trigger_matches(message_text: str, trigger_token: str, exact_match_only: bool) -> bool: + body = str(message_text or "").strip() + parsed = parse_codex_command(body) + if parsed.command: + return True + trigger = str(trigger_token or "").strip() + if not trigger: + return False + if exact_match_only: + return body.lower() == trigger.lower() + return trigger.lower() in body.lower() + + +class CodexCommandHandler(CommandHandler): + slug = "codex" + + async def _load_trigger(self, message_id: str) -> Message | None: + return await sync_to_async( + lambda: Message.objects.select_related("user", "session", "session__identifier", "reply_to") + .filter(id=message_id) + .first() + )() + + def _effective_scope(self, trigger: Message) -> tuple[str, str]: + service = str(getattr(trigger, "source_service", "") or "").strip().lower() + channel = str(getattr(trigger, "source_chat_id", "") or "").strip() + identifier = getattr(getattr(trigger, "session", None), "identifier", None) + fallback_service = str(getattr(identifier, "service", "") or "").strip().lower() + fallback_identifier = str(getattr(identifier, "identifier", "") or "").strip() + if service == "web" and fallback_service and fallback_identifier and fallback_service != "web": + return fallback_service, fallback_identifier + return service or "web", channel + + async def _mapped_sources(self, user, service: str, channel: str) -> list[ChatTaskSource]: + variants = channel_variants(service, channel) + if not variants: + return [] + return await sync_to_async(list)( + ChatTaskSource.objects.filter( + user=user, + enabled=True, + service=service, + channel_identifier__in=variants, + ).select_related("project", "epic") + ) + + async def _linked_task_from_reply(self, user, reply_to: Message | None) -> DerivedTask | None: + if reply_to is None: + return None + by_origin = await sync_to_async( + lambda: DerivedTask.objects.filter(user=user, origin_message=reply_to) + .select_related("project", "epic") + .order_by("-created_at") + .first() + )() + if by_origin is not None: + return by_origin + return await sync_to_async( + lambda: DerivedTask.objects.filter(user=user, events__source_message=reply_to) + .select_related("project", "epic") + .order_by("-created_at") + .first() + )() + + def _extract_project_token(self, body_text: str) -> tuple[str, str]: + text = str(body_text or "") + m = _PROJECT_TOKEN_RE.search(text) + if not m: + return "", text + token = str(m.group(1) or "").strip() + cleaned = _PROJECT_TOKEN_RE.sub("", text).strip() + return token, cleaned + + def _extract_reference(self, body_text: str) -> str: + m = _REFERENCE_RE.search(str(body_text or "")) + if not m: + return "" + return str(m.group(1) or "").strip() + + async def _resolve_task(self, user, reference_code: str, reply_task: DerivedTask | None) -> DerivedTask | None: + if reference_code: + return await sync_to_async( + lambda: DerivedTask.objects.filter(user=user, reference_code=reference_code) + .select_related("project", "epic") + .order_by("-created_at") + .first() + )() + return reply_task + + async def _resolve_project( + self, + *, + user, + service: str, + channel: str, + task: DerivedTask | None, + reply_task: DerivedTask | None, + project_token: str, + ) -> tuple[TaskProject | None, str]: + if task is not None: + return task.project, "" + if reply_task is not None: + return reply_task.project, "" + if project_token: + project = await sync_to_async( + lambda: TaskProject.objects.filter(user=user, name__iexact=project_token).first() + )() + if project is not None: + return project, "" + return None, f"project_not_found:{project_token}" + + mapped = await self._mapped_sources(user, service, channel) + project_ids = sorted({str(row.project_id) for row in mapped if row.project_id}) + if len(project_ids) == 1: + project = next((row.project for row in mapped if str(row.project_id) == project_ids[0]), None) + return project, "" + if len(project_ids) > 1: + return None, "project_required:[project:Name]" + return None, "project_unresolved" + + async def _post_source_status(self, trigger: Message, text: str, suffix: str) -> None: + await post_status_in_source( + trigger_message=trigger, + text=text, + origin_tag=f"codex-status:{suffix}", + ) + + async def _run_status(self, trigger: Message, service: str, channel: str, project: TaskProject | None) -> CommandResult: + def _load_runs(): + qs = CodexRun.objects.filter(user=trigger.user) + if service: + qs = qs.filter(source_service=service) + if channel: + qs = qs.filter(source_channel=channel) + if project is not None: + qs = qs.filter(project=project) + return list(qs.order_by("-created_at")[:10]) + + runs = await sync_to_async(_load_runs)() + if not runs: + await self._post_source_status(trigger, "[codex] no recent runs for this scope.", "empty") + return CommandResult(ok=True, status="ok", payload={"count": 0}) + lines = ["[codex] recent runs:"] + for row in runs: + ref = str(getattr(getattr(row, "task", None), "reference_code", "") or "-") + summary = str((row.result_payload or {}).get("summary") or "").strip() + summary_part = f" · {summary}" if summary else "" + lines.append(f"- {row.status} run={row.id} task=#{ref}{summary_part}") + await self._post_source_status(trigger, "\n".join(lines), "runs") + return CommandResult(ok=True, status="ok", payload={"count": len(runs)}) + + async def _run_approval_action( + self, + trigger: Message, + parsed: CodexParsedCommand, + current_service: str, + current_channel: str, + ) -> CommandResult: + cfg = await sync_to_async( + lambda: TaskProviderConfig.objects.filter(user=trigger.user, provider="codex_cli").first() + )() + settings_payload = dict(getattr(cfg, "settings", {}) or {}) + approver_service = str(settings_payload.get("approver_service") or "").strip().lower() + approver_identifier = str(settings_payload.get("approver_identifier") or "").strip() + if not approver_service or not approver_identifier: + return CommandResult(ok=False, status="failed", error="approver_channel_not_configured") + + if str(current_service or "").strip().lower() != approver_service or str(current_channel or "").strip() not in set( + channel_variants(approver_service, approver_identifier) + ): + return CommandResult(ok=False, status="failed", error="approval_command_not_allowed_in_this_channel") + + approval_key = parsed.approval_key + request = await sync_to_async( + lambda: CodexPermissionRequest.objects.select_related("codex_run", "external_sync_event") + .filter(user=trigger.user, approval_key=approval_key) + .first() + )() + if request is None: + return CommandResult(ok=False, status="failed", error="approval_key_not_found") + + now = timezone.now() + if parsed.command == "approve": + request.status = "approved" + request.resolved_at = now + request.resolved_by_identifier = current_channel + request.resolution_note = "approved via command" + await sync_to_async(request.save)( + update_fields=[ + "status", + "resolved_at", + "resolved_by_identifier", + "resolution_note", + ] + ) + run = request.codex_run + run.status = "approved_waiting_resume" + run.error = "" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + source_service = str(run.source_service or "") + source_channel = str(run.source_channel or "") + provider_payload = dict(run.request_payload.get("provider_payload") or {}) + provider_payload.update( + { + "mode": "approval_response", + "approval_key": approval_key, + "resume_payload": dict(request.resume_payload or {}), + "codex_run_id": str(run.id), + "source_service": source_service, + "source_channel": source_channel, + } + ) + await sync_to_async(ExternalSyncEvent.objects.update_or_create)( + idempotency_key=f"codex_approval:{approval_key}:approved", + defaults={ + "user": trigger.user, + "task_id": run.task_id, + "task_event_id": run.derived_task_event_id, + "provider": "codex_cli", + "status": "pending", + "payload": { + "action": "append_update", + "provider_payload": provider_payload, + }, + "error": "", + }, + ) + return CommandResult(ok=True, status="ok", payload={"approval_key": approval_key, "resolution": "approved"}) + + request.status = "denied" + request.resolved_at = now + request.resolved_by_identifier = current_channel + request.resolution_note = "denied via command" + await sync_to_async(request.save)( + update_fields=["status", "resolved_at", "resolved_by_identifier", "resolution_note"] + ) + run = request.codex_run + run.status = "denied" + run.error = "approval_denied" + await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"]) + await sync_to_async(ExternalSyncEvent.objects.update_or_create)( + idempotency_key=f"codex_approval:{approval_key}:denied", + defaults={ + "user": trigger.user, + "task_id": run.task_id, + "task_event_id": run.derived_task_event_id, + "provider": "codex_cli", + "status": "failed", + "payload": { + "action": "append_update", + "provider_payload": { + "mode": "approval_response", + "approval_key": approval_key, + "codex_run_id": str(run.id), + }, + }, + "error": "approval_denied", + }, + ) + return CommandResult(ok=True, status="ok", payload={"approval_key": approval_key, "resolution": "denied"}) + + async def _create_submission( + self, + *, + trigger: Message, + mode: str, + body_text: str, + task: DerivedTask, + project: TaskProject, + ) -> CommandResult: + cfg = await sync_to_async( + lambda: TaskProviderConfig.objects.filter(user=trigger.user, provider="codex_cli", enabled=True).first() + )() + if cfg is None: + return CommandResult(ok=False, status="failed", error="provider_disabled_or_missing") + + service, channel = self._effective_scope(trigger) + external_chat_id = await sync_to_async(resolve_external_chat_id)( + user=trigger.user, + provider="codex_cli", + service=service, + channel=channel, + ) + payload = { + "task_id": str(task.id), + "reference_code": str(task.reference_code or ""), + "title": str(task.title or ""), + "external_key": str(task.external_key or ""), + "project_name": str(getattr(project, "name", "") or ""), + "epic_name": str(getattr(getattr(task, "epic", None), "name", "") or ""), + "source_service": service, + "source_channel": channel, + "external_chat_id": external_chat_id, + "origin_message_id": str(getattr(task, "origin_message_id", "") or ""), + "trigger_message_id": str(trigger.id), + "mode": mode, + "command_text": str(body_text or ""), + } + if mode == "plan": + anchor = trigger.reply_to + if anchor is None: + return CommandResult(ok=False, status="failed", error="reply_required_for_codex_plan") + rows = await sync_to_async(list)( + Message.objects.filter( + user=trigger.user, + session=trigger.session, + ts__gte=int(anchor.ts or 0), + ts__lte=int(trigger.ts or 0), + ) + .order_by("ts") + .select_related("session", "session__identifier", "session__identifier__person") + ) + payload["reply_context"] = { + "anchor_message_id": str(anchor.id), + "trigger_message_id": str(trigger.id), + "message_ids": [str(row.id) for row in rows], + "content": plain_text_blob(rows), + } + + run = await sync_to_async(CodexRun.objects.create)( + user=trigger.user, + task=task, + source_message=trigger, + project=project, + epic=getattr(task, "epic", None), + source_service=service, + source_channel=channel, + external_chat_id=external_chat_id, + status="queued", + request_payload={"action": "append_update", "provider_payload": dict(payload)}, + result_payload={}, + error="", + ) + payload["codex_run_id"] = str(run.id) + run.request_payload = {"action": "append_update", "provider_payload": dict(payload)} + await sync_to_async(run.save)(update_fields=["request_payload", "updated_at"]) + + idempotency_key = f"codex_cmd:{trigger.id}:{mode}:{task.id}:{hashlib.sha1(str(body_text or '').encode('utf-8')).hexdigest()[:12]}" + await sync_to_async(ExternalSyncEvent.objects.update_or_create)( + idempotency_key=idempotency_key, + defaults={ + "user": trigger.user, + "task": task, + "task_event": None, + "provider": "codex_cli", + "status": "pending", + "payload": { + "action": "append_update", + "provider_payload": dict(payload), + }, + "error": "", + }, + ) + return CommandResult(ok=True, status="ok", payload={"codex_run_id": str(run.id)}) + + async def execute(self, ctx: CommandContext) -> CommandResult: + trigger = await self._load_trigger(ctx.message_id) + if trigger is None: + return CommandResult(ok=False, status="failed", error="trigger_not_found") + + profile = await sync_to_async( + lambda: CommandProfile.objects.filter(user=trigger.user, slug=self.slug, enabled=True).first() + )() + if profile is None: + return CommandResult(ok=False, status="skipped", error="profile_missing") + + parsed = parse_codex_command(ctx.message_text) + if not parsed.command: + return CommandResult(ok=False, status="skipped", error="codex_command_not_matched") + + service, channel = self._effective_scope(trigger) + + if parsed.command == "status": + project = None + reply_task = await self._linked_task_from_reply(trigger.user, trigger.reply_to) + if reply_task is not None: + project = reply_task.project + return await self._run_status(trigger, service, channel, project) + + if parsed.command in {"approve", "deny"}: + return await self._run_approval_action( + trigger, + parsed, + current_service=str(ctx.service or ""), + current_channel=str(ctx.channel_identifier or ""), + ) + + project_token, cleaned_body = self._extract_project_token(parsed.body_text) + reference_code = self._extract_reference(cleaned_body) + reply_task = await self._linked_task_from_reply(trigger.user, trigger.reply_to) + task = await self._resolve_task(trigger.user, reference_code, reply_task) + if task is None: + return CommandResult(ok=False, status="failed", error="task_target_required") + + project, project_error = await self._resolve_project( + user=trigger.user, + service=service, + channel=channel, + task=task, + reply_task=reply_task, + project_token=project_token, + ) + if project is None: + return CommandResult(ok=False, status="failed", error=project_error or "project_unresolved") + + mode = "plan" if parsed.command == "plan" else "default" + return await self._create_submission( + trigger=trigger, + mode=mode, + body_text=cleaned_body, + task=task, + project=project, + ) diff --git a/core/commands/policies.py b/core/commands/policies.py index 107f05d..0acb448 100644 --- a/core/commands/policies.py +++ b/core/commands/policies.py @@ -8,19 +8,19 @@ BP_VARIANT_KEYS = ("bp", "bp_set", "bp_set_range") BP_VARIANT_META = { "bp": { "name": "bp", - "trigger_token": "#bp#", + "trigger_token": ".bp", "template_supported": True, "position": 0, }, "bp_set": { "name": "bp set", - "trigger_token": "#bp set#", + "trigger_token": ".bp set", "template_supported": False, "position": 1, }, "bp_set_range": { "name": "bp set range", - "trigger_token": "#bp set range#", + "trigger_token": ".bp set range", "template_supported": False, "position": 2, }, @@ -63,6 +63,9 @@ def ensure_variant_policies_for_profile( result: dict[str, CommandVariantPolicy] = {} if str(profile.slug or "").strip() == "bp": + # Keep source-chat status visible for BP to avoid "silent success" confusion. + if str(profile.visibility_mode or "").strip() == "status_in_source": + CommandVariantPolicy.objects.filter(profile=profile).update(send_status_to_source=True) for key in BP_VARIANT_KEYS: meta = BP_VARIANT_META.get(key, {}) defaults = _bp_defaults(profile, key, post_result_enabled) diff --git a/core/management/commands/codex_worker.py b/core/management/commands/codex_worker.py index dd3bf7d..c4a40bb 100644 --- a/core/management/commands/codex_worker.py +++ b/core/management/commands/codex_worker.py @@ -1,10 +1,13 @@ from __future__ import annotations import time +import uuid +from asgiref.sync import async_to_sync from django.core.management.base import BaseCommand -from core.models import ExternalSyncEvent, TaskProviderConfig +from core.clients.transport import send_message_raw +from core.models import CodexPermissionRequest, CodexRun, ExternalSyncEvent, TaskProviderConfig from core.tasks.providers import get_provider from core.util import logs @@ -58,11 +61,36 @@ class Command(BaseCommand): event.status = "failed" event.error = "provider_disabled_or_missing" event.save(update_fields=["status", "error", "updated_at"]) + provider_payload = dict((event.payload or {}).get("provider_payload") or {}) + run_id = str(provider_payload.get("codex_run_id") or "").strip() + if run_id: + CodexRun.objects.filter(id=run_id, user=event.user).update( + status="failed", + error="provider_disabled_or_missing", + ) return payload = dict(event.payload or {}) action = str(payload.get("action") or "append_update").strip().lower() provider_payload = dict(payload.get("provider_payload") or payload) + run_id = str(provider_payload.get("codex_run_id") or payload.get("codex_run_id") or "").strip() + codex_run = None + if run_id: + codex_run = CodexRun.objects.filter(id=run_id, user=event.user).first() + if codex_run is None and event.task_id: + codex_run = ( + CodexRun.objects.filter( + user=event.user, + task_id=event.task_id, + status__in=["queued", "running", "approved_waiting_resume"], + ) + .order_by("-updated_at") + .first() + ) + if codex_run is not None: + codex_run.status = "running" + codex_run.error = "" + codex_run.save(update_fields=["status", "error", "updated_at"]) if action == "create": result = provider.create_task(dict(cfg.settings or {}), provider_payload) @@ -73,14 +101,106 @@ class Command(BaseCommand): else: result = provider.append_update(dict(cfg.settings or {}), provider_payload) + result_payload = dict(result.payload or {}) + requires_approval = bool(result_payload.get("requires_approval")) + if requires_approval: + approval_key = str(result_payload.get("approval_key") or uuid.uuid4().hex[:12]).strip() + permission_request = dict(result_payload.get("permission_request") or {}) + summary = str(result_payload.get("summary") or permission_request.get("summary") or "").strip() + requested_permissions = permission_request.get("requested_permissions") + if not isinstance(requested_permissions, (list, dict)): + requested_permissions = permission_request or {} + resume_payload = result_payload.get("resume_payload") + if not isinstance(resume_payload, dict): + resume_payload = {} + event.status = "waiting_approval" + event.error = "" + event.payload = dict(payload, worker_processed=True, result=result_payload) + event.save(update_fields=["status", "error", "payload", "updated_at"]) + if codex_run is not None: + codex_run.status = "waiting_approval" + codex_run.result_payload = dict(result_payload) + codex_run.error = "" + codex_run.save(update_fields=["status", "result_payload", "error", "updated_at"]) + CodexPermissionRequest.objects.update_or_create( + approval_key=approval_key, + defaults={ + "user": event.user, + "codex_run": codex_run if codex_run is not None else CodexRun.objects.create( + user=event.user, + task=event.task, + derived_task_event=event.task_event, + source_service=str(provider_payload.get("source_service") or ""), + source_channel=str(provider_payload.get("source_channel") or ""), + external_chat_id=str(provider_payload.get("external_chat_id") or ""), + status="waiting_approval", + request_payload=dict(payload or {}), + result_payload=dict(result_payload), + error="", + ), + "external_sync_event": event, + "summary": summary, + "requested_permissions": requested_permissions if isinstance(requested_permissions, dict) else { + "items": list(requested_permissions or []) + }, + "resume_payload": dict(resume_payload or {}), + "status": "pending", + "resolved_at": None, + "resolved_by_identifier": "", + "resolution_note": "", + }, + ) + approver_service = str((cfg.settings or {}).get("approver_service") or "").strip().lower() + approver_identifier = str((cfg.settings or {}).get("approver_identifier") or "").strip() + requested_text = result_payload.get("permission_request") or result_payload.get("requested_permissions") or {} + if approver_service and approver_identifier: + try: + async_to_sync(send_message_raw)( + approver_service, + approver_identifier, + text=( + f"[codex approval] key={approval_key}\\n" + f"summary={summary or 'Codex run requires approval'}\\n" + f"requested={requested_text}\\n" + f"use: .codex approve {approval_key} or .codex deny {approval_key}" + ), + attachments=[], + metadata={"origin_tag": f"codex-approval:{approval_key}"}, + ) + except Exception: + log.exception("failed to notify approver channel for approval_key=%s", approval_key) + else: + source_service = str(provider_payload.get("source_service") or "").strip().lower() + source_channel = str(provider_payload.get("source_channel") or "").strip() + if source_service and source_channel: + try: + async_to_sync(send_message_raw)( + source_service, + source_channel, + text=( + "[codex approval] approval is pending but no approver channel is configured. " + "Set approver_service and approver_identifier in Codex settings." + ), + attachments=[], + metadata={"origin_tag": "codex-approval-missing-target"}, + ) + except Exception: + log.exception("failed to notify source channel for missing approver target") + return + event.status = "ok" if result.ok else "failed" event.error = str(result.error or "") event.payload = dict( payload, worker_processed=True, - result=dict(result.payload or {}), + result=result_payload, ) event.save(update_fields=["status", "error", "payload", "updated_at"]) + if codex_run is not None: + codex_run.status = "ok" if result.ok else "failed" + codex_run.error = str(result.error or "") + codex_run.result_payload = result_payload + codex_run.save(update_fields=["status", "error", "result_payload", "updated_at"]) if result.ok and result.external_key and event.task_id and not str(event.task.external_key or "").strip(): event.task.external_key = str(result.external_key) diff --git a/core/migrations/0034_codexrun_codexpermissionrequest_and_more.py b/core/migrations/0034_codexrun_codexpermissionrequest_and_more.py new file mode 100644 index 0000000..49c74aa --- /dev/null +++ b/core/migrations/0034_codexrun_codexpermissionrequest_and_more.py @@ -0,0 +1,187 @@ +import uuid + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0033_contactavailability_and_externalchatlink"), + ] + + operations = [ + migrations.AlterField( + model_name="externalsyncevent", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("ok", "OK"), + ("failed", "Failed"), + ("retrying", "Retrying"), + ("waiting_approval", "Waiting Approval"), + ], + default="pending", + max_length=32, + ), + ), + migrations.CreateModel( + name="CodexRun", + fields=[ + ("id", models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ("source_service", models.CharField(blank=True, default="", max_length=255)), + ("source_channel", models.CharField(blank=True, default="", max_length=255)), + ("external_chat_id", models.CharField(blank=True, default="", max_length=255)), + ( + "status", + models.CharField( + choices=[ + ("queued", "Queued"), + ("running", "Running"), + ("waiting_approval", "Waiting Approval"), + ("approved_waiting_resume", "Approved Waiting Resume"), + ("denied", "Denied"), + ("ok", "OK"), + ("failed", "Failed"), + ("cancelled", "Cancelled"), + ], + default="queued", + max_length=32, + ), + ), + ("request_payload", models.JSONField(blank=True, default=dict)), + ("result_payload", models.JSONField(blank=True, default=dict)), + ("error", models.TextField(blank=True, default="")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "derived_task_event", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="codex_runs", + to="core.derivedtaskevent", + ), + ), + ( + "epic", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="codex_runs", + to="core.taskepic", + ), + ), + ( + "project", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="codex_runs", + to="core.taskproject", + ), + ), + ( + "source_message", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="codex_runs", + to="core.message", + ), + ), + ( + "task", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="codex_runs", + to="core.derivedtask", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="codex_runs", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index(fields=["user", "status", "updated_at"], name="core_codexr_user_id_dddd7f_idx"), + models.Index( + fields=["user", "source_service", "source_channel", "created_at"], + name="core_codexr_user_id_a70a53_idx", + ), + ], + }, + ), + migrations.CreateModel( + name="CodexPermissionRequest", + fields=[ + ("id", models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ("approval_key", models.CharField(max_length=255, unique=True)), + ("summary", models.TextField(blank=True, default="")), + ("requested_permissions", models.JSONField(blank=True, default=dict)), + ("resume_payload", models.JSONField(blank=True, default=dict)), + ( + "status", + models.CharField( + choices=[ + ("pending", "Pending"), + ("approved", "Approved"), + ("denied", "Denied"), + ("expired", "Expired"), + ], + default="pending", + max_length=16, + ), + ), + ("requested_at", models.DateTimeField(auto_now_add=True)), + ("resolved_at", models.DateTimeField(blank=True, null=True)), + ("resolved_by_identifier", models.CharField(blank=True, default="", max_length=255)), + ("resolution_note", models.TextField(blank=True, default="")), + ( + "codex_run", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="permission_requests", + to="core.codexrun", + ), + ), + ( + "external_sync_event", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="codex_permission_requests", + to="core.externalsyncevent", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="codex_permission_requests", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index(fields=["user", "status", "requested_at"], name="core_codexp_user_id_ba71e9_idx"), + models.Index(fields=["approval_key"], name="core_codexp_approva_83035d_idx"), + ], + }, + ), + ] diff --git a/core/models.py b/core/models.py index 435e9fe..a388f24 100644 --- a/core/models.py +++ b/core/models.py @@ -2176,6 +2176,7 @@ class ExternalSyncEvent(models.Model): ("ok", "OK"), ("failed", "Failed"), ("retrying", "Retrying"), + ("waiting_approval", "Waiting Approval"), ) id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) @@ -2227,6 +2228,111 @@ class TaskProviderConfig(models.Model): ] +class CodexRun(models.Model): + STATUS_CHOICES = ( + ("queued", "Queued"), + ("running", "Running"), + ("waiting_approval", "Waiting Approval"), + ("approved_waiting_resume", "Approved Waiting Resume"), + ("denied", "Denied"), + ("ok", "OK"), + ("failed", "Failed"), + ("cancelled", "Cancelled"), + ) + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="codex_runs") + task = models.ForeignKey( + DerivedTask, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="codex_runs", + ) + derived_task_event = models.ForeignKey( + DerivedTaskEvent, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="codex_runs", + ) + source_message = models.ForeignKey( + Message, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="codex_runs", + ) + project = models.ForeignKey( + TaskProject, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="codex_runs", + ) + epic = models.ForeignKey( + TaskEpic, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="codex_runs", + ) + source_service = models.CharField(max_length=255, blank=True, default="") + source_channel = models.CharField(max_length=255, blank=True, default="") + external_chat_id = models.CharField(max_length=255, blank=True, default="") + status = models.CharField(max_length=32, choices=STATUS_CHOICES, default="queued") + request_payload = models.JSONField(default=dict, blank=True) + result_payload = models.JSONField(default=dict, blank=True) + error = models.TextField(blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "status", "updated_at"]), + models.Index(fields=["user", "source_service", "source_channel", "created_at"]), + ] + + +class CodexPermissionRequest(models.Model): + STATUS_CHOICES = ( + ("pending", "Pending"), + ("approved", "Approved"), + ("denied", "Denied"), + ("expired", "Expired"), + ) + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="codex_permission_requests") + codex_run = models.ForeignKey( + CodexRun, + on_delete=models.CASCADE, + related_name="permission_requests", + ) + external_sync_event = models.ForeignKey( + ExternalSyncEvent, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="codex_permission_requests", + ) + approval_key = models.CharField(max_length=255, unique=True) + summary = models.TextField(blank=True, default="") + requested_permissions = models.JSONField(default=dict, blank=True) + resume_payload = models.JSONField(default=dict, blank=True) + status = models.CharField(max_length=16, choices=STATUS_CHOICES, default="pending") + requested_at = models.DateTimeField(auto_now_add=True) + resolved_at = models.DateTimeField(null=True, blank=True) + resolved_by_identifier = models.CharField(max_length=255, blank=True, default="") + resolution_note = models.TextField(blank=True, default="") + + class Meta: + indexes = [ + models.Index(fields=["user", "status", "requested_at"]), + models.Index(fields=["approval_key"]), + ] + + class ContactAvailabilitySettings(models.Model): user = models.OneToOneField( User, diff --git a/core/tasks/codex_support.py b/core/tasks/codex_support.py new file mode 100644 index 0000000..3d1f850 --- /dev/null +++ b/core/tasks/codex_support.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import re +from typing import Any + +from django.db.models import Q + +from core.models import ExternalChatLink, PersonIdentifier + + +def channel_variants(service: str, channel: str) -> list[str]: + value = str(channel or "").strip() + if not value: + return [] + variants = [value] + service_key = str(service or "").strip().lower() + if service_key == "whatsapp": + bare = value.split("@", 1)[0].strip() + if bare and bare not in variants: + variants.append(bare) + direct = f"{bare}@s.whatsapp.net" if bare else "" + if direct and direct not in variants: + variants.append(direct) + group = f"{bare}@g.us" if bare else "" + if group and group not in variants: + variants.append(group) + if service_key == "signal": + digits = re.sub(r"[^0-9]", "", value) + if digits and digits not in variants: + variants.append(digits) + if digits: + plus = f"+{digits}" + if plus not in variants: + variants.append(plus) + return variants + + +def resolve_external_chat_id(*, user, provider: str, service: str, channel: str) -> str: + variants = channel_variants(service, channel) + if not variants: + return "" + person_identifier = ( + PersonIdentifier.objects.filter( + user=user, + service=service, + identifier__in=variants, + ) + .select_related("person") + .order_by("-id") + .first() + ) + if person_identifier is None: + return "" + link = ( + ExternalChatLink.objects.filter( + user=user, + provider=provider, + enabled=True, + ) + .filter(Q(person_identifier=person_identifier) | Q(person=person_identifier.person)) + .order_by("-updated_at", "-id") + .first() + ) + return str(getattr(link, "external_chat_id", "") or "").strip() + + +def compact_json_snippet(payload: Any, limit: int = 800) -> str: + text = str(payload or "").strip() + if len(text) <= limit: + return text + return text[:limit].rstrip() + "..." diff --git a/core/tasks/engine.py b/core/tasks/engine.py index 6307ee3..6388246 100644 --- a/core/tasks/engine.py +++ b/core/tasks/engine.py @@ -4,7 +4,6 @@ import re from asgiref.sync import sync_to_async from django.conf import settings -from django.db.models import Q from core.clients.transport import send_message_raw from core.messaging import ai as ai_runner @@ -12,21 +11,36 @@ from core.models import ( AI, Chat, ChatTaskSource, + CodexRun, DerivedTask, DerivedTaskEvent, ExternalSyncEvent, - ExternalChatLink, Message, - PersonIdentifier, TaskCompletionPattern, + TaskEpic, TaskProviderConfig, ) from core.tasks.providers import get_provider +from core.tasks.codex_support import resolve_external_chat_id _TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE) _COMPLETION_RE = re.compile(r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE) _BALANCED_HINT_RE = re.compile(r"\b(todo|task|action item|action)\b", re.IGNORECASE) _BROAD_HINT_RE = re.compile(r"\b(todo|task|action|need to|please|reminder)\b", re.IGNORECASE) +_PREFIX_HEAD_TRIM = " \t\r\n`'\"([{<*#-–—_>.,:;!/?\\|" +_LIST_TASKS_RE = re.compile( + r"^\s*(?:\.l(?:\s+list(?:\s+tasks?)?)?|\.list(?:\s+tasks?)?)\s*$", + re.IGNORECASE, +) +_UNDO_TASK_RE = re.compile( + r"^\s*\.undo(?:\s+(?:#)?(?P[A-Za-z0-9_-]+))?\s*$", + re.IGNORECASE, +) +_EPIC_CREATE_RE = re.compile( + r"^\s*(?:\.epic\b|epic)\s*[:\-]?\s*(?P.+?)\s*$", + re.IGNORECASE | re.DOTALL, +) +_EPIC_TOKEN_RE = re.compile(r"\[\s*epic\s*:\s*([^\]]+?)\s*\]", re.IGNORECASE) def _channel_variants(service: str, channel: str) -> list[str]: @@ -57,27 +71,44 @@ def _channel_variants(service: str, channel: str) -> list[str]: async def _resolve_source_mappings(message: Message) -> list[ChatTaskSource]: - variants = _channel_variants(message.source_service or "", message.source_chat_id or "") - if str(message.source_service or "").strip().lower() == "signal": - signal_value = str(message.source_chat_id or "").strip() - if signal_value: - companions = await sync_to_async(list)( + lookup_service = str(message.source_service or "").strip().lower() + variants = _channel_variants(lookup_service, message.source_chat_id or "") + session_identifier = getattr(getattr(message, "session", None), "identifier", None) + canonical_service = str(getattr(session_identifier, "service", "") or "").strip().lower() + canonical_identifier = str(getattr(session_identifier, "identifier", "") or "").strip() + if lookup_service == "web" and canonical_service and canonical_service != "web": + lookup_service = canonical_service + variants = _channel_variants(lookup_service, message.source_chat_id or "") + for expanded in _channel_variants(lookup_service, canonical_identifier): + if expanded and expanded not in variants: + variants.append(expanded) + elif canonical_service and canonical_identifier and canonical_service == lookup_service: + for expanded in _channel_variants(canonical_service, canonical_identifier): + if expanded and expanded not in variants: + variants.append(expanded) + if lookup_service == "signal": + companions: list[str] = [] + for value in list(variants): + signal_value = str(value or "").strip() + if not signal_value: + continue + companions += await sync_to_async(list)( Chat.objects.filter(source_uuid=signal_value).values_list("source_number", flat=True) ) companions += await sync_to_async(list)( Chat.objects.filter(source_number=signal_value).values_list("source_uuid", flat=True) ) - for candidate in companions: - for expanded in _channel_variants("signal", str(candidate or "").strip()): - if expanded and expanded not in variants: - variants.append(expanded) + for candidate in companions: + for expanded in _channel_variants("signal", str(candidate or "").strip()): + if expanded and expanded not in variants: + variants.append(expanded) if not variants: return [] return await sync_to_async(list)( ChatTaskSource.objects.filter( user=message.user, enabled=True, - service=message.source_service, + service=lookup_service, channel_identifier__in=variants, ).select_related("project", "epic") ) @@ -107,6 +138,58 @@ def _parse_prefixes(raw) -> list[str]: return rows or ["task:", "todo:", "action:"] +def _prefix_roots(prefixes: list[str]) -> list[str]: + roots: list[str] = [] + for value in prefixes: + token = str(value or "").strip().lower() + if not token: + continue + token = token.lstrip(_PREFIX_HEAD_TRIM) + match = re.match(r"([a-z0-9]+)", token) + if not match: + continue + root = str(match.group(1) or "").strip() + if root and root not in roots: + roots.append(root) + return roots + + +def _has_task_prefix(text: str, prefixes: list[str]) -> bool: + body = str(text or "").strip().lower() + if not body: + return False + if any(body.startswith(prefix) for prefix in prefixes): + return True + trimmed = body.lstrip(_PREFIX_HEAD_TRIM) + roots = _prefix_roots(prefixes) + if not trimmed or not roots: + return False + for root in roots: + if re.match(rf"^{re.escape(root)}\b(?:\s*[:\-–—#>.,;!]*\s*|\s+)", trimmed): + return True + return False + + +def _strip_task_prefix(text: str, prefixes: list[str]) -> str: + body = str(text or "").strip() + if not body: + return "" + trimmed = body.lstrip(_PREFIX_HEAD_TRIM) + roots = _prefix_roots(prefixes) + if not trimmed or not roots: + return body + for root in roots: + match = re.match( + rf"^{re.escape(root)}\b(?:\s*[:\-–—#>.,;!]*\s*|\s+)(.+)$", + trimmed, + flags=re.IGNORECASE | re.DOTALL, + ) + if match: + cleaned = str(match.group(1) or "").strip() + return cleaned or body + return body + + def _normalize_flags(raw: dict | None) -> dict: row = dict(raw or {}) return { @@ -157,7 +240,7 @@ def _is_task_candidate(text: str, flags: dict) -> bool: return False body_lower = body.lower() prefixes = list(flags.get("allowed_prefixes") or []) - has_prefix = any(body_lower.startswith(prefix) for prefix in prefixes) + has_prefix = _has_task_prefix(body_lower, prefixes) if bool(flags.get("require_prefix")) and not has_prefix: return False mode = str(flags.get("match_mode") or "balanced").strip().lower() @@ -207,10 +290,13 @@ async def _derive_title(message: Message) -> str: async def _derive_title_with_flags(message: Message, flags: dict) -> str: + prefixes = list(flags.get("allowed_prefixes") or []) if not bool(flags.get("ai_title_enabled", True)): - text = str(message.text or "").strip() + text = _strip_task_prefix(str(message.text or "").strip(), prefixes) return (text or "Untitled task")[:255] - return await _derive_title(message) + title = await _derive_title(message) + cleaned = _strip_task_prefix(str(title or "").strip(), prefixes) + return (cleaned or title or "Untitled task")[:255] async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: str) -> None: @@ -221,36 +307,51 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s provider_settings = dict(getattr(cfg, "settings", {}) or {}) provider = get_provider(provider_name) idempotency_key = f"{provider_name}:{task.id}:{event.id}" - variants = _channel_variants(task.source_service or "", task.source_channel or "") - person_identifier = None - if variants: - person_identifier = await sync_to_async( - lambda: PersonIdentifier.objects.filter( - user=task.user, - service=task.source_service, - identifier__in=variants, - ) - .select_related("person") - .order_by("-id") - .first() - )() - external_chat_id = "" - if person_identifier is not None: - link = await sync_to_async( - lambda: ExternalChatLink.objects.filter( - user=task.user, - provider=provider_name, - enabled=True, - ) - .filter( - Q(person_identifier=person_identifier) - | Q(person=person_identifier.person) - ) - .order_by("-updated_at", "-id") - .first() - )() - if link is not None: - external_chat_id = str(link.external_chat_id or "").strip() + external_chat_id = await sync_to_async(resolve_external_chat_id)( + user=task.user, + provider=provider_name, + service=str(task.source_service or ""), + channel=str(task.source_channel or ""), + ) + cached_project = task._state.fields_cache.get("project") + cached_epic = task._state.fields_cache.get("epic") + project_name = str(getattr(cached_project, "name", "") or "") + epic_name = str(getattr(cached_epic, "name", "") or "") + request_payload = { + "task_id": str(task.id), + "reference_code": str(task.reference_code or ""), + "title": str(task.title or ""), + "external_key": str(task.external_key or ""), + "project_name": project_name, + "epic_name": epic_name, + "source_service": str(task.source_service or ""), + "source_channel": str(task.source_channel or ""), + "external_chat_id": external_chat_id, + "origin_message_id": str(getattr(task, "origin_message_id", "") or ""), + "trigger_message_id": str(getattr(event, "source_message_id", "") or getattr(task, "origin_message_id", "") or ""), + "mode": "default", + "payload": event.payload, + } + codex_run = await sync_to_async(CodexRun.objects.create)( + user=task.user, + task_id=task.id, + derived_task_event_id=event.id, + source_message_id=(event.source_message_id or task.origin_message_id), + project_id=task.project_id, + epic_id=task.epic_id, + source_service=str(task.source_service or ""), + source_channel=str(task.source_channel or ""), + external_chat_id=external_chat_id, + status="queued", + request_payload={ + "action": action, + "provider_payload": dict(request_payload), + "idempotency_key": idempotency_key, + }, + result_payload={}, + error="", + ) + request_payload["codex_run_id"] = str(codex_run.id) # Worker-backed providers are queued and executed by `manage.py codex_worker`. if bool(getattr(provider, "run_in_worker", False)): @@ -264,16 +365,7 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s "status": "pending", "payload": { "action": action, - "provider_payload": { - "task_id": str(task.id), - "title": task.title, - "external_key": task.external_key, - "reference_code": task.reference_code, - "source_service": str(task.source_service or ""), - "source_channel": str(task.source_channel or ""), - "external_chat_id": external_chat_id, - "payload": event.payload, - }, + "provider_payload": dict(request_payload), }, "error": "", }, @@ -281,34 +373,11 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s return if action == "create": - result = provider.create_task(provider_settings, { - "task_id": str(task.id), - "title": task.title, - "external_key": task.external_key, - "reference_code": task.reference_code, - "source_service": str(task.source_service or ""), - "source_channel": str(task.source_channel or ""), - "external_chat_id": external_chat_id, - }) + result = provider.create_task(provider_settings, dict(request_payload)) elif action == "complete": - result = provider.mark_complete(provider_settings, { - "task_id": str(task.id), - "external_key": task.external_key, - "reference_code": task.reference_code, - "source_service": str(task.source_service or ""), - "source_channel": str(task.source_channel or ""), - "external_chat_id": external_chat_id, - }) + result = provider.mark_complete(provider_settings, dict(request_payload)) else: - result = provider.append_update(provider_settings, { - "task_id": str(task.id), - "external_key": task.external_key, - "reference_code": task.reference_code, - "source_service": str(task.source_service or ""), - "source_channel": str(task.source_channel or ""), - "external_chat_id": external_chat_id, - "payload": event.payload, - }) + result = provider.append_update(provider_settings, dict(request_payload)) status = "ok" if result.ok else "failed" await sync_to_async(ExternalSyncEvent.objects.update_or_create)( @@ -323,6 +392,10 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s "error": str(result.error or ""), }, ) + codex_run.status = status + codex_run.result_payload = dict(result.payload or {}) + codex_run.error = str(result.error or "") + await sync_to_async(codex_run.save)(update_fields=["status", "result_payload", "error", "updated_at"]) if result.ok and result.external_key and not task.external_key: task.external_key = str(result.external_key) await sync_to_async(task.save)(update_fields=["external_key"]) @@ -338,6 +411,121 @@ async def _completion_regex(message: Message) -> re.Pattern: return re.compile(r"\\b(?:" + "|".join(re.escape(p) for p in phrases) + r")\\s*#([A-Za-z0-9_-]+)\\b", re.IGNORECASE) +async def _send_scope_message(source: ChatTaskSource, message: Message, text: str) -> None: + await send_message_raw( + source.service or message.source_service or "web", + source.channel_identifier or message.source_chat_id or "", + text=text, + attachments=[], + metadata={"origin": "task_scope_command"}, + ) + + +async def _handle_scope_task_commands(message: Message, sources: list[ChatTaskSource], text: str) -> bool: + if not sources: + return False + body = str(text or "").strip() + source = sources[0] + if _LIST_TASKS_RE.match(body): + open_rows = await sync_to_async(list)( + DerivedTask.objects.filter( + user=message.user, + project=source.project, + source_service=source.service, + source_channel=source.channel_identifier, + ) + .exclude(status_snapshot="completed") + .order_by("-created_at")[:20] + ) + if not open_rows: + await _send_scope_message(source, message, "[task] no open tasks in this chat.") + return True + lines = ["[task] open tasks:"] + for row in open_rows: + lines.append(f"- #{row.reference_code} {row.title}") + await _send_scope_message(source, message, "\n".join(lines)) + return True + + undo_match = _UNDO_TASK_RE.match(body) + if undo_match: + reference = str(undo_match.group("reference") or "").strip() + if reference: + task = await sync_to_async( + lambda: DerivedTask.objects.filter( + user=message.user, + project=source.project, + source_service=source.service, + source_channel=source.channel_identifier, + reference_code=reference, + ) + .order_by("-created_at") + .first() + )() + else: + task = await sync_to_async( + lambda: DerivedTask.objects.filter( + user=message.user, + project=source.project, + source_service=source.service, + source_channel=source.channel_identifier, + ) + .order_by("-created_at") + .first() + )() + if task is None: + await _send_scope_message(source, message, "[task] nothing to undo in this chat.") + return True + ref = str(task.reference_code or "") + title = str(task.title or "") + await sync_to_async(task.delete)() + await _send_scope_message(source, message, f"[task] removed #{ref}: {title}") + return True + + return False + + +def _extract_epic_name_from_text(text: str) -> str: + body = str(text or "") + match = _EPIC_TOKEN_RE.search(body) + if not match: + return "" + return str(match.group(1) or "").strip() + + +def _strip_epic_token(text: str) -> str: + body = str(text or "") + cleaned = _EPIC_TOKEN_RE.sub("", body) + return re.sub(r"\s{2,}", " ", cleaned).strip() + + +async def _handle_epic_create_command(message: Message, sources: list[ChatTaskSource], text: str) -> bool: + match = _EPIC_CREATE_RE.match(str(text or "")) + if not match or not sources: + return False + name = str(match.group("name") or "").strip() + if not name: + return True + source = sources[0] + epic, created = await sync_to_async(TaskEpic.objects.get_or_create)( + project=source.project, + name=name, + ) + state = "created" if created else "already exists" + await _send_scope_message( + source, + message, + ( + f"[epic] {state}: {epic.name}\n" + "WhatsApp usage:\n" + "- create epic: epic: (or .epic )\n" + "- add task to epic: task: [epic:]\n" + "- list tasks: .l list tasks\n" + "- undo latest task: .undo" + ), + ) + return True + + async def process_inbound_task_intelligence(message: Message) -> None: if message is None: return @@ -350,6 +538,10 @@ async def process_inbound_task_intelligence(message: Message) -> None: sources = await _resolve_source_mappings(message) if not sources: return + if await _handle_scope_task_commands(message, sources, text): + return + if await _handle_epic_create_command(message, sources, text): + return completion_allowed = any(bool(_effective_flags(source).get("completion_enabled")) for source in sources) completion_rx = await _completion_regex(message) if completion_allowed else None @@ -399,21 +591,37 @@ async def process_inbound_task_intelligence(message: Message) -> None: flags = _effective_flags(source) if not bool(flags.get("derive_enabled", True)): continue - if not _is_task_candidate(text, flags): + task_text = _strip_epic_token(text) + if not _is_task_candidate(task_text, flags): continue - title = await _derive_title_with_flags(message, flags) + epic = source.epic + epic_name = _extract_epic_name_from_text(text) + if epic_name: + epic, _ = await sync_to_async(TaskEpic.objects.get_or_create)( + project=source.project, + name=epic_name, + ) + cloned_message = message + if task_text != text: + cloned_message = Message( + user=message.user, + text=task_text, + source_service=message.source_service, + source_chat_id=message.source_chat_id, + ) + title = await _derive_title_with_flags(cloned_message, flags) reference = await sync_to_async(_next_reference)(message.user, source.project) task = await sync_to_async(DerivedTask.objects.create)( user=message.user, project=source.project, - epic=source.epic, + epic=epic, title=title, - source_service=message.source_service or "web", - source_channel=message.source_chat_id or "", + source_service=source.service or message.source_service or "web", + source_channel=source.channel_identifier or message.source_chat_id or "", origin_message=message, reference_code=reference, status_snapshot="open", - immutable_payload={"origin_text": text, "flags": flags}, + immutable_payload={"origin_text": text, "task_text": task_text, "flags": flags}, ) event = await sync_to_async(DerivedTaskEvent.objects.create)( task=task, @@ -426,8 +634,8 @@ async def process_inbound_task_intelligence(message: Message) -> None: if bool(flags.get("announce_task_id", False)): try: await send_message_raw( - message.source_service or "web", - message.source_chat_id or "", + source.service or message.source_service or "web", + source.channel_identifier or message.source_chat_id or "", text=f"[task] Created #{task.reference_code}: {task.title}", attachments=[], metadata={"origin": "task_announce"}, @@ -435,3 +643,22 @@ async def process_inbound_task_intelligence(message: Message) -> None: except Exception: # Announcement is best-effort and should not block derivation. pass + scope_count = await sync_to_async( + lambda: DerivedTask.objects.filter( + user=message.user, + project=source.project, + source_service=source.service, + source_channel=source.channel_identifier, + ).count() + )() + if scope_count > 0 and scope_count % 10 == 0: + try: + await send_message_raw( + source.service or message.source_service or "web", + source.channel_identifier or message.source_chat_id or "", + text="[task] tip: use .l list tasks to review tasks. use .undo to uncreate the latest task.", + attachments=[], + metadata={"origin": "task_reminder"}, + ) + except Exception: + pass diff --git a/core/tasks/providers/codex_cli.py b/core/tasks/providers/codex_cli.py index 4b25441..bb5214b 100644 --- a/core/tasks/providers/codex_cli.py +++ b/core/tasks/providers/codex_cli.py @@ -66,6 +66,14 @@ class CodexCLITaskProvider(TaskProvider): except Exception: parsed = {"raw_stdout": stdout} + parsed_status = str(parsed.get("status") or "").strip().lower() + permission_request = parsed.get("permission_request") + requires_approval = bool( + parsed.get("requires_approval") + or parsed_status in {"requires_approval", "waiting_approval"} + or permission_request + ) + ext = ( str(parsed.get("external_key") or "").strip() or str(parsed.get("task_id") or "").strip() @@ -78,6 +86,8 @@ class CodexCLITaskProvider(TaskProvider): "returncode": int(completed.returncode), "stdout": stdout[:4000], "stderr": stderr[:4000], + "parsed_status": parsed_status, + "requires_approval": requires_approval, } out_payload.update(parsed) return ProviderResult(ok=ok, external_key=ext, error=("" if ok else stderr[:4000]), payload=out_payload) diff --git a/core/templates/pages/codex-settings.html b/core/templates/pages/codex-settings.html new file mode 100644 index 0000000..eb3b1be --- /dev/null +++ b/core/templates/pages/codex-settings.html @@ -0,0 +1,144 @@ +{% extends "base.html" %} + +{% block content %} +
+
+

Codex Status

+

Global per-user Codex task-sync status, runs, and approvals.

+ +
+
+ Provider codex_cli + Health {% if health and health.ok %}online{% else %}offline{% endif %} + Pending {{ queue_counts.pending }} + Waiting Approval {{ queue_counts.waiting_approval }} +
+ {% if health and health.error %} +

Healthcheck error: {{ health.error }}

+ {% endif %} +

Config snapshot: command={{ provider_settings.command }}, workspace={{ provider_settings.workspace_root|default:"-" }}, profile={{ provider_settings.default_profile|default:"-" }}, instance={{ provider_settings.instance_label }}, approver={{ provider_settings.approver_service }} {{ provider_settings.approver_identifier }}.

+

Edit in Task Settings.

+
+ +
+

Run Filters

+
+
+
+ + +
+
+ + +
+
+ + +
+
+ +
+ +
+
+
+ + +
+
+ +
+
+ +
+

Runs

+ + + + {% for run in runs %} + + + + + + + + + + + {% empty %} + + {% endfor %} + +
WhenStatusService/ChannelProjectTaskSummaryFilesLinks
{{ run.created_at }}{{ run.status }}{{ run.source_service }} · {{ run.source_channel }}{{ run.project.name|default:"-" }}{% if run.task %}#{{ run.task.reference_code }}{% else %}-{% endif %}{{ run.result_payload.summary|default:"-" }}{{ run.result_payload.files_modified_count|default:"0" }} +
+ Details +

Request

+
{{ run.request_payload }}
+

Result

+
{{ run.result_payload }}
+

Error {{ run.error|default:"-" }}

+
+
No runs.
+
+ +
+

Permission Queue

+ + + + {% for row in permission_requests %} + + + + + + + + + + + {% empty %} + + {% endfor %} + +
RequestedApproval KeyStatusSummaryPermissionsRunTaskActions
{{ row.requested_at }}{{ row.approval_key }}{{ row.status }}{{ row.summary|default:"-" }}
{{ row.requested_permissions }}
{{ row.codex_run_id }}{% if row.codex_run.task %}#{{ row.codex_run.task.reference_code }}{% else %}-{% endif %} + {% if row.status == 'pending' %} +
+ {% csrf_token %} + + + +
+
+ {% csrf_token %} + + + +
+ {% else %} + - + {% endif %} +
No permission requests.
+
+
+
+ +{% endblock %} diff --git a/core/templates/pages/command-routing.html b/core/templates/pages/command-routing.html index d397d1f..55197db 100644 --- a/core/templates/pages/command-routing.html +++ b/core/templates/pages/command-routing.html @@ -38,7 +38,7 @@
- +
@@ -446,4 +446,30 @@ border-top: 1px solid #dbdbdb; } + {% endblock %} diff --git a/core/templates/pages/tasks-detail.html b/core/templates/pages/tasks-detail.html index f913713..10eee0e 100644 --- a/core/templates/pages/tasks-detail.html +++ b/core/templates/pages/tasks-detail.html @@ -9,7 +9,15 @@ · Source message {{ task.origin_message_id }} {% endif %}

- +
+ Back +
+ {% csrf_token %} + + + +
+

Events

@@ -58,6 +66,44 @@
+
+

Codex Runs

+ + + + {% for row in codex_runs %} + + + + + + + + {% empty %} + + {% endfor %} + +
WhenStatusSummaryFilesError
{{ row.updated_at }}{{ row.status }}{{ row.result_payload.summary|default:"-" }}{{ row.result_payload.files_modified_count|default:"0" }}{{ row.error|default:"" }}
No Codex runs.
+
+
+

Permission Requests

+ + + + {% for row in permission_requests %} + + + + + + + + {% empty %} + + {% endfor %} + +
WhenApproval KeyStatusSummaryResolved
{{ row.requested_at }}{{ row.approval_key }}{{ row.status }}{{ row.summary|default:"-" }}{{ row.resolved_at|default:"-" }}
No permission requests.
+