from __future__ import annotations import json import subprocess from .base import ProviderResult, TaskProvider class CodexCLITaskProvider(TaskProvider): name = "codex_cli" run_in_worker = True def _timeout(self, config: dict) -> int: try: return max(1, int(config.get("timeout_seconds") or 60)) except Exception: return 60 def _command(self, config: dict) -> str: return str(config.get("command") or "codex").strip() or "codex" def _workspace(self, config: dict) -> str: return str(config.get("workspace_root") or "").strip() def _profile(self, config: dict) -> str: return str(config.get("default_profile") or "").strip() def _run(self, config: dict, op: str, payload: dict) -> ProviderResult: cmd = [self._command(config), "task-sync", "--op", str(op)] workspace = self._workspace(config) if workspace: cmd.extend(["--workspace", workspace]) profile = self._profile(config) if profile: cmd.extend(["--profile", profile]) command_timeout = self._timeout(config) data = json.dumps(dict(payload or {}), separators=(",", ":")) cmd.extend(["--payload-json", data]) try: completed = subprocess.run( cmd, capture_output=True, text=True, timeout=command_timeout, check=False, cwd=workspace if workspace else None, ) except subprocess.TimeoutExpired: return ProviderResult( ok=False, error=f"codex_cli_timeout_{command_timeout}s", payload={"op": op, "timeout_seconds": command_timeout}, ) except Exception as exc: return ProviderResult(ok=False, error=f"codex_cli_exec_error:{exc}", payload={"op": op}) stdout = str(completed.stdout or "").strip() stderr = str(completed.stderr or "").strip() parsed = {} if stdout: try: parsed = json.loads(stdout) if not isinstance(parsed, dict): parsed = {"raw_stdout": stdout} except Exception: parsed = {"raw_stdout": stdout} ext = ( str(parsed.get("external_key") or "").strip() or str(parsed.get("task_id") or "").strip() or str(payload.get("external_key") or "").strip() ) ok = completed.returncode == 0 out_payload = { "op": op, "returncode": int(completed.returncode), "stdout": stdout[:4000], "stderr": stderr[:4000], } out_payload.update(parsed) return ProviderResult(ok=ok, external_key=ext, error=("" if ok else stderr[:4000]), payload=out_payload) def healthcheck(self, config: dict) -> ProviderResult: command = self._command(config) try: completed = subprocess.run( [command, "--version"], capture_output=True, text=True, timeout=max(1, min(20, self._timeout(config))), check=False, ) except Exception as exc: return ProviderResult(ok=False, error=f"codex_cli_unavailable:{exc}") return ProviderResult( ok=(completed.returncode == 0), payload={ "returncode": int(completed.returncode), "stdout": str(completed.stdout or "").strip()[:1000], "stderr": str(completed.stderr or "").strip()[:1000], }, error=("" if completed.returncode == 0 else str(completed.stderr or "").strip()[:1000]), ) def create_task(self, config: dict, payload: dict) -> ProviderResult: return self._run(config, "create", payload) def append_update(self, config: dict, payload: dict) -> ProviderResult: return self._run(config, "append_update", payload) def mark_complete(self, config: dict, payload: dict) -> ProviderResult: return self._run(config, "mark_complete", payload) def link_task(self, config: dict, payload: dict) -> ProviderResult: return self._run(config, "link_task", payload)