Files
GIA/core/tasks/engine.py

342 lines
13 KiB
Python

from __future__ import annotations
import re
from asgiref.sync import sync_to_async
from django.conf import settings
from core.clients.transport import send_message_raw
from core.messaging import ai as ai_runner
from core.models import (
AI,
ChatTaskSource,
DerivedTask,
DerivedTaskEvent,
ExternalSyncEvent,
Message,
TaskCompletionPattern,
TaskProviderConfig,
)
from core.tasks.providers.mock import get_provider
_TASK_HINT_RE = re.compile(r"\b(todo|task|action|need to|please)\b", re.IGNORECASE)
_COMPLETION_RE = re.compile(r"\b(done|completed|fixed)\s*#([A-Za-z0-9_-]+)\b", re.IGNORECASE)
_BALANCED_HINT_RE = re.compile(r"\b(todo|task|action item|action)\b", re.IGNORECASE)
_BROAD_HINT_RE = re.compile(r"\b(todo|task|action|need to|please|reminder)\b", re.IGNORECASE)
def _channel_variants(service: str, channel: str) -> list[str]:
value = str(channel or "").strip()
if not value:
return []
variants = [value]
if str(service or "").strip().lower() == "whatsapp":
bare = value.split("@", 1)[0].strip()
if bare and bare not in variants:
variants.append(bare)
group = f"{bare}@g.us" if bare else ""
if group and group not in variants:
variants.append(group)
return variants
async def _resolve_source_mappings(message: Message) -> list[ChatTaskSource]:
variants = _channel_variants(message.source_service or "", message.source_chat_id or "")
if not variants:
return []
return await sync_to_async(list)(
ChatTaskSource.objects.filter(
user=message.user,
enabled=True,
service=message.source_service,
channel_identifier__in=variants,
).select_related("project", "epic")
)
def _to_bool(raw, default=False) -> bool:
if raw is None:
return bool(default)
value = str(raw).strip().lower()
if value in {"1", "true", "yes", "on", "y"}:
return True
if value in {"0", "false", "no", "off", "n"}:
return False
return bool(default)
def _parse_prefixes(raw) -> list[str]:
if isinstance(raw, list):
values = raw
else:
values = str(raw or "").split(",")
rows = []
for row in values:
item = str(row or "").strip().lower()
if item and item not in rows:
rows.append(item)
return rows or ["task:", "todo:", "action:"]
def _normalize_flags(raw: dict | None) -> dict:
row = dict(raw or {})
return {
"derive_enabled": _to_bool(row.get("derive_enabled"), True),
"match_mode": str(row.get("match_mode") or "balanced").strip().lower() or "balanced",
"require_prefix": _to_bool(row.get("require_prefix"), False),
"allowed_prefixes": _parse_prefixes(row.get("allowed_prefixes")),
"completion_enabled": _to_bool(row.get("completion_enabled"), True),
"ai_title_enabled": _to_bool(row.get("ai_title_enabled"), True),
"announce_task_id": _to_bool(row.get("announce_task_id"), False),
"min_chars": max(1, int(row.get("min_chars") or 3)),
}
def _normalize_partial_flags(raw: dict | None) -> dict:
row = dict(raw or {})
out = {}
if "derive_enabled" in row:
out["derive_enabled"] = _to_bool(row.get("derive_enabled"), True)
if "match_mode" in row:
out["match_mode"] = str(row.get("match_mode") or "balanced").strip().lower() or "balanced"
if "require_prefix" in row:
out["require_prefix"] = _to_bool(row.get("require_prefix"), False)
if "allowed_prefixes" in row:
out["allowed_prefixes"] = _parse_prefixes(row.get("allowed_prefixes"))
if "completion_enabled" in row:
out["completion_enabled"] = _to_bool(row.get("completion_enabled"), True)
if "ai_title_enabled" in row:
out["ai_title_enabled"] = _to_bool(row.get("ai_title_enabled"), True)
if "announce_task_id" in row:
out["announce_task_id"] = _to_bool(row.get("announce_task_id"), False)
if "min_chars" in row:
out["min_chars"] = max(1, int(row.get("min_chars") or 3))
return out
def _effective_flags(source: ChatTaskSource) -> dict:
project_flags = _normalize_flags(getattr(getattr(source, "project", None), "settings", {}) or {})
source_flags = _normalize_partial_flags(getattr(source, "settings", {}) or {})
merged = dict(project_flags)
merged.update(source_flags)
return merged
def _is_task_candidate(text: str, flags: dict) -> bool:
body = str(text or "").strip()
if len(body) < int(flags.get("min_chars") or 1):
return False
body_lower = body.lower()
prefixes = list(flags.get("allowed_prefixes") or [])
has_prefix = any(body_lower.startswith(prefix) for prefix in prefixes)
if bool(flags.get("require_prefix")) and not has_prefix:
return False
mode = str(flags.get("match_mode") or "balanced").strip().lower()
if mode == "strict":
return has_prefix
if mode == "broad":
return has_prefix or bool(_BROAD_HINT_RE.search(body))
return has_prefix or bool(_BALANCED_HINT_RE.search(body))
def _next_reference(user, project) -> str:
last = (
DerivedTask.objects.filter(user=user, project=project)
.exclude(reference_code="")
.order_by("-created_at")
.first()
)
if not last:
return "1"
try:
return str(int(str(last.reference_code)) + 1)
except Exception:
return str(DerivedTask.objects.filter(user=user, project=project).count() + 1)
async def _derive_title(message: Message) -> str:
text = str(message.text or "").strip()
if not text:
return "Untitled task"
if not bool(getattr(settings, "TASK_DERIVATION_USE_AI", True)):
return text[:255]
ai_obj = await sync_to_async(lambda: AI.objects.filter(user=message.user).first())()
if not ai_obj:
return text[:255]
prompt = [
{
"role": "system",
"content": "Extract one concise actionable task title from the message. Return plain text only.",
},
{"role": "user", "content": text[:2000]},
]
try:
title = str(await ai_runner.run_prompt(prompt, ai_obj, operation="task_derive_title") or "").strip()
except Exception:
title = ""
return (title or text)[:255]
async def _derive_title_with_flags(message: Message, flags: dict) -> str:
if not bool(flags.get("ai_title_enabled", True)):
text = str(message.text or "").strip()
return (text or "Untitled task")[:255]
return await _derive_title(message)
async def _emit_sync_event(task: DerivedTask, event: DerivedTaskEvent, action: str) -> None:
cfg = await sync_to_async(
lambda: TaskProviderConfig.objects.filter(user=task.user, enabled=True).order_by("provider").first()
)()
provider_name = str(getattr(cfg, "provider", "mock") or "mock")
provider_settings = dict(getattr(cfg, "settings", {}) or {})
provider = get_provider(provider_name)
idempotency_key = f"{provider_name}:{task.id}:{event.id}"
if action == "create":
result = provider.create_task(provider_settings, {
"task_id": str(task.id),
"title": task.title,
"external_key": task.external_key,
"reference_code": task.reference_code,
})
elif action == "complete":
result = provider.mark_complete(provider_settings, {
"task_id": str(task.id),
"external_key": task.external_key,
"reference_code": task.reference_code,
})
else:
result = provider.append_update(provider_settings, {
"task_id": str(task.id),
"external_key": task.external_key,
"reference_code": task.reference_code,
"payload": event.payload,
})
status = "ok" if result.ok else "failed"
await sync_to_async(ExternalSyncEvent.objects.update_or_create)(
idempotency_key=idempotency_key,
defaults={
"user": task.user,
"task": task,
"task_event": event,
"provider": provider_name,
"status": status,
"payload": dict(result.payload or {}),
"error": str(result.error or ""),
},
)
if result.ok and result.external_key and not task.external_key:
task.external_key = str(result.external_key)
await sync_to_async(task.save)(update_fields=["external_key"])
async def _completion_regex(message: Message) -> re.Pattern:
patterns = await sync_to_async(list)(
TaskCompletionPattern.objects.filter(user=message.user, enabled=True).order_by("position", "created_at")
)
phrases = [str(row.phrase or "").strip() for row in patterns if str(row.phrase or "").strip()]
if not phrases:
phrases = ["done", "completed", "fixed"]
return re.compile(r"\\b(?:" + "|".join(re.escape(p) for p in phrases) + r")\\s*#([A-Za-z0-9_-]+)\\b", re.IGNORECASE)
async def process_inbound_task_intelligence(message: Message) -> None:
if message is None:
return
if dict(message.message_meta or {}).get("origin_tag"):
return
text = str(message.text or "").strip()
if not text:
return
sources = await _resolve_source_mappings(message)
if not sources:
return
completion_allowed = any(bool(_effective_flags(source).get("completion_enabled")) for source in sources)
completion_rx = await _completion_regex(message) if completion_allowed else None
marker_match = (completion_rx.search(text) if completion_rx else None) or (_COMPLETION_RE.search(text) if completion_allowed else None)
if marker_match:
ref_code = str(marker_match.group(marker_match.lastindex or 1) or "").strip()
task = await sync_to_async(
lambda: DerivedTask.objects.filter(user=message.user, reference_code=ref_code).order_by("-created_at").first()
)()
if not task:
# parser warning event attached to a newly derived placeholder in mapped project
source = sources[0]
placeholder = await sync_to_async(DerivedTask.objects.create)(
user=message.user,
project=source.project,
epic=source.epic,
title=f"Unresolved completion marker #{ref_code}",
source_service=message.source_service or "web",
source_channel=message.source_chat_id or "",
origin_message=message,
reference_code=ref_code,
status_snapshot="warning",
immutable_payload={"warning": "completion_marker_unresolved"},
)
await sync_to_async(DerivedTaskEvent.objects.create)(
task=placeholder,
event_type="parse_warning",
actor_identifier=str(message.sender_uuid or ""),
source_message=message,
payload={"reason": "completion_marker_unresolved", "marker": ref_code},
)
return
task.status_snapshot = "completed"
await sync_to_async(task.save)(update_fields=["status_snapshot"])
event = await sync_to_async(DerivedTaskEvent.objects.create)(
task=task,
event_type="completion_marked",
actor_identifier=str(message.sender_uuid or ""),
source_message=message,
payload={"marker": ref_code},
)
await _emit_sync_event(task, event, "complete")
return
for source in sources:
flags = _effective_flags(source)
if not bool(flags.get("derive_enabled", True)):
continue
if not _is_task_candidate(text, flags):
continue
title = await _derive_title_with_flags(message, flags)
reference = await sync_to_async(_next_reference)(message.user, source.project)
task = await sync_to_async(DerivedTask.objects.create)(
user=message.user,
project=source.project,
epic=source.epic,
title=title,
source_service=message.source_service or "web",
source_channel=message.source_chat_id or "",
origin_message=message,
reference_code=reference,
status_snapshot="open",
immutable_payload={"origin_text": text, "flags": flags},
)
event = await sync_to_async(DerivedTaskEvent.objects.create)(
task=task,
event_type="created",
actor_identifier=str(message.sender_uuid or ""),
source_message=message,
payload={"origin_text": text},
)
await _emit_sync_event(task, event, "create")
if bool(flags.get("announce_task_id", False)):
try:
await send_message_raw(
message.source_service or "web",
message.source_chat_id or "",
text=f"[task] Created #{task.reference_code}: {task.title}",
attachments=[],
metadata={"origin": "task_announce"},
)
except Exception:
# Announcement is best-effort and should not block derivation.
pass