from __future__ import annotations import re from asgiref.sync import sync_to_async from core.gateway.commands import ( GatewayCommandContext, GatewayCommandRoute, dispatch_gateway_command, ) from core.models import ( CodexPermissionRequest, CodexRun, DerivedTask, ExternalSyncEvent, Person, TaskProject, User, ) from core.tasks.engine import create_task_record_and_sync, mark_task_completed_and_sync APPROVAL_PROVIDER_COMMANDS = { ".claude": "claude", ".codex": "codex_cli", } APPROVAL_EVENT_PREFIX = "codex_approval" ACTION_TO_STATUS = {"approve": "approved", "reject": "denied"} TASK_COMMAND_MATCH_RE = re.compile(r"^\s*(?:\.tasks\b|\.l\b|\.list\b)", re.IGNORECASE) def gateway_help_lines() -> list[str]: return [ "Gateway commands:", " .contacts — list contacts", " .whoami — show current user", " .help — show this help", "Approval commands:", " .approval list-pending [all] — list pending approval requests", " .approval approve — approve a request", " .approval reject — reject a request", " .approval status — check request status", "Task commands:", " .l — shortcut for open task list", " .tasks list [status] [limit] — list tasks", " .tasks add :: — create task in project", " .tasks show #<ref> — show task details", " .tasks complete #<ref> — mark task complete", " .tasks undo #<ref> — remove task", ] def _resolve_request_provider(request): event = getattr(request, "external_sync_event", None) if event is None: return "" return str(getattr(event, "provider", "") or "").strip() async def _apply_approval_decision(request, decision): status = ACTION_TO_STATUS.get(decision, decision) request.status = status await sync_to_async(request.save)(update_fields=["status"]) run = None if request.codex_run_id: run = await sync_to_async(CodexRun.objects.get)(pk=request.codex_run_id) run.status = "approved_waiting_resume" if status == "approved" else status await sync_to_async(run.save)(update_fields=["status"]) if request.external_sync_event_id: evt = await sync_to_async(ExternalSyncEvent.objects.get)( pk=request.external_sync_event_id ) evt.status = "ok" await sync_to_async(evt.save)(update_fields=["status"]) user = await sync_to_async(User.objects.get)(pk=request.user_id) task = None if run is not None and run.task_id: task = await sync_to_async(DerivedTask.objects.get)(pk=run.task_id) ikey = f"{APPROVAL_EVENT_PREFIX}:{request.approval_key}:{status}" await sync_to_async(ExternalSyncEvent.objects.get_or_create)( idempotency_key=ikey, defaults={ "user": user, "task": task, "provider": "codex_cli", "status": "pending", "payload": {}, "error": "", }, ) async def _approval_list_pending(user, scope, emit): _ = scope requests = await sync_to_async(list)( CodexPermissionRequest.objects.filter(user=user, status="pending").order_by( "-requested_at" )[:20] ) emit(f"pending={len(requests)}") for req in requests: emit(f" {req.approval_key}: {req.summary}") async def _approval_status(user, approval_key, emit): try: req = await sync_to_async(CodexPermissionRequest.objects.get)( user=user, approval_key=approval_key ) emit(f"status={req.status} key={req.approval_key}") except CodexPermissionRequest.DoesNotExist: emit(f"approval_key_not_found:{approval_key}") async def handle_approval_command(user, body, emit): command = str(body or "").strip() for prefix, expected_provider in APPROVAL_PROVIDER_COMMANDS.items(): if command.startswith(prefix + " ") or command == prefix: sub = command[len(prefix) :].strip() parts = sub.split() if len(parts) >= 2 and parts[0] in ("approve", "reject"): action, approval_key = parts[0], parts[1] try: req = await sync_to_async( CodexPermissionRequest.objects.select_related( "external_sync_event" ).get )(user=user, approval_key=approval_key) except CodexPermissionRequest.DoesNotExist: emit(f"approval_key_not_found:{approval_key}") return True provider = _resolve_request_provider(req) if not provider.startswith(expected_provider): emit( f"approval_key_not_for_provider:{approval_key} provider={provider}" ) return True await _apply_approval_decision(req, action) emit(f"{action}d: {approval_key}") return True emit(f"usage: {prefix} approve|reject <key>") return True if not command.startswith(".approval"): return False rest = command[len(".approval") :].strip() if rest.split() and rest.split()[0] in ("approve", "reject"): parts = rest.split() action = parts[0] approval_key = parts[1] if len(parts) > 1 else "" if not approval_key: emit("usage: .approval approve|reject <key>") return True try: req = await sync_to_async( CodexPermissionRequest.objects.select_related("external_sync_event").get )(user=user, approval_key=approval_key) except CodexPermissionRequest.DoesNotExist: emit(f"approval_key_not_found:{approval_key}") return True await _apply_approval_decision(req, action) emit(f"{action}d: {approval_key}") return True if rest.startswith("list-pending"): scope = rest[len("list-pending") :].strip() or "mine" await _approval_list_pending(user, scope, emit) return True if rest.startswith("status "): approval_key = rest[len("status ") :].strip() await _approval_status(user, approval_key, emit) return True emit( "approval: .approval approve|reject <key> | " ".approval list-pending [all] | " ".approval status <key>" ) return True def _parse_task_create(rest: str) -> tuple[str, str]: text = str(rest or "").strip() if not text.lower().startswith("add "): return "", "" payload = text[4:].strip() if "::" in payload: project_name, title = payload.split("::", 1) return str(project_name or "").strip(), str(title or "").strip() return "", "" async def handle_tasks_command( user, body, emit, *, service: str = "", channel_identifier: str = "", sender_identifier: str = "", ): command = str(body or "").strip() lower_command = command.lower() if not TASK_COMMAND_MATCH_RE.match(command): return False if lower_command.startswith(".tasks"): rest = command[len(".tasks") :].strip() elif lower_command.startswith(".list") or lower_command.startswith(".l"): rest = "list" else: rest = "list " + command[2:].strip() if len(command) > 2 else "list" if rest.startswith("list"): parts = rest.split() status_filter = parts[1] if len(parts) > 1 else "open" limit = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else 10 tasks = await sync_to_async(list)( DerivedTask.objects.filter( user=user, status_snapshot=status_filter ).order_by("-id")[:limit] ) if not tasks: emit(f"no {status_filter} tasks") else: for task in tasks: emit(f"#{task.reference_code} [{task.status_snapshot}] {task.title}") return True project_name, title = _parse_task_create(rest) if project_name or rest.startswith("add "): if not project_name or not title: emit("usage: .tasks add <project> :: <title>") return True project = await sync_to_async( lambda: TaskProject.objects.filter(user=user, name__iexact=project_name) .order_by("name") .first() )() if project is None: emit(f"project_not_found:{project_name}") return True task, _event = await create_task_record_and_sync( user=user, project=project, title=title, source_service=str(service or "web").strip().lower() or "web", source_channel=str(channel_identifier or "").strip(), actor_identifier=str(sender_identifier or "").strip(), immutable_payload={ "origin": "gateway.tasks.add", "channel_service": str(service or "").strip().lower(), "channel_identifier": str(channel_identifier or "").strip(), }, event_payload={ "command": ".tasks add", "via": "gateway_builtin", }, ) emit(f"created #{task.reference_code} [{project.name}] {task.title}") return True if rest.startswith("show "): ref = rest[len("show ") :].strip().lstrip("#") try: task = await sync_to_async(DerivedTask.objects.get)( user=user, reference_code=ref ) emit(f"#{task.reference_code} {task.title}") emit(f"status: {task.status_snapshot}") except DerivedTask.DoesNotExist: emit(f"task_not_found:#{ref}") return True if rest.startswith("complete "): ref = rest[len("complete ") :].strip().lstrip("#") try: task = await sync_to_async(DerivedTask.objects.select_related("project").get)( user=user, reference_code=ref ) await mark_task_completed_and_sync( task=task, actor_identifier=str(sender_identifier or "").strip(), payload={ "marker": ref, "command": ".tasks complete", "via": "gateway_builtin", }, ) emit(f"completed #{ref}") except DerivedTask.DoesNotExist: emit(f"task_not_found:#{ref}") return True if rest.startswith("undo "): ref = rest[len("undo ") :].strip().lstrip("#") try: task = await sync_to_async(DerivedTask.objects.get)( user=user, reference_code=ref ) await sync_to_async(task.delete)() emit(f"removed #{ref}") except DerivedTask.DoesNotExist: emit(f"task_not_found:#{ref}") return True emit( "tasks: .l | .tasks list [status] [limit] | " ".tasks add <project> :: <title> | " ".tasks show #<ref> | " ".tasks complete #<ref> | " ".tasks undo #<ref>" ) return True async def dispatch_builtin_gateway_command( *, user, command_text: str, service: str, channel_identifier: str, sender_identifier: str, source_message, message_meta: dict, payload: dict, emit, ) -> bool: text = str(command_text or "").strip() async def _contacts_handler(_ctx, out): persons = await sync_to_async(list)(Person.objects.filter(user=user).order_by("name")) if not persons: out("No contacts found.") return True out("Contacts: " + ", ".join([p.name for p in persons])) return True async def _help_handler(_ctx, out): for line in gateway_help_lines(): out(line) return True async def _whoami_handler(_ctx, out): out(str(user.__dict__)) return True async def _approval_handler(_ctx, out): return await handle_approval_command(user, text, out) async def _tasks_handler(_ctx, out): return await handle_tasks_command( user, text, out, service=service, channel_identifier=channel_identifier, sender_identifier=sender_identifier, ) routes = [ GatewayCommandRoute( name="contacts", scope_key="gateway.contacts", matcher=lambda value: str(value or "").strip().lower() == ".contacts", handler=_contacts_handler, ), GatewayCommandRoute( name="help", scope_key="gateway.help", matcher=lambda value: str(value or "").strip().lower() == ".help", handler=_help_handler, ), GatewayCommandRoute( name="whoami", scope_key="gateway.whoami", matcher=lambda value: str(value or "").strip().lower() == ".whoami", handler=_whoami_handler, ), GatewayCommandRoute( name="approval", scope_key="gateway.approval", matcher=lambda value: str(value or "").strip().lower().startswith(".approval") or any( str(value or "").strip().lower().startswith(prefix + " ") or str(value or "").strip().lower() == prefix for prefix in APPROVAL_PROVIDER_COMMANDS ), handler=_approval_handler, ), GatewayCommandRoute( name="tasks", scope_key="gateway.tasks", matcher=lambda value: bool(TASK_COMMAND_MATCH_RE.match(str(value or ""))), handler=_tasks_handler, ), ] handled = await dispatch_gateway_command( context=GatewayCommandContext( user=user, source_message=source_message, service=str(service or "xmpp"), channel_identifier=str(channel_identifier or ""), sender_identifier=str(sender_identifier or ""), message_text=text, message_meta=dict(message_meta or {}), payload=dict(payload or {}), ), routes=routes, emit=emit, ) if not handled and text.startswith("."): emit("No such command") return handled