from __future__ import annotations import re from asgiref.sync import sync_to_async from django.conf import settings from django.db.models import Q from core.clients.transport import send_message_raw from core.messaging import ai as ai_runner from core.models import ( AI, Chat, ChatTaskSource, DerivedTask, DerivedTaskEvent, ExternalSyncEvent, ExternalChatLink, Message, PersonIdentifier, TaskCompletionPattern, TaskProviderConfig, ) from core.tasks.providers import get_provider _TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE) _COMPLETION_RE = re.compile(r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE) _BALANCED_HINT_RE = re.compile(r"\b(todo|task|action item|action)\b", re.IGNORECASE) _BROAD_HINT_RE = re.compile(r"\b(todo|task|action|need to|please|reminder)\b", re.IGNORECASE) def _channel_variants(service: str, channel: str) -> list[str]: value = str(channel or "").strip() if not value: return [] variants = [value] service_key = str(service or "").strip().lower() if service_key == "whatsapp": bare = value.split("@", 1)[0].strip() if bare and bare not in variants: variants.append(bare) direct = f"{bare}@s.whatsapp.net" if bare else "" if direct and direct not in variants: variants.append(direct) group = f"{bare}@g.us" if bare else "" if group and group not in variants: variants.append(group) if service_key == "signal": digits = re.sub(r"[^0-9]", "", value) if digits and digits not in variants: variants.append(digits) if digits: plus = f"+{digits}" if plus not in variants: variants.append(plus) return variants async def _resolve_source_mappings(message: Message) -> list[ChatTaskSource]: variants = _channel_variants(message.source_service or "", message.source_chat_id or "") if str(message.source_service or "").strip().lower() == "signal": signal_value = str(message.source_chat_id or "").strip() if signal_value: companions = await sync_to_async(list)( Chat.objects.filter(source_uuid=signal_value).values_list("source_number", flat=True) ) companions += await sync_to_async(list)( Chat.objects.filter(source_number=signal_value).values_list("source_uuid", flat=True) ) for candidate in companions: for expanded in _channel_variants("signal", str(candidate or "").strip()): if expanded and expanded not in variants: variants.append(expanded) if not variants: return [] return await sync_to_async(list)( ChatTaskSource.objects.filter( user=message.user, enabled=True, service=message.source_service, channel_identifier__in=variants, ).select_related("project", "epic") ) def _to_bool(raw, default=False) -> bool: if raw is None: return bool(default) value = str(raw).strip().lower() if value in {"1", "true", "yes", "on", "y"}: return True if value in {"0", "false", "no", "off", "n"}: return False return bool(default) def _parse_prefixes(raw) -> list[str]: if isinstance(raw, list): values = raw else: values = str(raw or "").split(",") rows = [] for row in values: item = str(row or "").strip().lower() if item and item not in rows: rows.append(item) return rows or ["task:", "todo:", "action:"] def _normalize_flags(raw: dict | None) -> dict: row = dict(raw or {}) return { "derive_enabled": _to_bool(row.get("derive_enabled"), True), "match_mode": str(row.get("match_mode") or "balanced").strip().lower() or "balanced", "require_prefix": _to_bool(row.get("require_prefix"), False), "allowed_prefixes": _parse_prefixes(row.get("allowed_prefixes")), "completion_enabled": _to_bool(row.get("completion_enabled"), True), "ai_title_enabled": _to_bool(row.get("ai_title_enabled"), True), "announce_task_id": _to_bool(row.get("announce_task_id"), False), "min_chars": max(1, int(row.get("min_chars") or 3)), } def _normalize_partial_flags(raw: dict | None) -> dict: row = dict(raw or {}) out = {} if "derive_enabled" in row: out["derive_enabled"] = _to_bool(row.get("derive_enabled"), True) if "match_mode" in row: out["match_mode"] = str(row.get("match_mode") or "balanced").strip().lower() or "balanced" if "require_prefix" in row: out["require_prefix"] = _to_bool(row.get("require_prefix"), False) if "allowed_prefixes" in row: out["allowed_prefixes"] = _parse_prefixes(row.get("allowed_prefixes")) if "completion_enabled" in row: out["completion_enabled"] = _to_bool(row.get("completion_enabled"), True) if "ai_title_enabled" in row: out["ai_title_enabled"] = _to_bool(row.get("ai_title_enabled"), True) if "announce_task_id" in row: out["announce_task_id"] = _to_bool(row.get("announce_task_id"), False) if "min_chars" in row: out["min_chars"] = max(1, int(row.get("min_chars") or 3)) return out def _effective_flags(source: ChatTaskSource) -> dict: project_flags = _normalize_flags(getattr(getattr(source, "project", None), "settings", {}) or {}) source_flags = _normalize_partial_flags(getattr(source, "settings", {}) or {}) merged = dict(project_flags) merged.update(source_flags) return merged def _is_task_candidate(text: str, flags: dict) -> bool: body = str(text or "").strip() if len(body) < int(flags.get("min_chars") or 1): return False body_lower = body.lower() prefixes = list(flags.get("allowed_prefixes") or []) has_prefix = any(body_lower.startswith(prefix) for prefix in prefixes) if bool(flags.get("require_prefix")) and not has_prefix: return False mode = str(flags.get("match_mode") or "balanced").strip().lower() if mode == "strict": return has_prefix if mode == "broad": return has_prefix or bool(_BROAD_HINT_RE.search(body)) return has_prefix or bool(_BALANCED_HINT_RE.search(body)) def _next_reference(user, project) -> str: last = ( DerivedTask.objects.filter(user=user, project=project) .exclude(reference_code="") .order_by("-created_at") .first() ) if not last: return "1" try: return str(int(str(last.reference_code)) + 1) except Exception: return str(DerivedTask.objects.filter(user=user, project=project).count() + 1) async def _derive_title(message: Message) -> str: text = str(message.text or "").strip() if not text: return "Untitled task" if not bool(getattr(settings, "TASK_DERIVATION_USE_AI", True)): return text[:255] ai_obj = await sync_to_async(lambda: AI.objects.filter(user=message.user).first())() if not ai_obj: return text[:255] prompt = [ { "role": "system", "content": "Extract one concise actionable task title from the message. Return plain text only.", }, {"role": "user", "content": text[:2000]}, ] try: title = str(await ai_runner.run_prompt(prompt, ai_obj, operation="task_derive_title") or "").strip() except Exception: title = "" return (title or text)[:255] async def _derive_title_with_flags(message: Message, flags: dict) -> str: if not bool(flags.get("ai_title_enabled", True)): text = str(message.text or "").strip() return (text or "Untitled task")[:255] return await _derive_title(message) async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: str) -> None: cfg = await sync_to_async( lambda: TaskProviderConfig.objects.filter(user=task.user, enabled=True).order_by("provider").first() )() provider_name = str(getattr(cfg, "provider", "mock") or "mock") provider_settings = dict(getattr(cfg, "settings", {}) or {}) provider = get_provider(provider_name) idempotency_key = f"{provider_name}:{task.id}:{event.id}" variants = _channel_variants(task.source_service or "", task.source_channel or "") person_identifier = None if variants: person_identifier = await sync_to_async( lambda: PersonIdentifier.objects.filter( user=task.user, service=task.source_service, identifier__in=variants, ) .select_related("person") .order_by("-id") .first() )() external_chat_id = "" if person_identifier is not None: link = await sync_to_async( lambda: ExternalChatLink.objects.filter( user=task.user, provider=provider_name, enabled=True, ) .filter( Q(person_identifier=person_identifier) | Q(person=person_identifier.person) ) .order_by("-updated_at", "-id") .first() )() if link is not None: external_chat_id = str(link.external_chat_id or "").strip() # Worker-backed providers are queued and executed by `manage.py codex_worker`. if bool(getattr(provider, "run_in_worker", False)): await sync_to_async(ExternalSyncEvent.objects.update_or_create)( idempotency_key=idempotency_key, defaults={ "user": task.user, "task": task, "task_event": event, "provider": provider_name, "status": "pending", "payload": { "action": action, "provider_payload": { "task_id": str(task.id), "title": task.title, "external_key": task.external_key, "reference_code": task.reference_code, "source_service": str(task.source_service or ""), "source_channel": str(task.source_channel or ""), "external_chat_id": external_chat_id, "payload": event.payload, }, }, "error": "", }, ) return if action == "create": result = provider.create_task(provider_settings, { "task_id": str(task.id), "title": task.title, "external_key": task.external_key, "reference_code": task.reference_code, "source_service": str(task.source_service or ""), "source_channel": str(task.source_channel or ""), "external_chat_id": external_chat_id, }) elif action == "complete": result = provider.mark_complete(provider_settings, { "task_id": str(task.id), "external_key": task.external_key, "reference_code": task.reference_code, "source_service": str(task.source_service or ""), "source_channel": str(task.source_channel or ""), "external_chat_id": external_chat_id, }) else: result = provider.append_update(provider_settings, { "task_id": str(task.id), "external_key": task.external_key, "reference_code": task.reference_code, "source_service": str(task.source_service or ""), "source_channel": str(task.source_channel or ""), "external_chat_id": external_chat_id, "payload": event.payload, }) status = "ok" if result.ok else "failed" await sync_to_async(ExternalSyncEvent.objects.update_or_create)( idempotency_key=idempotency_key, defaults={ "user": task.user, "task": task, "task_event": event, "provider": provider_name, "status": status, "payload": dict(result.payload or {}), "error": str(result.error or ""), }, ) if result.ok and result.external_key and not task.external_key: task.external_key = str(result.external_key) await sync_to_async(task.save)(update_fields=["external_key"]) async def _completion_regex(message: Message) -> re.Pattern: patterns = await sync_to_async(list)( TaskCompletionPattern.objects.filter(user=message.user, enabled=True).order_by("position", "created_at") ) phrases = [str(row.phrase or "").strip() for row in patterns if str(row.phrase or "").strip()] if not phrases: phrases = ["done", "completed", "fixed"] return re.compile(r"\\b(?:" + "|".join(re.escape(p) for p in phrases) + r")\\s*#([A-Za-z0-9_-]+)\\b", re.IGNORECASE) async def process_inbound_task_intelligence(message: Message) -> None: if message is None: return if dict(message.message_meta or {}).get("origin_tag"): return text = str(message.text or "").strip() if not text: return sources = await _resolve_source_mappings(message) if not sources: return completion_allowed = any(bool(_effective_flags(source).get("completion_enabled")) for source in sources) completion_rx = await _completion_regex(message) if completion_allowed else None marker_match = (completion_rx.search(text) if completion_rx else None) or (_COMPLETION_RE.search(text) if completion_allowed else None) if marker_match: ref_code = str(marker_match.group(marker_match.lastindex or 1) or "").strip() task = await sync_to_async( lambda: DerivedTask.objects.filter(user=message.user, reference_code=ref_code).order_by("-created_at").first() )() if not task: # parser warning event attached to a newly derived placeholder in mapped project source = sources[0] placeholder = await sync_to_async(DerivedTask.objects.create)( user=message.user, project=source.project, epic=source.epic, title=f"Unresolved completion marker #{ref_code}", source_service=message.source_service or "web", source_channel=message.source_chat_id or "", origin_message=message, reference_code=ref_code, status_snapshot="warning", immutable_payload={"warning": "completion_marker_unresolved"}, ) await sync_to_async(DerivedTaskEvent.objects.create)( task=placeholder, event_type="parse_warning", actor_identifier=str(message.sender_uuid or ""), source_message=message, payload={"reason": "completion_marker_unresolved", "marker": ref_code}, ) return task.status_snapshot = "completed" await sync_to_async(task.save)(update_fields=["status_snapshot"]) event = await sync_to_async(DerivedTaskEvent.objects.create)( task=task, event_type="completion_marked", actor_identifier=str(message.sender_uuid or ""), source_message=message, payload={"marker": ref_code}, ) await _emit_sync_event(task, event, "complete") return for source in sources: flags = _effective_flags(source) if not bool(flags.get("derive_enabled", True)): continue if not _is_task_candidate(text, flags): continue title = await _derive_title_with_flags(message, flags) reference = await sync_to_async(_next_reference)(message.user, source.project) task = await sync_to_async(DerivedTask.objects.create)( user=message.user, project=source.project, epic=source.epic, title=title, source_service=message.source_service or "web", source_channel=message.source_chat_id or "", origin_message=message, reference_code=reference, status_snapshot="open", immutable_payload={"origin_text": text, "flags": flags}, ) event = await sync_to_async(DerivedTaskEvent.objects.create)( task=task, event_type="created", actor_identifier=str(message.sender_uuid or ""), source_message=message, payload={"origin_text": text}, ) await _emit_sync_event(task, event, "create") if bool(flags.get("announce_task_id", False)): try: await send_message_raw( message.source_service or "web", message.source_chat_id or "", text=f"[task] Created #{task.reference_code}: {task.title}", attachments=[], metadata={"origin": "task_announce"}, ) except Exception: # Announcement is best-effort and should not block derivation. pass