Files
GIA/core/tests/test_codex_cli_provider.py

61 lines
2.2 KiB
Python

from __future__ import annotations
from subprocess import CompletedProcess, TimeoutExpired
from unittest.mock import patch
from django.test import SimpleTestCase
from core.tasks.providers.codex_cli import CodexCLITaskProvider
class CodexCLITaskProviderTests(SimpleTestCase):
def setUp(self):
self.provider = CodexCLITaskProvider()
@patch("core.tasks.providers.codex_cli.subprocess.run")
def test_healthcheck_success(self, run_mock):
run_mock.return_value = CompletedProcess(
args=["codex", "--version"],
returncode=0,
stdout="codex 1.2.3\n",
stderr="",
)
result = self.provider.healthcheck({"command": "codex", "timeout_seconds": 5})
self.assertTrue(result.ok)
self.assertIn("codex", str(result.payload.get("stdout") or ""))
@patch("core.tasks.providers.codex_cli.subprocess.run")
def test_create_task_builds_task_sync_command(self, run_mock):
run_mock.return_value = CompletedProcess(
args=[],
returncode=0,
stdout='{"external_key":"cx-123"}',
stderr="",
)
result = self.provider.create_task(
{
"command": "codex",
"workspace_root": "/tmp/work",
"default_profile": "default",
"timeout_seconds": 30,
},
{
"task_id": "t1",
"title": "hello",
"reference_code": "42",
},
)
self.assertTrue(result.ok)
self.assertEqual("cx-123", result.external_key)
args = run_mock.call_args.args[0]
self.assertEqual(["codex", "task-sync", "--op", "create"], args[:4])
self.assertIn("--workspace", args)
self.assertIn("--payload-json", args)
@patch("core.tasks.providers.codex_cli.subprocess.run")
def test_timeout_maps_to_failed_result(self, run_mock):
run_mock.side_effect = TimeoutExpired(cmd=["codex"], timeout=10)
result = self.provider.append_update({"command": "codex", "timeout_seconds": 10}, {"task_id": "t1"})
self.assertFalse(result.ok)
self.assertIn("timeout", result.error)