from __future__ import annotations from unittest.mock import MagicMock from asgiref.sync import async_to_sync from django.test import TestCase from core.gateway.builtin import ( gateway_help_lines, handle_approval_command, handle_tasks_command, ) from core.models import ( CodexPermissionRequest, CodexRun, DerivedTask, ExternalSyncEvent, TaskProject, User, ) class XMPPGatewayApprovalCommandTests(TestCase): def setUp(self): self.user = User.objects.create_user( "xmpp-approval-user", "xmpp-approval@example.com", "x" ) self.project = TaskProject.objects.create( user=self.user, name="Approval Project" ) self.task = DerivedTask.objects.create( user=self.user, project=self.project, epic=None, title="Approve me", source_service="xmpp", source_channel="component.example.test", reference_code="77", status_snapshot="open", ) self.waiting_event = ExternalSyncEvent.objects.create( user=self.user, task=self.task, provider="codex_cli", status="waiting_approval", payload={}, error="", ) self.run = CodexRun.objects.create( user=self.user, task=self.task, project=self.project, source_service="xmpp", source_channel="component.example.test", status="waiting_approval", request_payload={ "action": "append_update", "provider_payload": {"task_id": str(self.task.id)}, }, result_payload={}, ) self.request = CodexPermissionRequest.objects.create( user=self.user, codex_run=self.run, external_sync_event=self.waiting_event, approval_key="ak-xmpp-1", summary="Need auth approval", requested_permissions={"items": ["workspace_write"]}, resume_payload={}, status="pending", ) self.probe = MagicMock() def _run_command(self, text: str) -> list[str]: messages = [] def _sym(value): messages.append(str(value)) handled = async_to_sync(handle_approval_command)( self.user, text, _sym, ) self.assertTrue(handled) self.assertTrue(messages) return messages def test_approval_approve_command_resolves_request_and_queues_resume(self): rows = self._run_command(".approval approve ak-xmpp-1") self.assertIn("approved", "\n".join(rows).lower()) self.request.refresh_from_db() self.run.refresh_from_db() self.waiting_event.refresh_from_db() self.assertEqual("approved", self.request.status) self.assertEqual("approved_waiting_resume", self.run.status) self.assertEqual("ok", self.waiting_event.status) resume = ExternalSyncEvent.objects.filter( idempotency_key="codex_approval:ak-xmpp-1:approved" ).first() self.assertIsNotNone(resume) self.assertEqual("pending", resume.status) def test_approval_list_pending_and_status(self): rows = self._run_command(".approval list-pending all") text = "\n".join(rows) self.assertIn("pending=1", text) self.assertIn("ak-xmpp-1", text) status_rows = self._run_command(".approval status ak-xmpp-1") self.assertIn("status=pending", "\n".join(status_rows)) def test_provider_specific_command_rejects_mismatched_key(self): rows = self._run_command(".claude approve ak-xmpp-1") self.assertIn("approval_key_not_for_provider", "\n".join(rows)) self.request.refresh_from_db() self.assertEqual("pending", self.request.status) class XMPPGatewayTasksCommandTests(TestCase): def setUp(self): self.user = User.objects.create_user( "xmpp-task-user", "xmpp-task@example.com", "x" ) self.project = TaskProject.objects.create(user=self.user, name="Task Project") self.task = DerivedTask.objects.create( user=self.user, project=self.project, epic=None, title="Ship CLI", source_service="xmpp", source_channel="component.example.test", reference_code="12", status_snapshot="open", ) self.probe = MagicMock() def _run_tasks(self, text: str) -> list[str]: messages = [] def _sym(value): messages.append(str(value)) handled = async_to_sync(handle_tasks_command)( self.user, text, _sym, ) self.assertTrue(handled) self.assertTrue(messages) return messages def test_help_contains_approval_and_tasks_sections(self): lines = gateway_help_lines() text = "\n".join(lines) self.assertIn(".approval list-pending", text) self.assertIn(".tasks list", text) self.assertIn(".tasks add", text) self.assertIn(".l", text) def test_tasks_list_show_complete_and_undo(self): rows = self._run_tasks(".tasks list open 10") self.assertIn("#12", "\n".join(rows)) rows = self._run_tasks(".l") self.assertIn("#12", "\n".join(rows)) rows = self._run_tasks(".tasks show #12") self.assertIn("status: open", "\n".join(rows)) rows = self._run_tasks(".tasks complete #12") self.assertIn("completed #12", "\n".join(rows)) self.task.refresh_from_db() self.assertEqual("completed", self.task.status_snapshot) rows = self._run_tasks(".tasks undo #12") self.assertIn("removed #12", "\n".join(rows)) self.assertFalse(DerivedTask.objects.filter(id=self.task.id).exists()) def test_tasks_add_creates_task_in_named_project(self): rows = [] handled = async_to_sync(handle_tasks_command)( self.user, ".tasks add Task Project :: Wire XMPP manual task create", lambda value: rows.append(str(value)), service="xmpp", channel_identifier="component.example.test", sender_identifier="operator@example.test", ) self.assertTrue(handled) self.assertTrue(any("created #" in row.lower() for row in rows)) created = DerivedTask.objects.filter( user=self.user, project=self.project, title="Wire XMPP manual task create", source_service="xmpp", source_channel="component.example.test", ).first() self.assertIsNotNone(created)