1104 lines
38 KiB
Python
1104 lines
38 KiB
Python
from __future__ import annotations
|
||
|
||
import datetime
|
||
import re
|
||
|
||
from asgiref.sync import sync_to_async
|
||
from django.conf import settings
|
||
|
||
from core.clients.transport import send_message_raw
|
||
from core.memory.retrieval import retrieve_memories_for_prompt
|
||
from core.messaging import ai as ai_runner
|
||
from core.models import (
|
||
AI,
|
||
Chat,
|
||
ChatTaskSource,
|
||
CodexRun,
|
||
DerivedTask,
|
||
DerivedTaskEvent,
|
||
ExternalSyncEvent,
|
||
Message,
|
||
TaskCompletionPattern,
|
||
TaskEpic,
|
||
TaskProviderConfig,
|
||
)
|
||
from core.security.command_policy import CommandSecurityContext, evaluate_command_policy
|
||
from core.tasks.chat_defaults import (
|
||
ensure_default_source_for_chat,
|
||
resolve_message_scope,
|
||
)
|
||
from core.tasks.codex_approval import queue_codex_event_with_pre_approval
|
||
from core.tasks.codex_support import resolve_external_chat_id
|
||
from core.tasks.providers import get_provider
|
||
|
||
_TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE)
|
||
_COMPLETION_RE = re.compile(
|
||
r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE
|
||
)
|
||
_BALANCED_HINT_RE = re.compile(r"\b(todo|task|action item|action)\b", re.IGNORECASE)
|
||
_BROAD_HINT_RE = re.compile(
|
||
r"\b(todo|task|action|need to|please|reminder)\b", re.IGNORECASE
|
||
)
|
||
_PREFIX_HEAD_TRIM = " \t\r\n`'\"([{<*#-–—_>.,:;!/?\\|"
|
||
_LIST_TASKS_RE = re.compile(
|
||
r"^\s*(?:\.l(?:\s+list(?:\s+tasks?)?)?|\.list(?:\s+tasks?)?)\s*$",
|
||
re.IGNORECASE,
|
||
)
|
||
_UNDO_TASK_RE = re.compile(
|
||
r"^\s*\.undo(?:\s+(?:#)?(?P<reference>[A-Za-z0-9_-]+))?\s*$",
|
||
re.IGNORECASE,
|
||
)
|
||
_EPIC_CREATE_RE = re.compile(
|
||
r"^\s*(?:\.epic\b|epic)\s*[:\-]?\s*(?P<name>.+?)\s*$",
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
_EPIC_TOKEN_RE = re.compile(r"\[\s*epic\s*:\s*([^\]]+?)\s*\]", re.IGNORECASE)
|
||
_LIST_TASKS_CMD_RE = re.compile(
|
||
r"^\s*\.task\s+list\s*$",
|
||
re.IGNORECASE,
|
||
)
|
||
_TASK_SHOW_RE = re.compile(
|
||
r"^\s*\.task\s+show\s+#?(?P<reference>[A-Za-z0-9_-]+)\s*$",
|
||
re.IGNORECASE,
|
||
)
|
||
_TASK_COMPLETE_CMD_RE = re.compile(
|
||
r"^\s*\.task\s+(?:complete|done|close)\s+#?(?P<reference>[A-Za-z0-9_-]+)\s*$",
|
||
re.IGNORECASE,
|
||
)
|
||
_TASK_ADD_CMD_RE = re.compile(
|
||
r"^\s*\.task\s+(?:add|create|new)\s+(?P<title>.+?)\s*$",
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
_DUE_ISO_RE = re.compile(
|
||
r"\b(?:due|by)\s+(\d{4}-\d{2}-\d{2})\b",
|
||
re.IGNORECASE,
|
||
)
|
||
_DUE_RELATIVE_RE = re.compile(
|
||
r"\b(?:due|by)\s+(?P<token>today|tomorrow|monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b",
|
||
re.IGNORECASE,
|
||
)
|
||
_ASSIGNEE_AT_RE = re.compile(r"@([A-Za-z0-9_.-]+)")
|
||
_ASSIGNEE_PHRASE_RE = re.compile(
|
||
r"\b(?:assign(?:ed)?\s+to|for)\s+([A-Za-z0-9_.-]+)\b",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
_WEEKDAY_MAP = {
|
||
"monday": 0,
|
||
"tuesday": 1,
|
||
"wednesday": 2,
|
||
"thursday": 3,
|
||
"friday": 4,
|
||
"saturday": 5,
|
||
"sunday": 6,
|
||
}
|
||
|
||
|
||
def _parse_due_date(text: str) -> datetime.date | None:
|
||
body = str(text or "")
|
||
m = _DUE_ISO_RE.search(body)
|
||
if m:
|
||
try:
|
||
return datetime.date.fromisoformat(m.group(1))
|
||
except ValueError:
|
||
pass
|
||
m = _DUE_RELATIVE_RE.search(body)
|
||
if not m:
|
||
return None
|
||
token = m.group("token").strip().lower()
|
||
today = datetime.date.today()
|
||
if token == "today":
|
||
return today
|
||
if token == "tomorrow":
|
||
return today + datetime.timedelta(days=1)
|
||
target_weekday = _WEEKDAY_MAP.get(token)
|
||
if target_weekday is None:
|
||
return None
|
||
days_ahead = (target_weekday - today.weekday()) % 7
|
||
if days_ahead == 0:
|
||
days_ahead = 7
|
||
return today + datetime.timedelta(days=days_ahead)
|
||
|
||
|
||
def _parse_assignee(text: str) -> str:
|
||
body = str(text or "")
|
||
m = _ASSIGNEE_AT_RE.search(body)
|
||
if m:
|
||
return str(m.group(1) or "").strip()
|
||
m = _ASSIGNEE_PHRASE_RE.search(body)
|
||
if m:
|
||
return str(m.group(1) or "").strip()
|
||
return ""
|
||
|
||
|
||
def _channel_variants(service: str, channel: str) -> list[str]:
|
||
value = str(channel or "").strip()
|
||
if not value:
|
||
return []
|
||
variants = [value]
|
||
service_key = str(service or "").strip().lower()
|
||
if service_key == "whatsapp":
|
||
bare = value.split("@", 1)[0].strip()
|
||
if bare and bare not in variants:
|
||
variants.append(bare)
|
||
direct = f"{bare}@s.whatsapp.net" if bare else ""
|
||
if direct and direct not in variants:
|
||
variants.append(direct)
|
||
group = f"{bare}@g.us" if bare else ""
|
||
if group and group not in variants:
|
||
variants.append(group)
|
||
if service_key == "signal":
|
||
digits = re.sub(r"[^0-9]", "", value)
|
||
if digits and digits not in variants:
|
||
variants.append(digits)
|
||
if digits:
|
||
plus = f"+{digits}"
|
||
if plus not in variants:
|
||
variants.append(plus)
|
||
return variants
|
||
|
||
|
||
async def _resolve_source_mappings(message: Message) -> list[ChatTaskSource]:
|
||
lookup_service = str(message.source_service or "").strip().lower()
|
||
variants = _channel_variants(lookup_service, message.source_chat_id or "")
|
||
session_identifier = getattr(getattr(message, "session", None), "identifier", None)
|
||
canonical_service = (
|
||
str(getattr(session_identifier, "service", "") or "").strip().lower()
|
||
)
|
||
canonical_identifier = str(
|
||
getattr(session_identifier, "identifier", "") or ""
|
||
).strip()
|
||
if lookup_service == "web" and canonical_service and canonical_service != "web":
|
||
lookup_service = canonical_service
|
||
variants = _channel_variants(lookup_service, message.source_chat_id or "")
|
||
for expanded in _channel_variants(lookup_service, canonical_identifier):
|
||
if expanded and expanded not in variants:
|
||
variants.append(expanded)
|
||
elif (
|
||
canonical_service
|
||
and canonical_identifier
|
||
and canonical_service == lookup_service
|
||
):
|
||
for expanded in _channel_variants(canonical_service, canonical_identifier):
|
||
if expanded and expanded not in variants:
|
||
variants.append(expanded)
|
||
if lookup_service == "signal":
|
||
companions: list[str] = []
|
||
for value in list(variants):
|
||
signal_value = str(value or "").strip()
|
||
if not signal_value:
|
||
continue
|
||
companions += await sync_to_async(list)(
|
||
Chat.objects.filter(source_uuid=signal_value).values_list(
|
||
"source_number", flat=True
|
||
)
|
||
)
|
||
companions += await sync_to_async(list)(
|
||
Chat.objects.filter(source_number=signal_value).values_list(
|
||
"source_uuid", flat=True
|
||
)
|
||
)
|
||
for candidate in companions:
|
||
for expanded in _channel_variants("signal", str(candidate or "").strip()):
|
||
if expanded and expanded not in variants:
|
||
variants.append(expanded)
|
||
if not variants:
|
||
return []
|
||
return await sync_to_async(list)(
|
||
ChatTaskSource.objects.filter(
|
||
user=message.user,
|
||
enabled=True,
|
||
service=lookup_service,
|
||
channel_identifier__in=variants,
|
||
).select_related("project", "epic")
|
||
)
|
||
|
||
|
||
def _to_bool(raw, default=False) -> bool:
|
||
if raw is None:
|
||
return bool(default)
|
||
value = str(raw).strip().lower()
|
||
if value in {"1", "true", "yes", "on", "y"}:
|
||
return True
|
||
if value in {"0", "false", "no", "off", "n"}:
|
||
return False
|
||
return bool(default)
|
||
|
||
|
||
def _parse_prefixes(raw) -> list[str]:
|
||
if isinstance(raw, list):
|
||
values = raw
|
||
else:
|
||
values = str(raw or "").split(",")
|
||
rows = []
|
||
for row in values:
|
||
item = str(row or "").strip().lower()
|
||
if item and item not in rows:
|
||
rows.append(item)
|
||
return rows or ["task:", "todo:", "action:"]
|
||
|
||
|
||
def _prefix_roots(prefixes: list[str]) -> list[str]:
|
||
roots: list[str] = []
|
||
for value in prefixes:
|
||
token = str(value or "").strip().lower()
|
||
if not token:
|
||
continue
|
||
token = token.lstrip(_PREFIX_HEAD_TRIM)
|
||
match = re.match(r"([a-z0-9]+)", token)
|
||
if not match:
|
||
continue
|
||
root = str(match.group(1) or "").strip()
|
||
if root and root not in roots:
|
||
roots.append(root)
|
||
return roots
|
||
|
||
|
||
def _has_task_prefix(text: str, prefixes: list[str]) -> bool:
|
||
body = str(text or "").strip().lower()
|
||
if not body:
|
||
return False
|
||
if any(body.startswith(prefix) for prefix in prefixes):
|
||
return True
|
||
trimmed = body.lstrip(_PREFIX_HEAD_TRIM)
|
||
roots = _prefix_roots(prefixes)
|
||
if not trimmed or not roots:
|
||
return False
|
||
for root in roots:
|
||
if re.match(rf"^{re.escape(root)}\b(?:\s*[:\-–—#>.,;!]*\s*|\s+)", trimmed):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _strip_task_prefix(text: str, prefixes: list[str]) -> str:
|
||
body = str(text or "").strip()
|
||
if not body:
|
||
return ""
|
||
trimmed = body.lstrip(_PREFIX_HEAD_TRIM)
|
||
roots = _prefix_roots(prefixes)
|
||
if not trimmed or not roots:
|
||
return body
|
||
for root in roots:
|
||
match = re.match(
|
||
rf"^{re.escape(root)}\b(?:\s*[:\-–—#>.,;!]*\s*|\s+)(.+)$",
|
||
trimmed,
|
||
flags=re.IGNORECASE | re.DOTALL,
|
||
)
|
||
if match:
|
||
cleaned = str(match.group(1) or "").strip()
|
||
return cleaned or body
|
||
return body
|
||
|
||
|
||
def _normalize_flags(raw: dict | None) -> dict:
|
||
row = dict(raw or {})
|
||
return {
|
||
"derive_enabled": _to_bool(row.get("derive_enabled"), True),
|
||
"match_mode": str(row.get("match_mode") or "balanced").strip().lower()
|
||
or "balanced",
|
||
"require_prefix": _to_bool(row.get("require_prefix"), False),
|
||
"allowed_prefixes": _parse_prefixes(row.get("allowed_prefixes")),
|
||
"completion_enabled": _to_bool(row.get("completion_enabled"), True),
|
||
"ai_title_enabled": _to_bool(row.get("ai_title_enabled"), True),
|
||
"announce_task_id": _to_bool(row.get("announce_task_id"), False),
|
||
"min_chars": max(1, int(row.get("min_chars") or 3)),
|
||
}
|
||
|
||
|
||
def _normalize_partial_flags(raw: dict | None) -> dict:
|
||
row = dict(raw or {})
|
||
out = {}
|
||
if "derive_enabled" in row:
|
||
out["derive_enabled"] = _to_bool(row.get("derive_enabled"), True)
|
||
if "match_mode" in row:
|
||
out["match_mode"] = (
|
||
str(row.get("match_mode") or "balanced").strip().lower() or "balanced"
|
||
)
|
||
if "require_prefix" in row:
|
||
out["require_prefix"] = _to_bool(row.get("require_prefix"), False)
|
||
if "allowed_prefixes" in row:
|
||
out["allowed_prefixes"] = _parse_prefixes(row.get("allowed_prefixes"))
|
||
if "completion_enabled" in row:
|
||
out["completion_enabled"] = _to_bool(row.get("completion_enabled"), True)
|
||
if "ai_title_enabled" in row:
|
||
out["ai_title_enabled"] = _to_bool(row.get("ai_title_enabled"), True)
|
||
if "announce_task_id" in row:
|
||
out["announce_task_id"] = _to_bool(row.get("announce_task_id"), False)
|
||
if "min_chars" in row:
|
||
out["min_chars"] = max(1, int(row.get("min_chars") or 3))
|
||
return out
|
||
|
||
|
||
def _effective_flags(source: ChatTaskSource) -> dict:
|
||
project_flags = _normalize_flags(
|
||
getattr(getattr(source, "project", None), "settings", {}) or {}
|
||
)
|
||
source_flags = _normalize_partial_flags(getattr(source, "settings", {}) or {})
|
||
merged = dict(project_flags)
|
||
merged.update(source_flags)
|
||
return merged
|
||
|
||
|
||
def _is_task_candidate(text: str, flags: dict) -> bool:
|
||
body = str(text or "").strip()
|
||
if len(body) < int(flags.get("min_chars") or 1):
|
||
return False
|
||
body_lower = body.lower()
|
||
prefixes = list(flags.get("allowed_prefixes") or [])
|
||
has_prefix = _has_task_prefix(body_lower, prefixes)
|
||
if bool(flags.get("require_prefix")) and not has_prefix:
|
||
return False
|
||
mode = str(flags.get("match_mode") or "balanced").strip().lower()
|
||
if mode == "strict":
|
||
return has_prefix
|
||
if mode == "broad":
|
||
return has_prefix or bool(_BROAD_HINT_RE.search(body))
|
||
return has_prefix or bool(_BALANCED_HINT_RE.search(body))
|
||
|
||
|
||
def _next_reference(user, project) -> str:
|
||
last = (
|
||
DerivedTask.objects.filter(user=user, project=project)
|
||
.exclude(reference_code="")
|
||
.order_by("-created_at")
|
||
.first()
|
||
)
|
||
if not last:
|
||
return "1"
|
||
try:
|
||
return str(int(str(last.reference_code)) + 1)
|
||
except Exception:
|
||
return str(DerivedTask.objects.filter(user=user, project=project).count() + 1)
|
||
|
||
|
||
def create_task_record(
|
||
*,
|
||
user,
|
||
project,
|
||
title: str,
|
||
source_service: str,
|
||
source_channel: str,
|
||
origin_message: Message | None = None,
|
||
actor_identifier: str = "",
|
||
epic: TaskEpic | None = None,
|
||
due_date: datetime.date | None = None,
|
||
assignee_identifier: str = "",
|
||
immutable_payload: dict | None = None,
|
||
event_payload: dict | None = None,
|
||
status_snapshot: str = "open",
|
||
) -> tuple[DerivedTask, DerivedTaskEvent]:
|
||
reference = _next_reference(user, project)
|
||
task = DerivedTask.objects.create(
|
||
user=user,
|
||
project=project,
|
||
epic=epic,
|
||
title=str(title or "").strip()[:255] or "Untitled task",
|
||
source_service=str(source_service or "web").strip() or "web",
|
||
source_channel=str(source_channel or "").strip(),
|
||
origin_message=origin_message,
|
||
reference_code=reference,
|
||
status_snapshot=str(status_snapshot or "open").strip() or "open",
|
||
due_date=due_date,
|
||
assignee_identifier=str(assignee_identifier or "").strip(),
|
||
immutable_payload=dict(immutable_payload or {}),
|
||
)
|
||
event = DerivedTaskEvent.objects.create(
|
||
task=task,
|
||
event_type="created",
|
||
actor_identifier=str(actor_identifier or "").strip(),
|
||
source_message=origin_message,
|
||
payload=dict(event_payload or {}),
|
||
)
|
||
return task, event
|
||
|
||
|
||
async def create_task_record_and_sync(
|
||
*,
|
||
user,
|
||
project,
|
||
title: str,
|
||
source_service: str,
|
||
source_channel: str,
|
||
origin_message: Message | None = None,
|
||
actor_identifier: str = "",
|
||
epic: TaskEpic | None = None,
|
||
due_date: datetime.date | None = None,
|
||
assignee_identifier: str = "",
|
||
immutable_payload: dict | None = None,
|
||
event_payload: dict | None = None,
|
||
status_snapshot: str = "open",
|
||
) -> tuple[DerivedTask, DerivedTaskEvent]:
|
||
task, event = await sync_to_async(create_task_record)(
|
||
user=user,
|
||
project=project,
|
||
title=title,
|
||
source_service=source_service,
|
||
source_channel=source_channel,
|
||
origin_message=origin_message,
|
||
actor_identifier=actor_identifier,
|
||
epic=epic,
|
||
due_date=due_date,
|
||
assignee_identifier=assignee_identifier,
|
||
immutable_payload=immutable_payload,
|
||
event_payload=event_payload,
|
||
status_snapshot=status_snapshot,
|
||
)
|
||
await _emit_sync_event(task, event, "create")
|
||
return task, event
|
||
|
||
|
||
async def mark_task_completed_and_sync(
|
||
*,
|
||
task: DerivedTask,
|
||
actor_identifier: str = "",
|
||
source_message: Message | None = None,
|
||
payload: dict | None = None,
|
||
) -> DerivedTaskEvent:
|
||
task.status_snapshot = "completed"
|
||
await sync_to_async(task.save)(update_fields=["status_snapshot"])
|
||
event = await sync_to_async(DerivedTaskEvent.objects.create)(
|
||
task=task,
|
||
event_type="completion_marked",
|
||
actor_identifier=str(actor_identifier or "").strip(),
|
||
source_message=source_message,
|
||
payload=dict(payload or {}),
|
||
)
|
||
await _emit_sync_event(task, event, "complete")
|
||
return event
|
||
|
||
|
||
async def _derive_title(message: Message) -> str:
|
||
text = str(message.text or "").strip()
|
||
if not text:
|
||
return "Untitled task"
|
||
if not bool(getattr(settings, "TASK_DERIVATION_USE_AI", True)):
|
||
return text[:255]
|
||
ai_obj = await sync_to_async(lambda: AI.objects.filter(user=message.user).first())()
|
||
if not ai_obj:
|
||
return text[:255]
|
||
prompt = [
|
||
{
|
||
"role": "system",
|
||
"content": "Extract one concise actionable task title from the message. Return plain text only.",
|
||
},
|
||
{"role": "user", "content": text[:2000]},
|
||
]
|
||
try:
|
||
title = str(
|
||
await ai_runner.run_prompt(prompt, ai_obj, operation="task_derive_title")
|
||
or ""
|
||
).strip()
|
||
except Exception:
|
||
title = ""
|
||
return (title or text)[:255]
|
||
|
||
|
||
async def _derive_title_with_flags(message: Message, flags: dict) -> str:
|
||
prefixes = list(flags.get("allowed_prefixes") or [])
|
||
if not bool(flags.get("ai_title_enabled", True)):
|
||
text = _strip_task_prefix(str(message.text or "").strip(), prefixes)
|
||
return (text or "Untitled task")[:255]
|
||
title = await _derive_title(message)
|
||
cleaned = _strip_task_prefix(str(title or "").strip(), prefixes)
|
||
return (cleaned or title or "Untitled task")[:255]
|
||
|
||
|
||
async def _emit_sync_event(
|
||
task: DerivedTask, event: DerivedTaskEvent, action: str
|
||
) -> None:
|
||
cfg = await sync_to_async(
|
||
lambda: TaskProviderConfig.objects.filter(user=task.user, enabled=True)
|
||
.order_by("provider")
|
||
.first()
|
||
)()
|
||
provider_name = str(getattr(cfg, "provider", "mock") or "mock")
|
||
provider_settings = dict(getattr(cfg, "settings", {}) or {})
|
||
provider = get_provider(provider_name)
|
||
idempotency_key = f"{provider_name}:{task.id}:{event.id}"
|
||
external_chat_id = await sync_to_async(resolve_external_chat_id)(
|
||
user=task.user,
|
||
provider=provider_name,
|
||
service=str(task.source_service or ""),
|
||
channel=str(task.source_channel or ""),
|
||
)
|
||
cached_project = task._state.fields_cache.get("project")
|
||
cached_epic = task._state.fields_cache.get("epic")
|
||
project_name = str(getattr(cached_project, "name", "") or "")
|
||
epic_name = str(getattr(cached_epic, "name", "") or "")
|
||
memory_context: list = []
|
||
try:
|
||
memory_context = await sync_to_async(retrieve_memories_for_prompt)(
|
||
user_id=int(task.user_id),
|
||
query=str(task.title or ""),
|
||
limit=10,
|
||
)
|
||
except Exception:
|
||
pass
|
||
request_payload = {
|
||
"task_id": str(task.id),
|
||
"reference_code": str(task.reference_code or ""),
|
||
"title": str(task.title or ""),
|
||
"external_key": str(task.external_key or ""),
|
||
"due_date": task.due_date.isoformat() if task.due_date else "",
|
||
"assignee_identifier": str(task.assignee_identifier or ""),
|
||
"project_name": project_name,
|
||
"epic_name": epic_name,
|
||
"source_service": str(task.source_service or ""),
|
||
"source_channel": str(task.source_channel or ""),
|
||
"external_chat_id": external_chat_id,
|
||
"origin_message_id": str(getattr(task, "origin_message_id", "") or ""),
|
||
"trigger_message_id": str(
|
||
getattr(event, "source_message_id", "")
|
||
or getattr(task, "origin_message_id", "")
|
||
or ""
|
||
),
|
||
"mode": "default",
|
||
"payload": event.payload,
|
||
"memory_context": memory_context,
|
||
}
|
||
codex_run = await sync_to_async(CodexRun.objects.create)(
|
||
user=task.user,
|
||
task_id=task.id,
|
||
derived_task_event_id=event.id,
|
||
source_message_id=(event.source_message_id or task.origin_message_id),
|
||
project_id=task.project_id,
|
||
epic_id=task.epic_id,
|
||
source_service=str(task.source_service or ""),
|
||
source_channel=str(task.source_channel or ""),
|
||
external_chat_id=external_chat_id,
|
||
status="queued",
|
||
request_payload={
|
||
"action": action,
|
||
"provider_payload": dict(request_payload),
|
||
"idempotency_key": idempotency_key,
|
||
},
|
||
result_payload={},
|
||
error="",
|
||
)
|
||
request_payload["codex_run_id"] = str(codex_run.id)
|
||
|
||
# Worker-backed providers are queued and executed by `manage.py codex_worker`.
|
||
if bool(getattr(provider, "run_in_worker", False)):
|
||
if provider_name == "codex_cli":
|
||
await sync_to_async(queue_codex_event_with_pre_approval)(
|
||
user=task.user,
|
||
run=codex_run,
|
||
task=task,
|
||
task_event=event,
|
||
action=action,
|
||
provider_payload=dict(request_payload),
|
||
idempotency_key=idempotency_key,
|
||
)
|
||
return
|
||
await sync_to_async(ExternalSyncEvent.objects.update_or_create)(
|
||
idempotency_key=idempotency_key,
|
||
defaults={
|
||
"user": task.user,
|
||
"task": task,
|
||
"task_event": event,
|
||
"provider": provider_name,
|
||
"status": "pending",
|
||
"payload": {
|
||
"action": action,
|
||
"provider_payload": dict(request_payload),
|
||
},
|
||
"error": "",
|
||
},
|
||
)
|
||
return
|
||
|
||
if action == "create":
|
||
result = provider.create_task(provider_settings, dict(request_payload))
|
||
elif action == "complete":
|
||
result = provider.mark_complete(provider_settings, dict(request_payload))
|
||
else:
|
||
result = provider.append_update(provider_settings, dict(request_payload))
|
||
|
||
status = "ok" if result.ok else "failed"
|
||
await sync_to_async(ExternalSyncEvent.objects.update_or_create)(
|
||
idempotency_key=idempotency_key,
|
||
defaults={
|
||
"user": task.user,
|
||
"task": task,
|
||
"task_event": event,
|
||
"provider": provider_name,
|
||
"status": status,
|
||
"payload": dict(result.payload or {}),
|
||
"error": str(result.error or ""),
|
||
},
|
||
)
|
||
codex_run.status = status
|
||
codex_run.result_payload = dict(result.payload or {})
|
||
codex_run.error = str(result.error or "")
|
||
await sync_to_async(codex_run.save)(
|
||
update_fields=["status", "result_payload", "error", "updated_at"]
|
||
)
|
||
if result.ok and result.external_key and not task.external_key:
|
||
task.external_key = str(result.external_key)
|
||
await sync_to_async(task.save)(update_fields=["external_key"])
|
||
|
||
|
||
async def _completion_regex(message: Message) -> re.Pattern:
|
||
patterns = await sync_to_async(list)(
|
||
TaskCompletionPattern.objects.filter(user=message.user, enabled=True).order_by(
|
||
"position", "created_at"
|
||
)
|
||
)
|
||
phrases = [
|
||
str(row.phrase or "").strip()
|
||
for row in patterns
|
||
if str(row.phrase or "").strip()
|
||
]
|
||
if not phrases:
|
||
phrases = ["done", "completed", "fixed"]
|
||
return re.compile(
|
||
r"\\b(?:"
|
||
+ "|".join(re.escape(p) for p in phrases)
|
||
+ r")\\s*#([A-Za-z0-9_-]+)\\b",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
async def _send_scope_message(
|
||
source: ChatTaskSource, message: Message, text: str
|
||
) -> None:
|
||
await send_message_raw(
|
||
source.service or message.source_service or "web",
|
||
source.channel_identifier or message.source_chat_id or "",
|
||
text=text,
|
||
attachments=[],
|
||
metadata={"origin": "task_scope_command"},
|
||
)
|
||
|
||
|
||
async def _handle_scope_task_commands(
|
||
message: Message, sources: list[ChatTaskSource], text: str
|
||
) -> bool:
|
||
if not sources:
|
||
return False
|
||
body = str(text or "").strip()
|
||
source = sources[0]
|
||
if _LIST_TASKS_RE.match(body) or _LIST_TASKS_CMD_RE.match(body):
|
||
open_rows = await sync_to_async(list)(
|
||
DerivedTask.objects.filter(
|
||
user=message.user,
|
||
project=source.project,
|
||
source_service=source.service,
|
||
source_channel=source.channel_identifier,
|
||
)
|
||
.exclude(status_snapshot="completed")
|
||
.order_by("-created_at")[:20]
|
||
)
|
||
if not open_rows:
|
||
await _send_scope_message(
|
||
source, message, "[task] no open tasks in this chat."
|
||
)
|
||
return True
|
||
lines = ["[task] open tasks:"]
|
||
for row in open_rows:
|
||
lines.append(f"- #{row.reference_code} {row.title}")
|
||
await _send_scope_message(source, message, "\n".join(lines))
|
||
return True
|
||
|
||
create_match = _TASK_ADD_CMD_RE.match(body)
|
||
if create_match:
|
||
task_text = str(create_match.group("title") or "").strip()
|
||
if not task_text:
|
||
await _send_scope_message(
|
||
source, message, "[task] title is required for .task add."
|
||
)
|
||
return True
|
||
epic = source.epic
|
||
epic_name = _extract_epic_name_from_text(task_text)
|
||
if epic_name:
|
||
epic, _ = await sync_to_async(TaskEpic.objects.get_or_create)(
|
||
project=source.project,
|
||
name=epic_name,
|
||
)
|
||
cleaned_task_text = _strip_epic_token(task_text)
|
||
task, _event = await create_task_record_and_sync(
|
||
user=message.user,
|
||
project=source.project,
|
||
epic=epic,
|
||
title=cleaned_task_text[:255],
|
||
source_service=source.service or message.source_service or "web",
|
||
source_channel=source.channel_identifier or message.source_chat_id or "",
|
||
origin_message=message,
|
||
actor_identifier=str(message.sender_uuid or ""),
|
||
due_date=_parse_due_date(cleaned_task_text),
|
||
assignee_identifier=_parse_assignee(cleaned_task_text),
|
||
immutable_payload={
|
||
"origin_text": text,
|
||
"task_text": cleaned_task_text,
|
||
"source": "chat_manual_command",
|
||
},
|
||
event_payload={
|
||
"origin_text": text,
|
||
"command": ".task add",
|
||
"via": "chat_command",
|
||
},
|
||
)
|
||
await _send_scope_message(
|
||
source, message, f"[task] created #{task.reference_code}: {task.title}"
|
||
)
|
||
return True
|
||
|
||
undo_match = _UNDO_TASK_RE.match(body)
|
||
if undo_match:
|
||
reference = str(undo_match.group("reference") or "").strip()
|
||
if reference:
|
||
task = await sync_to_async(
|
||
lambda: DerivedTask.objects.filter(
|
||
user=message.user,
|
||
project=source.project,
|
||
source_service=source.service,
|
||
source_channel=source.channel_identifier,
|
||
reference_code=reference,
|
||
)
|
||
.order_by("-created_at")
|
||
.first()
|
||
)()
|
||
else:
|
||
task = await sync_to_async(
|
||
lambda: DerivedTask.objects.filter(
|
||
user=message.user,
|
||
project=source.project,
|
||
source_service=source.service,
|
||
source_channel=source.channel_identifier,
|
||
)
|
||
.order_by("-created_at")
|
||
.first()
|
||
)()
|
||
if task is None:
|
||
await _send_scope_message(
|
||
source, message, "[task] nothing to undo in this chat."
|
||
)
|
||
return True
|
||
ref = str(task.reference_code or "")
|
||
title = str(task.title or "")
|
||
await sync_to_async(task.delete)()
|
||
await _send_scope_message(source, message, f"[task] removed #{ref}: {title}")
|
||
return True
|
||
|
||
show_match = _TASK_SHOW_RE.match(body)
|
||
if show_match:
|
||
reference = str(show_match.group("reference") or "").strip()
|
||
task = await sync_to_async(
|
||
lambda: DerivedTask.objects.filter(
|
||
user=message.user,
|
||
project=source.project,
|
||
source_service=source.service,
|
||
source_channel=source.channel_identifier,
|
||
reference_code=reference,
|
||
)
|
||
.order_by("-created_at")
|
||
.first()
|
||
)()
|
||
if task is None:
|
||
await _send_scope_message(
|
||
source, message, f"[task] #{reference} not found."
|
||
)
|
||
return True
|
||
due_str = f"\ndue: {task.due_date}" if task.due_date else ""
|
||
assignee_str = (
|
||
f"\nassignee: {task.assignee_identifier}"
|
||
if task.assignee_identifier
|
||
else ""
|
||
)
|
||
detail = (
|
||
f"[task] #{task.reference_code}: {task.title}"
|
||
f"\nstatus: {task.status_snapshot}"
|
||
f"{due_str}"
|
||
f"{assignee_str}"
|
||
)
|
||
await _send_scope_message(source, message, detail)
|
||
return True
|
||
|
||
complete_match = _TASK_COMPLETE_CMD_RE.match(body)
|
||
if complete_match:
|
||
reference = str(complete_match.group("reference") or "").strip()
|
||
task = await sync_to_async(
|
||
lambda: DerivedTask.objects.filter(
|
||
user=message.user,
|
||
project=source.project,
|
||
source_service=source.service,
|
||
source_channel=source.channel_identifier,
|
||
reference_code=reference,
|
||
)
|
||
.order_by("-created_at")
|
||
.first()
|
||
)()
|
||
if task is None:
|
||
await _send_scope_message(
|
||
source, message, f"[task] #{reference} not found."
|
||
)
|
||
return True
|
||
await mark_task_completed_and_sync(
|
||
task=task,
|
||
actor_identifier=str(message.sender_uuid or ""),
|
||
source_message=message,
|
||
payload={
|
||
"marker": reference,
|
||
"command": ".task complete",
|
||
"via": "chat_command",
|
||
},
|
||
)
|
||
await _send_scope_message(
|
||
source, message, f"[task] completed #{task.reference_code}: {task.title}"
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def _extract_epic_name_from_text(text: str) -> str:
|
||
body = str(text or "")
|
||
match = _EPIC_TOKEN_RE.search(body)
|
||
if not match:
|
||
return ""
|
||
return str(match.group(1) or "").strip()
|
||
|
||
|
||
def _strip_epic_token(text: str) -> str:
|
||
body = str(text or "")
|
||
cleaned = _EPIC_TOKEN_RE.sub("", body)
|
||
return re.sub(r"\s{2,}", " ", cleaned).strip()
|
||
|
||
|
||
async def _handle_epic_create_command(
|
||
message: Message, sources: list[ChatTaskSource], text: str
|
||
) -> bool:
|
||
match = _EPIC_CREATE_RE.match(str(text or ""))
|
||
if not match or not sources:
|
||
return False
|
||
name = str(match.group("name") or "").strip()
|
||
if not name:
|
||
return True
|
||
source = sources[0]
|
||
epic, created = await sync_to_async(TaskEpic.objects.get_or_create)(
|
||
project=source.project,
|
||
name=name,
|
||
)
|
||
state = "created" if created else "already exists"
|
||
await _send_scope_message(
|
||
source,
|
||
message,
|
||
(
|
||
f"[epic] {state}: {epic.name}\n"
|
||
"WhatsApp usage:\n"
|
||
"- create epic: epic: <Epic name> (or .epic <Epic name>)\n"
|
||
"- add task to epic: task: <description> [epic:<Epic name>]\n"
|
||
"- list tasks: .l list tasks\n"
|
||
"- undo latest task: .undo"
|
||
),
|
||
)
|
||
return True
|
||
|
||
|
||
def _is_task_command_candidate(text: str) -> bool:
|
||
body = str(text or "").strip()
|
||
if not body:
|
||
return False
|
||
if (
|
||
_LIST_TASKS_RE.match(body)
|
||
or _LIST_TASKS_CMD_RE.match(body)
|
||
or _TASK_ADD_CMD_RE.match(body)
|
||
or _TASK_SHOW_RE.match(body)
|
||
or _TASK_COMPLETE_CMD_RE.match(body)
|
||
or _UNDO_TASK_RE.match(body)
|
||
or _EPIC_CREATE_RE.match(body)
|
||
):
|
||
return True
|
||
return _has_task_prefix(body.lower(), ["task:", "todo:"])
|
||
|
||
|
||
def _is_explicit_task_command(text: str) -> bool:
|
||
body = str(text or "").strip()
|
||
if not body:
|
||
return False
|
||
return bool(
|
||
_LIST_TASKS_RE.match(body)
|
||
or _LIST_TASKS_CMD_RE.match(body)
|
||
or _TASK_ADD_CMD_RE.match(body)
|
||
or _TASK_SHOW_RE.match(body)
|
||
or _TASK_COMPLETE_CMD_RE.match(body)
|
||
or _UNDO_TASK_RE.match(body)
|
||
or _EPIC_CREATE_RE.match(body)
|
||
)
|
||
|
||
|
||
async def process_inbound_task_intelligence(message: Message) -> None:
|
||
if message is None:
|
||
return
|
||
if dict(message.message_meta or {}).get("origin_tag"):
|
||
return
|
||
text = str(message.text or "").strip()
|
||
if not text:
|
||
return
|
||
security_context = CommandSecurityContext(
|
||
service=str(message.source_service or "").strip().lower(),
|
||
channel_identifier=str(message.source_chat_id or "").strip(),
|
||
message_meta=dict(message.message_meta or {}),
|
||
payload={},
|
||
)
|
||
if _is_explicit_task_command(text):
|
||
command_decision = await sync_to_async(evaluate_command_policy)(
|
||
user=message.user,
|
||
scope_key="tasks.commands",
|
||
context=security_context,
|
||
)
|
||
if not command_decision.allowed:
|
||
return
|
||
|
||
sources = await _resolve_source_mappings(message)
|
||
if not sources:
|
||
if not _is_task_command_candidate(text):
|
||
return
|
||
service, channel = resolve_message_scope(message)
|
||
if not service or not channel:
|
||
return
|
||
seeded = await sync_to_async(ensure_default_source_for_chat)(
|
||
user=message.user,
|
||
service=service,
|
||
channel_identifier=channel,
|
||
message=message,
|
||
)
|
||
if seeded is None:
|
||
return
|
||
sources = [seeded]
|
||
if await _handle_scope_task_commands(message, sources, text):
|
||
return
|
||
if await _handle_epic_create_command(message, sources, text):
|
||
return
|
||
|
||
submit_decision = await sync_to_async(evaluate_command_policy)(
|
||
user=message.user,
|
||
scope_key="tasks.submit",
|
||
context=security_context,
|
||
)
|
||
if not submit_decision.allowed:
|
||
return
|
||
|
||
completion_allowed = any(
|
||
bool(_effective_flags(source).get("completion_enabled")) for source in sources
|
||
)
|
||
completion_rx = await _completion_regex(message) if completion_allowed else None
|
||
marker_match = (completion_rx.search(text) if completion_rx else None) or (
|
||
_COMPLETION_RE.search(text) if completion_allowed else None
|
||
)
|
||
if marker_match:
|
||
ref_code = str(marker_match.group(marker_match.lastindex or 1) or "").strip()
|
||
task = await sync_to_async(
|
||
lambda: DerivedTask.objects.filter(
|
||
user=message.user, reference_code=ref_code
|
||
)
|
||
.order_by("-created_at")
|
||
.first()
|
||
)()
|
||
if not task:
|
||
# parser warning event attached to a newly derived placeholder in mapped project
|
||
source = sources[0]
|
||
placeholder = await sync_to_async(DerivedTask.objects.create)(
|
||
user=message.user,
|
||
project=source.project,
|
||
epic=source.epic,
|
||
title=f"Unresolved completion marker #{ref_code}",
|
||
source_service=message.source_service or "web",
|
||
source_channel=message.source_chat_id or "",
|
||
origin_message=message,
|
||
reference_code=ref_code,
|
||
status_snapshot="warning",
|
||
immutable_payload={"warning": "completion_marker_unresolved"},
|
||
)
|
||
await sync_to_async(DerivedTaskEvent.objects.create)(
|
||
task=placeholder,
|
||
event_type="parse_warning",
|
||
actor_identifier=str(message.sender_uuid or ""),
|
||
source_message=message,
|
||
payload={"reason": "completion_marker_unresolved", "marker": ref_code},
|
||
)
|
||
return
|
||
|
||
await mark_task_completed_and_sync(
|
||
task=task,
|
||
actor_identifier=str(message.sender_uuid or ""),
|
||
source_message=message,
|
||
payload={"marker": ref_code},
|
||
)
|
||
return
|
||
|
||
for source in sources:
|
||
flags = _effective_flags(source)
|
||
if not bool(flags.get("derive_enabled", True)):
|
||
continue
|
||
task_text = _strip_epic_token(text)
|
||
if not _is_task_candidate(task_text, flags):
|
||
continue
|
||
epic = source.epic
|
||
epic_name = _extract_epic_name_from_text(text)
|
||
if epic_name:
|
||
epic, _ = await sync_to_async(TaskEpic.objects.get_or_create)(
|
||
project=source.project,
|
||
name=epic_name,
|
||
)
|
||
cloned_message = message
|
||
if task_text != text:
|
||
cloned_message = Message(
|
||
user=message.user,
|
||
text=task_text,
|
||
source_service=message.source_service,
|
||
source_chat_id=message.source_chat_id,
|
||
)
|
||
title = await _derive_title_with_flags(cloned_message, flags)
|
||
parsed_due_date = _parse_due_date(task_text)
|
||
parsed_assignee = _parse_assignee(task_text)
|
||
task, event = await create_task_record_and_sync(
|
||
user=message.user,
|
||
project=source.project,
|
||
epic=epic,
|
||
title=title,
|
||
source_service=source.service or message.source_service or "web",
|
||
source_channel=source.channel_identifier or message.source_chat_id or "",
|
||
origin_message=message,
|
||
actor_identifier=str(message.sender_uuid or ""),
|
||
due_date=parsed_due_date,
|
||
assignee_identifier=parsed_assignee,
|
||
immutable_payload={
|
||
"origin_text": text,
|
||
"task_text": task_text,
|
||
"flags": flags,
|
||
},
|
||
event_payload={"origin_text": text},
|
||
)
|
||
if bool(flags.get("announce_task_id", False)):
|
||
try:
|
||
await send_message_raw(
|
||
source.service or message.source_service or "web",
|
||
source.channel_identifier or message.source_chat_id or "",
|
||
text=f"[task] Created #{task.reference_code}: {task.title}",
|
||
attachments=[],
|
||
metadata={"origin": "task_announce"},
|
||
)
|
||
except Exception:
|
||
# Announcement is best-effort and should not block derivation.
|
||
pass
|
||
scope_count = await sync_to_async(
|
||
lambda: DerivedTask.objects.filter(
|
||
user=message.user,
|
||
project=source.project,
|
||
source_service=source.service,
|
||
source_channel=source.channel_identifier,
|
||
).count()
|
||
)()
|
||
if scope_count > 0 and scope_count % 10 == 0:
|
||
try:
|
||
await send_message_raw(
|
||
source.service or message.source_service or "web",
|
||
source.channel_identifier or message.source_chat_id or "",
|
||
text="[task] tip: use .l list tasks to review tasks. use .undo to uncreate the latest task.",
|
||
attachments=[],
|
||
metadata={"origin": "task_reminder"},
|
||
)
|
||
except Exception:
|
||
pass
|