Reimplement compose and add tiling windows

This commit is contained in:
2026-03-12 22:03:30 +00:00
parent 79766d279d
commit 6ceff63b71
126 changed files with 5111 additions and 10796 deletions

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import datetime
import hashlib
import json
from urllib.parse import urlencode
@@ -13,7 +12,6 @@ from django.db.models import Count
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils import timezone
from django.views import View
from core.clients.transport import send_message_raw
@@ -21,11 +19,7 @@ from core.models import (
AnswerSuggestionEvent,
Chat,
ChatTaskSource,
CodexPermissionRequest,
CodexRun,
DerivedTask,
ExternalChatLink,
ExternalSyncEvent,
Person,
PersonIdentifier,
PlatformChatLink,
@@ -39,10 +33,7 @@ from core.tasks.chat_defaults import (
ensure_default_source_for_chat,
normalize_channel_identifier,
)
from core.tasks.codex_approval import queue_codex_event_with_pre_approval
from core.tasks.codex_support import resolve_external_chat_id
from core.tasks.engine import create_task_record_and_sync
from core.tasks.providers import get_provider
def _upsert_task_source(
@@ -436,115 +427,6 @@ def _provider_row_map(user):
}
def _codex_settings_with_defaults(raw: dict | None) -> dict:
row = dict(raw or {})
timeout_raw = str(row.get("timeout_seconds") or "60").strip()
try:
timeout_seconds = max(1, int(timeout_raw))
except Exception:
timeout_seconds = 60
return {
"command": str(row.get("command") or "codex").strip() or "codex",
"workspace_root": str(row.get("workspace_root") or "").strip(),
"default_profile": str(row.get("default_profile") or "").strip(),
"timeout_seconds": timeout_seconds,
"chat_link_mode": "task-sync",
"instance_label": str(row.get("instance_label") or "default").strip()
or "default",
"approver_service": str(row.get("approver_service") or "").strip().lower(),
"approver_identifier": str(row.get("approver_identifier") or "").strip(),
"approver_mode": "channel",
}
def _claude_settings_with_defaults(raw: dict | None) -> dict:
row = dict(raw or {})
timeout_raw = str(row.get("timeout_seconds") or "60").strip()
try:
timeout_seconds = max(1, int(timeout_raw))
except Exception:
timeout_seconds = 60
return {
"command": str(row.get("command") or "claude").strip() or "claude",
"workspace_root": str(row.get("workspace_root") or "").strip(),
"default_profile": str(row.get("default_profile") or "").strip(),
"timeout_seconds": timeout_seconds,
"approver_service": str(row.get("approver_service") or "").strip().lower(),
"approver_identifier": str(row.get("approver_identifier") or "").strip(),
}
def _enqueue_codex_task_submission(
*,
user,
task: DerivedTask,
source_service: str,
source_channel: str,
mode: str = "default",
command_text: str = "",
source_message=None,
provider: str = "codex_cli",
) -> CodexRun:
provider = str(provider or "codex_cli").strip() or "codex_cli"
external_chat_id = resolve_external_chat_id(
user=user,
provider=provider,
service=source_service,
channel=source_channel,
)
provider_payload = {
"task_id": str(task.id),
"reference_code": str(task.reference_code or ""),
"title": str(task.title or ""),
"external_key": str(task.external_key or ""),
"project_name": str(getattr(task.project, "name", "") or ""),
"epic_name": str(getattr(task.epic, "name", "") or ""),
"source_service": str(source_service or ""),
"source_channel": str(source_channel or ""),
"external_chat_id": external_chat_id,
"origin_message_id": str(getattr(task, "origin_message_id", "") or ""),
"trigger_message_id": str(getattr(source_message, "id", "") or ""),
"mode": str(mode or "default"),
}
if command_text:
provider_payload["command_text"] = str(command_text)
run = CodexRun.objects.create(
user=user,
task=task,
source_message=source_message,
project=task.project,
epic=task.epic,
source_service=str(source_service or ""),
source_channel=str(source_channel or ""),
external_chat_id=external_chat_id,
status="waiting_approval",
request_payload={
"action": "append_update",
"provider_payload": dict(provider_payload),
},
result_payload={},
error="",
)
provider_payload["codex_run_id"] = str(run.id)
run.request_payload = {
"action": "append_update",
"provider_payload": dict(provider_payload),
}
run.save(update_fields=["request_payload", "updated_at"])
idempotency_key = f"codex_submit:{task.id}:{mode}:{hashlib.sha1(str(command_text or '').encode('utf-8')).hexdigest()[:10]}:{run.id}"
queue_codex_event_with_pre_approval(
user=user,
run=run,
task=task,
task_event=None,
action="append_update",
provider_payload=dict(provider_payload),
idempotency_key=idempotency_key,
provider=provider,
)
return run
def _upsert_group_source(
*, user, service: str, channel_identifier: str, project, epic=None
):
@@ -665,22 +547,6 @@ def _person_identifier_scope_variants(service: str, identifier: str) -> list[str
return [row for row in variants if row]
def _scoped_person_identifier_rows(user, service: str, identifier: str):
service_key = str(service or "").strip().lower()
variants = _person_identifier_scope_variants(service_key, identifier)
if not service_key or not variants:
return PersonIdentifier.objects.none()
return (
PersonIdentifier.objects.filter(
user=user,
service=service_key,
identifier__in=variants,
)
.select_related("person")
.order_by("person__name", "service", "identifier")
)
def _resolve_channel_display(user, service: str, identifier: str) -> dict:
service_key = str(service or "").strip().lower()
raw_identifier = str(identifier or "").strip()
@@ -865,12 +731,6 @@ class TasksHub(LoginRequiredMixin, View):
"mapped": mapped,
}
)
enabled_providers = list(
TaskProviderConfig.objects.filter(user=request.user, enabled=True)
.exclude(provider="mock")
.values_list("provider", flat=True)
.order_by("provider")
)
return {
"projects": projects,
"project_choices": all_projects,
@@ -882,7 +742,6 @@ class TasksHub(LoginRequiredMixin, View):
"person_identifier_rows": person_identifier_rows,
"selected_project": selected_project,
"show_empty_projects": show_empty,
"enabled_providers": enabled_providers,
}
def get(self, request):
@@ -1362,24 +1221,12 @@ class TaskDetail(LoginRequiredMixin, View):
str(getattr(task, "source_service", "") or "").strip().lower(),
getattr(task, "origin_message", None),
)
sync_events = task.external_sync_events.order_by("-created_at")
codex_runs = task.codex_runs.select_related("source_message").order_by(
"-created_at"
)
permission_requests = (
CodexPermissionRequest.objects.filter(codex_run__task=task)
.select_related("codex_run", "external_sync_event")
.order_by("-requested_at")
)
return render(
request,
self.template_name,
{
"task": task,
"events": events,
"sync_events": sync_events,
"codex_runs": codex_runs,
"permission_requests": permission_requests,
},
)
@@ -1409,90 +1256,7 @@ class TaskSettings(LoginRequiredMixin, View):
row.settings_effective["allowed_prefixes"]
)
provider_map = _provider_row_map(request.user)
codex_cfg = provider_map.get("codex_cli")
codex_settings = _codex_settings_with_defaults(
dict(getattr(codex_cfg, "settings", {}) or {})
)
claude_cfg = provider_map.get("claude_cli")
claude_settings = _claude_settings_with_defaults(
dict(getattr(claude_cfg, "settings", {}) or {})
)
mock_cfg = provider_map.get("mock")
codex_provider = get_provider("codex_cli")
claude_provider = get_provider("claude_cli")
codex_healthcheck = (
codex_provider.healthcheck(codex_settings) if codex_cfg else None
)
claude_healthcheck = (
claude_provider.healthcheck(claude_settings) if claude_cfg else None
)
codex_queue_counts = {
"pending": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="pending"
).count(),
"waiting_approval": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="waiting_approval"
).count(),
"failed": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="failed"
).count(),
"ok": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="ok"
).count(),
}
claude_queue_counts = {
"pending": ExternalSyncEvent.objects.filter(
user=request.user, provider="claude_cli", status="pending"
).count(),
"waiting_approval": ExternalSyncEvent.objects.filter(
user=request.user, provider="claude_cli", status="waiting_approval"
).count(),
"failed": ExternalSyncEvent.objects.filter(
user=request.user, provider="claude_cli", status="failed"
).count(),
"ok": ExternalSyncEvent.objects.filter(
user=request.user, provider="claude_cli", status="ok"
).count(),
}
codex_recent_runs = CodexRun.objects.filter(user=request.user).order_by(
"-created_at"
)[:10]
latest_worker_event = (
ExternalSyncEvent.objects.filter(
user=request.user,
provider__in=["codex_cli", "claude_cli"],
)
.filter(status__in=["ok", "failed", "waiting_approval", "retrying"])
.order_by("-updated_at")
.first()
)
worker_heartbeat_at = getattr(latest_worker_event, "updated_at", None)
worker_heartbeat_age = ""
if worker_heartbeat_at is not None:
delta_seconds = max(
0, int((timezone.now() - worker_heartbeat_at).total_seconds())
)
worker_heartbeat_age = f"{delta_seconds}s ago"
external_chat_links = list(
ExternalChatLink.objects.filter(user=request.user)
.select_related("person", "person_identifier")
.order_by("-updated_at")[:200]
)
person_identifiers = (
PersonIdentifier.objects.filter(user=request.user)
.select_related("person")
.order_by("person__name", "service", "identifier")[:600]
)
external_link_scoped = bool(prefill_service and prefill_identifier)
external_link_scope_label = ""
external_link_person_identifiers = person_identifiers
if external_link_scoped:
external_link_scope_label = f"{prefill_service} · {prefill_identifier}"
external_link_person_identifiers = _scoped_person_identifier_rows(
request.user,
prefill_service,
prefill_identifier,
)
return {
"projects": projects,
@@ -1503,66 +1267,7 @@ class TaskSettings(LoginRequiredMixin, View):
"patterns": TaskCompletionPattern.objects.filter(
user=request.user
).order_by("position", "created_at"),
"provider_configs": list(provider_map.values()),
"mock_provider_config": mock_cfg,
"codex_provider_config": codex_cfg,
"codex_provider_settings": {
"command": str(codex_settings.get("command") or "codex"),
"workspace_root": str(codex_settings.get("workspace_root") or ""),
"default_profile": str(codex_settings.get("default_profile") or ""),
"timeout_seconds": int(codex_settings.get("timeout_seconds") or 60),
"chat_link_mode": str(
codex_settings.get("chat_link_mode") or "task-sync"
),
"instance_label": str(
codex_settings.get("instance_label") or "default"
),
"approver_service": str(codex_settings.get("approver_service") or ""),
"approver_identifier": str(
codex_settings.get("approver_identifier") or ""
),
"approver_mode": "channel",
},
"codex_compact_summary": {
"healthcheck_ok": bool(getattr(codex_healthcheck, "ok", False)),
"healthcheck_error": str(getattr(codex_healthcheck, "error", "") or ""),
"healthcheck_payload": dict(
getattr(codex_healthcheck, "payload", {}) or {}
),
"worker_heartbeat_at": worker_heartbeat_at,
"worker_heartbeat_age": worker_heartbeat_age,
"queue_counts": codex_queue_counts,
"recent_runs": codex_recent_runs,
},
"claude_provider_config": claude_cfg,
"claude_provider_settings": {
"command": str(claude_settings.get("command") or "claude"),
"workspace_root": str(claude_settings.get("workspace_root") or ""),
"default_profile": str(claude_settings.get("default_profile") or ""),
"timeout_seconds": int(claude_settings.get("timeout_seconds") or 60),
"approver_service": str(claude_settings.get("approver_service") or ""),
"approver_identifier": str(
claude_settings.get("approver_identifier") or ""
),
},
"claude_compact_summary": {
"healthcheck_ok": bool(getattr(claude_healthcheck, "ok", False)),
"healthcheck_error": str(
getattr(claude_healthcheck, "error", "") or ""
),
"healthcheck_payload": dict(
getattr(claude_healthcheck, "payload", {}) or {}
),
"queue_counts": claude_queue_counts,
},
"person_identifiers": person_identifiers,
"external_link_person_identifiers": external_link_person_identifiers,
"external_link_scoped": external_link_scoped,
"external_link_scope_label": external_link_scope_label,
"external_chat_links": external_chat_links,
"sync_events": ExternalSyncEvent.objects.filter(user=request.user).order_by(
"-updated_at"
)[:100],
"prefill_service": prefill_service,
"prefill_identifier": prefill_identifier,
}
@@ -1699,384 +1404,22 @@ class TaskSettings(LoginRequiredMixin, View):
if action == "provider_update":
provider = str(request.POST.get("provider") or "mock").strip() or "mock"
if provider != "mock":
messages.error(request, "Only the mock task provider is available.")
return _settings_redirect(request)
row, _ = TaskProviderConfig.objects.get_or_create(
user=request.user,
provider=provider,
defaults={"enabled": False, "settings": {}},
)
row.enabled = bool(request.POST.get("enabled"))
settings_payload = dict(row.settings or {})
if provider == "codex_cli":
settings_payload = _codex_settings_with_defaults(
{
"command": request.POST.get("command"),
"workspace_root": request.POST.get("workspace_root"),
"default_profile": request.POST.get("default_profile"),
"timeout_seconds": request.POST.get("timeout_seconds"),
"instance_label": request.POST.get("instance_label"),
"approver_service": request.POST.get("approver_service"),
"approver_identifier": request.POST.get("approver_identifier"),
"approver_mode": "channel",
}
)
elif provider == "claude_cli":
settings_payload = _claude_settings_with_defaults(
{
"command": request.POST.get("command"),
"workspace_root": request.POST.get("workspace_root"),
"default_profile": request.POST.get("default_profile"),
"timeout_seconds": request.POST.get("timeout_seconds"),
"approver_service": request.POST.get("approver_service"),
"approver_identifier": request.POST.get("approver_identifier"),
}
)
row.settings = settings_payload
row.settings = dict(row.settings or {})
row.save(update_fields=["enabled", "settings", "updated_at"])
return _settings_redirect(request)
if action == "external_chat_link_upsert":
provider = (
str(request.POST.get("provider") or "codex_cli").strip().lower()
or "codex_cli"
)
external_chat_id = str(request.POST.get("external_chat_id") or "").strip()
person_identifier_id = str(
request.POST.get("person_identifier_id") or ""
).strip()
prefill_service = (
str(
request.POST.get("prefill_service")
or request.GET.get("service")
or ""
)
.strip()
.lower()
)
prefill_identifier = str(
request.POST.get("prefill_identifier")
or request.GET.get("identifier")
or ""
).strip()
if not external_chat_id:
messages.error(request, "External chat ID is required.")
return _settings_redirect(request)
identifier = None
if person_identifier_id:
identifier = get_object_or_404(
PersonIdentifier,
user=request.user,
id=person_identifier_id,
)
if prefill_service and prefill_identifier:
allowed_ids = set(
_scoped_person_identifier_rows(
request.user, prefill_service, prefill_identifier
).values_list("id", flat=True)
)
if identifier.id not in allowed_ids:
messages.error(
request,
"Selected contact is outside the current scoped chat.",
)
return _settings_redirect(request)
row, _ = ExternalChatLink.objects.update_or_create(
user=request.user,
provider=provider,
external_chat_id=external_chat_id,
defaults={
"person": getattr(identifier, "person", None),
"person_identifier": identifier,
"enabled": bool(request.POST.get("enabled")),
"metadata": {
"chat_link_mode": "task-sync",
"notes": str(request.POST.get("metadata_notes") or "").strip(),
},
},
)
if identifier and row.person_id != identifier.person_id:
row.person = identifier.person
row.save(update_fields=["person", "updated_at"])
return _settings_redirect(request)
if action == "external_chat_link_delete":
row = get_object_or_404(
ExternalChatLink,
id=request.POST.get("external_link_id"),
user=request.user,
)
row.delete()
return _settings_redirect(request)
if action == "sync_retry":
event = get_object_or_404(
ExternalSyncEvent, id=request.POST.get("event_id"), user=request.user
)
provider = get_provider(event.provider)
if bool(getattr(provider, "run_in_worker", False)):
event.status = "pending"
event.error = ""
event.payload = dict(event.payload or {}, retried=True)
event.save(update_fields=["status", "error", "payload", "updated_at"])
else:
payload = dict(event.payload or {})
result = provider.append_update({}, payload)
event.status = "ok" if result.ok else "failed"
event.error = str(result.error or "")
event.payload = dict(payload, retried=True)
event.save(update_fields=["status", "error", "payload", "updated_at"])
return _settings_redirect(request)
return _settings_redirect(request)
_ALLOWED_SUBMIT_PROVIDERS = {"codex_cli", "claude_cli"}
class TaskCodexSubmit(LoginRequiredMixin, View):
def post(self, request):
task_id = str(request.POST.get("task_id") or "").strip()
next_url = str(request.POST.get("next") or reverse("tasks_hub")).strip()
provider = str(request.POST.get("provider") or "codex_cli").strip().lower()
if provider not in _ALLOWED_SUBMIT_PROVIDERS:
provider = "codex_cli"
task = get_object_or_404(
DerivedTask.objects.select_related("project", "epic", "origin_message"),
id=task_id,
user=request.user,
)
cfg = TaskProviderConfig.objects.filter(
user=request.user,
provider=provider,
enabled=True,
).first()
provider_label = "Claude" if provider == "claude_cli" else "Codex"
if cfg is None:
messages.error(
request,
f"{provider_label} provider is disabled. Enable it in Task Automation first.",
)
return redirect(next_url)
run = _enqueue_codex_task_submission(
user=request.user,
task=task,
source_service=str(task.source_service or ""),
source_channel=str(task.source_channel or ""),
mode="default",
source_message=getattr(task, "origin_message", None),
provider=provider,
)
messages.success(
request,
f"Queued approval for task #{task.reference_code} before {provider_label} run {run.id}.",
)
return redirect(next_url)
class CodexSettingsPage(LoginRequiredMixin, View):
template_name = "pages/codex-settings.html"
def _context(self, request):
cfg = TaskProviderConfig.objects.filter(
user=request.user, provider="codex_cli"
).first()
settings_payload = _codex_settings_with_defaults(
dict(getattr(cfg, "settings", {}) or {})
)
provider = get_provider("codex_cli")
health = provider.healthcheck(settings_payload) if cfg else None
status_filter = str(request.GET.get("status") or "").strip().lower()
service_filter = str(request.GET.get("service") or "").strip().lower()
channel_filter = str(request.GET.get("channel") or "").strip()
project_filter = str(request.GET.get("project") or "").strip()
date_from = str(request.GET.get("date_from") or "").strip()
runs = (
CodexRun.objects.filter(user=request.user)
.select_related("task", "project", "epic")
.order_by("-created_at")
)
if status_filter:
runs = runs.filter(status=status_filter)
if service_filter:
runs = runs.filter(source_service=service_filter)
if channel_filter:
runs = runs.filter(source_channel=channel_filter)
if project_filter:
runs = runs.filter(project_id=project_filter)
if date_from:
runs = runs.filter(created_at__date__gte=date_from)
runs = runs[:200]
queue_counts = {
"pending": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="pending"
).count(),
"waiting_approval": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="waiting_approval"
).count(),
"failed": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="failed"
).count(),
"ok": ExternalSyncEvent.objects.filter(
user=request.user, provider="codex_cli", status="ok"
).count(),
}
permission_requests = (
CodexPermissionRequest.objects.filter(user=request.user)
.select_related("codex_run", "codex_run__task", "external_sync_event")
.order_by("-requested_at")[:200]
)
return {
"provider_config": cfg,
"provider_settings": settings_payload,
"health": health,
"runs": runs,
"queue_counts": queue_counts,
"permission_requests": permission_requests,
"projects": TaskProject.objects.filter(user=request.user).order_by("name"),
"filters": {
"status": status_filter,
"service": service_filter,
"channel": channel_filter,
"project": project_filter,
"date_from": date_from,
},
}
def get(self, request):
return render(request, self.template_name, self._context(request))
class CodexApprovalAction(LoginRequiredMixin, View):
def post(self, request):
request_id = str(request.POST.get("request_id") or "").strip()
decision = str(request.POST.get("decision") or "").strip().lower()
row = get_object_or_404(
CodexPermissionRequest.objects.select_related(
"codex_run", "external_sync_event"
),
id=request_id,
user=request.user,
)
if row.status != "pending":
return redirect("codex_settings")
now = timezone.now()
if decision == "approve":
row.status = "approved"
row.resolved_at = now
row.resolved_by_identifier = "settings_ui"
row.resolution_note = "approved via settings ui"
row.save(
update_fields=[
"status",
"resolved_at",
"resolved_by_identifier",
"resolution_note",
]
)
if row.external_sync_event_id:
ExternalSyncEvent.objects.filter(id=row.external_sync_event_id).update(
status="ok",
error="",
)
run = row.codex_run
run.status = "approved_waiting_resume"
run.error = ""
run.save(update_fields=["status", "error", "updated_at"])
resume_payload = dict(row.resume_payload or {})
resume_action = str(resume_payload.get("action") or "").strip().lower()
resume_provider_payload = dict(resume_payload.get("provider_payload") or {})
if resume_action and resume_provider_payload:
provider_payload = dict(resume_provider_payload)
provider_payload["codex_run_id"] = str(run.id)
event_action = resume_action
resume_idempotency_key = str(
resume_payload.get("idempotency_key") or ""
).strip()
resume_event_key = (
resume_idempotency_key
if resume_idempotency_key
else f"codex_approval:{row.approval_key}:approved"
)
else:
provider_payload = dict(
run.request_payload.get("provider_payload") or {}
)
provider_payload.update(
{
"mode": "approval_response",
"approval_key": row.approval_key,
"resume_payload": dict(row.resume_payload or {}),
"codex_run_id": str(run.id),
}
)
event_action = "append_update"
resume_event_key = f"codex_approval:{row.approval_key}:approved"
ExternalSyncEvent.objects.update_or_create(
idempotency_key=resume_event_key,
defaults={
"user": request.user,
"task": run.task,
"task_event": run.derived_task_event,
"provider": "codex_cli",
"status": "pending",
"payload": {
"action": event_action,
"provider_payload": provider_payload,
},
"error": "",
},
)
messages.success(
request, f"Approved {row.approval_key}. Resume event queued."
)
return redirect("codex_settings")
row.status = "denied"
row.resolved_at = now
row.resolved_by_identifier = "settings_ui"
row.resolution_note = "denied via settings ui"
row.save(
update_fields=[
"status",
"resolved_at",
"resolved_by_identifier",
"resolution_note",
]
)
run = row.codex_run
run.status = "denied"
run.error = "approval_denied"
run.save(update_fields=["status", "error", "updated_at"])
ExternalSyncEvent.objects.update_or_create(
idempotency_key=f"codex_approval:{row.approval_key}:denied",
defaults={
"user": request.user,
"task": run.task,
"task_event": run.derived_task_event,
"provider": "codex_cli",
"status": "failed",
"payload": {
"action": "append_update",
"provider_payload": {
"mode": "approval_response",
"approval_key": row.approval_key,
"codex_run_id": str(run.id),
},
},
"error": "approval_denied",
},
)
if row.external_sync_event_id:
ExternalSyncEvent.objects.filter(id=row.external_sync_event_id).update(
status="failed",
error="approval_denied",
)
messages.warning(request, f"Denied {row.approval_key}.")
return redirect("codex_settings")
class AnswerSuggestionSend(LoginRequiredMixin, View):
def post(self, request):
event = get_object_or_404(