From 8c091b1e6d1c87c5bd75bacf0dcc6854e96199c9 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 6 Mar 2026 17:47:58 +0000 Subject: [PATCH] Increase platform abstraction cohesion --- INSTALL.md | 53 + Makefile | 3 + README.md | 1 + app/local_settings.py | 36 +- app/urls.py | 6 + artifacts/mcp/manticore-mcp-server.md | 71 + artifacts/plans/11-personal-ai-memory.md | 28 - .../12-mcp-server-for-tasks-and-knowledge.md | 60 - artifacts/plans/13-edit-delete.md | 16 - artifacts/plans/14-security-audit.md | 2 - .../15-simplify-task-settings-and-more.md | 19 - .../16-agent-knowledge-memory-foundation.md | 34 - .../plans/16-memory-backend-evaluation.md | 25 - .../plans/17-person-enrichment-without-llm.md | 95 ++ core/clients/signal.py | 189 ++- core/clients/transport.py | 8 + core/clients/xmpp.py | 92 +- .../commands/mcp_manticore_server.py | 11 + core/management/commands/memory_hygiene.py | 40 + .../commands/memory_suggest_from_messages.py | 46 + core/mcp/__init__.py | 3 + core/mcp/server.py | 149 ++ core/mcp/tools.py | 1220 +++++++++++++++++ core/memory/__init__.py | 3 +- core/memory/pipeline.py | 419 ++++++ core/memory/retrieval.py | 123 ++ core/memory/search_backend.py | 62 + core/messaging/history.py | 272 ++++ ...changerequest_knowledgearticle_and_more.py | 444 ++++++ core/models.py | 243 ++++ core/templates/base.html | 3 +- core/templates/pages/ai-execution-log.html | 287 ++-- core/templates/partials/compose-panel.html | 545 +++++++- core/templatetags/page_title.py | 18 + core/tests/test_compose_send_capabilities.py | 67 + core/tests/test_mcp_tools.py | 239 ++++ core/tests/test_memory_pipeline_commands.py | 97 ++ ...test_presence_query_and_compose_context.py | 29 +- core/tests/test_reaction_normalization.py | 63 + core/tests/test_signal_reply_send.py | 70 + core/tests/test_transport_capabilities.py | 10 + core/views/compose.py | 270 +++- core/views/prosody.py | 88 ++ core/views/workspace.py | 32 +- docker/uwsgi.ini | 2 +- rust/manticore-mcp-worker/Cargo.lock | 634 +++++++++ rust/manticore-mcp-worker/Cargo.toml | 10 + rust/manticore-mcp-worker/README.md | 26 + rust/manticore-mcp-worker/src/main.rs | 387 ++++++ scripts/quadlet/manage.sh | 14 +- stack.env.example | 3 + utilities/prosody/auth_django.sh | 64 +- utilities/prosody/manage_prosody_container.sh | 122 +- utilities/prosody/modules/mod_auth_gia.lua | 84 ++ utilities/prosody/prosody.cfg.lua | 58 +- 55 files changed, 6555 insertions(+), 440 deletions(-) create mode 100644 artifacts/mcp/manticore-mcp-server.md delete mode 100644 artifacts/plans/11-personal-ai-memory.md delete mode 100644 artifacts/plans/12-mcp-server-for-tasks-and-knowledge.md delete mode 100644 artifacts/plans/13-edit-delete.md delete mode 100644 artifacts/plans/14-security-audit.md delete mode 100644 artifacts/plans/15-simplify-task-settings-and-more.md delete mode 100644 artifacts/plans/16-agent-knowledge-memory-foundation.md delete mode 100644 artifacts/plans/16-memory-backend-evaluation.md create mode 100644 artifacts/plans/17-person-enrichment-without-llm.md create mode 100644 core/management/commands/mcp_manticore_server.py create mode 100644 core/management/commands/memory_hygiene.py create mode 100644 core/management/commands/memory_suggest_from_messages.py create mode 100644 core/mcp/__init__.py create mode 100644 core/mcp/server.py create mode 100644 core/mcp/tools.py create mode 100644 core/memory/pipeline.py create mode 100644 core/memory/retrieval.py create mode 100644 core/migrations/0036_memoryitem_memorychangerequest_knowledgearticle_and_more.py create mode 100644 core/templatetags/page_title.py create mode 100644 core/tests/test_compose_send_capabilities.py create mode 100644 core/tests/test_mcp_tools.py create mode 100644 core/tests/test_memory_pipeline_commands.py create mode 100644 core/views/prosody.py create mode 100644 rust/manticore-mcp-worker/Cargo.lock create mode 100644 rust/manticore-mcp-worker/Cargo.toml create mode 100644 rust/manticore-mcp-worker/README.md create mode 100644 rust/manticore-mcp-worker/src/main.rs create mode 100644 utilities/prosody/modules/mod_auth_gia.lua diff --git a/INSTALL.md b/INSTALL.md index fdb7b0b..aab81b0 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -200,8 +200,61 @@ Query memory backend: podman exec ur_gia /venv/bin/python manage.py memory_search_query --user-id 1 --query "reply style" ``` +Generate proposed memories from recent inbound messages: + +```bash +podman exec ur_gia /venv/bin/python manage.py memory_suggest_from_messages --user-id 1 --limit-messages 300 --max-items 30 +``` + +Run memory hygiene (expiry decay + contradiction queueing): + +```bash +podman exec ur_gia /venv/bin/python manage.py memory_hygiene --user-id 1 +``` + +Performance defaults now applied in GIA: + +- Batched Manticore reindex writes (`REPLACE ... VALUES (...)` in chunks) for lower ingest latency. +- Cached table-ensure checks to avoid `CREATE TABLE IF NOT EXISTS` overhead on every query. +- Runtime table maintenance available through MCP (`FLUSH RAMCHUNK`, `OPTIMIZE TABLE`) for steady query responsiveness. + +### F) MCP server for task + memory tooling (VS Code) + +The workspace includes an MCP config at `/code/xf/.vscode/mcp.json` for server `manticore`. + +It launches inside the running `ur_gia` container and forces: + +- `MEMORY_SEARCH_BACKEND=manticore` + +`MANTICORE_HTTP_URL` is read from the container environment (`stack.env` / app settings). + +Start requirements first: + +```bash +make run +./utilities/memory/manage_manticore_container.sh up +``` + +Then approve/enable the `manticore` MCP server in VS Code when prompted. + +Optional ultra-light Rust MCP worker: + +```bash +cd /code/xf/GIA +make mcp-rust-build +``` + +Then enable `manticore-rust-worker` in `/code/xf/.vscode/mcp.json`. +It is intentionally `disabled: true` by default so the existing Python MCP server remains the baseline. + ### C) Signal or WhatsApp send failures - Verify account/link status in service pages. - Verify `ur` service is running. - Inspect `ur` logs for transport-specific errors. + +### G) XMPP reconnect loop in logs + +- Confirm `XMPP_ADDRESS`, `XMPP_JID`, `XMPP_PORT`, and `XMPP_SECRET` are populated in `stack.env`. +- `XMPP_PORT` is parsed as an integer in settings; invalid values can cause repeated reconnect failures. +- The runtime now uses a single reconnect loop with exponential backoff to avoid overlapping reconnect churn. diff --git a/Makefile b/Makefile index 50a826e..f01b7ed 100644 --- a/Makefile +++ b/Makefile @@ -61,3 +61,6 @@ token: echo "Container '$(APP_CONTAINER)' is not running. Start the stack first with 'make run'." >&2; \ exit 125; \ fi + +mcp-rust-build: + cd rust/manticore-mcp-worker && cargo build --release diff --git a/README.md b/README.md index 9c09e72..77ad7b9 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ Core components: - `core/clients/signal.py`, `core/clients/signalapi.py`: Signal event + REST transport handling. - `core/clients/whatsapp.py`: Neonize-backed runtime transport. - `core/clients/xmpp.py`: XMPP component bridge and media upload relay. +- `rust/manticore-mcp-worker`: optional ultra-light MCP frontend for direct Manticore status/query/maintenance. - `core/views/compose.py`: Manual compose UX, polling/ws, send pipeline, media blob endpoint. - `core/views/workspace.py`: AI workspace operations and insight surfaces. - `core/views/osint.py`: Search/workspace OSINT interactions. diff --git a/app/local_settings.py b/app/local_settings.py index 334aa56..e07639f 100644 --- a/app/local_settings.py +++ b/app/local_settings.py @@ -1,16 +1,39 @@ from os import getenv +from urllib.parse import urlparse trues = ("t", "true", "yes", "y", "1") + +def _csv_env(name: str, default: str) -> list[str]: + return [item.strip() for item in getenv(name, default).split(",") if item.strip()] + + # URLs DOMAIN = getenv("DOMAIN", "example.com") URL = getenv("URL", f"https://{DOMAIN}") +URL_HOST = urlparse(URL).hostname or "" +DEBUG = getenv("DEBUG", "false").lower() in trues # Access control -ALLOWED_HOSTS = getenv("ALLOWED_HOSTS", f"localhost,{DOMAIN}").split(",") +ALLOWED_HOSTS = _csv_env( + "ALLOWED_HOSTS", + ",".join( + item + for item in ( + "localhost", + "127.0.0.1", + DOMAIN, + URL_HOST, + ) + if item + ), +) +if DEBUG: + # Local/dev stack runs behind varying hostnames/tunnels. + ALLOWED_HOSTS = ["*"] # CSRF -CSRF_TRUSTED_ORIGINS = getenv("CSRF_TRUSTED_ORIGINS", URL).split(",") +CSRF_TRUSTED_ORIGINS = _csv_env("CSRF_TRUSTED_ORIGINS", URL) # Stripe BILLING_ENABLED = getenv("BILLING_ENABLED", "false").lower() in trues @@ -23,7 +46,10 @@ STRIPE_PUBLIC_API_KEY_PROD = getenv("STRIPE_PUBLIC_API_KEY_PROD", "") STRIPE_ENDPOINT_SECRET = getenv("STRIPE_ENDPOINT_SECRET", "") STATIC_ROOT = getenv("STATIC_ROOT", "") -SECRET_KEY = getenv("SECRET_KEY", "") +SECRET_KEY = (getenv("SECRET_KEY", "") or "").strip() +if not SECRET_KEY: + # Keep local developer stacks usable when stack.env is uninitialized. + SECRET_KEY = "gia-dev-secret-key" STRIPE_ADMIN_COUPON = getenv("STRIPE_ADMIN_COUPON", "") @@ -33,7 +59,6 @@ LAGO_API_KEY = getenv("LAGO_API_KEY", "") LAGO_ORG_ID = getenv("LAGO_ORG_ID", "") LAGO_URL = getenv("LAGO_URL", "") -DEBUG = getenv("DEBUG", "false") in trues PROFILER = getenv("PROFILER", "false") in trues if DEBUG: @@ -62,7 +87,8 @@ INSTAGRAM_HTTP_URL = getenv("INSTAGRAM_HTTP_URL", "http://instagram:8080") XMPP_ADDRESS = getenv("XMPP_ADDRESS") XMPP_JID = getenv("XMPP_JID") -XMPP_PORT = getenv("XMPP_PORT") +XMPP_USER_DOMAIN = getenv("XMPP_USER_DOMAIN", "") +XMPP_PORT = int(getenv("XMPP_PORT", "8888") or 8888) XMPP_SECRET = getenv("XMPP_SECRET") EVENT_LEDGER_DUAL_WRITE = getenv("EVENT_LEDGER_DUAL_WRITE", "false").lower() in trues diff --git a/app/urls.py b/app/urls.py index 5390b03..f26942c 100644 --- a/app/urls.py +++ b/app/urls.py @@ -35,6 +35,7 @@ from core.views import ( osint, people, personas, + prosody, queues, sessions, signal, @@ -98,6 +99,11 @@ urlpatterns = [ system.MemorySearchQueryAPI.as_view(), name="system_memory_search_query", ), + path( + "internal/prosody/auth/", + prosody.ProsodyAuthBridge.as_view(), + name="prosody_auth_bridge", + ), path( "settings/command-routing/", automation.CommandRoutingSettings.as_view(), diff --git a/artifacts/mcp/manticore-mcp-server.md b/artifacts/mcp/manticore-mcp-server.md new file mode 100644 index 0000000..8bb53a2 --- /dev/null +++ b/artifacts/mcp/manticore-mcp-server.md @@ -0,0 +1,71 @@ +# Manticore MCP Server (GIA) + +This document describes the MCP server wired for task + memory operations in GIA. + +## Server entrypoint + +- Django management command: `python manage.py mcp_manticore_server` +- Python module: `core.mcp.server` +- Tool handlers: `core.mcp.tools` + +## Rust worker frontend (optional) + +For low-overhead direct Manticore operations, a Rust stdio MCP worker is included: + +- Project: `rust/manticore-mcp-worker` +- Build: `make mcp-rust-build` +- Binary: `rust/manticore-mcp-worker/target/release/manticore-mcp-worker` +- VS Code server name: `manticore-rust-worker` (disabled by default in `/code/xf/.vscode/mcp.json`) + +This worker exposes fast table/status/query/maintenance operations and can be enabled when you want a minimal MCP process in front of Manticore. + +## VS Code wiring + +Workspace config is in `/code/xf/.vscode/mcp.json`: + +- Server name: `manticore` +- Launch method: `podman exec -i ur_gia /venv/bin/python manage.py mcp_manticore_server` +- Forced env: + - `MEMORY_SEARCH_BACKEND=manticore` + +`MANTICORE_HTTP_URL` is inherited from container environment so each deployment can set the correct reachable address. + +This allows MCP tool calls from VS Code to run against the live GIA container without requiring local Django dependencies. + +## Implemented MCP tools + +- `manticore.status` +- `manticore.query` +- `manticore.reindex` +- `memory.list` +- `memory.propose` +- `memory.pending` +- `memory.review` +- `memory.suggest_from_messages` +- `tasks.list` +- `tasks.search` +- `tasks.get` +- `tasks.events` +- `tasks.create_note` +- `tasks.link_artifact` +- `wiki.create_article` +- `wiki.update_article` +- `wiki.list` +- `wiki.get` +- `project.get_guidelines` +- `project.get_layout` +- `project.get_runbook` +- `docs.append_run_note` + +`docs.append_run_note` appends markdown notes to `/tmp/gia-mcp-run-notes.md` by default (or a project path you pass explicitly). + +All MCP tool invocations are audit-logged in `core_mcptoolauditlog` (`MCPToolAuditLog` model). + +## Runtime notes + +1. Ensure GIA services are running (`make run`). +2. Start Manticore container: + - `./utilities/memory/manage_manticore_container.sh up` +3. Optional initial index: + - `podman exec ur_gia /venv/bin/python manage.py memory_search_reindex --user-id --statuses active` +4. In VS Code, approve/enabled the workspace MCP server when prompted. diff --git a/artifacts/plans/11-personal-ai-memory.md b/artifacts/plans/11-personal-ai-memory.md deleted file mode 100644 index 9be95f2..0000000 --- a/artifacts/plans/11-personal-ai-memory.md +++ /dev/null @@ -1,28 +0,0 @@ -# Feature Plan: Personal AI Memory (Per Person) - -## Goal -Store and manage long-lived person-specific memory for better continuity and assistant quality. - -## Why This Fits GIA -- Person-centric data model already exists. -- Current approvals pattern can gate memory writes. - -## Scope -- Memory entries: preferences, commitments, facts, communication style. -- Confidence/expiry fields and provenance links. -- Approval-required writes with edit/delete controls. - -## Implementation -1. Add memory model linked to `Person` with source references. -2. Add extraction pipeline (suggested memory from messages). -3. Add approval queue for memory create/update/delete. -4. Add retrieval service for compose/AI workspace prompts. -5. Add memory hygiene jobs: stale decay, contradiction detection. - -## Acceptance Criteria -- Every memory has provenance and last-verified timestamp. -- Unapproved memory never influences generated output. -- Users can inspect, edit, and purge memory entries. - -## Out of Scope -- Cross-user shared memory graph. diff --git a/artifacts/plans/12-mcp-server-for-tasks-and-knowledge.md b/artifacts/plans/12-mcp-server-for-tasks-and-knowledge.md deleted file mode 100644 index 76e381d..0000000 --- a/artifacts/plans/12-mcp-server-for-tasks-and-knowledge.md +++ /dev/null @@ -1,60 +0,0 @@ -# Feature Plan: MCP Server for Tasks + Wiki/Knowledgebase - -## Goal -Create an MCP server that allows agents to: -- read/write task context, -- create/update knowledgebase/wiki artifacts during task execution, -- retrieve coding guidelines/project layout for continuity between runs. - -## Why This Fits GIA -- Tasks, approvals, and command-driven automation already exist. -- This provides durable agent memory and operator visibility of "what changed, why, and how to use it". - -## Scope -- MCP server with authenticated tools for: - - `tasks.list`, `tasks.get`, `tasks.search`, `tasks.events` - - `tasks.create_note`, `tasks.link_artifact` - - `wiki.create_article`, `wiki.update_article`, `wiki.list`, `wiki.get` - - `project.get_layout`, `project.get_guidelines`, `project.get_runbook` -- Permission model tied to user and chat/project scope. -- Audit log for all MCP tool calls. - -## Proposed Data Additions -- `KnowledgeArticle` (title, slug, markdown, tags, status, owner, related_task). -- `KnowledgeRevision` (article, revision, author/tool, diff, created_at). -- Optional `TaskArtifactLink` (task, kind, uri/path, summary). - -## Implementation -1. Build MCP server process (Python) with JSON-RPC transport and token auth. -2. Implement task read tools against existing task models/views. -3. Implement wiki CRUD tools with revision history. -4. Implement project context tools that read: - - `AGENTS.md`, - - coding standards docs, - - key architecture docs. -5. Add agent-run convention: - - on task start: fetch task + related wiki + guidelines, - - during run: append execution notes, - - on completion: publish "what was done / how to use" article and link to task. -6. Add web UI page for knowledge articles and task-linked docs. -7. Add approvals for destructive knowledge actions (delete/overwrite). - -## Acceptance Criteria -- Agent can fetch full task context in one MCP call sequence. -- Agent can publish/update wiki pages tied to tasks. -- Operators can open a task and see linked implementation notes + usage docs. -- MCP actions are fully auditable and scoped by user permissions. - -## Security and Guardrails -- Tool-level RBAC and per-user scoping. -- Redact secrets from returned context. -- Rate limits and request signing for external agent clients. - -## Rollout -1. Read-only task tools. -2. Wiki write tools with revisioning. -3. Task artifact linking + UI surfaces. -4. Agent workflow templates and docs. - -## Out of Scope -- Autonomous code execution from MCP itself. diff --git a/artifacts/plans/13-edit-delete.md b/artifacts/plans/13-edit-delete.md deleted file mode 100644 index b1ded99..0000000 --- a/artifacts/plans/13-edit-delete.md +++ /dev/null @@ -1,16 +0,0 @@ -Perfect, so it all works? - -the message saying "the recipient does the same" has been reacted to with a heart but it is not shown in web compose - -I also sent an erronrous message, a literal reply to a message that i said i would react to with a heart. the message contained a heart emoji, so it is a reply with a heart and not a reaction - -after some confusion I deleted this message - -can deleted messages be noted and collected for storage in a deleted message tab in compose that lists what each recipient deleted and when - -ensure message edit history is shown, and preserved if the message is deleted, seamlessly reusing the navigation code to preserve a unified interface - -work on implementing edit message tracking and delete message indications - -consider how to implement - diff --git a/artifacts/plans/14-security-audit.md b/artifacts/plans/14-security-audit.md deleted file mode 100644 index 9861cc6..0000000 --- a/artifacts/plans/14-security-audit.md +++ /dev/null @@ -1,2 +0,0 @@ -# 14) Run security audit using artifacts/1-initial.json. Generated using ship-safe. -https://github.com/asamassekou10/ship-safe \ No newline at end of file diff --git a/artifacts/plans/15-simplify-task-settings-and-more.md b/artifacts/plans/15-simplify-task-settings-and-more.md deleted file mode 100644 index c5c6cc6..0000000 --- a/artifacts/plans/15-simplify-task-settings-and-more.md +++ /dev/null @@ -1,19 +0,0 @@ - -No Tasks Yet - -This group has no derived tasks yet. To start populating this view: - - Open Task Settings and confirm this chat is mapped under Group Mapping. - Send task-like messages in this group, for example: task: ship v1, todo: write tests, please review PR. - Mark completion explicitly with a phrase + reference, for example: done #12, completed #12, fixed #12. - Refresh this page; new derived tasks and events should appear automatically. - - - -task settings sound complicated, make them simpler - --- - -# https://gia.zm.is/settings/system/ -assume the user cannot access the log -Use a trace id from the dropdown (recent traces), Event Ledger Smoke `sample[].trace_id`, or UR logs. \ No newline at end of file diff --git a/artifacts/plans/16-agent-knowledge-memory-foundation.md b/artifacts/plans/16-agent-knowledge-memory-foundation.md deleted file mode 100644 index 39a2db3..0000000 --- a/artifacts/plans/16-agent-knowledge-memory-foundation.md +++ /dev/null @@ -1,34 +0,0 @@ -# Feature Plan: Agent Knowledge Memory Foundation (Pre-11/12) - -## Goal -Establish a scalable, queryable memory substrate so wiki and MCP features can rely on fast retrieval instead of markdown-file scans. - -## Why This Comes Before 11/12 -- Plan 11 (personal memory) needs performant retrieval and indexing guarantees. -- Plan 12 (MCP wiki/tools) needs a stable backend abstraction independent of UI and tool transport. - -## Scope -- Pluggable memory search backend interface. -- Default Django backend for zero-infra operation. -- Optional Manticore backend for scalable full-text/vector-ready indexing. -- Reindex + query operational commands. -- System diagnostics endpoints for backend status and query inspection. - -## Implementation Slice -1. Add `core/memory/search_backend.py` abstraction and backends. -2. Add `memory_search_reindex` and `memory_search_query` management commands. -3. Add system APIs: - - backend status - - memory query -4. Add lightweight Podman utility script for Manticore runtime. -5. Add tests for diagnostics and query behavior. - -## Acceptance Criteria -- Memory retrieval works with `MEMORY_SEARCH_BACKEND=django` out of the box. -- Switching to `MEMORY_SEARCH_BACKEND=manticore` requires only env/config + container startup. -- Operators can verify backend health and query output from system settings. - -## Out of Scope -- Full wiki article model/UI. -- Full MCP server process/tooling. -- Embedding generation pipeline (next slice after backend foundation). diff --git a/artifacts/plans/16-memory-backend-evaluation.md b/artifacts/plans/16-memory-backend-evaluation.md deleted file mode 100644 index 8295d23..0000000 --- a/artifacts/plans/16-memory-backend-evaluation.md +++ /dev/null @@ -1,25 +0,0 @@ -# Memory Backend Evaluation: Manticore vs Alternatives - -## Decision Summary -- **Recommended now:** Manticore for indexed text retrieval and future vector layering. -- **Default fallback:** Django/ORM backend for zero-infra environments. -- **Revisit later:** dedicated vector DB only if recall quality or ANN latency requires it. - -## Why Manticore Fits This Stage -- Already present in adjacent infra and codebase history. -- Runs well as a small standalone container with low operational complexity. -- Supports SQL-like querying and fast full-text retrieval for agent memory/wiki content. -- Lets us keep one retrieval abstraction while deferring embedding complexity. - -## Tradeoff Notes -- Manticore-first gives immediate performance over markdown scans. -- For advanced ANN/vector-only workloads, Qdrant/pgvector/Weaviate may outperform with less custom shaping. -- A hybrid approach remains possible: - - Manticore for lexical + metadata filtering, - - optional vector store for semantic recall. - -## Practical Rollout -1. Start with `MEMORY_SEARCH_BACKEND=django` and verify API/command workflows. -2. Start Manticore container and switch to `MEMORY_SEARCH_BACKEND=manticore`. -3. Run reindex and validate query latency/quality on real agent workflows. -4. Add embedding pipeline only after baseline lexical retrieval is stable. diff --git a/artifacts/plans/17-person-enrichment-without-llm.md b/artifacts/plans/17-person-enrichment-without-llm.md new file mode 100644 index 0000000..0decae6 --- /dev/null +++ b/artifacts/plans/17-person-enrichment-without-llm.md @@ -0,0 +1,95 @@ +# Feature Plan: Person Model Enrichment (Non-LLM First) + +## Goal +Populate `Person` fields from existing message history without spending OpenAI tokens by default: +- `summary` +- `profile` +- `revealed` +- `likes` +- `dislikes` +- `sentiment` +- `timezone` +- `last_interaction` + +## Problem We Are Solving +- We have high-volume message data but limited durable person intelligence. +- LLM analysis is expensive for continuous/background processing. +- We need fast, deterministic extraction first, with optional semantic ranking. + +## Design Decisions +1. Config scope: + - global defaults + - optional group-level overrides + - per-user overrides +2. Resolution order: + - `user > group > global` +3. Global toggle: + - hard kill-switch (`PERSON_ENRICHMENT_ENABLED`) +4. Per-user/group controls: + - enable/disable enrichment + - write mode (`proposal_required` or `direct`) + - confidence threshold + - max messages scanned per run + - semantic-ranking toggle + +## Proposed Data Additions +- `PersonEnrichmentSettings`: + - scope fields (`user`, optional `group`) + - toggle/threshold/runtime limits +- `PersonSignal`: + - normalized extracted clue + - source references (message ids/events) + - confidence and detector name +- `PersonUpdateProposal`: + - pending/approved/rejected person field updates + - reason and provenance +- Optional `PersonFieldRevision`: + - before/after snapshots for auditability + +## Processing Flow +1. Select message window: + - recent inbound/outbound messages per person/service + - bounded by configurable caps +2. Fast extraction: + - deterministic rules/regex for: + - timezone cues + - explicit likes/dislikes + - self-revealed facts + - interaction-derived sentiment hints +3. Semantic ranking (optional): + - use Manticore-backed similarity search for classifier labels + - rank candidate signals; do not call OpenAI in default path +4. Signal aggregation: + - merge repeated evidence + - decay stale evidence + - detect contradictions +5. Apply update: + - `proposal_required`: create `PersonUpdateProposal` + - `direct`: write only above confidence threshold and with no conflict +6. Persist audit trail: + - record detector/classifier source and exact message provenance + +## Field-Specific Policy +- `summary/profile`: generated from stable high-confidence aggregates only. +- `revealed`: only explicit self-disclosures. +- `likes/dislikes`: require explicit statement or repeated pattern. +- `sentiment`: rolling value with recency decay; never absolute truth label. +- `timezone`: explicit declaration preferred; behavioral inference secondary. +- `last_interaction`: deterministic from most recent message timestamps. + +## Rollout +1. Schema and settings models. +2. Deterministic extractor pipeline and commands. +3. Proposal queue + review flow. +4. Optional Manticore semantic ranking layer. +5. Backfill job for existing persons with safe rate limits. + +## Acceptance Criteria +- Default enrichment path runs with zero OpenAI usage. +- Person updates are traceable to concrete message evidence. +- Config hierarchy behaves predictably (`user > group > global`). +- Operators can switch between proposal and direct write modes per scope. + +## Out of Scope +- Cross-user shared person graph. +- Autonomous LLM-generated profile writing as default. diff --git a/core/clients/signal.py b/core/clients/signal.py index d4ccc57..d1361db 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -193,6 +193,85 @@ def _extract_signal_reaction(envelope): } +def _extract_signal_edit(envelope): + paths = [ + ("dataMessage", "editMessage"), + ("syncMessage", "sentMessage", "editMessage"), + ("syncMessage", "editMessage"), + ] + node = None + for path in paths: + candidate = _get_nested(envelope, path) + if isinstance(candidate, dict): + node = candidate + break + if not isinstance(node, dict): + return None + + target_ts = node.get("targetSentTimestamp") + if target_ts is None: + target_ts = node.get("targetTimestamp") + if target_ts is None: + target_ts = node.get("targetTs") + try: + target_ts = int(target_ts) + except Exception: + target_ts = 0 + if target_ts <= 0: + return None + + data_message = node.get("dataMessage") or node.get("message") or {} + new_text = "" + if isinstance(data_message, dict): + for key in ("message", "text", "body", "caption"): + value = str(data_message.get(key) or "").strip() + if value: + new_text = value + break + if not new_text: + new_text = str(node.get("message") or "").strip() + if not new_text: + return None + + return { + "target_ts": target_ts, + "new_text": new_text, + "raw": dict(node), + } + + +def _extract_signal_delete(envelope): + paths = [ + ("dataMessage", "delete"), + ("dataMessage", "remoteDelete"), + ("syncMessage", "sentMessage", "delete"), + ("syncMessage", "delete"), + ] + node = None + for path in paths: + candidate = _get_nested(envelope, path) + if isinstance(candidate, dict): + node = candidate + break + if not isinstance(node, dict): + return None + target_ts = node.get("targetSentTimestamp") + if target_ts is None: + target_ts = node.get("targetTimestamp") + if target_ts is None: + target_ts = node.get("targetTs") + try: + target_ts = int(target_ts) + except Exception: + target_ts = 0 + if target_ts <= 0: + return None + return { + "target_ts": target_ts, + "raw": dict(node), + } + + def _extract_signal_text(raw_payload, default_text=""): text = str(default_text or "").strip() if text: @@ -1299,6 +1378,8 @@ class SignalClient(ClientBase): destination_number, ) reaction_payload = _extract_signal_reaction(envelope) + edit_payload = _extract_signal_edit(envelope) + delete_payload = _extract_signal_delete(envelope) if identifiers and isinstance(reaction_payload, dict): source_uuid = str( envelope.get("sourceUuid") or envelope.get("source") or "" @@ -1343,6 +1424,61 @@ class SignalClient(ClientBase): self.log.warning( "signal raw sync reaction relay to XMPP failed: %s", exc ) + if identifiers and isinstance(edit_payload, dict): + source_uuid = str( + envelope.get("sourceUuid") or envelope.get("source") or "" + ).strip() + source_number = str(envelope.get("sourceNumber") or "").strip() + for identifier in identifiers: + try: + await history.apply_message_edit( + identifier.user, + identifier, + target_message_id="", + target_ts=int(edit_payload.get("target_ts") or 0), + new_text=str(edit_payload.get("new_text") or ""), + source_service="signal", + actor=(source_uuid or source_number or ""), + payload=edit_payload.get("raw") or {}, + ) + except Exception as exc: + self.log.warning( + "signal raw sync edit history apply failed: %s", exc + ) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + return + if identifiers and isinstance(delete_payload, dict): + source_uuid = str( + envelope.get("sourceUuid") or envelope.get("source") or "" + ).strip() + source_number = str(envelope.get("sourceNumber") or "").strip() + for identifier in identifiers: + try: + await history.apply_message_delete( + identifier.user, + identifier, + target_message_id="", + target_ts=int(delete_payload.get("target_ts") or 0), + source_service="signal", + actor=(source_uuid or source_number or ""), + payload=delete_payload.get("raw") or {}, + ) + except Exception as exc: + self.log.warning( + "signal raw sync delete history apply failed: %s", exc + ) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + return if identifiers and text: ts_raw = ( sync_sent_message.get("timestamp") @@ -1427,8 +1563,14 @@ class SignalClient(ClientBase): identifiers = await self._resolve_signal_identifiers(source_uuid, source_number) reaction_payload = _extract_signal_reaction(envelope) - if (not identifiers) and isinstance(reaction_payload, dict): - # Sync reactions from our own linked device can arrive with source=our + edit_payload = _extract_signal_edit(envelope) + delete_payload = _extract_signal_delete(envelope) + if (not identifiers) and ( + isinstance(reaction_payload, dict) + or isinstance(edit_payload, dict) + or isinstance(delete_payload, dict) + ): + # Sync events from our own linked device can arrive with source=our # account and destination=. Resolve by destination as fallback. destination_uuid = str( envelope.get("destinationServiceId") @@ -1497,6 +1639,49 @@ class SignalClient(ClientBase): last_inbound_exception_message="", ) return + if isinstance(edit_payload, dict): + for identifier in identifiers: + try: + await history.apply_message_edit( + identifier.user, + identifier, + target_message_id="", + target_ts=int(edit_payload.get("target_ts") or 0), + new_text=str(edit_payload.get("new_text") or ""), + source_service="signal", + actor=(source_uuid or source_number or ""), + payload=edit_payload.get("raw") or {}, + ) + except Exception as exc: + self.log.warning("signal raw edit history apply failed: %s", exc) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + return + if isinstance(delete_payload, dict): + for identifier in identifiers: + try: + await history.apply_message_delete( + identifier.user, + identifier, + target_message_id="", + target_ts=int(delete_payload.get("target_ts") or 0), + source_service="signal", + actor=(source_uuid or source_number or ""), + payload=delete_payload.get("raw") or {}, + ) + except Exception as exc: + self.log.warning("signal raw delete history apply failed: %s", exc) + transport.update_runtime_state( + self.service, + last_inbound_ok_ts=int(time.time() * 1000), + last_inbound_exception_type="", + last_inbound_exception_message="", + ) + return text = _extract_signal_text(payload, str(data_message.get("message") or "").strip()) if not text: diff --git a/core/clients/transport.py b/core/clients/transport.py index d2f6044..6eb4e6d 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -776,6 +776,14 @@ async def send_message_raw( Unified outbound send path used by models/views/UR. """ service_key = _service_key(service) + if _capability_checks_enabled() and not supports(service_key, "send"): + reason = unsupported_reason(service_key, "send") + log.warning( + "capability-check failed service=%s feature=send: %s", + service_key, + reason, + ) + return False if service_key == "signal": prepared_attachments = await prepare_outbound_attachments( service_key, attachments or [] diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py index e538d6c..f7cd76b 100644 --- a/core/clients/xmpp.py +++ b/core/clients/xmpp.py @@ -141,10 +141,14 @@ class XMPPComponent(ComponentXMPP): self._reconnect_task = None self._reconnect_delay_seconds = 1.0 self._reconnect_delay_max_seconds = 30.0 + self._connect_inflight = False + self._session_live = False self.log = logs.get_logger("XMPP") super().__init__(jid, secret, server, port) + # Use one reconnect strategy (our backoff loop) to avoid reconnect churn. + self.auto_reconnect = False # Register chat state plugins register_stanza_plugin(Message, Active) register_stanza_plugin(Message, Composing) @@ -178,6 +182,21 @@ class XMPPComponent(ComponentXMPP): self.add_event_handler("chatstate_inactive", self.on_chatstate_inactive) self.add_event_handler("chatstate_gone", self.on_chatstate_gone) + def _user_xmpp_domain(self): + domain = str(getattr(settings, "XMPP_USER_DOMAIN", "") or "").strip() + if domain: + return domain + component_jid = str(getattr(settings, "XMPP_JID", "") or "").strip() + if "." in component_jid: + return component_jid.split(".", 1)[1] + configured_domain = str(getattr(settings, "DOMAIN", "") or "").strip() + if configured_domain: + return configured_domain + return str(getattr(settings, "XMPP_ADDRESS", "") or "").strip() + + def _user_jid(self, username): + return f"{username}@{self._user_xmpp_domain()}" + async def enable_carbons(self): """Enable XMPP Message Carbons (XEP-0280)""" try: @@ -827,25 +846,33 @@ class XMPPComponent(ComponentXMPP): async def session_start(self, *args): self.log.info("XMPP session started") + self._session_live = True + self._connect_inflight = False self._reconnect_delay_seconds = 1.0 if self._reconnect_task and not self._reconnect_task.done(): self._reconnect_task.cancel() self._reconnect_task = None - await self.enable_carbons() + # This client connects as an external component, not a user client; + # XEP-0280 (carbons) is client-scoped and not valid here. + self.log.debug("Skipping carbons enable for component session") async def _reconnect_loop(self): try: while True: delay = float(self._reconnect_delay_seconds) await asyncio.sleep(delay) + if self._session_live or self._connect_inflight: + return try: self.log.info("XMPP reconnect attempt delay_s=%.1f", delay) + self._connect_inflight = True connected = self.connect() if connected is False: raise RuntimeError("connect returned false") return except Exception as exc: self.log.warning("XMPP reconnect attempt failed: %s", exc) + self._connect_inflight = False self._reconnect_delay_seconds = min( self._reconnect_delay_max_seconds, max(1.0, float(self._reconnect_delay_seconds) * 2.0), @@ -853,6 +880,8 @@ class XMPPComponent(ComponentXMPP): except asyncio.CancelledError: return finally: + if not self._session_live: + self._connect_inflight = False self._reconnect_task = None def _schedule_reconnect(self): @@ -864,6 +893,8 @@ class XMPPComponent(ComponentXMPP): """ Handles XMPP disconnection and triggers a reconnect loop. """ + self._session_live = False + self._connect_inflight = False self.log.warning( "XMPP disconnected, scheduling reconnect attempt in %.1fs", float(self._reconnect_delay_seconds), @@ -1576,7 +1607,7 @@ class XMPPComponent(ComponentXMPP): f"{person_identifier.person.name.lower()}|" f"{person_identifier.service}@{settings.XMPP_JID}" ) - recipient_jid = f"{user.username}@{settings.XMPP_ADDRESS}" + recipient_jid = self._user_jid(user.username) await self.send_xmpp_reaction( recipient_jid, sender_jid, @@ -1625,7 +1656,7 @@ class XMPPComponent(ComponentXMPP): f"{person_identifier.person.name.lower()}|" f"{person_identifier.service}@{settings.XMPP_JID}" ) - recipient_jid = f"{user.username}@{settings.XMPP_ADDRESS}" + recipient_jid = self._user_jid(user.username) await self.send_chat_state(recipient_jid, sender_jid, started) async def send_from_external( @@ -1640,7 +1671,7 @@ class XMPPComponent(ComponentXMPP): """Handles sending XMPP messages with text and attachments.""" sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" - recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" + recipient_jid = self._user_jid(person_identifier.user.username) if is_outgoing_message: xmpp_id = await self.send_xmpp_message( recipient_jid, @@ -1767,22 +1798,45 @@ class XMPPComponent(ComponentXMPP): class XMPPClient(ClientBase): def __init__(self, ur, *args, **kwargs): super().__init__(ur, *args, **kwargs) - self.client = XMPPComponent( - ur, - jid=settings.XMPP_JID, - secret=settings.XMPP_SECRET, - server=settings.XMPP_ADDRESS, - port=settings.XMPP_PORT, - ) + self._enabled = True + self.client = None + jid = str(getattr(settings, "XMPP_JID", "") or "").strip() + secret = str(getattr(settings, "XMPP_SECRET", "") or "").strip() + server = str(getattr(settings, "XMPP_ADDRESS", "") or "").strip() + port = int(getattr(settings, "XMPP_PORT", 8888) or 8888) + missing = [] + if not jid: + missing.append("XMPP_JID") + if not secret: + missing.append("XMPP_SECRET") + if not server: + missing.append("XMPP_ADDRESS") + if missing: + self._enabled = False + self.log.warning( + "XMPP client disabled due to missing configuration: %s", + ", ".join(missing), + ) - self.client.register_plugin("xep_0030") # Service Discovery - self.client.register_plugin("xep_0004") # Data Forms - self.client.register_plugin("xep_0060") # PubSub - self.client.register_plugin("xep_0199") # XMPP Ping - self.client.register_plugin("xep_0085") # Chat State Notifications - self.client.register_plugin("xep_0363") # HTTP File Upload + if self._enabled: + self.client = XMPPComponent( + ur, + jid=jid, + secret=secret, + server=server, + port=port, + ) + + self.client.register_plugin("xep_0030") # Service Discovery + self.client.register_plugin("xep_0004") # Data Forms + self.client.register_plugin("xep_0060") # PubSub + self.client.register_plugin("xep_0199") # XMPP Ping + self.client.register_plugin("xep_0085") # Chat State Notifications + self.client.register_plugin("xep_0363") # HTTP File Upload def start(self): + if not self._enabled or self.client is None: + return self.log.info("XMPP client starting...") # ensure slixmpp uses the same asyncio loop as the router @@ -1791,7 +1845,11 @@ class XMPPClient(ClientBase): self.client.connect() async def start_typing_for_person(self, user, person_identifier): + if self.client is None: + return await self.client.send_typing_for_person(user, person_identifier, True) async def stop_typing_for_person(self, user, person_identifier): + if self.client is None: + return await self.client.send_typing_for_person(user, person_identifier, False) diff --git a/core/management/commands/mcp_manticore_server.py b/core/management/commands/mcp_manticore_server.py new file mode 100644 index 0000000..9d14a3a --- /dev/null +++ b/core/management/commands/mcp_manticore_server.py @@ -0,0 +1,11 @@ +from django.core.management.base import BaseCommand + +from core.mcp.server import run_stdio_server + + +class Command(BaseCommand): + help = "Run GIA MCP stdio server with manticore/task/documentation tools." + + def handle(self, *args, **options): + _ = args, options + run_stdio_server() diff --git a/core/management/commands/memory_hygiene.py b/core/management/commands/memory_hygiene.py new file mode 100644 index 0000000..a925b4f --- /dev/null +++ b/core/management/commands/memory_hygiene.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import json + +from django.core.management.base import BaseCommand + +from core.memory.pipeline import run_memory_hygiene + + +class Command(BaseCommand): + help = "Run memory hygiene checks (stale decay + contradiction queueing)." + + def add_arguments(self, parser): + parser.add_argument("--user-id", default="") + parser.add_argument("--dry-run", action="store_true", default=False) + parser.add_argument("--json", action="store_true", default=False) + + def handle(self, *args, **options): + user_id_raw = str(options.get("user_id") or "").strip() + dry_run = bool(options.get("dry_run")) + as_json = bool(options.get("json")) + user_id = int(user_id_raw) if user_id_raw else None + + result = run_memory_hygiene(user_id=user_id, dry_run=dry_run) + payload = { + "user_id": user_id, + "dry_run": dry_run, + "result": result, + } + if as_json: + self.stdout.write(json.dumps(payload, indent=2, sort_keys=True)) + return + self.stdout.write( + "memory-hygiene " + f"user={user_id if user_id is not None else '-'} " + f"dry_run={'yes' if dry_run else 'no'} " + f"expired={int(result.get('expired') or 0)} " + f"contradictions={int(result.get('contradictions') or 0)} " + f"queued={int(result.get('queued_requests') or 0)}" + ) diff --git a/core/management/commands/memory_suggest_from_messages.py b/core/management/commands/memory_suggest_from_messages.py new file mode 100644 index 0000000..63ef03c --- /dev/null +++ b/core/management/commands/memory_suggest_from_messages.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import json + +from django.core.management.base import BaseCommand, CommandError + +from core.memory.pipeline import suggest_memories_from_recent_messages + + +class Command(BaseCommand): + help = "Suggest proposed MemoryItem rows from recent inbound message text." + + def add_arguments(self, parser): + parser.add_argument("--user-id", required=True) + parser.add_argument("--limit-messages", type=int, default=300) + parser.add_argument("--max-items", type=int, default=30) + parser.add_argument("--json", action="store_true", default=False) + + def handle(self, *args, **options): + user_id_raw = str(options.get("user_id") or "").strip() + if not user_id_raw: + raise CommandError("--user-id is required") + limit_messages = max(1, int(options.get("limit_messages") or 300)) + max_items = max(1, int(options.get("max_items") or 30)) + as_json = bool(options.get("json")) + + result = suggest_memories_from_recent_messages( + user_id=int(user_id_raw), + limit_messages=limit_messages, + max_items=max_items, + ) + payload = { + "user_id": int(user_id_raw), + "limit_messages": limit_messages, + "max_items": max_items, + "result": result, + } + if as_json: + self.stdout.write(json.dumps(payload, indent=2, sort_keys=True)) + return + self.stdout.write( + "memory-suggest-from-messages " + f"user={payload['user_id']} " + f"scanned={int(result.get('scanned') or 0)} " + f"queued={int(result.get('queued') or 0)}" + ) diff --git a/core/mcp/__init__.py b/core/mcp/__init__.py new file mode 100644 index 0000000..797ac60 --- /dev/null +++ b/core/mcp/__init__.py @@ -0,0 +1,3 @@ +from .server import run_stdio_server + +__all__ = ["run_stdio_server"] diff --git a/core/mcp/server.py b/core/mcp/server.py new file mode 100644 index 0000000..903198b --- /dev/null +++ b/core/mcp/server.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import json +import os +import sys +from typing import Any + +import django + +from core.mcp.tools import execute_tool, format_tool_content, tool_specs +from core.util import logs + +log = logs.get_logger("mcp-server") + +_compat_newline_mode = False + + +def _setup_django() -> None: + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings") + django.setup() + + +def _response(msg_id: Any, result: dict[str, Any]) -> dict[str, Any]: + return {"jsonrpc": "2.0", "id": msg_id, "result": result} + + +def _error(msg_id: Any, code: int, message: str) -> dict[str, Any]: + return {"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}} + + +def _read_message() -> dict[str, Any] | None: + global _compat_newline_mode + headers: dict[str, str] = {} + pending_body = b"" + while True: + line = sys.stdin.buffer.readline() + if not line: + return None + if not headers and line.lstrip().startswith((b"{", b"[")): + _compat_newline_mode = True + return json.loads(line.decode("utf-8").strip()) + + sep = None + if b"\r\n\r\n" in line: + sep = b"\r\n\r\n" + elif b"\n\n" in line: + sep = b"\n\n" + if sep is not None: + header_line, tail = line.split(sep, 1) + pending_body = tail + else: + header_line = line + + if header_line in (b"\r\n", b"\n"): + break + + decoded = header_line.decode("utf-8").strip() + if ":" in decoded: + key, value = decoded.split(":", 1) + headers[key.strip().lower()] = value.strip() + if sep is not None: + break + + length_raw = headers.get("content-length") + if not length_raw: + if not pending_body: + pending_body = sys.stdin.buffer.readline() + if not pending_body: + return None + _compat_newline_mode = True + return json.loads(pending_body.decode("utf-8").strip()) + + length = int(length_raw) + body = pending_body + if len(body) < length: + body += sys.stdin.buffer.read(length - len(body)) + body = body[:length] + if not body: + return None + return json.loads(body.decode("utf-8")) + + +def _write_message(payload: dict[str, Any]) -> None: + raw_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) + if _compat_newline_mode: + sys.stdout.buffer.write((raw_json + "\n").encode("utf-8")) + else: + raw = raw_json.encode("utf-8") + sys.stdout.buffer.write(f"Content-Length: {len(raw)}\r\n\r\n".encode("utf-8")) + sys.stdout.buffer.write(raw) + sys.stdout.buffer.flush() + + +def _handle_message(message: dict[str, Any]) -> dict[str, Any] | None: + msg_id = message.get("id") + method = str(message.get("method") or "") + params = message.get("params") or {} + + if method == "notifications/initialized": + return None + if method == "initialize": + return _response( + msg_id, + { + "protocolVersion": "2025-06-18", + "serverInfo": {"name": "gia-manticore-mcp", "version": "0.1.0"}, + "capabilities": {"tools": {}}, + }, + ) + if method == "ping": + return _response(msg_id, {}) + if method == "tools/list": + return _response(msg_id, {"tools": tool_specs()}) + if method == "tools/call": + name = str(params.get("name") or "").strip() + arguments = params.get("arguments") or {} + try: + payload = execute_tool(name, arguments) + return _response(msg_id, format_tool_content(payload)) + except Exception as exc: + log.warning("tool call failed name=%s err=%s", name, exc) + return _response( + msg_id, + { + "isError": True, + "content": [{"type": "text", "text": json.dumps({"error": str(exc)})}], + }, + ) + + return _error(msg_id, -32601, f"Method not found: {method}") + + +def run_stdio_server() -> None: + _setup_django() + while True: + message = _read_message() + if message is None: + return + try: + response = _handle_message(message) + if response is not None: + _write_message(response) + except Exception as exc: + msg_id = message.get("id") + _write_message(_error(msg_id, -32000, str(exc))) + + +if __name__ == "__main__": + run_stdio_server() diff --git a/core/mcp/tools.py b/core/mcp/tools.py new file mode 100644 index 0000000..54973cf --- /dev/null +++ b/core/mcp/tools.py @@ -0,0 +1,1220 @@ +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Any + +from django.conf import settings +from django.db.models import Q +from django.utils import timezone +from django.utils.dateparse import parse_datetime +from django.utils.text import slugify + +from core.memory.pipeline import ( + create_memory_change_request, + review_memory_change_request, + suggest_memories_from_recent_messages, +) +from core.memory.retrieval import retrieve_memories_for_prompt +from core.memory.search_backend import backend_status, get_memory_search_backend +from core.models import ( + DerivedTask, + DerivedTaskEvent, + KnowledgeArticle, + KnowledgeRevision, + MCPToolAuditLog, + MemoryChangeRequest, + MemoryItem, + TaskArtifactLink, + User, + WorkspaceConversation, +) +from core.util import logs + +log = logs.get_logger("mcp-tools") + + +def _safe_limit(value: Any, default: int, low: int, high: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + parsed = default + return max(low, min(high, parsed)) + + +def _coerce_statuses( + value: Any, + default: tuple[str, ...] = ("active",), +) -> tuple[str, ...]: + if isinstance(value, (list, tuple, set)): + statuses = [str(item or "").strip().lower() for item in value] + else: + statuses = [item.strip().lower() for item in str(value or "").split(",")] + cleaned = tuple(item for item in statuses if item) + return cleaned or default + + +def _coerce_tags(value: Any) -> list[str]: + if isinstance(value, (list, tuple, set)): + tags = [str(item or "").strip() for item in value] + else: + tags = [item.strip() for item in str(value or "").split(",")] + seen = set() + ordered: list[str] = [] + for tag in tags: + if not tag: + continue + key = tag.lower() + if key in seen: + continue + seen.add(key) + ordered.append(tag) + return ordered + + +def _as_iso(value: Any) -> str: + return value.isoformat() if value else "" + + +def _task_payload(task: DerivedTask) -> dict[str, Any]: + return { + "id": str(task.id), + "title": str(task.title or ""), + "status_snapshot": str(task.status_snapshot or ""), + "reference_code": str(task.reference_code or ""), + "external_key": str(task.external_key or ""), + "source_service": str(task.source_service or ""), + "source_channel": str(task.source_channel or ""), + "project_id": str(task.project_id or ""), + "project_name": str(getattr(task.project, "name", "") or ""), + "epic_id": str(task.epic_id or ""), + "epic_name": str(getattr(task.epic, "name", "") or ""), + "created_at": _as_iso(task.created_at), + "immutable_payload": task.immutable_payload or {}, + } + + +def _event_payload(event: DerivedTaskEvent) -> dict[str, Any]: + return { + "id": str(event.id), + "task_id": str(event.task_id), + "event_type": str(event.event_type or ""), + "actor_identifier": str(event.actor_identifier or ""), + "source_message_id": str(event.source_message_id or ""), + "payload": event.payload or {}, + "created_at": _as_iso(event.created_at), + } + + +def _artifact_payload(link: TaskArtifactLink) -> dict[str, Any]: + return { + "id": str(link.id), + "task_id": str(link.task_id), + "kind": str(link.kind or ""), + "uri": str(link.uri or ""), + "path": str(link.path or ""), + "summary": str(link.summary or ""), + "created_by_identifier": str(link.created_by_identifier or ""), + "created_at": _as_iso(link.created_at), + } + + +def _article_payload(article: KnowledgeArticle) -> dict[str, Any]: + return { + "id": str(article.id), + "user_id": int(article.user_id), + "related_task_id": str(article.related_task_id or ""), + "title": str(article.title or ""), + "slug": str(article.slug or ""), + "markdown": str(article.markdown or ""), + "tags": list(article.tags or []), + "status": str(article.status or ""), + "owner_identifier": str(article.owner_identifier or ""), + "created_at": _as_iso(article.created_at), + "updated_at": _as_iso(article.updated_at), + } + + +def _revision_payload(revision: KnowledgeRevision) -> dict[str, Any]: + return { + "id": str(revision.id), + "article_id": str(revision.article_id), + "revision": int(revision.revision), + "author_tool": str(revision.author_tool or ""), + "author_identifier": str(revision.author_identifier or ""), + "summary": str(revision.summary or ""), + "markdown": str(revision.markdown or ""), + "created_at": _as_iso(revision.created_at), + } + + +def _memory_change_payload(req: MemoryChangeRequest) -> dict[str, Any]: + return { + "id": str(req.id), + "user_id": int(req.user_id), + "memory_id": str(req.memory_id or ""), + "conversation_id": str(req.conversation_id or ""), + "person_id": str(req.person_id or ""), + "action": str(req.action or ""), + "status": str(req.status or ""), + "proposed_memory_kind": str(req.proposed_memory_kind or ""), + "proposed_content": req.proposed_content or {}, + "proposed_confidence_score": ( + float(req.proposed_confidence_score) + if req.proposed_confidence_score is not None + else None + ), + "proposed_expires_at": _as_iso(req.proposed_expires_at), + "reason": str(req.reason or ""), + "requested_by_identifier": str(req.requested_by_identifier or ""), + "reviewed_by_identifier": str(req.reviewed_by_identifier or ""), + "reviewed_at": _as_iso(req.reviewed_at), + "created_at": _as_iso(req.created_at), + "updated_at": _as_iso(req.updated_at), + } + + +def _parse_iso_datetime(value: Any) -> str: + raw = str(value or "").strip() + if not raw: + return "" + parsed = parse_datetime(raw) + if parsed is None: + raise ValueError("invalid ISO datetime") + if parsed.tzinfo is None: + parsed = timezone.make_aware(parsed, timezone.get_current_timezone()) + return parsed.isoformat() + + +def _resolve_task(arguments: dict[str, Any]) -> DerivedTask: + task_id = str(arguments.get("task_id") or "").strip() + if not task_id: + raise ValueError("task_id is required") + user_id_raw = str(arguments.get("user_id") or "").strip() + queryset = DerivedTask.objects.select_related("project", "epic") + if user_id_raw: + queryset = queryset.filter(user_id=int(user_id_raw)) + return queryset.get(id=task_id) + + +def _get_article_for_user(arguments: dict[str, Any]) -> KnowledgeArticle: + user_id = int(arguments.get("user_id")) + article_id = str(arguments.get("article_id") or "").strip() + slug = str(arguments.get("slug") or "").strip() + queryset = KnowledgeArticle.objects.filter(user_id=user_id) + if article_id: + return queryset.get(id=article_id) + if slug: + return queryset.get(slug=slug) + raise ValueError("article_id or slug is required") + + +def _next_unique_slug(*, user_id: int, requested_slug: str) -> str: + base = slugify(requested_slug)[:255].strip("-") + if not base: + raise ValueError("slug cannot be empty") + candidate = base + idx = 2 + while KnowledgeArticle.objects.filter(user_id=int(user_id), slug=candidate).exists(): + suffix = f"-{idx}" + candidate = f"{base[: max(1, 255 - len(suffix))]}{suffix}" + idx += 1 + return candidate + + +def _create_revision( + *, + article: KnowledgeArticle, + markdown: str, + author_tool: str, + author_identifier: str, + summary: str, +) -> KnowledgeRevision: + last = article.revisions.order_by("-revision").first() + revision_no = int(last.revision if last else 0) + 1 + return KnowledgeRevision.objects.create( + article=article, + revision=revision_no, + author_tool=str(author_tool or "mcp").strip(), + author_identifier=str(author_identifier or "").strip(), + summary=str(summary or "").strip(), + markdown=str(markdown or ""), + ) + + +def _preview_meta(payload: Any) -> dict[str, Any]: + if isinstance(payload, dict): + keys = list(payload.keys())[:24] + meta = {"keys": keys} + if "count" in payload: + meta["count"] = payload.get("count") + if "id" in payload: + meta["id"] = payload.get("id") + return meta + return {"preview": str(payload)[:500]} + + +def _audit_user_from_args(arguments: dict[str, Any]) -> User | None: + user_id_raw = str(arguments.get("user_id") or "").strip() + if not user_id_raw: + return None + try: + user_id = int(user_id_raw) + except ValueError: + return None + return User.objects.filter(id=user_id).first() + + +def tool_manticore_status(arguments: dict[str, Any]) -> dict[str, Any]: + _ = arguments + status = backend_status() + status["ts_ms"] = int(time.time() * 1000) + return status + + +def tool_manticore_query(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + query = str(arguments.get("query") or "").strip() + conversation_id = str(arguments.get("conversation_id") or "").strip() + limit = _safe_limit(arguments.get("limit"), default=20, low=1, high=100) + statuses = _coerce_statuses(arguments.get("statuses"), default=("active",)) + if not query: + raise ValueError("query is required") + + backend = get_memory_search_backend() + hits = backend.search( + user_id=user_id, + query=query, + conversation_id=conversation_id, + limit=limit, + include_statuses=statuses, + ) + return { + "backend": getattr(backend, "name", "unknown"), + "query": query, + "user_id": user_id, + "conversation_id": conversation_id, + "statuses": list(statuses), + "count": len(hits), + "hits": [ + { + "memory_id": item.memory_id, + "score": item.score, + "summary": item.summary, + "payload": item.payload, + } + for item in hits + ], + } + + +def tool_manticore_reindex(arguments: dict[str, Any]) -> dict[str, Any]: + user_id_raw = str(arguments.get("user_id") or "").strip() + user_id = int(user_id_raw) if user_id_raw else None + limit = _safe_limit(arguments.get("limit"), default=2000, low=1, high=20000) + statuses = _coerce_statuses(arguments.get("statuses"), default=("active",)) + backend = get_memory_search_backend() + result = backend.reindex(user_id=user_id, include_statuses=statuses, limit=limit) + return { + "backend": getattr(backend, "name", "unknown"), + "user_id": user_id, + "statuses": list(statuses), + "limit": limit, + "result": result, + } + + +def tool_memory_list(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + query = str(arguments.get("query") or "").strip() + person_id = str(arguments.get("person_id") or "").strip() + conversation_id = str(arguments.get("conversation_id") or "").strip() + statuses = _coerce_statuses(arguments.get("statuses"), default=("active",)) + limit = _safe_limit(arguments.get("limit"), default=30, low=1, high=200) + rows = retrieve_memories_for_prompt( + user_id=user_id, + query=query, + person_id=person_id, + conversation_id=conversation_id, + statuses=statuses, + limit=limit, + ) + return { + "user_id": user_id, + "query": query, + "person_id": person_id, + "conversation_id": conversation_id, + "statuses": list(statuses), + "count": len(rows), + "items": rows, + } + + +def tool_memory_propose(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + conversation_id = str(arguments.get("conversation_id") or "").strip() + person_id = str(arguments.get("person_id") or "").strip() + memory_kind = str(arguments.get("memory_kind") or "fact").strip().lower() + content_raw = arguments.get("content") + if isinstance(content_raw, dict): + content = dict(content_raw) + else: + content = {"text": str(content_raw or "").strip()} + if not content: + raise ValueError("content is required") + + confidence_score = float(arguments.get("confidence_score") or 0.5) + expires_at = _parse_iso_datetime(arguments.get("expires_at")) + reason = str(arguments.get("reason") or "").strip() + requested_by = str(arguments.get("requested_by_identifier") or "").strip() + + conversation = WorkspaceConversation.objects.filter( + user_id=user_id, + id=conversation_id, + ).first() + if conversation is None: + raise ValueError("conversation_id is required and must exist") + + item = MemoryItem.objects.create( + user_id=user_id, + conversation=conversation, + person_id=person_id or None, + memory_kind=memory_kind, + status="proposed", + content=content, + provenance={"source": "mcp.memory.propose"}, + confidence_score=confidence_score, + expires_at=parse_datetime(expires_at) if expires_at else None, + ) + req = create_memory_change_request( + user_id=user_id, + action="create", + conversation_id=conversation_id, + person_id=person_id, + memory_id=str(item.id), + memory_kind=memory_kind, + content=content, + confidence_score=confidence_score, + expires_at=expires_at, + reason=reason, + requested_by_identifier=requested_by, + ) + return { + "ok": True, + "memory_id": str(item.id), + "request": _memory_change_payload(req), + } + + +def tool_memory_pending(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + limit = _safe_limit(arguments.get("limit"), default=50, low=1, high=500) + rows = ( + MemoryChangeRequest.objects.filter(user_id=user_id, status="pending") + .select_related("memory") + .order_by("created_at")[:limit] + ) + items = [_memory_change_payload(item) for item in rows] + return {"count": len(items), "items": items} + + +def tool_memory_review(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + request_id = str(arguments.get("request_id") or "").strip() + decision = str(arguments.get("decision") or "").strip().lower() + reviewer_identifier = str(arguments.get("reviewer_identifier") or "").strip() + note = str(arguments.get("note") or "").strip() + if not request_id: + raise ValueError("request_id is required") + req = review_memory_change_request( + user_id=user_id, + request_id=request_id, + decision=decision, + reviewer_identifier=reviewer_identifier, + note=note, + ) + memory = req.memory + return { + "request": _memory_change_payload(req), + "memory": ( + { + "id": str(memory.id), + "status": str(memory.status), + "memory_kind": str(memory.memory_kind), + "content": memory.content or {}, + "updated_at": _as_iso(memory.updated_at), + } + if memory is not None + else None + ), + } + + +def tool_memory_suggest(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + limit_messages = _safe_limit( + arguments.get("limit_messages"), + default=300, + low=1, + high=2000, + ) + max_items = _safe_limit(arguments.get("max_items"), default=30, low=1, high=500) + result = suggest_memories_from_recent_messages( + user_id=user_id, + limit_messages=limit_messages, + max_items=max_items, + ) + return { + "user_id": user_id, + "limit_messages": limit_messages, + "max_items": max_items, + "result": result, + } + + +def tool_tasks_list(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + status = str(arguments.get("status") or "").strip().lower() + project_id = str(arguments.get("project_id") or "").strip() + query = str(arguments.get("query") or "").strip() + limit = _safe_limit(arguments.get("limit"), default=30, low=1, high=200) + + queryset = ( + DerivedTask.objects.filter(user_id=user_id) + .select_related("project", "epic") + .order_by("-created_at") + ) + if status: + queryset = queryset.filter(status_snapshot__iexact=status) + if project_id: + queryset = queryset.filter(project_id=project_id) + if query: + queryset = queryset.filter( + Q(title__icontains=query) + | Q(reference_code__icontains=query) + | Q(external_key__icontains=query) + ) + rows = [_task_payload(item) for item in queryset[:limit]] + return {"count": len(rows), "items": rows} + + +def tool_tasks_search(arguments: dict[str, Any]) -> dict[str, Any]: + query = str(arguments.get("query") or "").strip() + if not query: + raise ValueError("query is required") + return tool_tasks_list(arguments) + + +def tool_tasks_get(arguments: dict[str, Any]) -> dict[str, Any]: + task = _resolve_task(arguments) + payload = _task_payload(task) + payload["artifact_links"] = [ + _artifact_payload(item) + for item in task.artifact_links.order_by("-created_at")[:40] + ] + payload["knowledge_articles"] = [ + { + "id": str(article.id), + "slug": str(article.slug or ""), + "title": str(article.title or ""), + "status": str(article.status or ""), + "updated_at": _as_iso(article.updated_at), + } + for article in task.knowledge_articles.order_by("-updated_at")[:40] + ] + return payload + + +def tool_tasks_events(arguments: dict[str, Any]) -> dict[str, Any]: + task = _resolve_task(arguments) + limit = _safe_limit(arguments.get("limit"), default=50, low=1, high=200) + rows = ( + DerivedTaskEvent.objects.filter(task=task) + .select_related("task") + .order_by("-created_at")[:limit] + ) + items = [_event_payload(item) for item in rows] + return {"count": len(items), "items": items} + + +def tool_tasks_create_note(arguments: dict[str, Any]) -> dict[str, Any]: + task = _resolve_task(arguments) + note = str(arguments.get("note") or "").strip() + actor_identifier = str(arguments.get("actor_identifier") or "").strip() + if not note: + raise ValueError("note is required") + event = DerivedTaskEvent.objects.create( + task=task, + event_type="progress", + actor_identifier=actor_identifier, + payload={"note": note, "source": "mcp.tasks.create_note"}, + ) + return {"task": _task_payload(task), "event": _event_payload(event)} + + +def tool_tasks_link_artifact(arguments: dict[str, Any]) -> dict[str, Any]: + task = _resolve_task(arguments) + kind = str(arguments.get("kind") or "").strip() or "note" + uri = str(arguments.get("uri") or "").strip() + path = str(arguments.get("path") or "").strip() + summary = str(arguments.get("summary") or "").strip() + created_by_identifier = str(arguments.get("created_by_identifier") or "").strip() + if not uri and not path: + raise ValueError("uri or path is required") + artifact = TaskArtifactLink.objects.create( + task=task, + kind=kind, + uri=uri, + path=path, + summary=summary, + created_by_identifier=created_by_identifier, + ) + return {"task_id": str(task.id), "artifact": _artifact_payload(artifact)} + + +def tool_wiki_create_article(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + title = str(arguments.get("title") or "").strip() + markdown = str(arguments.get("markdown") or "") + related_task_id = str(arguments.get("related_task_id") or "").strip() + status = str(arguments.get("status") or "draft").strip().lower() + tags = _coerce_tags(arguments.get("tags")) + owner_identifier = str(arguments.get("owner_identifier") or "").strip() + author_identifier = str(arguments.get("author_identifier") or "").strip() + summary = str(arguments.get("summary") or "Initial revision.").strip() + + if not title: + raise ValueError("title is required") + if status not in {"draft", "published", "archived"}: + raise ValueError("status must be draft/published/archived") + + requested_slug = str(arguments.get("slug") or "").strip() or title + slug = _next_unique_slug(user_id=user_id, requested_slug=requested_slug) + + related_task = None + if related_task_id: + related_task = DerivedTask.objects.filter( + user_id=user_id, + id=related_task_id, + ).first() + if related_task is None: + raise ValueError("related_task_id not found") + + article = KnowledgeArticle.objects.create( + user_id=user_id, + related_task=related_task, + title=title, + slug=slug, + markdown=markdown, + tags=tags, + status=status, + owner_identifier=owner_identifier, + ) + revision = _create_revision( + article=article, + markdown=markdown, + author_tool="mcp", + author_identifier=author_identifier, + summary=summary, + ) + return { + "article": _article_payload(article), + "revision": _revision_payload(revision), + } + + +def tool_wiki_update_article(arguments: dict[str, Any]) -> dict[str, Any]: + article = _get_article_for_user(arguments) + title = str(arguments.get("title") or "").strip() + markdown_marker = "markdown" in arguments + markdown = str(arguments.get("markdown") or "") + tags_marker = "tags" in arguments + status_marker = "status" in arguments + status = str(arguments.get("status") or "").strip().lower() + related_task_id = str(arguments.get("related_task_id") or "").strip() + summary = str(arguments.get("summary") or "Updated via MCP").strip() + author_identifier = str(arguments.get("author_identifier") or "").strip() + approve_overwrite = bool(arguments.get("approve_overwrite")) + approve_archive = bool(arguments.get("approve_archive")) + + if markdown_marker and article.markdown and article.markdown != markdown: + if not approve_overwrite: + raise ValueError( + "approve_overwrite=true is required to overwrite existing markdown" + ) + if status_marker and status == "archived" and article.status != "archived": + if not approve_archive: + raise ValueError( + "approve_archive=true is required to archive an article" + ) + + if title: + article.title = title + if markdown_marker: + article.markdown = markdown + if tags_marker: + article.tags = _coerce_tags(arguments.get("tags")) + if status_marker: + if status not in {"draft", "published", "archived"}: + raise ValueError("status must be draft/published/archived") + article.status = status + if related_task_id: + task = DerivedTask.objects.filter( + user_id=article.user_id, + id=related_task_id, + ).first() + if task is None: + raise ValueError("related_task_id not found") + article.related_task = task + article.save() + + revision = _create_revision( + article=article, + markdown=article.markdown, + author_tool="mcp", + author_identifier=author_identifier, + summary=summary, + ) + return { + "article": _article_payload(article), + "revision": _revision_payload(revision), + } + + +def tool_wiki_list(arguments: dict[str, Any]) -> dict[str, Any]: + user_id = int(arguments.get("user_id")) + status = str(arguments.get("status") or "").strip().lower() + tag = str(arguments.get("tag") or "").strip().lower() + related_task_id = str(arguments.get("related_task_id") or "").strip() + query = str(arguments.get("query") or "").strip() + limit = _safe_limit(arguments.get("limit"), default=50, low=1, high=500) + queryset = KnowledgeArticle.objects.filter(user_id=user_id).order_by("-updated_at") + if status: + queryset = queryset.filter(status__iexact=status) + if related_task_id: + queryset = queryset.filter(related_task_id=related_task_id) + if tag: + queryset = queryset.filter(tags__icontains=tag) + if query: + queryset = queryset.filter(Q(title__icontains=query) | Q(slug__icontains=query)) + rows = [_article_payload(item) for item in queryset[:limit]] + return {"count": len(rows), "items": rows} + + +def tool_wiki_get(arguments: dict[str, Any]) -> dict[str, Any]: + article = _get_article_for_user(arguments) + include_revisions = bool(arguments.get("include_revisions")) + revision_limit = _safe_limit(arguments.get("revision_limit"), default=20, low=1, high=200) + payload = {"article": _article_payload(article)} + if include_revisions: + revisions = article.revisions.order_by("-revision")[:revision_limit] + payload["revisions"] = [_revision_payload(item) for item in revisions] + return payload + + +def tool_project_get_guidelines(arguments: dict[str, Any]) -> dict[str, Any]: + max_chars = _safe_limit(arguments.get("max_chars"), default=16000, low=500, high=50000) + base = Path(settings.BASE_DIR).resolve() + file_names = ["AGENTS.md", "LLM_CODING_STANDARDS.md", "INSTALL.md", "README.md"] + payload = [] + total = 0 + for name in file_names: + path = (base / name).resolve() + if not path.exists(): + continue + text = path.read_text(encoding="utf-8") + remaining = max_chars - total + if remaining <= 0: + break + selected = text[:remaining] + total += len(selected) + payload.append({"path": str(path), "content": selected}) + return {"files": payload, "truncated": total >= max_chars} + + +def tool_project_get_layout(arguments: dict[str, Any]) -> dict[str, Any]: + max_entries = _safe_limit(arguments.get("max_entries"), default=300, low=50, high=4000) + base = Path(settings.BASE_DIR).resolve() + roots = ["app", "core", "scripts", "utilities", "artifacts"] + items: list[str] = [] + for root in roots: + root_path = (base / root).resolve() + if not root_path.exists(): + continue + items.append(f"{root}/") + for path in sorted(root_path.rglob("*")): + if len(items) >= max_entries: + return {"base_dir": str(base), "items": items, "truncated": True} + rel = path.relative_to(base) + if len(rel.parts) > 4: + continue + items.append(f"{rel.as_posix()}/" if path.is_dir() else rel.as_posix()) + return {"base_dir": str(base), "items": items, "truncated": False} + + +def tool_project_get_runbook(arguments: dict[str, Any]) -> dict[str, Any]: + max_chars = _safe_limit(arguments.get("max_chars"), default=16000, low=500, high=50000) + base = Path(settings.BASE_DIR).resolve() + file_names = [ + "INSTALL.md", + "README.md", + "artifacts/mcp/manticore-mcp-server.md", + "artifacts/plans/11-personal-ai-memory.md", + "artifacts/plans/12-mcp-server-for-tasks-and-knowledge.md", + ] + payload = [] + total = 0 + for name in file_names: + path = (base / name).resolve() + if not path.exists(): + continue + text = path.read_text(encoding="utf-8") + remaining = max_chars - total + if remaining <= 0: + break + selected = text[:remaining] + total += len(selected) + payload.append({"path": str(path), "content": selected}) + return {"files": payload, "truncated": total >= max_chars} + + +def tool_docs_append_run_note(arguments: dict[str, Any]) -> dict[str, Any]: + content = str(arguments.get("content") or "").strip() + title = str(arguments.get("title") or "").strip() or "MCP Run Note" + task_id = str(arguments.get("task_id") or "").strip() + raw_path = str(arguments.get("path") or "").strip() + if not content: + raise ValueError("content is required") + + base = Path(settings.BASE_DIR).resolve() + if not raw_path: + path = Path("/tmp/gia-mcp-run-notes.md") + else: + candidate = Path(raw_path) + path = candidate.resolve() if candidate.is_absolute() else (base / candidate).resolve() + allowed_roots = [base, Path("/tmp").resolve()] + if not any(str(path).startswith(str(root)) for root in allowed_roots): + raise ValueError("path must be within project root or /tmp") + path.parent.mkdir(parents=True, exist_ok=True) + + ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()) + lines = [f"## {title}", "", f"- Timestamp: {ts}"] + if task_id: + lines.append(f"- Task ID: `{task_id}`") + lines.extend(["", content, "", "---", ""]) + text = "\n".join(lines) + with path.open("a", encoding="utf-8") as handle: + handle.write(text) + return {"ok": True, "path": str(path), "bytes_written": len(text)} + + +TOOL_DEFS: dict[str, dict[str, Any]] = { + "manticore.status": { + "description": "Report configured memory backend status (django or manticore).", + "inputSchema": {"type": "object", "properties": {}, "additionalProperties": False}, + "handler": tool_manticore_status, + }, + "manticore.query": { + "description": "Query memory index via configured backend.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "query": {"type": "string"}, + "conversation_id": {"type": "string"}, + "limit": {"type": "integer"}, + "statuses": { + "anyOf": [ + {"type": "string"}, + {"type": "array", "items": {"type": "string"}}, + ] + }, + }, + "required": ["user_id", "query"], + "additionalProperties": False, + }, + "handler": tool_manticore_query, + }, + "manticore.reindex": { + "description": "Reindex memory rows into configured backend.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "limit": {"type": "integer"}, + "statuses": { + "anyOf": [ + {"type": "string"}, + {"type": "array", "items": {"type": "string"}}, + ] + }, + }, + "additionalProperties": False, + }, + "handler": tool_manticore_reindex, + }, + "memory.list": { + "description": "List approved memories for prompt usage.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "query": {"type": "string"}, + "person_id": {"type": "string"}, + "conversation_id": {"type": "string"}, + "statuses": { + "anyOf": [ + {"type": "string"}, + {"type": "array", "items": {"type": "string"}}, + ] + }, + "limit": {"type": "integer"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_memory_list, + }, + "memory.propose": { + "description": "Create a pending memory proposal requiring review.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "conversation_id": {"type": "string"}, + "person_id": {"type": "string"}, + "memory_kind": {"type": "string"}, + "content": {"anyOf": [{"type": "object"}, {"type": "string"}]}, + "confidence_score": {"type": "number"}, + "expires_at": {"type": "string"}, + "reason": {"type": "string"}, + "requested_by_identifier": {"type": "string"}, + }, + "required": ["user_id", "conversation_id", "content"], + "additionalProperties": False, + }, + "handler": tool_memory_propose, + }, + "memory.pending": { + "description": "List pending memory change requests.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "limit": {"type": "integer"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_memory_pending, + }, + "memory.review": { + "description": "Approve or reject a pending memory request.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "request_id": {"type": "string"}, + "decision": {"type": "string"}, + "reviewer_identifier": {"type": "string"}, + "note": {"type": "string"}, + }, + "required": ["user_id", "request_id", "decision"], + "additionalProperties": False, + }, + "handler": tool_memory_review, + }, + "memory.suggest_from_messages": { + "description": "Extract memory proposals from recent inbound messages.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "limit_messages": {"type": "integer"}, + "max_items": {"type": "integer"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_memory_suggest, + }, + "tasks.list": { + "description": "List derived tasks for a user.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "status": {"type": "string"}, + "project_id": {"type": "string"}, + "query": {"type": "string"}, + "limit": {"type": "integer"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_tasks_list, + }, + "tasks.search": { + "description": "Search derived tasks by free text for a user.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "query": {"type": "string"}, + "status": {"type": "string"}, + "project_id": {"type": "string"}, + "limit": {"type": "integer"}, + }, + "required": ["user_id", "query"], + "additionalProperties": False, + }, + "handler": tool_tasks_search, + }, + "tasks.get": { + "description": "Get one derived task by ID, including links.", + "inputSchema": { + "type": "object", + "properties": { + "task_id": {"type": "string"}, + "user_id": {"type": "integer"}, + }, + "required": ["task_id"], + "additionalProperties": False, + }, + "handler": tool_tasks_get, + }, + "tasks.events": { + "description": "List events for one derived task.", + "inputSchema": { + "type": "object", + "properties": { + "task_id": {"type": "string"}, + "user_id": {"type": "integer"}, + "limit": {"type": "integer"}, + }, + "required": ["task_id"], + "additionalProperties": False, + }, + "handler": tool_tasks_events, + }, + "tasks.create_note": { + "description": "Append an implementation/progress note to a task.", + "inputSchema": { + "type": "object", + "properties": { + "task_id": {"type": "string"}, + "user_id": {"type": "integer"}, + "note": {"type": "string"}, + "actor_identifier": {"type": "string"}, + }, + "required": ["task_id", "note"], + "additionalProperties": False, + }, + "handler": tool_tasks_create_note, + }, + "tasks.link_artifact": { + "description": "Link an artifact (URI/path) to a task.", + "inputSchema": { + "type": "object", + "properties": { + "task_id": {"type": "string"}, + "user_id": {"type": "integer"}, + "kind": {"type": "string"}, + "uri": {"type": "string"}, + "path": {"type": "string"}, + "summary": {"type": "string"}, + "created_by_identifier": {"type": "string"}, + }, + "required": ["task_id"], + "additionalProperties": False, + }, + "handler": tool_tasks_link_artifact, + }, + "wiki.create_article": { + "description": "Create a wiki article with initial revision.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "title": {"type": "string"}, + "slug": {"type": "string"}, + "markdown": {"type": "string"}, + "tags": { + "anyOf": [ + {"type": "string"}, + {"type": "array", "items": {"type": "string"}}, + ] + }, + "status": {"type": "string"}, + "related_task_id": {"type": "string"}, + "owner_identifier": {"type": "string"}, + "author_identifier": {"type": "string"}, + "summary": {"type": "string"}, + }, + "required": ["user_id", "title"], + "additionalProperties": False, + }, + "handler": tool_wiki_create_article, + }, + "wiki.update_article": { + "description": "Update wiki article and append a revision entry.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "article_id": {"type": "string"}, + "slug": {"type": "string"}, + "title": {"type": "string"}, + "markdown": {"type": "string"}, + "tags": { + "anyOf": [ + {"type": "string"}, + {"type": "array", "items": {"type": "string"}}, + ] + }, + "status": {"type": "string"}, + "related_task_id": {"type": "string"}, + "summary": {"type": "string"}, + "author_identifier": {"type": "string"}, + "approve_overwrite": {"type": "boolean"}, + "approve_archive": {"type": "boolean"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_wiki_update_article, + }, + "wiki.list": { + "description": "List wiki articles for a user with filters.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "status": {"type": "string"}, + "tag": {"type": "string"}, + "related_task_id": {"type": "string"}, + "query": {"type": "string"}, + "limit": {"type": "integer"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_wiki_list, + }, + "wiki.get": { + "description": "Get one wiki article by id/slug.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "integer"}, + "article_id": {"type": "string"}, + "slug": {"type": "string"}, + "include_revisions": {"type": "boolean"}, + "revision_limit": {"type": "integer"}, + }, + "required": ["user_id"], + "additionalProperties": False, + }, + "handler": tool_wiki_get, + }, + "project.get_guidelines": { + "description": "Load key project guideline documents.", + "inputSchema": { + "type": "object", + "properties": {"max_chars": {"type": "integer"}}, + "additionalProperties": False, + }, + "handler": tool_project_get_guidelines, + }, + "project.get_layout": { + "description": "List major project files/directories for orientation.", + "inputSchema": { + "type": "object", + "properties": {"max_entries": {"type": "integer"}}, + "additionalProperties": False, + }, + "handler": tool_project_get_layout, + }, + "project.get_runbook": { + "description": "Load operational runbook docs for agent continuity.", + "inputSchema": { + "type": "object", + "properties": {"max_chars": {"type": "integer"}}, + "additionalProperties": False, + }, + "handler": tool_project_get_runbook, + }, + "docs.append_run_note": { + "description": "Append an implementation note markdown entry.", + "inputSchema": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "content": {"type": "string"}, + "task_id": {"type": "string"}, + "path": {"type": "string"}, + }, + "required": ["content"], + "additionalProperties": False, + }, + "handler": tool_docs_append_run_note, + }, +} + + +def tool_specs() -> list[dict[str, Any]]: + return [ + { + "name": name, + "description": definition["description"], + "inputSchema": definition["inputSchema"], + } + for name, definition in TOOL_DEFS.items() + ] + + +def execute_tool(name: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]: + entry = TOOL_DEFS.get(str(name or "").strip()) + if not entry: + raise ValueError(f"Unknown tool: {name}") + + args = arguments or {} + handler = entry["handler"] + started = time.time() + audit_user = _audit_user_from_args(args) + + try: + payload = handler(args) + duration_ms = int((time.time() - started) * 1000) + try: + MCPToolAuditLog.objects.create( + tool_name=str(name), + user=audit_user, + request_args=args, + response_meta=_preview_meta(payload), + ok=True, + duration_ms=max(0, duration_ms), + ) + except Exception as exc: + log.warning("failed writing MCP success audit log: %s", exc) + return payload + except Exception as exc: + duration_ms = int((time.time() - started) * 1000) + try: + MCPToolAuditLog.objects.create( + tool_name=str(name), + user=audit_user, + request_args=args, + response_meta={}, + ok=False, + error=str(exc), + duration_ms=max(0, duration_ms), + ) + except Exception as audit_exc: + log.warning("failed writing MCP error audit log: %s", audit_exc) + raise + + +def format_tool_content(payload: dict[str, Any]) -> dict[str, Any]: + return {"content": [{"type": "text", "text": json.dumps(payload, indent=2)}]} diff --git a/core/memory/__init__.py b/core/memory/__init__.py index c86cbcb..e4f2781 100644 --- a/core/memory/__init__.py +++ b/core/memory/__init__.py @@ -1,3 +1,4 @@ from .search_backend import get_memory_search_backend +from .retrieval import retrieve_memories_for_prompt -__all__ = ["get_memory_search_backend"] +__all__ = ["get_memory_search_backend", "retrieve_memories_for_prompt"] diff --git a/core/memory/pipeline.py b/core/memory/pipeline.py new file mode 100644 index 0000000..b0b2c3b --- /dev/null +++ b/core/memory/pipeline.py @@ -0,0 +1,419 @@ +from __future__ import annotations + +import re +from datetime import timezone as dt_timezone +from typing import Any + +from django.db import transaction +from django.utils import timezone +from django.utils.dateparse import parse_datetime + +from core.models import ( + MemoryChangeRequest, + MemoryItem, + MemorySourceReference, + MessageEvent, + WorkspaceConversation, +) +from core.util import logs + +log = logs.get_logger("memory-pipeline") + +_LIKE_RE = re.compile( + r"\b(?:i (?:like|love|prefer)|my favorite)\s+(?P[^.!?]{2,120})", + re.IGNORECASE, +) +_DISLIKE_RE = re.compile( + r"\b(?:i (?:dislike|hate|avoid)|i don't like)\s+(?P[^.!?]{2,120})", + re.IGNORECASE, +) +_STYLE_RE = re.compile( + r"\b(?:please|pls)\s+(?P[^.!?]{3,120})", + re.IGNORECASE, +) + + +def _clean_value(value: str) -> str: + return " ".join(str(value or "").strip().split()) + + +def extract_memory_candidates(text: str) -> list[dict[str, Any]]: + source = str(text or "").strip() + if not source: + return [] + + candidates: list[dict[str, Any]] = [] + for regex, field, kind, confidence in ( + (_LIKE_RE, "likes", "fact", 0.68), + (_DISLIKE_RE, "dislikes", "fact", 0.68), + (_STYLE_RE, "communication_style", "state", 0.52), + ): + for match in regex.finditer(source): + value = _clean_value(match.group("value")) + if len(value) < 3: + continue + candidates.append( + { + "memory_kind": kind, + "field": field, + "text": value, + "confidence_score": confidence, + } + ) + return candidates + + +def _existing_fingerprints(user_id: int) -> set[tuple[str, str, str, str]]: + items = MemoryItem.objects.filter(user_id=int(user_id)).only( + "memory_kind", + "conversation_id", + "person_id", + "content", + ) + fingerprints = set() + for item in items: + content = item.content or {} + field = str(content.get("field") or "").strip().lower() + text = _clean_value(str(content.get("text") or "")).lower() + fingerprints.add( + ( + str(item.memory_kind or "").strip().lower(), + str(item.conversation_id or "").strip(), + str(item.person_id or "").strip(), + f"{field}:{text}", + ) + ) + return fingerprints + + +def _infer_single_person_id(conversation: WorkspaceConversation) -> str: + participant_ids = list(conversation.participants.values_list("id", flat=True)[:2]) + if len(participant_ids) != 1: + return "" + return str(participant_ids[0] or "") + + +@transaction.atomic +def suggest_memories_from_recent_messages( + *, + user_id: int, + limit_messages: int = 300, + max_items: int = 30, +) -> dict[str, int]: + safe_limit_messages = max(1, min(2000, int(limit_messages or 300))) + safe_max_items = max(1, min(500, int(max_items or 30))) + existing = _existing_fingerprints(int(user_id)) + + scanned = 0 + queued = 0 + rows = ( + MessageEvent.objects.filter(user_id=int(user_id), direction="in") + .select_related("conversation") + .order_by("-ts")[:safe_limit_messages] + ) + for event in rows: + scanned += 1 + person_id = _infer_single_person_id(event.conversation) + for candidate in extract_memory_candidates(event.text or ""): + field = str(candidate.get("field") or "").strip().lower() + text = _clean_value(str(candidate.get("text") or "")) + if not text: + continue + fingerprint = ( + str(candidate.get("memory_kind") or "fact").strip().lower(), + str(event.conversation_id or "").strip(), + person_id, + f"{field}:{text.lower()}", + ) + if fingerprint in existing: + continue + + item = MemoryItem.objects.create( + user_id=int(user_id), + conversation=event.conversation, + person_id=person_id or None, + memory_kind=str(candidate.get("memory_kind") or "fact"), + status="proposed", + content={"field": field, "text": text}, + provenance={ + "pipeline": "message_regex", + "message_event_id": str(event.id), + }, + confidence_score=float(candidate.get("confidence_score") or 0.5), + ) + MemorySourceReference.objects.create( + memory=item, + message_event=event, + source_label="message_event", + ) + MemoryChangeRequest.objects.create( + user_id=int(user_id), + memory=item, + conversation=event.conversation, + person_id=person_id or None, + action="create", + status="pending", + proposed_memory_kind=item.memory_kind, + proposed_content=item.content, + proposed_confidence_score=item.confidence_score, + reason="Auto-suggested from recent inbound messages.", + requested_by_identifier="memory-pipeline", + ) + existing.add(fingerprint) + queued += 1 + if queued >= safe_max_items: + return {"scanned": scanned, "queued": queued} + return {"scanned": scanned, "queued": queued} + + +def _coerce_expires_at(value: Any): + raw = str(value or "").strip() + if not raw: + return None + parsed = parse_datetime(raw) + if parsed is None: + raise ValueError("expires_at must be an ISO datetime") + if parsed.tzinfo is None: + return timezone.make_aware(parsed, dt_timezone.utc) + return parsed + + +@transaction.atomic +def create_memory_change_request( + *, + user_id: int, + action: str, + conversation_id: str = "", + person_id: str = "", + memory_id: str = "", + memory_kind: str = "", + content: dict[str, Any] | None = None, + confidence_score: float | None = None, + expires_at: str = "", + reason: str = "", + requested_by_identifier: str = "", +) -> MemoryChangeRequest: + normalized_action = str(action or "").strip().lower() + if normalized_action not in {"create", "update", "delete"}: + raise ValueError("action must be create/update/delete") + + memory = None + if memory_id: + memory = MemoryItem.objects.filter(user_id=int(user_id), id=memory_id).first() + if memory is None: + raise ValueError("memory_id not found") + + conversation = None + if conversation_id: + conversation = WorkspaceConversation.objects.filter( + user_id=int(user_id), + id=conversation_id, + ).first() + if conversation is None: + raise ValueError("conversation_id not found") + + if normalized_action == "create" and conversation is None: + raise ValueError("conversation_id is required for create") + if normalized_action in {"update", "delete"} and memory is None: + raise ValueError("memory_id is required for update/delete") + + return MemoryChangeRequest.objects.create( + user_id=int(user_id), + memory=memory, + conversation=conversation or (memory.conversation if memory else None), + person_id=person_id or (str(memory.person_id or "") if memory else "") or None, + action=normalized_action, + status="pending", + proposed_memory_kind=str(memory_kind or (memory.memory_kind if memory else "")).strip(), + proposed_content=dict(content or {}), + proposed_confidence_score=( + float(confidence_score) + if confidence_score is not None + else (float(memory.confidence_score) if memory else None) + ), + proposed_expires_at=_coerce_expires_at(expires_at), + reason=str(reason or "").strip(), + requested_by_identifier=str(requested_by_identifier or "").strip(), + ) + + +@transaction.atomic +def review_memory_change_request( + *, + user_id: int, + request_id: str, + decision: str, + reviewer_identifier: str = "", + note: str = "", +) -> MemoryChangeRequest: + req = MemoryChangeRequest.objects.select_related("memory", "conversation").get( + id=request_id, + user_id=int(user_id), + ) + if req.status != "pending": + raise ValueError("request is not pending") + + now = timezone.now() + normalized_decision = str(decision or "").strip().lower() + if normalized_decision not in {"approve", "reject"}: + raise ValueError("decision must be approve/reject") + + req.reviewed_by_identifier = str(reviewer_identifier or "").strip() + req.reviewed_at = now + if note: + req.reason = f"{req.reason}\n\nReview note: {str(note).strip()}".strip() + + if normalized_decision == "reject": + req.status = "rejected" + req.save( + update_fields=[ + "status", + "reviewed_by_identifier", + "reviewed_at", + "reason", + "updated_at", + ] + ) + return req + + req.status = "approved" + req.save( + update_fields=[ + "status", + "reviewed_by_identifier", + "reviewed_at", + "reason", + "updated_at", + ] + ) + + memory = req.memory + if req.action == "create": + if memory is None: + if req.conversation is None: + raise ValueError("approved create request missing conversation") + memory = MemoryItem.objects.create( + user_id=int(user_id), + conversation=req.conversation, + person_id=req.person_id, + memory_kind=req.proposed_memory_kind or "fact", + status="active", + content=req.proposed_content or {}, + confidence_score=float(req.proposed_confidence_score or 0.5), + expires_at=req.proposed_expires_at, + last_verified_at=now, + provenance={"approved_request_id": str(req.id)}, + ) + req.memory = memory + else: + memory.status = "active" + memory.last_verified_at = now + memory.save(update_fields=["status", "last_verified_at", "updated_at"]) + elif req.action == "update": + if memory is None: + raise ValueError("approved update request missing memory") + if req.proposed_memory_kind: + memory.memory_kind = req.proposed_memory_kind + if req.proposed_content: + memory.content = req.proposed_content + if req.proposed_confidence_score is not None: + memory.confidence_score = float(req.proposed_confidence_score) + memory.expires_at = req.proposed_expires_at + memory.last_verified_at = now + memory.status = "active" + memory.save() + else: + if memory is None: + raise ValueError("approved delete request missing memory") + memory.status = "deprecated" + memory.last_verified_at = now + memory.save(update_fields=["status", "last_verified_at", "updated_at"]) + + req.status = "applied" + req.save(update_fields=["status", "memory", "updated_at"]) + return req + + +@transaction.atomic +def run_memory_hygiene(*, user_id: int | None = None, dry_run: bool = False) -> dict[str, int]: + now = timezone.now() + queryset = MemoryItem.objects.filter(status="active") + if user_id is not None: + queryset = queryset.filter(user_id=int(user_id)) + + expired_ids = list( + queryset.filter(expires_at__isnull=False, expires_at__lte=now).values_list( + "id", + flat=True, + ) + ) + expired = len(expired_ids) + if expired and not dry_run: + MemoryItem.objects.filter(id__in=expired_ids).update(status="deprecated") + + contradictions = 0 + queued = 0 + grouped: dict[tuple[int, str, str, str, str], dict[str, list[MemoryItem]]] = {} + for item in queryset.select_related("conversation", "person"): + content = item.content or {} + field = str(content.get("field") or content.get("key") or "").strip().lower() + text = _clean_value(str(content.get("text") or content.get("value") or "")).lower() + if not field or not text: + continue + scope = ( + int(item.user_id), + str(item.person_id or ""), + str(item.conversation_id or ""), + str(item.memory_kind or ""), + field, + ) + grouped.setdefault(scope, {}) + grouped[scope].setdefault(text, []) + grouped[scope][text].append(item) + + for values in grouped.values(): + if len(values.keys()) <= 1: + continue + flat = [item for subset in values.values() for item in subset] + contradictions += len(flat) + if dry_run: + continue + for item in flat: + already_pending = MemoryChangeRequest.objects.filter( + user_id=item.user_id, + memory=item, + action="update", + status="pending", + reason__icontains="contradiction", + ).exists() + if already_pending: + continue + MemoryChangeRequest.objects.create( + user_id=item.user_id, + memory=item, + conversation=item.conversation, + person=item.person, + action="update", + status="pending", + proposed_memory_kind=item.memory_kind, + proposed_content=item.content, + proposed_confidence_score=item.confidence_score, + proposed_expires_at=item.expires_at, + reason="Contradiction detected by hygiene job.", + requested_by_identifier="memory-hygiene", + ) + queued += 1 + + log.info( + "memory hygiene user=%s dry_run=%s expired=%s contradictions=%s queued=%s", + user_id if user_id is not None else "-", + dry_run, + expired, + contradictions, + queued, + ) + return { + "expired": expired, + "contradictions": contradictions, + "queued_requests": queued, + } diff --git a/core/memory/retrieval.py b/core/memory/retrieval.py new file mode 100644 index 0000000..8f19d86 --- /dev/null +++ b/core/memory/retrieval.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from typing import Any + +from django.db.models import Q +from django.utils import timezone + +from core.memory.search_backend import get_memory_search_backend +from core.models import MemoryItem + + +def _coerce_statuses(value: Any, default: tuple[str, ...]) -> tuple[str, ...]: + if isinstance(value, (list, tuple, set)): + items = [str(item or "").strip().lower() for item in value] + else: + items = [item.strip().lower() for item in str(value or "").split(",")] + cleaned = tuple(item for item in items if item) + return cleaned or default + + +def _base_queryset( + *, + user_id: int, + person_id: str = "", + conversation_id: str = "", + statuses: tuple[str, ...] = ("active",), +): + now = timezone.now() + queryset = MemoryItem.objects.filter(user_id=int(user_id)) + if statuses: + queryset = queryset.filter(status__in=list(statuses)) + queryset = queryset.filter(Q(expires_at__isnull=True) | Q(expires_at__gt=now)) + if person_id: + queryset = queryset.filter(person_id=person_id) + if conversation_id: + queryset = queryset.filter(conversation_id=conversation_id) + return queryset + + +def retrieve_memories_for_prompt( + *, + user_id: int, + query: str = "", + person_id: str = "", + conversation_id: str = "", + statuses: tuple[str, ...] = ("active",), + limit: int = 20, +) -> list[dict[str, Any]]: + statuses = _coerce_statuses(statuses, ("active",)) + safe_limit = max(1, min(200, int(limit or 20))) + search_text = str(query or "").strip() + + if search_text: + backend = get_memory_search_backend() + hits = backend.search( + user_id=int(user_id), + query=search_text, + conversation_id=conversation_id, + limit=safe_limit, + include_statuses=statuses, + ) + ids = [str(hit.memory_id or "").strip() for hit in hits if str(hit.memory_id or "").strip()] + scoped = _base_queryset( + user_id=int(user_id), + person_id=person_id, + conversation_id=conversation_id, + statuses=statuses, + ).filter(id__in=ids) + by_id = {str(item.id): item for item in scoped} + rows = [] + for hit in hits: + item = by_id.get(str(hit.memory_id)) + if not item: + continue + rows.append( + { + "id": str(item.id), + "memory_kind": str(item.memory_kind or ""), + "status": str(item.status or ""), + "person_id": str(item.person_id or ""), + "conversation_id": str(item.conversation_id or ""), + "content": item.content or {}, + "provenance": item.provenance or {}, + "confidence_score": float(item.confidence_score or 0.0), + "expires_at": item.expires_at.isoformat() if item.expires_at else "", + "last_verified_at": ( + item.last_verified_at.isoformat() if item.last_verified_at else "" + ), + "updated_at": item.updated_at.isoformat() if item.updated_at else "", + "search_score": float(hit.score or 0.0), + "search_summary": str(hit.summary or ""), + } + ) + return rows + + queryset = _base_queryset( + user_id=int(user_id), + person_id=person_id, + conversation_id=conversation_id, + statuses=statuses, + ).order_by("-last_verified_at", "-updated_at") + rows = [] + for item in queryset[:safe_limit]: + rows.append( + { + "id": str(item.id), + "memory_kind": str(item.memory_kind or ""), + "status": str(item.status or ""), + "person_id": str(item.person_id or ""), + "conversation_id": str(item.conversation_id or ""), + "content": item.content or {}, + "provenance": item.provenance or {}, + "confidence_score": float(item.confidence_score or 0.0), + "expires_at": item.expires_at.isoformat() if item.expires_at else "", + "last_verified_at": ( + item.last_verified_at.isoformat() if item.last_verified_at else "" + ), + "updated_at": item.updated_at.isoformat() if item.updated_at else "", + "search_score": 0.0, + "search_summary": "", + } + ) + return rows diff --git a/core/memory/search_backend.py b/core/memory/search_backend.py index 16707ea..5f34ac1 100644 --- a/core/memory/search_backend.py +++ b/core/memory/search_backend.py @@ -137,6 +137,8 @@ class DjangoMemorySearchBackend(BaseMemorySearchBackend): class ManticoreMemorySearchBackend(BaseMemorySearchBackend): name = "manticore" + _table_ready_cache: dict[str, float] = {} + _table_ready_ttl_seconds = 30.0 def __init__(self): self.base_url = str( @@ -146,6 +148,7 @@ class ManticoreMemorySearchBackend(BaseMemorySearchBackend): getattr(settings, "MANTICORE_MEMORY_TABLE", "gia_memory_items") ).strip() or "gia_memory_items" self.timeout_seconds = int(getattr(settings, "MANTICORE_HTTP_TIMEOUT", 5) or 5) + self._table_cache_key = f"{self.base_url}|{self.table}" def _sql(self, query: str) -> dict[str, Any]: response = requests.post( @@ -160,6 +163,9 @@ class ManticoreMemorySearchBackend(BaseMemorySearchBackend): return dict(payload or {}) def ensure_table(self) -> None: + last_ready = float(self._table_ready_cache.get(self._table_cache_key, 0.0) or 0.0) + if (time.time() - last_ready) <= float(self._table_ready_ttl_seconds): + return self._sql( ( f"CREATE TABLE IF NOT EXISTS {self.table} (" @@ -175,6 +181,7 @@ class ManticoreMemorySearchBackend(BaseMemorySearchBackend): ")" ) ) + self._table_ready_cache[self._table_cache_key] = time.time() def _doc_id(self, memory_id: str) -> int: digest = hashlib.blake2b( @@ -206,11 +213,66 @@ class ManticoreMemorySearchBackend(BaseMemorySearchBackend): ) self._sql(query) + def _build_upsert_values_clause(self, item: MemoryItem) -> str: + memory_id = str(item.id) + doc_id = self._doc_id(memory_id) + summary = _flatten_to_text(item.content)[:280] + body = _flatten_to_text(item.content) + updated_ts = int(item.updated_at.timestamp() * 1000) + return ( + f"({doc_id},'{self._escape(memory_id)}',{int(item.user_id)}," + f"'{self._escape(item.conversation_id)}','{self._escape(item.memory_kind)}'," + f"'{self._escape(item.status)}',{updated_ts}," + f"'{self._escape(summary)}','{self._escape(body)}')" + ) + def delete(self, memory_id: str) -> None: self.ensure_table() doc_id = self._doc_id(memory_id) self._sql(f"DELETE FROM {self.table} WHERE id={doc_id}") + def reindex( + self, + *, + user_id: int | None = None, + include_statuses: tuple[str, ...] = ("active",), + limit: int = 2000, + ) -> dict[str, int]: + self.ensure_table() + queryset = MemoryItem.objects.all().order_by("-updated_at") + if user_id is not None: + queryset = queryset.filter(user_id=int(user_id)) + if include_statuses: + queryset = queryset.filter(status__in=list(include_statuses)) + + scanned = 0 + indexed = 0 + batch_size = 100 + values: list[str] = [] + for item in queryset[: max(1, int(limit))]: + scanned += 1 + try: + values.append(self._build_upsert_values_clause(item)) + except Exception as exc: + log.warning("memory-search upsert build failed id=%s err=%s", item.id, exc) + continue + if len(values) >= batch_size: + self._sql( + f"REPLACE INTO {self.table} " + "(id,memory_uuid,user_id,conversation_id,memory_kind,status,updated_ts,summary,body) " + f"VALUES {','.join(values)}" + ) + indexed += len(values) + values = [] + if values: + self._sql( + f"REPLACE INTO {self.table} " + "(id,memory_uuid,user_id,conversation_id,memory_kind,status,updated_ts,summary,body) " + f"VALUES {','.join(values)}" + ) + indexed += len(values) + return {"scanned": scanned, "indexed": indexed} + def search( self, *, diff --git a/core/messaging/history.py b/core/messaging/history.py index 845541d..e1906e3 100644 --- a/core/messaging/history.py +++ b/core/messaging/history.py @@ -1,5 +1,6 @@ from asgiref.sync import sync_to_async from django.conf import settings +import time import uuid from core.events.ledger import append_event @@ -628,6 +629,277 @@ async def apply_reaction( return target +async def _resolve_message_target( + user, + identifier, + *, + target_message_id="", + target_ts=0, + target_author="", +): + queryset = Message.objects.filter( + user=user, + session__identifier=identifier, + ).select_related("session") + + target = None + match_strategy = "none" + target_author_value = str(target_author or "").strip() + target_uuid = str(target_message_id or "").strip() + if target_uuid: + is_uuid = True + try: + uuid.UUID(str(target_uuid)) + except Exception: + is_uuid = False + if is_uuid: + target = await sync_to_async( + lambda: queryset.filter(id=target_uuid).order_by("-ts").first() + )() + if target is not None: + match_strategy = "local_message_id" + if target is None: + target = await sync_to_async( + lambda: queryset.filter(source_message_id=target_uuid) + .order_by("-ts") + .first() + )() + if target is not None: + match_strategy = "source_message_id" + + if target is None: + try: + ts_value = int(target_ts or 0) + except Exception: + ts_value = 0 + if ts_value > 0: + exact_candidates = await sync_to_async(list)( + queryset.filter(source_message_id=str(ts_value)).order_by("-ts")[:20] + ) + if target_author_value and exact_candidates: + filtered = [ + row + for row in exact_candidates + if str(row.sender_uuid or "").strip() == target_author_value + ] + if filtered: + exact_candidates = filtered + if exact_candidates: + target = exact_candidates[0] + match_strategy = "exact_source_message_id_ts" + + if target is None and ts_value > 0: + strict_ts_rows = await sync_to_async(list)( + queryset.filter(ts=ts_value).order_by("-id")[:20] + ) + if target_author_value and strict_ts_rows: + filtered = [ + row + for row in strict_ts_rows + if str(row.sender_uuid or "").strip() == target_author_value + ] + if filtered: + strict_ts_rows = filtered + if strict_ts_rows: + target = strict_ts_rows[0] + match_strategy = "strict_ts_match" + + if target is None and ts_value > 0: + lower = ts_value - 10_000 + upper = ts_value + 10_000 + window_rows = await sync_to_async(list)( + queryset.filter(ts__gte=lower, ts__lte=upper).order_by("ts")[:200] + ) + if target_author_value and window_rows: + author_rows = [ + row + for row in window_rows + if str(row.sender_uuid or "").strip() == target_author_value + ] + if author_rows: + window_rows = author_rows + if window_rows: + target = min( + window_rows, + key=lambda row: ( + abs(int(row.ts or 0) - ts_value), + -int(row.ts or 0), + ), + ) + match_strategy = "nearest_ts_window" + + return target, match_strategy + + +async def apply_message_edit( + user, + identifier, + *, + target_message_id="", + target_ts=0, + new_text="", + source_service="", + actor="", + payload=None, + trace_id="", + target_author="", +): + target, match_strategy = await _resolve_message_target( + user, + identifier, + target_message_id=target_message_id, + target_ts=target_ts, + target_author=target_author, + ) + if target is None: + log.warning( + "edit-sync history-apply miss user=%s person_identifier=%s target_message_id=%s target_ts=%s", + getattr(user, "id", "-"), + getattr(identifier, "id", "-"), + str(target_message_id or "") or "-", + int(target_ts or 0), + ) + return None + + old_text = str(target.text or "") + updated_text = str(new_text or "") + event_ts = int(target_ts or target.ts or int(time.time() * 1000)) + receipt_payload = dict(target.receipt_payload or {}) + edit_history = list(receipt_payload.get("edit_history") or []) + edit_history.append( + { + "edited_ts": int(event_ts), + "source_service": str(source_service or "").strip().lower(), + "actor": str(actor or "").strip(), + "previous_text": old_text, + "new_text": updated_text, + "match_strategy": str(match_strategy or ""), + "payload": dict(payload or {}), + } + ) + if len(edit_history) > 200: + edit_history = edit_history[-200:] + receipt_payload["edit_history"] = edit_history + receipt_payload["last_edited_ts"] = int(event_ts) + receipt_payload["edit_count"] = len(edit_history) + target.receipt_payload = receipt_payload + + update_fields = ["receipt_payload"] + if old_text != updated_text: + target.text = updated_text + update_fields.append("text") + await sync_to_async(target.save)(update_fields=update_fields) + try: + await append_event( + user=user, + session=target.session, + ts=int(event_ts), + event_type="message_edited", + direction="system", + actor_identifier=str(actor or ""), + origin_transport=str(source_service or ""), + origin_message_id=str(target.source_message_id or target.id), + origin_chat_id=str(target.source_chat_id or ""), + payload={ + "message_id": str(target.id), + "target_message_id": str(target_message_id or target.id), + "target_ts": int(target_ts or target.ts or 0), + "old_text": old_text, + "new_text": updated_text, + "source_service": str(source_service or "").strip().lower(), + "actor": str(actor or ""), + "match_strategy": str(match_strategy or ""), + }, + raw_payload=dict(payload or {}), + trace_id=ensure_trace_id(trace_id, payload or {}), + ) + except Exception as exc: + log.warning( + "Event ledger append failed for message edit message=%s: %s", + target.id, + exc, + ) + return target + + +async def apply_message_delete( + user, + identifier, + *, + target_message_id="", + target_ts=0, + source_service="", + actor="", + payload=None, + trace_id="", + target_author="", +): + target, match_strategy = await _resolve_message_target( + user, + identifier, + target_message_id=target_message_id, + target_ts=target_ts, + target_author=target_author, + ) + if target is None: + log.warning( + "delete-sync history-apply miss user=%s person_identifier=%s target_message_id=%s target_ts=%s", + getattr(user, "id", "-"), + getattr(identifier, "id", "-"), + str(target_message_id or "") or "-", + int(target_ts or 0), + ) + return None + + event_ts = int(target_ts or target.ts or int(time.time() * 1000)) + deleted_row = { + "deleted_ts": int(event_ts), + "source_service": str(source_service or "").strip().lower(), + "actor": str(actor or "").strip(), + "match_strategy": str(match_strategy or ""), + "payload": dict(payload or {}), + } + receipt_payload = dict(target.receipt_payload or {}) + delete_events = list(receipt_payload.get("delete_events") or []) + delete_events.append(dict(deleted_row)) + if len(delete_events) > 200: + delete_events = delete_events[-200:] + receipt_payload["delete_events"] = delete_events + receipt_payload["deleted"] = deleted_row + receipt_payload["is_deleted"] = True + target.receipt_payload = receipt_payload + await sync_to_async(target.save)(update_fields=["receipt_payload"]) + try: + await append_event( + user=user, + session=target.session, + ts=int(event_ts), + event_type="message_deleted", + direction="system", + actor_identifier=str(actor or ""), + origin_transport=str(source_service or ""), + origin_message_id=str(target.source_message_id or target.id), + origin_chat_id=str(target.source_chat_id or ""), + payload={ + "message_id": str(target.id), + "target_message_id": str(target_message_id or target.id), + "target_ts": int(target_ts or target.ts or 0), + "source_service": str(source_service or "").strip().lower(), + "actor": str(actor or ""), + "match_strategy": str(match_strategy or ""), + }, + raw_payload=dict(payload or {}), + trace_id=ensure_trace_id(trace_id, payload or {}), + ) + except Exception as exc: + log.warning( + "Event ledger append failed for message delete message=%s: %s", + target.id, + exc, + ) + return target + + def _iter_bridge_refs(receipt_payload, source_service): payload = dict(receipt_payload or {}) refs = payload.get("bridge_refs") or {} diff --git a/core/migrations/0036_memoryitem_memorychangerequest_knowledgearticle_and_more.py b/core/migrations/0036_memoryitem_memorychangerequest_knowledgearticle_and_more.py new file mode 100644 index 0000000..4f6654b --- /dev/null +++ b/core/migrations/0036_memoryitem_memorychangerequest_knowledgearticle_and_more.py @@ -0,0 +1,444 @@ +# Generated by ChatGPT on 2026-03-05 + +import uuid + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0035_conversationevent_adapterhealthevent"), + ] + + operations = [ + migrations.AddField( + model_name="memoryitem", + name="confidence_score", + field=models.FloatField( + default=0.5, + help_text="Confidence score for this memory (0.0-1.0).", + ), + ), + migrations.AddField( + model_name="memoryitem", + name="expires_at", + field=models.DateTimeField( + blank=True, + help_text="Optional expiry timestamp for stale memory decay.", + null=True, + ), + ), + migrations.AddField( + model_name="memoryitem", + name="last_verified_at", + field=models.DateTimeField( + blank=True, + help_text="Last operator verification timestamp.", + null=True, + ), + ), + migrations.AddField( + model_name="memoryitem", + name="person", + field=models.ForeignKey( + blank=True, + help_text=( + "Optional person this memory is about for person-centric recall." + ), + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="memory_items", + to="core.person", + ), + ), + migrations.AddField( + model_name="memoryitem", + name="provenance", + field=models.JSONField( + blank=True, + default=dict, + help_text=( + "Source metadata for this memory (agent/tool/message references)." + ), + ), + ), + migrations.CreateModel( + name="KnowledgeArticle", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ("title", models.CharField(max_length=255)), + ("slug", models.SlugField(max_length=255)), + ("markdown", models.TextField(blank=True, default="")), + ("tags", models.JSONField(blank=True, default=list)), + ( + "status", + models.CharField( + choices=[ + ("draft", "Draft"), + ("published", "Published"), + ("archived", "Archived"), + ], + default="draft", + max_length=16, + ), + ), + ("owner_identifier", models.CharField(blank=True, default="", max_length=255)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "related_task", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="knowledge_articles", + to="core.derivedtask", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="knowledge_articles", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index( + fields=["user", "status", "updated_at"], + name="core_knowl_user_id_331625_idx", + ), + models.Index( + fields=["related_task", "updated_at"], + name="core_knowl_related_cf6071_idx", + ), + ], + "constraints": [ + models.UniqueConstraint( + fields=("user", "slug"), + name="unique_knowledge_article_slug_per_user", + ) + ], + }, + ), + migrations.CreateModel( + name="MCPToolAuditLog", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ("tool_name", models.CharField(max_length=255)), + ("request_args", models.JSONField(blank=True, default=dict)), + ("response_meta", models.JSONField(blank=True, default=dict)), + ("ok", models.BooleanField(default=True)), + ("error", models.TextField(blank=True, default="")), + ("duration_ms", models.PositiveIntegerField(default=0)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "user", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="mcp_tool_audit_logs", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index( + fields=["tool_name", "created_at"], + name="core_mcpau_tool_na_2db9d7_idx", + ), + models.Index( + fields=["user", "created_at"], + name="core_mcpau_user_id_4a55f1_idx", + ), + models.Index( + fields=["ok", "created_at"], + name="core_mcpau_ok_1f5c91_idx", + ), + ] + }, + ), + migrations.CreateModel( + name="MemoryChangeRequest", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "action", + models.CharField( + choices=[ + ("create", "Create"), + ("update", "Update"), + ("delete", "Delete"), + ], + max_length=16, + ), + ), + ( + "status", + models.CharField( + choices=[ + ("pending", "Pending"), + ("approved", "Approved"), + ("rejected", "Rejected"), + ("applied", "Applied"), + ], + default="pending", + max_length=16, + ), + ), + ("proposed_memory_kind", models.CharField(blank=True, default="", max_length=16)), + ("proposed_content", models.JSONField(blank=True, default=dict)), + ("proposed_confidence_score", models.FloatField(blank=True, null=True)), + ("proposed_expires_at", models.DateTimeField(blank=True, null=True)), + ("reason", models.TextField(blank=True, default="")), + ("requested_by_identifier", models.CharField(blank=True, default="", max_length=255)), + ("reviewed_by_identifier", models.CharField(blank=True, default="", max_length=255)), + ("reviewed_at", models.DateTimeField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "conversation", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="memory_change_requests", + to="core.workspaceconversation", + ), + ), + ( + "memory", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="change_requests", + to="core.memoryitem", + ), + ), + ( + "person", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="memory_change_requests", + to="core.person", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="memory_change_requests", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "indexes": [ + models.Index( + fields=["user", "status", "created_at"], + name="core_memor_user_id_31963a_idx", + ), + models.Index( + fields=["memory", "created_at"], + name="core_memor_memory__1b9d7e_idx", + ), + ] + }, + ), + migrations.CreateModel( + name="MemorySourceReference", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("source_label", models.CharField(blank=True, default="", max_length=255)), + ("source_uri", models.CharField(blank=True, default="", max_length=1024)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "memory", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="source_references", + to="core.memoryitem", + ), + ), + ( + "message", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="memory_source_references", + to="core.message", + ), + ), + ( + "message_event", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="memory_source_references", + to="core.messageevent", + ), + ), + ( + "source_request", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="memory_source_references", + to="core.airequest", + ), + ), + ], + options={ + "indexes": [ + models.Index( + fields=["memory", "created_at"], + name="core_memor_memory__92752b_idx", + ), + models.Index(fields=["source_uri"], name="core_memor_source__5bb587_idx"), + ] + }, + ), + migrations.CreateModel( + name="TaskArtifactLink", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ("kind", models.CharField(default="note", max_length=64)), + ("uri", models.CharField(blank=True, default="", max_length=1024)), + ("path", models.CharField(blank=True, default="", max_length=1024)), + ("summary", models.TextField(blank=True, default="")), + ("created_by_identifier", models.CharField(blank=True, default="", max_length=255)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "task", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="artifact_links", + to="core.derivedtask", + ), + ), + ], + options={ + "indexes": [ + models.Index( + fields=["task", "created_at"], + name="core_taskar_task_id_cf5572_idx", + ), + models.Index( + fields=["kind", "created_at"], + name="core_taskar_kind_5dbab7_idx", + ), + ] + }, + ), + migrations.CreateModel( + name="KnowledgeRevision", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("revision", models.PositiveIntegerField()), + ("author_tool", models.CharField(blank=True, default="", max_length=255)), + ("author_identifier", models.CharField(blank=True, default="", max_length=255)), + ("summary", models.TextField(blank=True, default="")), + ("markdown", models.TextField(blank=True, default="")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "article", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="revisions", + to="core.knowledgearticle", + ), + ), + ], + options={ + "ordering": ["article", "revision"], + "constraints": [ + models.UniqueConstraint( + fields=("article", "revision"), + name="unique_knowledge_revision_per_article", + ) + ], + }, + ), + migrations.AddIndex( + model_name="memoryitem", + index=models.Index( + fields=["user", "status", "updated_at"], + name="core_mem_user_stat_upd_idx", + ), + ), + migrations.AddIndex( + model_name="memoryitem", + index=models.Index( + fields=["user", "person", "status", "updated_at"], + name="core_mem_user_pers_stat_idx", + ), + ), + migrations.AddIndex( + model_name="memoryitem", + index=models.Index( + fields=["user", "conversation", "status", "updated_at"], + name="core_mem_user_conv_stat_idx", + ), + ), + ] diff --git a/core/models.py b/core/models.py index b7f97c8..f2a908c 100644 --- a/core/models.py +++ b/core/models.py @@ -1129,6 +1129,14 @@ class MemoryItem(models.Model): related_name="memory_items", help_text="Conversation scope this memory item belongs to.", ) + person = models.ForeignKey( + Person, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="memory_items", + help_text="Optional person this memory is about for person-centric recall.", + ) memory_kind = models.CharField( max_length=16, choices=MEMORY_KIND_CHOICES, @@ -1145,6 +1153,25 @@ class MemoryItem(models.Model): blank=True, help_text="Structured memory payload (schema can evolve by type).", ) + provenance = models.JSONField( + default=dict, + blank=True, + help_text="Source metadata for this memory (agent/tool/message references).", + ) + confidence_score = models.FloatField( + default=0.5, + help_text="Confidence score for this memory (0.0-1.0).", + ) + expires_at = models.DateTimeField( + null=True, + blank=True, + help_text="Optional expiry timestamp for stale memory decay.", + ) + last_verified_at = models.DateTimeField( + null=True, + blank=True, + help_text="Last operator verification timestamp.", + ) source_request = models.ForeignKey( AIRequest, on_delete=models.SET_NULL, @@ -1161,6 +1188,111 @@ class MemoryItem(models.Model): help_text="Last update timestamp.", ) + class Meta: + indexes = [ + models.Index(fields=["user", "status", "updated_at"]), + models.Index(fields=["user", "person", "status", "updated_at"]), + models.Index(fields=["user", "conversation", "status", "updated_at"]), + ] + + +class MemorySourceReference(models.Model): + memory = models.ForeignKey( + MemoryItem, + on_delete=models.CASCADE, + related_name="source_references", + ) + message_event = models.ForeignKey( + "MessageEvent", + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="memory_source_references", + ) + message = models.ForeignKey( + Message, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="memory_source_references", + ) + source_request = models.ForeignKey( + AIRequest, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="memory_source_references", + ) + source_label = models.CharField(max_length=255, blank=True, default="") + source_uri = models.CharField(max_length=1024, blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + indexes = [ + models.Index(fields=["memory", "created_at"]), + models.Index(fields=["source_uri"]), + ] + + +class MemoryChangeRequest(models.Model): + ACTION_CHOICES = ( + ("create", "Create"), + ("update", "Update"), + ("delete", "Delete"), + ) + STATUS_CHOICES = ( + ("pending", "Pending"), + ("approved", "Approved"), + ("rejected", "Rejected"), + ("applied", "Applied"), + ) + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="memory_change_requests", + ) + memory = models.ForeignKey( + MemoryItem, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="change_requests", + ) + conversation = models.ForeignKey( + WorkspaceConversation, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="memory_change_requests", + ) + person = models.ForeignKey( + Person, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="memory_change_requests", + ) + action = models.CharField(max_length=16, choices=ACTION_CHOICES) + status = models.CharField(max_length=16, choices=STATUS_CHOICES, default="pending") + proposed_memory_kind = models.CharField(max_length=16, blank=True, default="") + proposed_content = models.JSONField(default=dict, blank=True) + proposed_confidence_score = models.FloatField(null=True, blank=True) + proposed_expires_at = models.DateTimeField(null=True, blank=True) + reason = models.TextField(blank=True, default="") + requested_by_identifier = models.CharField(max_length=255, blank=True, default="") + reviewed_by_identifier = models.CharField(max_length=255, blank=True, default="") + reviewed_at = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + indexes = [ + models.Index(fields=["user", "status", "created_at"]), + models.Index(fields=["memory", "created_at"]), + ] + class AIResultSignal(models.Model): """ @@ -2249,6 +2381,117 @@ class DerivedTaskEvent(models.Model): ] +class TaskArtifactLink(models.Model): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + task = models.ForeignKey( + DerivedTask, + on_delete=models.CASCADE, + related_name="artifact_links", + ) + kind = models.CharField(max_length=64, default="note") + uri = models.CharField(max_length=1024, blank=True, default="") + path = models.CharField(max_length=1024, blank=True, default="") + summary = models.TextField(blank=True, default="") + created_by_identifier = models.CharField(max_length=255, blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + indexes = [ + models.Index(fields=["task", "created_at"]), + models.Index(fields=["kind", "created_at"]), + ] + + +class KnowledgeArticle(models.Model): + STATUS_CHOICES = ( + ("draft", "Draft"), + ("published", "Published"), + ("archived", "Archived"), + ) + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="knowledge_articles", + ) + related_task = models.ForeignKey( + DerivedTask, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="knowledge_articles", + ) + title = models.CharField(max_length=255) + slug = models.SlugField(max_length=255) + markdown = models.TextField(blank=True, default="") + tags = models.JSONField(default=list, blank=True) + status = models.CharField(max_length=16, choices=STATUS_CHOICES, default="draft") + owner_identifier = models.CharField(max_length=255, blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["user", "slug"], + name="unique_knowledge_article_slug_per_user", + ), + ] + indexes = [ + models.Index(fields=["user", "status", "updated_at"]), + models.Index(fields=["related_task", "updated_at"]), + ] + + +class KnowledgeRevision(models.Model): + article = models.ForeignKey( + KnowledgeArticle, + on_delete=models.CASCADE, + related_name="revisions", + ) + revision = models.PositiveIntegerField() + author_tool = models.CharField(max_length=255, blank=True, default="") + author_identifier = models.CharField(max_length=255, blank=True, default="") + summary = models.TextField(blank=True, default="") + markdown = models.TextField(blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["article", "revision"], + name="unique_knowledge_revision_per_article", + ) + ] + ordering = ["article", "revision"] + + +class MCPToolAuditLog(models.Model): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + tool_name = models.CharField(max_length=255) + user = models.ForeignKey( + User, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="mcp_tool_audit_logs", + ) + request_args = models.JSONField(default=dict, blank=True) + response_meta = models.JSONField(default=dict, blank=True) + ok = models.BooleanField(default=True) + error = models.TextField(blank=True, default="") + duration_ms = models.PositiveIntegerField(default=0) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + indexes = [ + models.Index(fields=["tool_name", "created_at"]), + models.Index(fields=["user", "created_at"]), + models.Index(fields=["ok", "created_at"]), + ] + + class ExternalSyncEvent(models.Model): STATUS_CHOICES = ( ("pending", "Pending"), diff --git a/core/templates/base.html b/core/templates/base.html index 162ea4c..3ce17ac 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -1,12 +1,13 @@ {% load static %} {% load cache %} +{% load page_title %} - {% block browser_title %}{{ request.resolver_match.url_name|default:request.path_info|cut:"_"|cut:"/"|cut:"-"|upper|slice:":3" }}{% endblock %} + {% block browser_title %}{% firstof page_browser_title page_title as explicit_title %}{% if explicit_title %}{{ explicit_title }} · GIA{% else %}{% with route_value=request.resolver_match.url_name|default:request.path_info|humanize_route %}{% if route_value %}{{ route_value }} · GIA{% else %}GIA{% endif %}{% endwith %}{% endif %}{% endblock %} diff --git a/core/templates/pages/ai-execution-log.html b/core/templates/pages/ai-execution-log.html index c24654a..83e29ca 100644 --- a/core/templates/pages/ai-execution-log.html +++ b/core/templates/pages/ai-execution-log.html @@ -1,117 +1,208 @@ {% extends "base.html" %} {% block content %} -
-

AI Execution Log

-

Tracked model calls and usage metrics for this account.

- -
-
-

Total Runs

{{ stats.total_runs }}

-

OK

{{ stats.total_ok }}

-

Failed

{{ stats.total_failed }}

-

Success Rate

{{ stats.success_rate }}%

-

24h Runs

{{ stats.last_24h_runs }}

-

24h Failed

{{ stats.last_24h_failed }}

-

7d Runs

{{ stats.last_7d_runs }}

-

Avg Duration

{{ stats.avg_duration_ms }}ms

-

Prompt Chars

{{ stats.total_prompt_chars }}

-

Response Chars

{{ stats.total_response_chars }}

-

Avg Prompt

{{ stats.avg_prompt_chars }}

-

Avg Response

{{ stats.avg_response_chars }}

+
+
+
+
+

AI Execution Log

+

Tracked model calls and usage metrics for this account.

+
+
+
+
+ {% if stats.total_runs %} + Tracking Active + {% else %} + No Runs Yet + {% endif %} +
+
+
+ +
+

Execution health at a glance

+
+ Total {{ stats.total_runs }} + OK {{ stats.total_ok }} + Failed {{ stats.total_failed }} + 24h {{ stats.last_24h_runs }} + 24h Failed {{ stats.last_24h_failed }} + 7d {{ stats.last_7d_runs }} +
+

Success Rate

+ {{ stats.success_rate }}%
-
-
-
-

By Operation

- - - - - - {% for row in operation_breakdown %} - - - - - - - {% empty %} - - {% endfor %} - -
OperationTotalOKFailed
{{ row.operation|default:"(none)" }}{{ row.total }}{{ row.ok }}{{ row.failed }}
No runs yet.
+
+
+
+
+

Reliability

+
+
+ + + + + + + +
Total Runs{{ stats.total_runs }}
OK{{ stats.total_ok }}
Failed{{ stats.total_failed }}
Success Rate{{ stats.success_rate }}%
+
-
-
-

By Model

- - - - - - {% for row in model_breakdown %} - - - - - - - {% empty %} - - {% endfor %} - -
ModelTotalOKFailed
{{ row.model|default:"(none)" }}{{ row.total }}{{ row.ok }}{{ row.failed }}
No runs yet.
+
+
+
+

Throughput

+
+
+ + + + + + + +
Runs (24h){{ stats.last_24h_runs }}
Failed (24h){{ stats.last_24h_failed }}
Runs (7d){{ stats.last_7d_runs }}
Avg Duration{{ stats.avg_duration_ms }}ms
+
+
+
+
+
+
+

Token Proxy (Chars)

+
+
+ + + + + + + +
Total Prompt{{ stats.total_prompt_chars }}
Total Response{{ stats.total_response_chars }}
Avg Prompt{{ stats.avg_prompt_chars }}
Avg Response{{ stats.avg_response_chars }}
+
-
-

Recent Runs

-
- - - - - - - - - - - - - - - - {% for run in runs %} +
+
+
+
+

By Operation

+
+
+
+
StartedStatusOperationModelMessagesPromptResponseDurationError
+ + + + + {% for row in operation_breakdown %} + + + + + + + {% empty %} + + {% endfor %} + +
OperationTotalOKFailed
{{ row.operation|default:"(none)" }}{{ row.total }}{{ row.ok }}{{ row.failed }}
No runs yet.
+
+
+
+
+
+
+
+

By Model

+
+
+
+ + + + + + {% for row in model_breakdown %} + + + + + + + {% empty %} + + {% endfor %} + +
ModelTotalOKFailed
{{ row.model|default:"(none)" }}{{ row.total }}{{ row.ok }}{{ row.failed }}
No runs yet.
+
+
+
+
+
+ +
+
+

Recent Runs

+
+
+
+ + - - - - - - - - - + + + + + + + + + - {% empty %} - - {% endfor %} - -
{{ run.started_at }}{{ run.status }}{{ run.operation|default:"-" }}{{ run.model|default:"-" }}{{ run.message_count }}{{ run.prompt_chars }}{{ run.response_chars }}{% if run.duration_ms %}{{ run.duration_ms }}ms{% else %}-{% endif %}{{ run.error|default:"-" }}StartedStatusOperationModelMessagesPromptResponseDurationError
No runs yet.
+ + + {% for run in runs %} + + {{ run.started_at }} + + {% if run.status == "ok" %} + ok + {% elif run.status == "failed" %} + failed + {% else %} + {{ run.status }} + {% endif %} + + {{ run.operation|default:"-" }} + {{ run.model|default:"-" }} + {{ run.message_count }} + {{ run.prompt_chars }} + {{ run.response_chars }} + {% if run.duration_ms %}{{ run.duration_ms }}ms{% else %}-{% endif %} + + {% if run.error %} + {{ run.error|truncatechars:120 }} + {% else %} + - + {% endif %} + + + {% empty %} + No runs yet. + {% endfor %} + + +
diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index 58b0bfb..1142d65 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -381,6 +381,47 @@ {% endfor %}
+
+ {% if availability_summary %} + + {{ availability_summary.state_label }} + + {{ availability_summary.service|upper|default:"-" }} + {% if availability_summary.ts_label %} + Updated {{ availability_summary.ts_label }} + {% endif %} + {% if availability_summary.is_cross_service %} + Cross-service fallback + {% endif %} + {% endif %} +
+ +
+ + +
+ +
{% for msg in serialized_messages %} -
+
{% if msg.gap_fragments %} {% with gap=msg.gap_fragments.0 %} {% endif %} - {% if service == "signal" or service == "whatsapp" %} + {% if msg.edit_count %} +
+ Edited {{ msg.edit_count }} time{% if msg.edit_count != 1 %}s{% endif %} +
    + {% for edit in msg.edit_history %} +
  • + {% if edit.edited_display %}{{ edit.edited_display }}{% else %}Unknown time{% endif %} + {% if edit.actor %} · {{ edit.actor }}{% endif %} + {% if edit.source_service %} · {{ edit.source_service|upper }}{% endif %} +
    + {{ edit.previous_text|default:"(empty)" }} + + {{ edit.new_text|default:"(empty)" }} +
    +
  • + {% endfor %} +
+
+ {% endif %} + {% if capability_reactions %}
@@ -495,6 +557,12 @@ {% endif %}

{{ msg.display_ts }}{% if msg.author %} · {{ msg.author }}{% endif %} + {% if msg.is_edited %} + edited + {% endif %} + {% if msg.is_deleted %} + deleted + {% endif %} {% if msg.read_ts %}

+ {% if not capability_send %} +

Send disabled: {{ capability_send_reason }}

+ {% endif %}