Files
GIA/core/tests/test_xmpp_approval_commands.py
2026-03-08 22:08:55 +00:00

195 lines
6.6 KiB
Python

from __future__ import annotations
from unittest.mock import MagicMock
from asgiref.sync import async_to_sync
from django.test import TestCase
from core.gateway.builtin import (
gateway_help_lines,
handle_approval_command,
handle_tasks_command,
)
from core.models import (
CodexPermissionRequest,
CodexRun,
DerivedTask,
ExternalSyncEvent,
TaskProject,
User,
)
class XMPPGatewayApprovalCommandTests(TestCase):
def setUp(self):
self.user = User.objects.create_user(
"xmpp-approval-user", "xmpp-approval@example.com", "x"
)
self.project = TaskProject.objects.create(
user=self.user, name="Approval Project"
)
self.task = DerivedTask.objects.create(
user=self.user,
project=self.project,
epic=None,
title="Approve me",
source_service="xmpp",
source_channel="component.example.test",
reference_code="77",
status_snapshot="open",
)
self.waiting_event = ExternalSyncEvent.objects.create(
user=self.user,
task=self.task,
provider="codex_cli",
status="waiting_approval",
payload={},
error="",
)
self.run = CodexRun.objects.create(
user=self.user,
task=self.task,
project=self.project,
source_service="xmpp",
source_channel="component.example.test",
status="waiting_approval",
request_payload={
"action": "append_update",
"provider_payload": {"task_id": str(self.task.id)},
},
result_payload={},
)
self.request = CodexPermissionRequest.objects.create(
user=self.user,
codex_run=self.run,
external_sync_event=self.waiting_event,
approval_key="ak-xmpp-1",
summary="Need auth approval",
requested_permissions={"items": ["workspace_write"]},
resume_payload={},
status="pending",
)
self.probe = MagicMock()
def _run_command(self, text: str) -> list[str]:
messages = []
def _sym(value):
messages.append(str(value))
handled = async_to_sync(handle_approval_command)(
self.user,
text,
_sym,
)
self.assertTrue(handled)
self.assertTrue(messages)
return messages
def test_approval_approve_command_resolves_request_and_queues_resume(self):
rows = self._run_command(".approval approve ak-xmpp-1")
self.assertIn("approved", "\n".join(rows).lower())
self.request.refresh_from_db()
self.run.refresh_from_db()
self.waiting_event.refresh_from_db()
self.assertEqual("approved", self.request.status)
self.assertEqual("approved_waiting_resume", self.run.status)
self.assertEqual("ok", self.waiting_event.status)
resume = ExternalSyncEvent.objects.filter(
idempotency_key="codex_approval:ak-xmpp-1:approved"
).first()
self.assertIsNotNone(resume)
self.assertEqual("pending", resume.status)
def test_approval_list_pending_and_status(self):
rows = self._run_command(".approval list-pending all")
text = "\n".join(rows)
self.assertIn("pending=1", text)
self.assertIn("ak-xmpp-1", text)
status_rows = self._run_command(".approval status ak-xmpp-1")
self.assertIn("status=pending", "\n".join(status_rows))
def test_provider_specific_command_rejects_mismatched_key(self):
rows = self._run_command(".claude approve ak-xmpp-1")
self.assertIn("approval_key_not_for_provider", "\n".join(rows))
self.request.refresh_from_db()
self.assertEqual("pending", self.request.status)
class XMPPGatewayTasksCommandTests(TestCase):
def setUp(self):
self.user = User.objects.create_user(
"xmpp-task-user", "xmpp-task@example.com", "x"
)
self.project = TaskProject.objects.create(user=self.user, name="Task Project")
self.task = DerivedTask.objects.create(
user=self.user,
project=self.project,
epic=None,
title="Ship CLI",
source_service="xmpp",
source_channel="component.example.test",
reference_code="12",
status_snapshot="open",
)
self.probe = MagicMock()
def _run_tasks(self, text: str) -> list[str]:
messages = []
def _sym(value):
messages.append(str(value))
handled = async_to_sync(handle_tasks_command)(
self.user,
text,
_sym,
)
self.assertTrue(handled)
self.assertTrue(messages)
return messages
def test_help_contains_approval_and_tasks_sections(self):
lines = gateway_help_lines()
text = "\n".join(lines)
self.assertIn(".approval list-pending", text)
self.assertIn(".tasks list", text)
self.assertIn(".tasks add", text)
self.assertIn(".l", text)
def test_tasks_list_show_complete_and_undo(self):
rows = self._run_tasks(".tasks list open 10")
self.assertIn("#12", "\n".join(rows))
rows = self._run_tasks(".l")
self.assertIn("#12", "\n".join(rows))
rows = self._run_tasks(".tasks show #12")
self.assertIn("status: open", "\n".join(rows))
rows = self._run_tasks(".tasks complete #12")
self.assertIn("completed #12", "\n".join(rows))
self.task.refresh_from_db()
self.assertEqual("completed", self.task.status_snapshot)
rows = self._run_tasks(".tasks undo #12")
self.assertIn("removed #12", "\n".join(rows))
self.assertFalse(DerivedTask.objects.filter(id=self.task.id).exists())
def test_tasks_add_creates_task_in_named_project(self):
rows = []
handled = async_to_sync(handle_tasks_command)(
self.user,
".tasks add Task Project :: Wire XMPP manual task create",
lambda value: rows.append(str(value)),
service="xmpp",
channel_identifier="component.example.test",
sender_identifier="operator@example.test",
)
self.assertTrue(handled)
self.assertTrue(any("created #" in row.lower() for row in rows))
created = DerivedTask.objects.filter(
user=self.user,
project=self.project,
title="Wire XMPP manual task create",
source_service="xmpp",
source_channel="component.example.test",
).first()
self.assertIsNotNone(created)