from __future__ import annotations import json import os import sys from typing import Any import django from core.mcp.tools import execute_tool, format_tool_content, tool_specs from core.util import logs log = logs.get_logger("mcp-server") _compat_newline_mode = False def _setup_django() -> None: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings") django.setup() def _response(msg_id: Any, result: dict[str, Any]) -> dict[str, Any]: return {"jsonrpc": "2.0", "id": msg_id, "result": result} def _error(msg_id: Any, code: int, message: str) -> dict[str, Any]: return {"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}} def _read_message() -> dict[str, Any] | None: global _compat_newline_mode headers: dict[str, str] = {} pending_body = b"" while True: line = sys.stdin.buffer.readline() if not line: return None if not headers and line.lstrip().startswith((b"{", b"[")): _compat_newline_mode = True return json.loads(line.decode("utf-8").strip()) sep = None if b"\r\n\r\n" in line: sep = b"\r\n\r\n" elif b"\n\n" in line: sep = b"\n\n" if sep is not None: header_line, tail = line.split(sep, 1) pending_body = tail else: header_line = line if header_line in (b"\r\n", b"\n"): break decoded = header_line.decode("utf-8").strip() if ":" in decoded: key, value = decoded.split(":", 1) headers[key.strip().lower()] = value.strip() if sep is not None: break length_raw = headers.get("content-length") if not length_raw: if not pending_body: pending_body = sys.stdin.buffer.readline() if not pending_body: return None _compat_newline_mode = True return json.loads(pending_body.decode("utf-8").strip()) length = int(length_raw) body = pending_body if len(body) < length: body += sys.stdin.buffer.read(length - len(body)) body = body[:length] if not body: return None return json.loads(body.decode("utf-8")) def _write_message(payload: dict[str, Any]) -> None: raw_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) if _compat_newline_mode: sys.stdout.buffer.write((raw_json + "\n").encode("utf-8")) else: raw = raw_json.encode("utf-8") sys.stdout.buffer.write(f"Content-Length: {len(raw)}\r\n\r\n".encode("utf-8")) sys.stdout.buffer.write(raw) sys.stdout.buffer.flush() def _handle_message(message: dict[str, Any]) -> dict[str, Any] | None: msg_id = message.get("id") method = str(message.get("method") or "") params = message.get("params") or {} if method == "notifications/initialized": return None if method == "initialize": return _response( msg_id, { "protocolVersion": "2025-06-18", "serverInfo": {"name": "gia-manticore-mcp", "version": "0.1.0"}, "capabilities": {"tools": {}}, }, ) if method == "ping": return _response(msg_id, {}) if method == "tools/list": return _response(msg_id, {"tools": tool_specs()}) if method == "tools/call": name = str(params.get("name") or "").strip() arguments = params.get("arguments") or {} try: payload = execute_tool(name, arguments) return _response(msg_id, format_tool_content(payload)) except Exception as exc: log.warning("tool call failed name=%s err=%s", name, exc) return _response( msg_id, { "isError": True, "content": [{"type": "text", "text": json.dumps({"error": str(exc)})}], }, ) return _error(msg_id, -32601, f"Method not found: {method}") def run_stdio_server() -> None: _setup_django() while True: message = _read_message() if message is None: return try: response = _handle_message(message) if response is not None: _write_message(response) except Exception as exc: msg_id = message.get("id") _write_message(_error(msg_id, -32000, str(exc))) if __name__ == "__main__": run_stdio_server()