Implement 3 plans

This commit is contained in:
2026-03-06 19:38:32 +00:00
parent 49aaed5dec
commit ff66bc9e1f
13 changed files with 1650 additions and 74 deletions

View File

@@ -9,6 +9,7 @@ from core.commands.handlers.bp import (
bp_subcommands_enabled,
bp_trigger_matches,
)
from core.commands.handlers.claude import ClaudeCommandHandler, claude_trigger_matches
from core.commands.handlers.codex import CodexCommandHandler, codex_trigger_matches
from core.commands.policies import ensure_variant_policies_for_profile
from core.commands.registry import get as get_handler
@@ -123,11 +124,36 @@ def _ensure_codex_profile(user_id: int) -> CommandProfile:
return profile
def _ensure_claude_profile(user_id: int) -> CommandProfile:
profile, _ = CommandProfile.objects.get_or_create(
user_id=user_id,
slug="claude",
defaults={
"name": "Claude",
"enabled": True,
"trigger_token": ".claude",
"reply_required": False,
"exact_match_only": False,
"window_scope": "conversation",
"visibility_mode": "status_in_source",
},
)
if not profile.enabled:
profile.enabled = True
profile.save(update_fields=["enabled", "updated_at"])
if str(profile.trigger_token or "").strip() != ".claude":
profile.trigger_token = ".claude"
profile.save(update_fields=["trigger_token", "updated_at"])
return profile
def _ensure_profile_for_slug(user_id: int, slug: str) -> CommandProfile | None:
if slug == "bp":
return _ensure_bp_profile(user_id)
if slug == "codex":
return _ensure_codex_profile(user_id)
if slug == "claude":
return _ensure_claude_profile(user_id)
return None
@@ -137,6 +163,8 @@ def _detected_bootstrap_slugs(message_text: str) -> list[str]:
slugs.append("bp")
if codex_trigger_matches(message_text, ".codex", False):
slugs.append("codex")
if claude_trigger_matches(message_text, ".claude", False):
slugs.append("claude")
return slugs
@@ -202,6 +230,7 @@ def ensure_handlers_registered():
return
register(BPCommandHandler())
register(CodexCommandHandler())
register(ClaudeCommandHandler())
_REGISTERED = True
@@ -271,6 +300,12 @@ def _matches_trigger(profile: CommandProfile, text: str) -> bool:
trigger_token=profile.trigger_token,
exact_match_only=profile.exact_match_only,
)
if profile.slug == "claude":
return claude_trigger_matches(
message_text=text,
trigger_token=profile.trigger_token,
exact_match_only=profile.exact_match_only,
)
body = str(text or "").strip()
trigger = str(profile.trigger_token or "").strip()
if not trigger:

View File

@@ -0,0 +1,530 @@
from __future__ import annotations
import hashlib
import re
from asgiref.sync import sync_to_async
from django.utils import timezone
from core.commands.base import CommandContext, CommandHandler, CommandResult
from core.commands.delivery import post_status_in_source
from core.messaging.text_export import plain_text_blob
from core.models import (
ChatTaskSource,
CodexPermissionRequest,
CodexRun,
CommandProfile,
DerivedTask,
ExternalSyncEvent,
Message,
TaskProject,
TaskProviderConfig,
)
from core.tasks.codex_support import channel_variants, resolve_external_chat_id
from core.tasks.codex_approval import queue_codex_event_with_pre_approval
_CLAUDE_DEFAULT_RE = re.compile(
r"^\s*(?:\.claude\b|#claude#?)(?P<body>.*)$",
re.IGNORECASE | re.DOTALL,
)
_CLAUDE_PLAN_RE = re.compile(
r"^\s*(?:\.claude\s+plan\b|#claude\s+plan#?)(?P<body>.*)$",
re.IGNORECASE | re.DOTALL,
)
_CLAUDE_STATUS_RE = re.compile(r"^\s*(?:\.claude\s+status\b|#claude\s+status#?)\s*$", re.IGNORECASE)
_CLAUDE_APPROVE_DENY_RE = re.compile(
r"^\s*(?:\.claude|#claude)\s+(?P<action>approve|deny)\s+(?P<approval_key>[A-Za-z0-9._:-]+)#?\s*$",
re.IGNORECASE,
)
_PROJECT_TOKEN_RE = re.compile(r"\[\s*project\s*:\s*([^\]]+)\]", re.IGNORECASE)
_REFERENCE_RE = re.compile(r"(?<!\w)#([A-Za-z0-9_-]+)\b")
class ClaudeParsedCommand(dict):
@property
def command(self) -> str | None:
value = self.get("command")
return str(value) if value else None
@property
def body_text(self) -> str:
return str(self.get("body_text") or "")
@property
def approval_key(self) -> str:
return str(self.get("approval_key") or "")
def parse_claude_command(text: str) -> ClaudeParsedCommand:
body = str(text or "")
m = _CLAUDE_APPROVE_DENY_RE.match(body)
if m:
return ClaudeParsedCommand(
command=str(m.group("action") or "").strip().lower(),
body_text="",
approval_key=str(m.group("approval_key") or "").strip(),
)
if _CLAUDE_STATUS_RE.match(body):
return ClaudeParsedCommand(command="status", body_text="", approval_key="")
m = _CLAUDE_PLAN_RE.match(body)
if m:
return ClaudeParsedCommand(
command="plan",
body_text=str(m.group("body") or "").strip(),
approval_key="",
)
m = _CLAUDE_DEFAULT_RE.match(body)
if m:
return ClaudeParsedCommand(
command="default",
body_text=str(m.group("body") or "").strip(),
approval_key="",
)
return ClaudeParsedCommand(command=None, body_text="", approval_key="")
def claude_trigger_matches(message_text: str, trigger_token: str, exact_match_only: bool) -> bool:
body = str(message_text or "").strip()
parsed = parse_claude_command(body)
if parsed.command:
return True
trigger = str(trigger_token or "").strip()
if not trigger:
return False
if exact_match_only:
return body.lower() == trigger.lower()
return trigger.lower() in body.lower()
class ClaudeCommandHandler(CommandHandler):
slug = "claude"
_provider_name = "claude_cli"
_approval_prefix = "claude_approval"
async def _load_trigger(self, message_id: str) -> Message | None:
return await sync_to_async(
lambda: Message.objects.select_related("user", "session", "session__identifier", "reply_to")
.filter(id=message_id)
.first()
)()
def _effective_scope(self, trigger: Message) -> tuple[str, str]:
service = str(getattr(trigger, "source_service", "") or "").strip().lower()
channel = str(getattr(trigger, "source_chat_id", "") or "").strip()
identifier = getattr(getattr(trigger, "session", None), "identifier", None)
fallback_service = str(getattr(identifier, "service", "") or "").strip().lower()
fallback_identifier = str(getattr(identifier, "identifier", "") or "").strip()
if service == "web" and fallback_service and fallback_identifier and fallback_service != "web":
return fallback_service, fallback_identifier
return service or "web", channel
async def _mapped_sources(self, user, service: str, channel: str) -> list[ChatTaskSource]:
variants = channel_variants(service, channel)
if not variants:
return []
return await sync_to_async(list)(
ChatTaskSource.objects.filter(
user=user,
enabled=True,
service=service,
channel_identifier__in=variants,
).select_related("project", "epic")
)
async def _linked_task_from_reply(self, user, reply_to: Message | None) -> DerivedTask | None:
if reply_to is None:
return None
by_origin = await sync_to_async(
lambda: DerivedTask.objects.filter(user=user, origin_message=reply_to)
.select_related("project", "epic")
.order_by("-created_at")
.first()
)()
if by_origin is not None:
return by_origin
return await sync_to_async(
lambda: DerivedTask.objects.filter(user=user, events__source_message=reply_to)
.select_related("project", "epic")
.order_by("-created_at")
.first()
)()
def _extract_project_token(self, body_text: str) -> tuple[str, str]:
text = str(body_text or "")
m = _PROJECT_TOKEN_RE.search(text)
if not m:
return "", text
token = str(m.group(1) or "").strip()
cleaned = _PROJECT_TOKEN_RE.sub("", text).strip()
return token, cleaned
def _extract_reference(self, body_text: str) -> str:
m = _REFERENCE_RE.search(str(body_text or ""))
if not m:
return ""
return str(m.group(1) or "").strip()
async def _resolve_task(self, user, reference_code: str, reply_task: DerivedTask | None) -> DerivedTask | None:
if reference_code:
return await sync_to_async(
lambda: DerivedTask.objects.filter(user=user, reference_code=reference_code)
.select_related("project", "epic")
.order_by("-created_at")
.first()
)()
return reply_task
async def _resolve_project(
self,
*,
user,
service: str,
channel: str,
task: DerivedTask | None,
reply_task: DerivedTask | None,
project_token: str,
) -> tuple[TaskProject | None, str]:
if task is not None:
return task.project, ""
if reply_task is not None:
return reply_task.project, ""
if project_token:
project = await sync_to_async(
lambda: TaskProject.objects.filter(user=user, name__iexact=project_token).first()
)()
if project is not None:
return project, ""
return None, f"project_not_found:{project_token}"
mapped = await self._mapped_sources(user, service, channel)
project_ids = sorted({str(row.project_id) for row in mapped if row.project_id})
if len(project_ids) == 1:
project = next((row.project for row in mapped if str(row.project_id) == project_ids[0]), None)
return project, ""
if len(project_ids) > 1:
return None, "project_required:[project:Name]"
return None, "project_unresolved"
async def _post_source_status(self, trigger: Message, text: str, suffix: str) -> None:
await post_status_in_source(
trigger_message=trigger,
text=text,
origin_tag=f"claude-status:{suffix}",
)
async def _run_status(self, trigger: Message, service: str, channel: str, project: TaskProject | None) -> CommandResult:
def _load_runs():
qs = CodexRun.objects.filter(user=trigger.user)
if service:
qs = qs.filter(source_service=service)
if channel:
qs = qs.filter(source_channel=channel)
if project is not None:
qs = qs.filter(project=project)
return list(qs.order_by("-created_at")[:10])
runs = await sync_to_async(_load_runs)()
if not runs:
await self._post_source_status(trigger, "[claude] no recent runs for this scope.", "empty")
return CommandResult(ok=True, status="ok", payload={"count": 0})
lines = ["[claude] recent runs:"]
for row in runs:
ref = str(getattr(getattr(row, "task", None), "reference_code", "") or "-")
summary = str((row.result_payload or {}).get("summary") or "").strip()
summary_part = f" · {summary}" if summary else ""
lines.append(f"- {row.status} run={row.id} task=#{ref}{summary_part}")
await self._post_source_status(trigger, "\n".join(lines), "runs")
return CommandResult(ok=True, status="ok", payload={"count": len(runs)})
async def _run_approval_action(
self,
trigger: Message,
parsed: ClaudeParsedCommand,
current_service: str,
current_channel: str,
) -> CommandResult:
cfg = await sync_to_async(
lambda: TaskProviderConfig.objects.filter(
user=trigger.user, provider=self._provider_name
).first()
)()
settings_payload = dict(getattr(cfg, "settings", {}) or {})
approver_service = str(settings_payload.get("approver_service") or "").strip().lower()
approver_identifier = str(settings_payload.get("approver_identifier") or "").strip()
if not approver_service or not approver_identifier:
return CommandResult(ok=False, status="failed", error="approver_channel_not_configured")
if str(current_service or "").strip().lower() != approver_service or str(current_channel or "").strip() not in set(
channel_variants(approver_service, approver_identifier)
):
return CommandResult(ok=False, status="failed", error="approval_command_not_allowed_in_this_channel")
approval_key = parsed.approval_key
request = await sync_to_async(
lambda: CodexPermissionRequest.objects.select_related("codex_run", "external_sync_event")
.filter(user=trigger.user, approval_key=approval_key)
.first()
)()
if request is None:
return CommandResult(ok=False, status="failed", error="approval_key_not_found")
now = timezone.now()
if parsed.command == "approve":
request.status = "approved"
request.resolved_at = now
request.resolved_by_identifier = current_channel
request.resolution_note = "approved via claude command"
await sync_to_async(request.save)(
update_fields=[
"status",
"resolved_at",
"resolved_by_identifier",
"resolution_note",
]
)
if request.external_sync_event_id:
await sync_to_async(ExternalSyncEvent.objects.filter(id=request.external_sync_event_id).update)(
status="ok",
error="",
)
run = request.codex_run
run.status = "approved_waiting_resume"
run.error = ""
await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"])
source_service = str(run.source_service or "")
source_channel = str(run.source_channel or "")
resume_payload = dict(request.resume_payload or {})
resume_action = str(resume_payload.get("action") or "").strip().lower()
resume_provider_payload = dict(resume_payload.get("provider_payload") or {})
if resume_action and resume_provider_payload:
provider_payload = dict(resume_provider_payload)
provider_payload["codex_run_id"] = str(run.id)
provider_payload["source_service"] = source_service
provider_payload["source_channel"] = source_channel
event_action = resume_action
resume_idempotency_key = str(resume_payload.get("idempotency_key") or "").strip()
resume_event_key = (
resume_idempotency_key
if resume_idempotency_key
else f"{self._approval_prefix}:{approval_key}:approved"
)
else:
provider_payload = dict(run.request_payload.get("provider_payload") or {})
provider_payload.update(
{
"mode": "approval_response",
"approval_key": approval_key,
"resume_payload": dict(request.resume_payload or {}),
"codex_run_id": str(run.id),
"source_service": source_service,
"source_channel": source_channel,
}
)
event_action = "append_update"
resume_event_key = f"{self._approval_prefix}:{approval_key}:approved"
await sync_to_async(ExternalSyncEvent.objects.update_or_create)(
idempotency_key=resume_event_key,
defaults={
"user": trigger.user,
"task_id": run.task_id,
"task_event_id": run.derived_task_event_id,
"provider": self._provider_name,
"status": "pending",
"payload": {
"action": event_action,
"provider_payload": provider_payload,
},
"error": "",
},
)
return CommandResult(ok=True, status="ok", payload={"approval_key": approval_key, "resolution": "approved"})
request.status = "denied"
request.resolved_at = now
request.resolved_by_identifier = current_channel
request.resolution_note = "denied via claude command"
await sync_to_async(request.save)(
update_fields=["status", "resolved_at", "resolved_by_identifier", "resolution_note"]
)
if request.external_sync_event_id:
await sync_to_async(ExternalSyncEvent.objects.filter(id=request.external_sync_event_id).update)(
status="failed",
error="approval_denied",
)
run = request.codex_run
run.status = "denied"
run.error = "approval_denied"
await sync_to_async(run.save)(update_fields=["status", "error", "updated_at"])
await sync_to_async(ExternalSyncEvent.objects.update_or_create)(
idempotency_key=f"{self._approval_prefix}:{approval_key}:denied",
defaults={
"user": trigger.user,
"task_id": run.task_id,
"task_event_id": run.derived_task_event_id,
"provider": self._provider_name,
"status": "failed",
"payload": {
"action": "append_update",
"provider_payload": {
"mode": "approval_response",
"approval_key": approval_key,
"codex_run_id": str(run.id),
},
},
"error": "approval_denied",
},
)
return CommandResult(ok=True, status="ok", payload={"approval_key": approval_key, "resolution": "denied"})
async def _create_submission(
self,
*,
trigger: Message,
mode: str,
body_text: str,
task: DerivedTask,
project: TaskProject,
) -> CommandResult:
cfg = await sync_to_async(
lambda: TaskProviderConfig.objects.filter(
user=trigger.user, provider=self._provider_name, enabled=True
).first()
)()
if cfg is None:
return CommandResult(ok=False, status="failed", error="provider_disabled_or_missing")
service, channel = self._effective_scope(trigger)
external_chat_id = await sync_to_async(resolve_external_chat_id)(
user=trigger.user,
provider=self._provider_name,
service=service,
channel=channel,
)
payload = {
"task_id": str(task.id),
"reference_code": str(task.reference_code or ""),
"title": str(task.title or ""),
"external_key": str(task.external_key or ""),
"project_name": str(getattr(project, "name", "") or ""),
"epic_name": str(getattr(getattr(task, "epic", None), "name", "") or ""),
"source_service": service,
"source_channel": channel,
"external_chat_id": external_chat_id,
"origin_message_id": str(getattr(task, "origin_message_id", "") or ""),
"trigger_message_id": str(trigger.id),
"mode": mode,
"command_text": str(body_text or ""),
}
if mode == "plan":
anchor = trigger.reply_to
if anchor is None:
return CommandResult(ok=False, status="failed", error="reply_required_for_claude_plan")
rows = await sync_to_async(list)(
Message.objects.filter(
user=trigger.user,
session=trigger.session,
ts__gte=int(anchor.ts or 0),
ts__lte=int(trigger.ts or 0),
)
.order_by("ts")
.select_related("session", "session__identifier", "session__identifier__person")
)
payload["reply_context"] = {
"anchor_message_id": str(anchor.id),
"trigger_message_id": str(trigger.id),
"message_ids": [str(row.id) for row in rows],
"content": plain_text_blob(rows),
}
run = await sync_to_async(CodexRun.objects.create)(
user=trigger.user,
task=task,
source_message=trigger,
project=project,
epic=getattr(task, "epic", None),
source_service=service,
source_channel=channel,
external_chat_id=external_chat_id,
status="waiting_approval",
request_payload={"action": "append_update", "provider_payload": dict(payload)},
result_payload={},
error="",
)
payload["codex_run_id"] = str(run.id)
run.request_payload = {"action": "append_update", "provider_payload": dict(payload)}
await sync_to_async(run.save)(update_fields=["request_payload", "updated_at"])
idempotency_key = f"claude_cmd:{trigger.id}:{mode}:{task.id}:{hashlib.sha1(str(body_text or '').encode('utf-8')).hexdigest()[:12]}"
await sync_to_async(queue_codex_event_with_pre_approval)(
user=trigger.user,
run=run,
task=task,
task_event=None,
action="append_update",
provider_payload=dict(payload),
idempotency_key=idempotency_key,
)
return CommandResult(
ok=True,
status="ok",
payload={"codex_run_id": str(run.id), "approval_required": True},
)
async def execute(self, ctx: CommandContext) -> CommandResult:
trigger = await self._load_trigger(ctx.message_id)
if trigger is None:
return CommandResult(ok=False, status="failed", error="trigger_not_found")
profile = await sync_to_async(
lambda: CommandProfile.objects.filter(user=trigger.user, slug=self.slug, enabled=True).first()
)()
if profile is None:
return CommandResult(ok=False, status="skipped", error="profile_missing")
parsed = parse_claude_command(ctx.message_text)
if not parsed.command:
return CommandResult(ok=False, status="skipped", error="claude_command_not_matched")
service, channel = self._effective_scope(trigger)
if parsed.command == "status":
project = None
reply_task = await self._linked_task_from_reply(trigger.user, trigger.reply_to)
if reply_task is not None:
project = reply_task.project
return await self._run_status(trigger, service, channel, project)
if parsed.command in {"approve", "deny"}:
return await self._run_approval_action(
trigger,
parsed,
current_service=str(ctx.service or ""),
current_channel=str(ctx.channel_identifier or ""),
)
project_token, cleaned_body = self._extract_project_token(parsed.body_text)
reference_code = self._extract_reference(cleaned_body)
reply_task = await self._linked_task_from_reply(trigger.user, trigger.reply_to)
task = await self._resolve_task(trigger.user, reference_code, reply_task)
if task is None:
return CommandResult(ok=False, status="failed", error="task_target_required")
project, project_error = await self._resolve_project(
user=trigger.user,
service=service,
channel=channel,
task=task,
reply_task=reply_task,
project_token=project_token,
)
if project is None:
return CommandResult(ok=False, status="failed", error=project_error or "project_unresolved")
mode = "plan" if parsed.command == "plan" else "default"
return await self._create_submission(
trigger=trigger,
mode=mode,
body_text=cleaned_body,
task=task,
project=project,
)

View File

@@ -0,0 +1,21 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("core", "0036_memoryitem_memorychangerequest_knowledgearticle_and_more"),
]
operations = [
migrations.AddField(
model_name="derivedtask",
name="due_date",
field=models.DateField(blank=True, null=True),
),
migrations.AddField(
model_name="derivedtask",
name="assignee_identifier",
field=models.CharField(blank=True, default="", max_length=255),
),
]

View File

@@ -2334,6 +2334,8 @@ class DerivedTask(models.Model):
reference_code = models.CharField(max_length=64, blank=True, default="")
external_key = models.CharField(max_length=255, blank=True, default="")
status_snapshot = models.CharField(max_length=64, blank=True, default="open")
due_date = models.DateField(null=True, blank=True)
assignee_identifier = models.CharField(max_length=255, blank=True, default="")
immutable_payload = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)

View File

@@ -1,11 +1,13 @@
from __future__ import annotations
import datetime
import re
from asgiref.sync import sync_to_async
from django.conf import settings
from core.clients.transport import send_message_raw
from core.memory.retrieval import retrieve_memories_for_prompt
from core.messaging import ai as ai_runner
from core.models import (
AI,
@@ -43,6 +45,78 @@ _EPIC_CREATE_RE = re.compile(
re.IGNORECASE | re.DOTALL,
)
_EPIC_TOKEN_RE = re.compile(r"\[\s*epic\s*:\s*([^\]]+?)\s*\]", re.IGNORECASE)
_LIST_TASKS_CMD_RE = re.compile(
r"^\s*\.task\s+list\s*$",
re.IGNORECASE,
)
_TASK_SHOW_RE = re.compile(
r"^\s*\.task\s+show\s+#?(?P<reference>[A-Za-z0-9_-]+)\s*$",
re.IGNORECASE,
)
_TASK_COMPLETE_CMD_RE = re.compile(
r"^\s*\.task\s+(?:complete|done|close)\s+#?(?P<reference>[A-Za-z0-9_-]+)\s*$",
re.IGNORECASE,
)
_DUE_ISO_RE = re.compile(
r"\b(?:due|by)\s+(\d{4}-\d{2}-\d{2})\b",
re.IGNORECASE,
)
_DUE_RELATIVE_RE = re.compile(
r"\b(?:due|by)\s+(?P<token>today|tomorrow|monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b",
re.IGNORECASE,
)
_ASSIGNEE_AT_RE = re.compile(r"@([A-Za-z0-9_.-]+)")
_ASSIGNEE_PHRASE_RE = re.compile(
r"\b(?:assign(?:ed)?\s+to|for)\s+([A-Za-z0-9_.-]+)\b",
re.IGNORECASE,
)
_WEEKDAY_MAP = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
def _parse_due_date(text: str) -> datetime.date | None:
body = str(text or "")
m = _DUE_ISO_RE.search(body)
if m:
try:
return datetime.date.fromisoformat(m.group(1))
except ValueError:
pass
m = _DUE_RELATIVE_RE.search(body)
if not m:
return None
token = m.group("token").strip().lower()
today = datetime.date.today()
if token == "today":
return today
if token == "tomorrow":
return today + datetime.timedelta(days=1)
target_weekday = _WEEKDAY_MAP.get(token)
if target_weekday is None:
return None
days_ahead = (target_weekday - today.weekday()) % 7
if days_ahead == 0:
days_ahead = 7
return today + datetime.timedelta(days=days_ahead)
def _parse_assignee(text: str) -> str:
body = str(text or "")
m = _ASSIGNEE_AT_RE.search(body)
if m:
return str(m.group(1) or "").strip()
m = _ASSIGNEE_PHRASE_RE.search(body)
if m:
return str(m.group(1) or "").strip()
return ""
def _channel_variants(service: str, channel: str) -> list[str]:
@@ -319,11 +393,22 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s
cached_epic = task._state.fields_cache.get("epic")
project_name = str(getattr(cached_project, "name", "") or "")
epic_name = str(getattr(cached_epic, "name", "") or "")
memory_context: list = []
try:
memory_context = await sync_to_async(retrieve_memories_for_prompt)(
user_id=int(task.user_id),
query=str(task.title or ""),
limit=10,
)
except Exception:
pass
request_payload = {
"task_id": str(task.id),
"reference_code": str(task.reference_code or ""),
"title": str(task.title or ""),
"external_key": str(task.external_key or ""),
"due_date": task.due_date.isoformat() if task.due_date else "",
"assignee_identifier": str(task.assignee_identifier or ""),
"project_name": project_name,
"epic_name": epic_name,
"source_service": str(task.source_service or ""),
@@ -333,6 +418,7 @@ async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: s
"trigger_message_id": str(getattr(event, "source_message_id", "") or getattr(task, "origin_message_id", "") or ""),
"mode": "default",
"payload": event.payload,
"memory_context": memory_context,
}
codex_run = await sync_to_async(CodexRun.objects.create)(
user=task.user,
@@ -439,7 +525,7 @@ async def _handle_scope_task_commands(message: Message, sources: list[ChatTaskSo
return False
body = str(text or "").strip()
source = sources[0]
if _LIST_TASKS_RE.match(body):
if _LIST_TASKS_RE.match(body) or _LIST_TASKS_CMD_RE.match(body):
open_rows = await sync_to_async(list)(
DerivedTask.objects.filter(
user=message.user,
@@ -494,6 +580,64 @@ async def _handle_scope_task_commands(message: Message, sources: list[ChatTaskSo
await _send_scope_message(source, message, f"[task] removed #{ref}: {title}")
return True
show_match = _TASK_SHOW_RE.match(body)
if show_match:
reference = str(show_match.group("reference") or "").strip()
task = await sync_to_async(
lambda: DerivedTask.objects.filter(
user=message.user,
project=source.project,
source_service=source.service,
source_channel=source.channel_identifier,
reference_code=reference,
)
.order_by("-created_at")
.first()
)()
if task is None:
await _send_scope_message(source, message, f"[task] #{reference} not found.")
return True
due_str = f"\ndue: {task.due_date}" if task.due_date else ""
assignee_str = f"\nassignee: {task.assignee_identifier}" if task.assignee_identifier else ""
detail = (
f"[task] #{task.reference_code}: {task.title}"
f"\nstatus: {task.status_snapshot}"
f"{due_str}"
f"{assignee_str}"
)
await _send_scope_message(source, message, detail)
return True
complete_match = _TASK_COMPLETE_CMD_RE.match(body)
if complete_match:
reference = str(complete_match.group("reference") or "").strip()
task = await sync_to_async(
lambda: DerivedTask.objects.filter(
user=message.user,
project=source.project,
source_service=source.service,
source_channel=source.channel_identifier,
reference_code=reference,
)
.order_by("-created_at")
.first()
)()
if task is None:
await _send_scope_message(source, message, f"[task] #{reference} not found.")
return True
task.status_snapshot = "completed"
await sync_to_async(task.save)(update_fields=["status_snapshot"])
event = await sync_to_async(DerivedTaskEvent.objects.create)(
task=task,
event_type="completion_marked",
actor_identifier=str(message.sender_uuid or ""),
source_message=message,
payload={"marker": reference, "command": ".task complete", "via": "chat_command"},
)
await _emit_sync_event(task, event, "complete")
await _send_scope_message(source, message, f"[task] completed #{task.reference_code}: {task.title}")
return True
return False
@@ -543,7 +687,14 @@ def _is_task_command_candidate(text: str) -> bool:
body = str(text or "").strip()
if not body:
return False
if _LIST_TASKS_RE.match(body) or _UNDO_TASK_RE.match(body) or _EPIC_CREATE_RE.match(body):
if (
_LIST_TASKS_RE.match(body)
or _LIST_TASKS_CMD_RE.match(body)
or _TASK_SHOW_RE.match(body)
or _TASK_COMPLETE_CMD_RE.match(body)
or _UNDO_TASK_RE.match(body)
or _EPIC_CREATE_RE.match(body)
):
return True
return _has_task_prefix(body.lower(), ["task:", "todo:"])
@@ -646,6 +797,8 @@ async def process_inbound_task_intelligence(message: Message) -> None:
)
title = await _derive_title_with_flags(cloned_message, flags)
reference = await sync_to_async(_next_reference)(message.user, source.project)
parsed_due_date = _parse_due_date(task_text)
parsed_assignee = _parse_assignee(task_text)
task = await sync_to_async(DerivedTask.objects.create)(
user=message.user,
project=source.project,
@@ -656,6 +809,8 @@ async def process_inbound_task_intelligence(message: Message) -> None:
origin_message=message,
reference_code=reference,
status_snapshot="open",
due_date=parsed_due_date,
assignee_identifier=parsed_assignee,
immutable_payload={"origin_text": text, "task_text": task_text, "flags": flags},
)
event = await sync_to_async(DerivedTaskEvent.objects.create)(

View File

@@ -1,12 +1,14 @@
from __future__ import annotations
from .base import TaskProvider
from .claude_cli import ClaudeCLITaskProvider
from .codex_cli import CodexCLITaskProvider
from .mock import MockTaskProvider
PROVIDERS = {
"mock": MockTaskProvider(),
"codex_cli": CodexCLITaskProvider(),
"claude_cli": ClaudeCLITaskProvider(),
}

View File

@@ -0,0 +1,209 @@
from __future__ import annotations
import json
import subprocess
from hashlib import sha1
from .base import ProviderResult, TaskProvider
class ClaudeCLITaskProvider(TaskProvider):
name = "claude_cli"
run_in_worker = True
def _timeout(self, config: dict) -> int:
try:
return max(1, int(config.get("timeout_seconds") or 60))
except Exception:
return 60
def _command(self, config: dict) -> str:
return str(config.get("command") or "claude").strip() or "claude"
def _workspace(self, config: dict) -> str:
return str(config.get("workspace_root") or "").strip()
def _profile(self, config: dict) -> str:
return str(config.get("default_profile") or "").strip()
def _is_task_sync_contract_mismatch(self, stderr: str) -> bool:
text = str(stderr or "").lower()
if "unexpected argument '--op'" in text:
return True
if "unexpected argument 'create'" in text and "usage: claude" in text:
return True
if "unexpected argument 'append_update'" in text and "usage: claude" in text:
return True
if "unexpected argument 'mark_complete'" in text and "usage: claude" in text:
return True
if "unexpected argument 'link_task'" in text and "usage: claude" in text:
return True
if "unrecognized subcommand 'create'" in text and "usage: claude" in text:
return True
if "unrecognized subcommand 'append_update'" in text and "usage: claude" in text:
return True
if "unrecognized subcommand 'mark_complete'" in text and "usage: claude" in text:
return True
return False
def _builtin_stub_result(self, op: str, payload: dict, stderr: str) -> ProviderResult:
mode = str(payload.get("mode") or "default").strip().lower()
external_key = (
str(payload.get("external_key") or "").strip()
or str(payload.get("task_id") or "").strip()
)
if mode == "approval_response":
return ProviderResult(
ok=True,
external_key=external_key,
payload={
"op": op,
"status": "ok",
"summary": "approval acknowledged; resumed by builtin claude stub",
"requires_approval": False,
"output": "",
"fallback_mode": "builtin_task_sync_stub",
"fallback_reason": str(stderr or "")[:4000],
},
)
task_id = str(payload.get("task_id") or "").strip()
key_basis = f"{op}:{task_id}:{payload.get('trigger_message_id') or payload.get('origin_message_id') or ''}"
approval_key = sha1(key_basis.encode("utf-8")).hexdigest()[:12]
summary = "Claude approval required (builtin stub fallback)"
return ProviderResult(
ok=True,
external_key=external_key,
payload={
"op": op,
"status": "requires_approval",
"requires_approval": True,
"summary": summary,
"approval_key": approval_key,
"permission_request": {
"summary": summary,
"requested_permissions": ["workspace_write"],
},
"resume_payload": {
"task_id": task_id,
"op": op,
},
"fallback_mode": "builtin_task_sync_stub",
"fallback_reason": str(stderr or "")[:4000],
},
)
def _run(self, config: dict, op: str, payload: dict) -> ProviderResult:
base_cmd = [self._command(config), "task-sync"]
workspace = self._workspace(config)
profile = self._profile(config)
command_timeout = self._timeout(config)
data = json.dumps(dict(payload or {}), separators=(",", ":"))
common_args: list[str] = []
if workspace:
common_args.extend(["--workspace", workspace])
if profile:
common_args.extend(["--profile", profile])
primary_cmd = [*base_cmd, "--op", str(op), *common_args, "--payload-json", data]
fallback_cmd = [*base_cmd, str(op), *common_args, "--payload-json", data]
try:
completed = subprocess.run(
primary_cmd,
capture_output=True,
text=True,
timeout=command_timeout,
check=False,
cwd=workspace if workspace else None,
)
stderr_probe = str(completed.stderr or "").lower()
if completed.returncode != 0 and "unexpected argument '--op'" in stderr_probe:
completed = subprocess.run(
fallback_cmd,
capture_output=True,
text=True,
timeout=command_timeout,
check=False,
cwd=workspace if workspace else None,
)
except subprocess.TimeoutExpired:
return ProviderResult(
ok=False,
error=f"claude_cli_timeout_{command_timeout}s",
payload={"op": op, "timeout_seconds": command_timeout},
)
except Exception as exc:
return ProviderResult(ok=False, error=f"claude_cli_exec_error:{exc}", payload={"op": op})
stdout = str(completed.stdout or "").strip()
stderr = str(completed.stderr or "").strip()
parsed = {}
if stdout:
try:
parsed = json.loads(stdout)
if not isinstance(parsed, dict):
parsed = {"raw_stdout": stdout}
except Exception:
parsed = {"raw_stdout": stdout}
parsed_status = str(parsed.get("status") or "").strip().lower()
permission_request = parsed.get("permission_request")
requires_approval = bool(
parsed.get("requires_approval")
or parsed_status in {"requires_approval", "waiting_approval"}
or permission_request
)
ext = (
str(parsed.get("external_key") or "").strip()
or str(parsed.get("task_id") or "").strip()
or str(payload.get("external_key") or "").strip()
)
ok = completed.returncode == 0
out_payload = {
"op": op,
"returncode": int(completed.returncode),
"stdout": stdout[:4000],
"stderr": stderr[:4000],
"parsed_status": parsed_status,
"requires_approval": requires_approval,
}
out_payload.update(parsed)
if (not ok) and self._is_task_sync_contract_mismatch(stderr):
return self._builtin_stub_result(op, dict(payload or {}), stderr)
return ProviderResult(ok=ok, external_key=ext, error=("" if ok else stderr[:4000]), payload=out_payload)
def healthcheck(self, config: dict) -> ProviderResult:
command = self._command(config)
try:
completed = subprocess.run(
[command, "--version"],
capture_output=True,
text=True,
timeout=max(1, min(20, self._timeout(config))),
check=False,
)
except Exception as exc:
return ProviderResult(ok=False, error=f"claude_cli_unavailable:{exc}")
return ProviderResult(
ok=(completed.returncode == 0),
payload={
"returncode": int(completed.returncode),
"stdout": str(completed.stdout or "").strip()[:1000],
"stderr": str(completed.stderr or "").strip()[:1000],
},
error=("" if completed.returncode == 0 else str(completed.stderr or "").strip()[:1000]),
)
def create_task(self, config: dict, payload: dict) -> ProviderResult:
return self._run(config, "create", payload)
def append_update(self, config: dict, payload: dict) -> ProviderResult:
return self._run(config, "append_update", payload)
def mark_complete(self, config: dict, payload: dict) -> ProviderResult:
return self._run(config, "mark_complete", payload)
def link_task(self, config: dict, payload: dict) -> ProviderResult:
return self._run(config, "link_task", payload)

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
from subprocess import CompletedProcess, TimeoutExpired
from unittest.mock import patch
from django.test import SimpleTestCase
from core.tasks.providers.claude_cli import ClaudeCLITaskProvider
class ClaudeCLITaskProviderTests(SimpleTestCase):
def setUp(self):
self.provider = ClaudeCLITaskProvider()
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_healthcheck_success(self, run_mock):
run_mock.return_value = CompletedProcess(
args=["claude", "--version"],
returncode=0,
stdout="claude 1.0.0\n",
stderr="",
)
result = self.provider.healthcheck({"command": "claude", "timeout_seconds": 5})
self.assertTrue(result.ok)
self.assertIn("claude", str(result.payload.get("stdout") or ""))
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_create_task_builds_task_sync_command(self, run_mock):
run_mock.return_value = CompletedProcess(
args=[],
returncode=0,
stdout='{"external_key":"cl-123"}',
stderr="",
)
result = self.provider.create_task(
{
"command": "claude",
"workspace_root": "/tmp/work",
"default_profile": "default",
"timeout_seconds": 30,
},
{
"task_id": "t1",
"title": "hello",
"reference_code": "42",
},
)
self.assertTrue(result.ok)
self.assertEqual("cl-123", result.external_key)
args = run_mock.call_args.args[0]
self.assertEqual(["claude", "task-sync", "--op", "create"], args[:4])
self.assertIn("--workspace", args)
self.assertIn("--payload-json", args)
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_timeout_maps_to_failed_result(self, run_mock):
run_mock.side_effect = TimeoutExpired(cmd=["claude"], timeout=10)
result = self.provider.append_update({"command": "claude", "timeout_seconds": 10}, {"task_id": "t1"})
self.assertFalse(result.ok)
self.assertIn("timeout", result.error)
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_requires_approval_parsed_from_stdout(self, run_mock):
run_mock.return_value = CompletedProcess(
args=[],
returncode=0,
stdout='{"status":"requires_approval","approval_key":"ak-1","permission_request":{"requested_permissions":["write"]}}',
stderr="",
)
result = self.provider.append_update({"command": "claude"}, {"task_id": "t1"})
self.assertTrue(result.ok)
self.assertTrue(bool((result.payload or {}).get("requires_approval")))
self.assertEqual("requires_approval", (result.payload or {}).get("parsed_status"))
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_retries_with_positional_op_when_flag_unsupported(self, run_mock):
run_mock.side_effect = [
CompletedProcess(
args=[],
returncode=2,
stdout="",
stderr="error: unexpected argument '--op' found",
),
CompletedProcess(
args=[],
returncode=0,
stdout='{"status":"ok","external_key":"cl-42"}',
stderr="",
),
]
result = self.provider.create_task({"command": "claude"}, {"task_id": "t1"})
self.assertTrue(result.ok)
self.assertEqual("cl-42", result.external_key)
self.assertEqual(2, run_mock.call_count)
first = run_mock.call_args_list[0].args[0]
second = run_mock.call_args_list[1].args[0]
self.assertIn("--op", first)
self.assertNotIn("--op", second)
self.assertEqual(["claude", "task-sync", "create"], second[:3])
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_falls_back_to_builtin_approval_stub_when_no_task_sync_contract(self, run_mock):
run_mock.side_effect = [
CompletedProcess(
args=[],
returncode=2,
stdout="",
stderr="error: unexpected argument '--op' found",
),
CompletedProcess(
args=[],
returncode=2,
stdout="",
stderr="error: unrecognized subcommand 'create'\nUsage: claude [OPTIONS] [PROMPT]",
),
]
result = self.provider.create_task(
{"command": "claude"},
{
"task_id": "t1",
"trigger_message_id": "m1",
"mode": "default",
},
)
self.assertTrue(result.ok)
self.assertTrue(bool((result.payload or {}).get("requires_approval")))
self.assertEqual("requires_approval", str((result.payload or {}).get("status") or ""))
self.assertEqual("builtin_task_sync_stub", str((result.payload or {}).get("fallback_mode") or ""))
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_builtin_stub_approval_response_returns_ok(self, run_mock):
run_mock.side_effect = [
CompletedProcess(
args=[],
returncode=2,
stdout="",
stderr="error: unexpected argument '--op' found",
),
CompletedProcess(
args=[],
returncode=2,
stdout="",
stderr="error: unexpected argument 'append_update' found\nUsage: claude [OPTIONS] [PROMPT]",
),
]
result = self.provider.append_update(
{"command": "claude"},
{
"task_id": "t1",
"mode": "approval_response",
"approval_key": "abc123",
},
)
self.assertTrue(result.ok)
self.assertFalse(bool((result.payload or {}).get("requires_approval")))
self.assertEqual("ok", str((result.payload or {}).get("status") or ""))
def test_provider_name_and_run_in_worker(self):
self.assertEqual("claude_cli", self.provider.name)
self.assertTrue(self.provider.run_in_worker)
@patch("core.tasks.providers.claude_cli.subprocess.run")
def test_healthcheck_failure(self, run_mock):
run_mock.return_value = CompletedProcess(
args=["claude", "--version"],
returncode=1,
stdout="",
stderr="command not found: claude",
)
result = self.provider.healthcheck({"command": "claude"})
self.assertFalse(result.ok)
self.assertIn("command not found", result.error)

View File

@@ -0,0 +1,279 @@
from __future__ import annotations
from asgiref.sync import async_to_sync
from django.test import TestCase
from core.commands.base import CommandContext
from core.commands.engine import process_inbound_message
from core.commands.handlers.claude import parse_claude_command
from core.models import (
ChatSession,
CommandChannelBinding,
CommandProfile,
CodexPermissionRequest,
CodexRun,
DerivedTask,
ExternalSyncEvent,
Message,
Person,
PersonIdentifier,
TaskProject,
TaskProviderConfig,
User,
)
class ClaudeCommandParserTests(TestCase):
def test_parse_variants(self):
self.assertEqual("default", parse_claude_command("#claude# run this").command)
self.assertEqual("plan", parse_claude_command("#claude plan# run this").command)
self.assertEqual("status", parse_claude_command("#claude status#").command)
parsed = parse_claude_command("#claude approve abc123#")
self.assertEqual("approve", parsed.command)
self.assertEqual("abc123", parsed.approval_key)
self.assertEqual("default", parse_claude_command(".claude run this").command)
self.assertEqual("plan", parse_claude_command(".CLAUDE plan run this").command)
self.assertEqual("status", parse_claude_command(".claude status").command)
parsed_dot = parse_claude_command(".claude approve abc123")
self.assertEqual("approve", parsed_dot.command)
self.assertEqual("abc123", parsed_dot.approval_key)
def test_no_match_returns_none_command(self):
self.assertIsNone(parse_claude_command("hello world").command)
self.assertIsNone(parse_claude_command(".codex do this").command)
class ClaudeCommandExecutionTests(TestCase):
def setUp(self):
self.user = User.objects.create_user("claude-cmd-user", "claude-cmd@example.com", "x")
self.person = Person.objects.create(user=self.user, name="Claude Cmd")
self.identifier = PersonIdentifier.objects.create(
user=self.user,
person=self.person,
service="web",
identifier="web-chan-1",
)
self.session = ChatSession.objects.create(user=self.user, identifier=self.identifier)
self.project = TaskProject.objects.create(user=self.user, name="Project A")
self.task = DerivedTask.objects.create(
user=self.user,
project=self.project,
epic=None,
title="Task A",
source_service="web",
source_channel="web-chan-1",
reference_code="1",
status_snapshot="open",
)
self.profile = CommandProfile.objects.create(
user=self.user,
slug="claude",
name="Claude",
enabled=True,
trigger_token="#claude#",
reply_required=False,
exact_match_only=False,
)
CommandChannelBinding.objects.create(
profile=self.profile,
direction="ingress",
service="web",
channel_identifier="web-chan-1",
enabled=True,
)
TaskProviderConfig.objects.create(
user=self.user,
provider="claude_cli",
enabled=True,
settings={
"command": "claude",
"workspace_root": "",
"default_profile": "",
"timeout_seconds": 60,
"chat_link_mode": "task-sync",
"instance_label": "default",
"approver_mode": "channel",
"approver_service": "web",
"approver_identifier": "approver-chan",
},
)
def _msg(self, text: str, *, source_chat_id: str = "web-chan-1", reply_to=None):
return Message.objects.create(
user=self.user,
session=self.session,
sender_uuid="",
text=text,
ts=1000 + Message.objects.filter(user=self.user).count(),
source_service="web",
source_chat_id=source_chat_id,
reply_to=reply_to,
message_meta={},
)
def test_default_submission_creates_run_and_event(self):
trigger = self._msg("#claude# please update #1")
results = async_to_sync(process_inbound_message)(
CommandContext(
service="web",
channel_identifier="web-chan-1",
message_id=str(trigger.id),
user_id=self.user.id,
message_text=str(trigger.text),
payload={},
)
)
self.assertEqual(1, len(results))
self.assertTrue(results[0].ok)
run = CodexRun.objects.order_by("-created_at").first()
self.assertIsNotNone(run)
self.assertEqual("waiting_approval", run.status)
event = ExternalSyncEvent.objects.order_by("-created_at").first()
self.assertEqual("waiting_approval", event.status)
self.assertEqual(
"default",
str((event.payload or {}).get("provider_payload", {}).get("mode") or ""),
)
self.assertTrue(
CodexPermissionRequest.objects.filter(
user=self.user,
codex_run=run,
status="pending",
).exists()
)
# The approval notification must reference ".claude approve" not ".codex approve"
req = CodexPermissionRequest.objects.get(codex_run=run, status="pending")
approval_key = str(req.approval_key or "")
# The approval_key should exist
self.assertTrue(bool(approval_key))
def test_plan_requires_reply_anchor(self):
trigger = self._msg("#claude plan# #1")
results = async_to_sync(process_inbound_message)(
CommandContext(
service="web",
channel_identifier="web-chan-1",
message_id=str(trigger.id),
user_id=self.user.id,
message_text=str(trigger.text),
payload={},
)
)
self.assertEqual(1, len(results))
self.assertFalse(results[0].ok)
self.assertEqual("reply_required_for_claude_plan", results[0].error)
def test_approve_command_queues_resume_event(self):
waiting_event = ExternalSyncEvent.objects.create(
user=self.user,
task=self.task,
provider="claude_cli",
status="waiting_approval",
payload={},
error="",
)
run = CodexRun.objects.create(
user=self.user,
task=self.task,
project=self.project,
source_service="web",
source_channel="web-chan-1",
status="waiting_approval",
request_payload={
"action": "append_update",
"provider_payload": {"task_id": str(self.task.id)},
},
result_payload={},
)
CodexPermissionRequest.objects.create(
user=self.user,
codex_run=run,
external_sync_event=waiting_event,
approval_key="cl-ak-123",
summary="Need approval",
requested_permissions={"items": ["write"]},
resume_payload={"resume": True},
status="pending",
)
CommandChannelBinding.objects.create(
profile=self.profile,
direction="ingress",
service="web",
channel_identifier="approver-chan",
enabled=True,
)
trigger = self._msg("#claude approve cl-ak-123#", source_chat_id="approver-chan")
results = async_to_sync(process_inbound_message)(
CommandContext(
service="web",
channel_identifier="approver-chan",
message_id=str(trigger.id),
user_id=self.user.id,
message_text=str(trigger.text),
payload={},
)
)
self.assertEqual(1, len(results))
self.assertTrue(results[0].ok)
run.refresh_from_db()
waiting_event.refresh_from_db()
self.assertEqual("approved_waiting_resume", run.status)
self.assertEqual("ok", waiting_event.status)
self.assertTrue(
ExternalSyncEvent.objects.filter(
idempotency_key="claude_approval:cl-ak-123:approved",
status="pending",
).exists()
)
def test_deny_command_marks_run_denied(self):
waiting_event = ExternalSyncEvent.objects.create(
user=self.user,
task=self.task,
provider="claude_cli",
status="waiting_approval",
payload={},
error="",
)
run = CodexRun.objects.create(
user=self.user,
task=self.task,
project=self.project,
source_service="web",
source_channel="web-chan-1",
status="waiting_approval",
request_payload={},
result_payload={},
)
CodexPermissionRequest.objects.create(
user=self.user,
codex_run=run,
external_sync_event=waiting_event,
approval_key="cl-deny-1",
summary="Need approval",
requested_permissions={"items": ["write"]},
resume_payload={},
status="pending",
)
CommandChannelBinding.objects.get_or_create(
profile=self.profile,
direction="ingress",
service="web",
channel_identifier="approver-chan",
defaults={"enabled": True},
)
trigger = self._msg(".claude deny cl-deny-1", source_chat_id="approver-chan")
results = async_to_sync(process_inbound_message)(
CommandContext(
service="web",
channel_identifier="approver-chan",
message_id=str(trigger.id),
user_id=self.user.id,
message_text=str(trigger.text),
payload={},
)
)
self.assertEqual(1, len(results))
self.assertTrue(results[0].ok)
run.refresh_from_db()
self.assertEqual("denied", run.status)

View File

@@ -0,0 +1,231 @@
from __future__ import annotations
import datetime
from unittest.mock import AsyncMock, patch
from asgiref.sync import async_to_sync
from django.test import TestCase, override_settings
from django.utils import timezone
from core.models import (
ChatSession,
ChatTaskSource,
DerivedTask,
DerivedTaskEvent,
Message,
Person,
PersonIdentifier,
TaskProject,
User,
)
from core.tasks.engine import (
_parse_assignee,
_parse_due_date,
process_inbound_task_intelligence,
)
class DueDateParsingTests(TestCase):
def test_parses_due_iso_date(self):
result = _parse_due_date("due 2026-04-15")
self.assertEqual(datetime.date(2026, 4, 15), result)
def test_parses_by_iso_date(self):
result = _parse_due_date("please finish by 2026-04-15")
self.assertEqual(datetime.date(2026, 4, 15), result)
def test_returns_none_for_no_date(self):
self.assertIsNone(_parse_due_date("just a task description"))
def test_parses_due_today(self):
result = _parse_due_date("due today")
self.assertEqual(datetime.date.today(), result)
def test_parses_due_tomorrow(self):
result = _parse_due_date("by tomorrow")
self.assertEqual(datetime.date.today() + datetime.timedelta(days=1), result)
def test_parses_weekday_name(self):
result = _parse_due_date("by friday")
self.assertIsNotNone(result)
self.assertEqual(4, result.weekday()) # Friday = 4
def test_case_insensitive(self):
result = _parse_due_date("Due Today")
self.assertEqual(datetime.date.today(), result)
class AssigneeParsingTests(TestCase):
def test_parses_at_mention(self):
result = _parse_assignee("@alice please review this")
self.assertEqual("alice", result)
def test_parses_assign_to(self):
result = _parse_assignee("assign to bob")
self.assertEqual("bob", result)
def test_parses_for_person(self):
result = _parse_assignee("for charlie to fix by friday")
self.assertEqual("charlie", result)
def test_returns_empty_string_when_no_assignee(self):
result = _parse_assignee("no one mentioned")
self.assertEqual("", result)
def test_prefers_at_mention(self):
result = _parse_assignee("@dave assign to someone")
self.assertEqual("dave", result)
@override_settings(TASK_DERIVATION_USE_AI=False)
class TaskEnginePlan09Tests(TestCase):
def setUp(self):
self.user = User.objects.create_user("plan09-user", "plan09@example.com", "x")
self.person = Person.objects.create(user=self.user, name="Plan09 Person")
self.identifier = PersonIdentifier.objects.create(
user=self.user,
person=self.person,
service="signal",
identifier="+15559001234",
)
self.session = ChatSession.objects.create(user=self.user, identifier=self.identifier)
self.project = TaskProject.objects.create(user=self.user, name="Plan09 Project")
ChatTaskSource.objects.create(
user=self.user,
service="signal",
channel_identifier="+15559001234",
project=self.project,
enabled=True,
)
def _msg(self, text: str, ts: int = 1000):
return Message.objects.create(
user=self.user,
session=self.session,
sender_uuid="peer",
text=text,
ts=ts,
source_service="signal",
source_chat_id="+15559001234",
)
def test_due_date_stored_in_model_field(self):
m = self._msg("task: update SSL cert by 2026-04-15")
async_to_sync(process_inbound_task_intelligence)(m)
task = DerivedTask.objects.get(origin_message=m)
self.assertEqual(datetime.date(2026, 4, 15), task.due_date)
def test_assignee_stored_in_model_field(self):
m = self._msg("task: review PR @alice")
async_to_sync(process_inbound_task_intelligence)(m)
task = DerivedTask.objects.get(origin_message=m)
self.assertEqual("alice", task.assignee_identifier)
def test_task_without_due_date_has_null_due_date(self):
m = self._msg("task: plain task no date")
async_to_sync(process_inbound_task_intelligence)(m)
task = DerivedTask.objects.get(origin_message=m)
self.assertIsNone(task.due_date)
@patch("core.tasks.engine.send_message_raw", new_callable=AsyncMock)
def test_dot_task_list_command(self, mocked_send):
seed = self._msg("task: fix the database issue", ts=1001)
async_to_sync(process_inbound_task_intelligence)(seed)
cmd = self._msg(".task list", ts=1002)
async_to_sync(process_inbound_task_intelligence)(cmd)
payloads = [str(call.kwargs.get("text") or "") for call in mocked_send.await_args_list]
self.assertTrue(any("open tasks" in row.lower() for row in payloads))
@patch("core.tasks.engine.send_message_raw", new_callable=AsyncMock)
def test_dot_task_show_displays_task_detail(self, mocked_send):
seed = self._msg("task: deploy new version", ts=1003)
async_to_sync(process_inbound_task_intelligence)(seed)
task = DerivedTask.objects.get(origin_message=seed)
cmd = self._msg(f".task show #{task.reference_code}", ts=1004)
async_to_sync(process_inbound_task_intelligence)(cmd)
payloads = [str(call.kwargs.get("text") or "") for call in mocked_send.await_args_list]
self.assertTrue(any("deploy new version" in row.lower() for row in payloads))
self.assertTrue(any(str(task.reference_code) in row for row in payloads))
@patch("core.tasks.engine.send_message_raw", new_callable=AsyncMock)
def test_dot_task_complete_marks_task_done(self, mocked_send):
seed = self._msg("task: restart services", ts=1005)
async_to_sync(process_inbound_task_intelligence)(seed)
task = DerivedTask.objects.get(origin_message=seed)
cmd = self._msg(f".task complete #{task.reference_code}", ts=1006)
async_to_sync(process_inbound_task_intelligence)(cmd)
task.refresh_from_db()
self.assertEqual("completed", task.status_snapshot)
self.assertTrue(
DerivedTaskEvent.objects.filter(task=task, event_type="completion_marked").exists()
)
payloads = [str(call.kwargs.get("text") or "") for call in mocked_send.await_args_list]
self.assertTrue(any("completed" in row.lower() for row in payloads))
def test_dot_task_complete_creates_audit_event(self):
seed = self._msg("task: patch kernel", ts=1007)
async_to_sync(process_inbound_task_intelligence)(seed)
task = DerivedTask.objects.get(origin_message=seed)
with patch("core.tasks.engine.send_message_raw", new_callable=AsyncMock):
cmd = self._msg(f".task complete #{task.reference_code}", ts=1008)
async_to_sync(process_inbound_task_intelligence)(cmd)
event = DerivedTaskEvent.objects.filter(task=task, event_type="completion_marked").first()
self.assertIsNotNone(event)
self.assertIn("command", str(event.payload or {}).lower())
@override_settings(TASK_DERIVATION_USE_AI=False)
class TaskEngineMemoryContextTests(TestCase):
def setUp(self):
self.user = User.objects.create_user("mem-ctx-user", "mem-ctx@example.com", "x")
self.person = Person.objects.create(user=self.user, name="Mem Ctx Person")
self.identifier = PersonIdentifier.objects.create(
user=self.user,
person=self.person,
service="whatsapp",
identifier="447700900001@s.whatsapp.net",
)
self.session = ChatSession.objects.create(user=self.user, identifier=self.identifier)
self.project = TaskProject.objects.create(user=self.user, name="Mem Project")
ChatTaskSource.objects.create(
user=self.user,
service="whatsapp",
channel_identifier="447700900001@s.whatsapp.net",
project=self.project,
enabled=True,
)
def _msg(self, text: str, ts: int = 2000):
return Message.objects.create(
user=self.user,
session=self.session,
sender_uuid="peer",
text=text,
ts=ts,
source_service="whatsapp",
source_chat_id="447700900001@s.whatsapp.net",
)
def test_task_creation_invokes_memory_retrieval(self):
m = self._msg("task: deploy production release")
with patch(
"core.tasks.engine.retrieve_memories_for_prompt", return_value=[]
) as mock_retrieve:
async_to_sync(process_inbound_task_intelligence)(m)
self.assertTrue(mock_retrieve.called)
def test_memory_context_included_in_sync_event_payload(self):
from core.models import CodexRun
m = self._msg("task: fix authentication bug", ts=2001)
fake_memory = [{"id": "mem-1", "memory_kind": "fact", "content": {"text": "prefers short summaries"}}]
with patch("core.tasks.engine.retrieve_memories_for_prompt", return_value=fake_memory):
async_to_sync(process_inbound_task_intelligence)(m)
task = DerivedTask.objects.filter(origin_message=m).first()
self.assertIsNotNone(task)
run = CodexRun.objects.filter(task=task).order_by("-created_at").first()
self.assertIsNotNone(run, "Expected CodexRun created for task")
provider_payload = (run.request_payload or {}).get("provider_payload") or {}
memory_context = provider_payload.get("memory_context")
self.assertIsNotNone(memory_context, "Expected memory_context in CodexRun provider payload")
self.assertEqual(1, len(memory_context))