from __future__ import annotations import datetime import re from asgiref.sync import sync_to_async from django.conf import settings from core.clients.transport import send_message_raw from core.memory.retrieval import retrieve_memories_for_prompt from core.messaging import ai as ai_runner from core.models import ( AI, Chat, ChatTaskSource, CodexRun, DerivedTask, DerivedTaskEvent, ExternalSyncEvent, Message, TaskCompletionPattern, TaskEpic, TaskProviderConfig, ) from core.security.command_policy import CommandSecurityContext, evaluate_command_policy from core.tasks.chat_defaults import ( ensure_default_source_for_chat, resolve_message_scope, ) from core.tasks.codex_approval import queue_codex_event_with_pre_approval from core.tasks.codex_support import resolve_external_chat_id from core.tasks.providers import get_provider _TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE) _COMPLETION_RE = re.compile( r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE ) _BALANCED_HINT_RE = re.compile(r"\b(todo|task|action item|action)\b", re.IGNORECASE) _BROAD_HINT_RE = re.compile( r"\b(todo|task|action|need to|please|reminder)\b", re.IGNORECASE ) _PREFIX_HEAD_TRIM = " \t\r\n`'\"([{<*#-–—_>.,:;!/?\\|" _LIST_TASKS_RE = re.compile( r"^\s*(?:\.l(?:\s+list(?:\s+tasks?)?)?|\.list(?:\s+tasks?)?)\s*$", re.IGNORECASE, ) _UNDO_TASK_RE = re.compile( r"^\s*\.undo(?:\s+(?:#)?(?P[A-Za-z0-9_-]+))?\s*$", re.IGNORECASE, ) _EPIC_CREATE_RE = re.compile( r"^\s*(?:\.epic\b|epic)\s*[:\-]?\s*(?P.+?)\s*$", re.IGNORECASE | re.DOTALL, ) _EPIC_TOKEN_RE = re.compile(r"\[\s*epic\s*:\s*([^\]]+?)\s*\]", re.IGNORECASE) _LIST_TASKS_CMD_RE = re.compile( r"^\s*\.task\s+list\s*$", re.IGNORECASE, ) _TASK_SHOW_RE = re.compile( r"^\s*\.task\s+show\s+#?(?P[A-Za-z0-9_-]+)\s*$", re.IGNORECASE, ) _TASK_COMPLETE_CMD_RE = re.compile( r"^\s*\.task\s+(?:complete|done|close)\s+#?(?P[A-Za-z0-9_-]+)\s*$", re.IGNORECASE, ) _TASK_ADD_CMD_RE = re.compile( r"^\s*\.task\s+(?:add|create|new)\s+(?P.+?)\s*$", re.IGNORECASE | re.DOTALL, ) _DUE_ISO_RE = re.compile( r"\b(?:due|by)\s+(\d{4}-\d{2}-\d{2})\b", re.IGNORECASE, ) _DUE_RELATIVE_RE = re.compile( r"\b(?:due|by)\s+(?P<token>today|tomorrow|monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b", re.IGNORECASE, ) _ASSIGNEE_AT_RE = re.compile(r"@([A-Za-z0-9_.-]+)") _ASSIGNEE_PHRASE_RE = re.compile( r"\b(?:assign(?:ed)?\s+to|for)\s+([A-Za-z0-9_.-]+)\b", re.IGNORECASE, ) _WEEKDAY_MAP = { "monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, "friday": 4, "saturday": 5, "sunday": 6, } def _parse_due_date(text: str) -> datetime.date | None: body = str(text or "") m = _DUE_ISO_RE.search(body) if m: try: return datetime.date.fromisoformat(m.group(1)) except ValueError: pass m = _DUE_RELATIVE_RE.search(body) if not m: return None token = m.group("token").strip().lower() today = datetime.date.today() if token == "today": return today if token == "tomorrow": return today + datetime.timedelta(days=1) target_weekday = _WEEKDAY_MAP.get(token) if target_weekday is None: return None days_ahead = (target_weekday - today.weekday()) % 7 if days_ahead == 0: days_ahead = 7 return today + datetime.timedelta(days=days_ahead) def _parse_assignee(text: str) -> str: body = str(text or "") m = _ASSIGNEE_AT_RE.search(body) if m: return str(m.group(1) or "").strip() m = _ASSIGNEE_PHRASE_RE.search(body) if m: return str(m.group(1) or "").strip() return "" def _channel_variants(service: str, channel: str) -> list[str]: value = str(channel or "").strip() if not value: return [] variants = [value] service_key = str(service or "").strip().lower() if service_key == "whatsapp": bare = value.split("@", 1)[0].strip() if bare and bare not in variants: variants.append(bare) direct = f"{bare}@s.whatsapp.net" if bare else "" if direct and direct not in variants: variants.append(direct) group = f"{bare}@g.us" if bare else "" if group and group not in variants: variants.append(group) if service_key == "signal": digits = re.sub(r"[^0-9]", "", value) if digits and digits not in variants: variants.append(digits) if digits: plus = f"+{digits}" if plus not in variants: variants.append(plus) return variants async def _resolve_source_mappings(message: Message) -> list[ChatTaskSource]: lookup_service = str(message.source_service or "").strip().lower() variants = _channel_variants(lookup_service, message.source_chat_id or "") session_identifier = getattr(getattr(message, "session", None), "identifier", None) canonical_service = ( str(getattr(session_identifier, "service", "") or "").strip().lower() ) canonical_identifier = str( getattr(session_identifier, "identifier", "") or "" ).strip() if lookup_service == "web" and canonical_service and canonical_service != "web": lookup_service = canonical_service variants = _channel_variants(lookup_service, message.source_chat_id or "") for expanded in _channel_variants(lookup_service, canonical_identifier): if expanded and expanded not in variants: variants.append(expanded) elif ( canonical_service and canonical_identifier and canonical_service == lookup_service ): for expanded in _channel_variants(canonical_service, canonical_identifier): if expanded and expanded not in variants: variants.append(expanded) if lookup_service == "signal": companions: list[str] = [] for value in list(variants): signal_value = str(value or "").strip() if not signal_value: continue companions += await sync_to_async(list)( Chat.objects.filter(source_uuid=signal_value).values_list( "source_number", flat=True ) ) companions += await sync_to_async(list)( Chat.objects.filter(source_number=signal_value).values_list( "source_uuid", flat=True ) ) for candidate in companions: for expanded in _channel_variants("signal", str(candidate or "").strip()): if expanded and expanded not in variants: variants.append(expanded) if not variants: return [] return await sync_to_async(list)( ChatTaskSource.objects.filter( user=message.user, enabled=True, service=lookup_service, channel_identifier__in=variants, ).select_related("project", "epic") ) def _to_bool(raw, default=False) -> bool: if raw is None: return bool(default) value = str(raw).strip().lower() if value in {"1", "true", "yes", "on", "y"}: return True if value in {"0", "false", "no", "off", "n"}: return False return bool(default) def _parse_prefixes(raw) -> list[str]: if isinstance(raw, list): values = raw else: values = str(raw or "").split(",") rows = [] for row in values: item = str(row or "").strip().lower() if item and item not in rows: rows.append(item) return rows or ["task:", "todo:", "action:"] def _prefix_roots(prefixes: list[str]) -> list[str]: roots: list[str] = [] for value in prefixes: token = str(value or "").strip().lower() if not token: continue token = token.lstrip(_PREFIX_HEAD_TRIM) match = re.match(r"([a-z0-9]+)", token) if not match: continue root = str(match.group(1) or "").strip() if root and root not in roots: roots.append(root) return roots def _has_task_prefix(text: str, prefixes: list[str]) -> bool: body = str(text or "").strip().lower() if not body: return False if any(body.startswith(prefix) for prefix in prefixes): return True trimmed = body.lstrip(_PREFIX_HEAD_TRIM) roots = _prefix_roots(prefixes) if not trimmed or not roots: return False for root in roots: if re.match(rf"^{re.escape(root)}\b(?:\s*[:\-–—#>.,;!]*\s*|\s+)", trimmed): return True return False def _strip_task_prefix(text: str, prefixes: list[str]) -> str: body = str(text or "").strip() if not body: return "" trimmed = body.lstrip(_PREFIX_HEAD_TRIM) roots = _prefix_roots(prefixes) if not trimmed or not roots: return body for root in roots: match = re.match( rf"^{re.escape(root)}\b(?:\s*[:\-–—#>.,;!]*\s*|\s+)(.+)$", trimmed, flags=re.IGNORECASE | re.DOTALL, ) if match: cleaned = str(match.group(1) or "").strip() return cleaned or body return body def _normalize_flags(raw: dict | None) -> dict: row = dict(raw or {}) return { "derive_enabled": _to_bool(row.get("derive_enabled"), True), "match_mode": str(row.get("match_mode") or "balanced").strip().lower() or "balanced", "require_prefix": _to_bool(row.get("require_prefix"), False), "allowed_prefixes": _parse_prefixes(row.get("allowed_prefixes")), "completion_enabled": _to_bool(row.get("completion_enabled"), True), "ai_title_enabled": _to_bool(row.get("ai_title_enabled"), True), "announce_task_id": _to_bool(row.get("announce_task_id"), False), "min_chars": max(1, int(row.get("min_chars") or 3)), } def _normalize_partial_flags(raw: dict | None) -> dict: row = dict(raw or {}) out = {} if "derive_enabled" in row: out["derive_enabled"] = _to_bool(row.get("derive_enabled"), True) if "match_mode" in row: out["match_mode"] = ( str(row.get("match_mode") or "balanced").strip().lower() or "balanced" ) if "require_prefix" in row: out["require_prefix"] = _to_bool(row.get("require_prefix"), False) if "allowed_prefixes" in row: out["allowed_prefixes"] = _parse_prefixes(row.get("allowed_prefixes")) if "completion_enabled" in row: out["completion_enabled"] = _to_bool(row.get("completion_enabled"), True) if "ai_title_enabled" in row: out["ai_title_enabled"] = _to_bool(row.get("ai_title_enabled"), True) if "announce_task_id" in row: out["announce_task_id"] = _to_bool(row.get("announce_task_id"), False) if "min_chars" in row: out["min_chars"] = max(1, int(row.get("min_chars") or 3)) return out def _effective_flags(source: ChatTaskSource) -> dict: project_flags = _normalize_flags( getattr(getattr(source, "project", None), "settings", {}) or {} ) source_flags = _normalize_partial_flags(getattr(source, "settings", {}) or {}) merged = dict(project_flags) merged.update(source_flags) return merged def _is_task_candidate(text: str, flags: dict) -> bool: body = str(text or "").strip() if len(body) < int(flags.get("min_chars") or 1): return False body_lower = body.lower() prefixes = list(flags.get("allowed_prefixes") or []) has_prefix = _has_task_prefix(body_lower, prefixes) if bool(flags.get("require_prefix")) and not has_prefix: return False mode = str(flags.get("match_mode") or "balanced").strip().lower() if mode == "strict": return has_prefix if mode == "broad": return has_prefix or bool(_BROAD_HINT_RE.search(body)) return has_prefix or bool(_BALANCED_HINT_RE.search(body)) def _next_reference(user, project) -> str: last = ( DerivedTask.objects.filter(user=user, project=project) .exclude(reference_code="") .order_by("-created_at") .first() ) if not last: return "1" try: return str(int(str(last.reference_code)) + 1) except Exception: return str(DerivedTask.objects.filter(user=user, project=project).count() + 1) def create_task_record( *, user, project, title: str, source_service: str, source_channel: str, origin_message: Message | None = None, actor_identifier: str = "", epic: TaskEpic | None = None, due_date: datetime.date | None = None, assignee_identifier: str = "", immutable_payload: dict | None = None, event_payload: dict | None = None, status_snapshot: str = "open", ) -> tuple[DerivedTask, DerivedTaskEvent]: reference = _next_reference(user, project) task = DerivedTask.objects.create( user=user, project=project, epic=epic, title=str(title or "").strip()[:255] or "Untitled task", source_service=str(source_service or "web").strip() or "web", source_channel=str(source_channel or "").strip(), origin_message=origin_message, reference_code=reference, status_snapshot=str(status_snapshot or "open").strip() or "open", due_date=due_date, assignee_identifier=str(assignee_identifier or "").strip(), immutable_payload=dict(immutable_payload or {}), ) event = DerivedTaskEvent.objects.create( task=task, event_type="created", actor_identifier=str(actor_identifier or "").strip(), source_message=origin_message, payload=dict(event_payload or {}), ) return task, event async def create_task_record_and_sync( *, user, project, title: str, source_service: str, source_channel: str, origin_message: Message | None = None, actor_identifier: str = "", epic: TaskEpic | None = None, due_date: datetime.date | None = None, assignee_identifier: str = "", immutable_payload: dict | None = None, event_payload: dict | None = None, status_snapshot: str = "open", ) -> tuple[DerivedTask, DerivedTaskEvent]: task, event = await sync_to_async(create_task_record)( user=user, project=project, title=title, source_service=source_service, source_channel=source_channel, origin_message=origin_message, actor_identifier=actor_identifier, epic=epic, due_date=due_date, assignee_identifier=assignee_identifier, immutable_payload=immutable_payload, event_payload=event_payload, status_snapshot=status_snapshot, ) await _emit_sync_event(task, event, "create") return task, event async def mark_task_completed_and_sync( *, task: DerivedTask, actor_identifier: str = "", source_message: Message | None = None, payload: dict | None = None, ) -> DerivedTaskEvent: task.status_snapshot = "completed" await sync_to_async(task.save)(update_fields=["status_snapshot"]) event = await sync_to_async(DerivedTaskEvent.objects.create)( task=task, event_type="completion_marked", actor_identifier=str(actor_identifier or "").strip(), source_message=source_message, payload=dict(payload or {}), ) await _emit_sync_event(task, event, "complete") return event async def _derive_title(message: Message) -> str: text = str(message.text or "").strip() if not text: return "Untitled task" if not bool(getattr(settings, "TASK_DERIVATION_USE_AI", True)): return text[:255] ai_obj = await sync_to_async(lambda: AI.objects.filter(user=message.user).first())() if not ai_obj: return text[:255] prompt = [ { "role": "system", "content": "Extract one concise actionable task title from the message. Return plain text only.", }, {"role": "user", "content": text[:2000]}, ] try: title = str( await ai_runner.run_prompt(prompt, ai_obj, operation="task_derive_title") or "" ).strip() except Exception: title = "" return (title or text)[:255] async def _derive_title_with_flags(message: Message, flags: dict) -> str: prefixes = list(flags.get("allowed_prefixes") or []) if not bool(flags.get("ai_title_enabled", True)): text = _strip_task_prefix(str(message.text or "").strip(), prefixes) return (text or "Untitled task")[:255] title = await _derive_title(message) cleaned = _strip_task_prefix(str(title or "").strip(), prefixes) return (cleaned or title or "Untitled task")[:255] async def _emit_sync_event( task: DerivedTask, event: DerivedTaskEvent, action: str ) -> None: cfg = await sync_to_async( lambda: TaskProviderConfig.objects.filter(user=task.user, enabled=True) .order_by("provider") .first() )() provider_name = str(getattr(cfg, "provider", "mock") or "mock") provider_settings = dict(getattr(cfg, "settings", {}) or {}) provider = get_provider(provider_name) idempotency_key = f"{provider_name}:{task.id}:{event.id}" external_chat_id = await sync_to_async(resolve_external_chat_id)( user=task.user, provider=provider_name, service=str(task.source_service or ""), channel=str(task.source_channel or ""), ) cached_project = task._state.fields_cache.get("project") cached_epic = task._state.fields_cache.get("epic") project_name = str(getattr(cached_project, "name", "") or "") epic_name = str(getattr(cached_epic, "name", "") or "") memory_context: list = [] try: memory_context = await sync_to_async(retrieve_memories_for_prompt)( user_id=int(task.user_id), query=str(task.title or ""), limit=10, ) except Exception: pass request_payload = { "task_id": str(task.id), "reference_code": str(task.reference_code or ""), "title": str(task.title or ""), "external_key": str(task.external_key or ""), "due_date": task.due_date.isoformat() if task.due_date else "", "assignee_identifier": str(task.assignee_identifier or ""), "project_name": project_name, "epic_name": epic_name, "source_service": str(task.source_service or ""), "source_channel": str(task.source_channel or ""), "external_chat_id": external_chat_id, "origin_message_id": str(getattr(task, "origin_message_id", "") or ""), "trigger_message_id": str( getattr(event, "source_message_id", "") or getattr(task, "origin_message_id", "") or "" ), "mode": "default", "payload": event.payload, "memory_context": memory_context, } codex_run = await sync_to_async(CodexRun.objects.create)( user=task.user, task_id=task.id, derived_task_event_id=event.id, source_message_id=(event.source_message_id or task.origin_message_id), project_id=task.project_id, epic_id=task.epic_id, source_service=str(task.source_service or ""), source_channel=str(task.source_channel or ""), external_chat_id=external_chat_id, status="queued", request_payload={ "action": action, "provider_payload": dict(request_payload), "idempotency_key": idempotency_key, }, result_payload={}, error="", ) request_payload["codex_run_id"] = str(codex_run.id) # Worker-backed providers are queued and executed by `manage.py codex_worker`. if bool(getattr(provider, "run_in_worker", False)): if provider_name == "codex_cli": await sync_to_async(queue_codex_event_with_pre_approval)( user=task.user, run=codex_run, task=task, task_event=event, action=action, provider_payload=dict(request_payload), idempotency_key=idempotency_key, ) return await sync_to_async(ExternalSyncEvent.objects.update_or_create)( idempotency_key=idempotency_key, defaults={ "user": task.user, "task": task, "task_event": event, "provider": provider_name, "status": "pending", "payload": { "action": action, "provider_payload": dict(request_payload), }, "error": "", }, ) return if action == "create": result = provider.create_task(provider_settings, dict(request_payload)) elif action == "complete": result = provider.mark_complete(provider_settings, dict(request_payload)) else: result = provider.append_update(provider_settings, dict(request_payload)) status = "ok" if result.ok else "failed" await sync_to_async(ExternalSyncEvent.objects.update_or_create)( idempotency_key=idempotency_key, defaults={ "user": task.user, "task": task, "task_event": event, "provider": provider_name, "status": status, "payload": dict(result.payload or {}), "error": str(result.error or ""), }, ) codex_run.status = status codex_run.result_payload = dict(result.payload or {}) codex_run.error = str(result.error or "") await sync_to_async(codex_run.save)( update_fields=["status", "result_payload", "error", "updated_at"] ) if result.ok and result.external_key and not task.external_key: task.external_key = str(result.external_key) await sync_to_async(task.save)(update_fields=["external_key"]) async def _completion_regex(message: Message) -> re.Pattern: patterns = await sync_to_async(list)( TaskCompletionPattern.objects.filter(user=message.user, enabled=True).order_by( "position", "created_at" ) ) phrases = [ str(row.phrase or "").strip() for row in patterns if str(row.phrase or "").strip() ] if not phrases: phrases = ["done", "completed", "fixed"] return re.compile( r"\\b(?:" + "|".join(re.escape(p) for p in phrases) + r")\\s*#([A-Za-z0-9_-]+)\\b", re.IGNORECASE, ) async def _send_scope_message( source: ChatTaskSource, message: Message, text: str ) -> None: await send_message_raw( source.service or message.source_service or "web", source.channel_identifier or message.source_chat_id or "", text=text, attachments=[], metadata={"origin": "task_scope_command"}, ) async def _handle_scope_task_commands( message: Message, sources: list[ChatTaskSource], text: str ) -> bool: if not sources: return False body = str(text or "").strip() source = sources[0] if _LIST_TASKS_RE.match(body) or _LIST_TASKS_CMD_RE.match(body): open_rows = await sync_to_async(list)( DerivedTask.objects.filter( user=message.user, project=source.project, source_service=source.service, source_channel=source.channel_identifier, ) .exclude(status_snapshot="completed") .order_by("-created_at")[:20] ) if not open_rows: await _send_scope_message( source, message, "[task] no open tasks in this chat." ) return True lines = ["[task] open tasks:"] for row in open_rows: lines.append(f"- #{row.reference_code} {row.title}") await _send_scope_message(source, message, "\n".join(lines)) return True create_match = _TASK_ADD_CMD_RE.match(body) if create_match: task_text = str(create_match.group("title") or "").strip() if not task_text: await _send_scope_message( source, message, "[task] title is required for .task add." ) return True epic = source.epic epic_name = _extract_epic_name_from_text(task_text) if epic_name: epic, _ = await sync_to_async(TaskEpic.objects.get_or_create)( project=source.project, name=epic_name, ) cleaned_task_text = _strip_epic_token(task_text) task, _event = await create_task_record_and_sync( user=message.user, project=source.project, epic=epic, title=cleaned_task_text[:255], source_service=source.service or message.source_service or "web", source_channel=source.channel_identifier or message.source_chat_id or "", origin_message=message, actor_identifier=str(message.sender_uuid or ""), due_date=_parse_due_date(cleaned_task_text), assignee_identifier=_parse_assignee(cleaned_task_text), immutable_payload={ "origin_text": text, "task_text": cleaned_task_text, "source": "chat_manual_command", }, event_payload={ "origin_text": text, "command": ".task add", "via": "chat_command", }, ) await _send_scope_message( source, message, f"[task] created #{task.reference_code}: {task.title}" ) return True undo_match = _UNDO_TASK_RE.match(body) if undo_match: reference = str(undo_match.group("reference") or "").strip() if reference: task = await sync_to_async( lambda: DerivedTask.objects.filter( user=message.user, project=source.project, source_service=source.service, source_channel=source.channel_identifier, reference_code=reference, ) .order_by("-created_at") .first() )() else: task = await sync_to_async( lambda: DerivedTask.objects.filter( user=message.user, project=source.project, source_service=source.service, source_channel=source.channel_identifier, ) .order_by("-created_at") .first() )() if task is None: await _send_scope_message( source, message, "[task] nothing to undo in this chat." ) return True ref = str(task.reference_code or "") title = str(task.title or "") await sync_to_async(task.delete)() await _send_scope_message(source, message, f"[task] removed #{ref}: {title}") return True show_match = _TASK_SHOW_RE.match(body) if show_match: reference = str(show_match.group("reference") or "").strip() task = await sync_to_async( lambda: DerivedTask.objects.filter( user=message.user, project=source.project, source_service=source.service, source_channel=source.channel_identifier, reference_code=reference, ) .order_by("-created_at") .first() )() if task is None: await _send_scope_message( source, message, f"[task] #{reference} not found." ) return True due_str = f"\ndue: {task.due_date}" if task.due_date else "" assignee_str = ( f"\nassignee: {task.assignee_identifier}" if task.assignee_identifier else "" ) detail = ( f"[task] #{task.reference_code}: {task.title}" f"\nstatus: {task.status_snapshot}" f"{due_str}" f"{assignee_str}" ) await _send_scope_message(source, message, detail) return True complete_match = _TASK_COMPLETE_CMD_RE.match(body) if complete_match: reference = str(complete_match.group("reference") or "").strip() task = await sync_to_async( lambda: DerivedTask.objects.filter( user=message.user, project=source.project, source_service=source.service, source_channel=source.channel_identifier, reference_code=reference, ) .order_by("-created_at") .first() )() if task is None: await _send_scope_message( source, message, f"[task] #{reference} not found." ) return True await mark_task_completed_and_sync( task=task, actor_identifier=str(message.sender_uuid or ""), source_message=message, payload={ "marker": reference, "command": ".task complete", "via": "chat_command", }, ) await _send_scope_message( source, message, f"[task] completed #{task.reference_code}: {task.title}" ) return True return False def _extract_epic_name_from_text(text: str) -> str: body = str(text or "") match = _EPIC_TOKEN_RE.search(body) if not match: return "" return str(match.group(1) or "").strip() def _strip_epic_token(text: str) -> str: body = str(text or "") cleaned = _EPIC_TOKEN_RE.sub("", body) return re.sub(r"\s{2,}", " ", cleaned).strip() async def _handle_epic_create_command( message: Message, sources: list[ChatTaskSource], text: str ) -> bool: match = _EPIC_CREATE_RE.match(str(text or "")) if not match or not sources: return False name = str(match.group("name") or "").strip() if not name: return True source = sources[0] epic, created = await sync_to_async(TaskEpic.objects.get_or_create)( project=source.project, name=name, ) state = "created" if created else "already exists" await _send_scope_message( source, message, ( f"[epic] {state}: {epic.name}\n" "WhatsApp usage:\n" "- create epic: epic: <Epic name> (or .epic <Epic name>)\n" "- add task to epic: task: <description> [epic:<Epic name>]\n" "- list tasks: .l list tasks\n" "- undo latest task: .undo" ), ) return True def _is_task_command_candidate(text: str) -> bool: body = str(text or "").strip() if not body: return False if ( _LIST_TASKS_RE.match(body) or _LIST_TASKS_CMD_RE.match(body) or _TASK_ADD_CMD_RE.match(body) or _TASK_SHOW_RE.match(body) or _TASK_COMPLETE_CMD_RE.match(body) or _UNDO_TASK_RE.match(body) or _EPIC_CREATE_RE.match(body) ): return True return _has_task_prefix(body.lower(), ["task:", "todo:"]) def _is_explicit_task_command(text: str) -> bool: body = str(text or "").strip() if not body: return False return bool( _LIST_TASKS_RE.match(body) or _LIST_TASKS_CMD_RE.match(body) or _TASK_ADD_CMD_RE.match(body) or _TASK_SHOW_RE.match(body) or _TASK_COMPLETE_CMD_RE.match(body) or _UNDO_TASK_RE.match(body) or _EPIC_CREATE_RE.match(body) ) async def process_inbound_task_intelligence(message: Message) -> None: if message is None: return if dict(message.message_meta or {}).get("origin_tag"): return text = str(message.text or "").strip() if not text: return security_context = CommandSecurityContext( service=str(message.source_service or "").strip().lower(), channel_identifier=str(message.source_chat_id or "").strip(), message_meta=dict(message.message_meta or {}), payload={}, ) if _is_explicit_task_command(text): command_decision = await sync_to_async(evaluate_command_policy)( user=message.user, scope_key="tasks.commands", context=security_context, ) if not command_decision.allowed: return sources = await _resolve_source_mappings(message) if not sources: if not _is_task_command_candidate(text): return service, channel = resolve_message_scope(message) if not service or not channel: return seeded = await sync_to_async(ensure_default_source_for_chat)( user=message.user, service=service, channel_identifier=channel, message=message, ) if seeded is None: return sources = [seeded] if await _handle_scope_task_commands(message, sources, text): return if await _handle_epic_create_command(message, sources, text): return submit_decision = await sync_to_async(evaluate_command_policy)( user=message.user, scope_key="tasks.submit", context=security_context, ) if not submit_decision.allowed: return completion_allowed = any( bool(_effective_flags(source).get("completion_enabled")) for source in sources ) completion_rx = await _completion_regex(message) if completion_allowed else None marker_match = (completion_rx.search(text) if completion_rx else None) or ( _COMPLETION_RE.search(text) if completion_allowed else None ) if marker_match: ref_code = str(marker_match.group(marker_match.lastindex or 1) or "").strip() task = await sync_to_async( lambda: DerivedTask.objects.filter( user=message.user, reference_code=ref_code ) .order_by("-created_at") .first() )() if not task: # parser warning event attached to a newly derived placeholder in mapped project source = sources[0] placeholder = await sync_to_async(DerivedTask.objects.create)( user=message.user, project=source.project, epic=source.epic, title=f"Unresolved completion marker #{ref_code}", source_service=message.source_service or "web", source_channel=message.source_chat_id or "", origin_message=message, reference_code=ref_code, status_snapshot="warning", immutable_payload={"warning": "completion_marker_unresolved"}, ) await sync_to_async(DerivedTaskEvent.objects.create)( task=placeholder, event_type="parse_warning", actor_identifier=str(message.sender_uuid or ""), source_message=message, payload={"reason": "completion_marker_unresolved", "marker": ref_code}, ) return await mark_task_completed_and_sync( task=task, actor_identifier=str(message.sender_uuid or ""), source_message=message, payload={"marker": ref_code}, ) return for source in sources: flags = _effective_flags(source) if not bool(flags.get("derive_enabled", True)): continue task_text = _strip_epic_token(text) if not _is_task_candidate(task_text, flags): continue epic = source.epic epic_name = _extract_epic_name_from_text(text) if epic_name: epic, _ = await sync_to_async(TaskEpic.objects.get_or_create)( project=source.project, name=epic_name, ) cloned_message = message if task_text != text: cloned_message = Message( user=message.user, text=task_text, source_service=message.source_service, source_chat_id=message.source_chat_id, ) title = await _derive_title_with_flags(cloned_message, flags) parsed_due_date = _parse_due_date(task_text) parsed_assignee = _parse_assignee(task_text) task, event = await create_task_record_and_sync( user=message.user, project=source.project, epic=epic, title=title, source_service=source.service or message.source_service or "web", source_channel=source.channel_identifier or message.source_chat_id or "", origin_message=message, actor_identifier=str(message.sender_uuid or ""), due_date=parsed_due_date, assignee_identifier=parsed_assignee, immutable_payload={ "origin_text": text, "task_text": task_text, "flags": flags, }, event_payload={"origin_text": text}, ) if bool(flags.get("announce_task_id", False)): try: await send_message_raw( source.service or message.source_service or "web", source.channel_identifier or message.source_chat_id or "", text=f"[task] Created #{task.reference_code}: {task.title}", attachments=[], metadata={"origin": "task_announce"}, ) except Exception: # Announcement is best-effort and should not block derivation. pass scope_count = await sync_to_async( lambda: DerivedTask.objects.filter( user=message.user, project=source.project, source_service=source.service, source_channel=source.channel_identifier, ).count() )() if scope_count > 0 and scope_count % 10 == 0: try: await send_message_raw( source.service or message.source_service or "web", source.channel_identifier or message.source_chat_id or "", text="[task] tip: use .l list tasks to review tasks. use .undo to uncreate the latest task.", attachments=[], metadata={"origin": "task_reminder"}, ) except Exception: pass