Files
GIA/core/mcp/server.py

152 lines
4.5 KiB
Python

from __future__ import annotations
import json
import os
import sys
from typing import Any
import django
from core.mcp.tools import execute_tool, format_tool_content, tool_specs
from core.util import logs
log = logs.get_logger("mcp-server")
_compat_newline_mode = False
def _setup_django() -> None:
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings")
django.setup()
def _response(msg_id: Any, result: dict[str, Any]) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": msg_id, "result": result}
def _error(msg_id: Any, code: int, message: str) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}}
def _read_message() -> dict[str, Any] | None:
global _compat_newline_mode
headers: dict[str, str] = {}
pending_body = b""
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
if not headers and line.lstrip().startswith((b"{", b"[")):
_compat_newline_mode = True
return json.loads(line.decode("utf-8").strip())
sep = None
if b"\r\n\r\n" in line:
sep = b"\r\n\r\n"
elif b"\n\n" in line:
sep = b"\n\n"
if sep is not None:
header_line, tail = line.split(sep, 1)
pending_body = tail
else:
header_line = line
if header_line in (b"\r\n", b"\n"):
break
decoded = header_line.decode("utf-8").strip()
if ":" in decoded:
key, value = decoded.split(":", 1)
headers[key.strip().lower()] = value.strip()
if sep is not None:
break
length_raw = headers.get("content-length")
if not length_raw:
if not pending_body:
pending_body = sys.stdin.buffer.readline()
if not pending_body:
return None
_compat_newline_mode = True
return json.loads(pending_body.decode("utf-8").strip())
length = int(length_raw)
body = pending_body
if len(body) < length:
body += sys.stdin.buffer.read(length - len(body))
body = body[:length]
if not body:
return None
return json.loads(body.decode("utf-8"))
def _write_message(payload: dict[str, Any]) -> None:
raw_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
if _compat_newline_mode:
sys.stdout.buffer.write((raw_json + "\n").encode("utf-8"))
else:
raw = raw_json.encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {len(raw)}\r\n\r\n".encode("utf-8"))
sys.stdout.buffer.write(raw)
sys.stdout.buffer.flush()
def _handle_message(message: dict[str, Any]) -> dict[str, Any] | None:
msg_id = message.get("id")
method = str(message.get("method") or "")
params = message.get("params") or {}
if method == "notifications/initialized":
return None
if method == "initialize":
return _response(
msg_id,
{
"protocolVersion": "2025-06-18",
"serverInfo": {"name": "gia-manticore-mcp", "version": "0.1.0"},
"capabilities": {"tools": {}},
},
)
if method == "ping":
return _response(msg_id, {})
if method == "tools/list":
return _response(msg_id, {"tools": tool_specs()})
if method == "tools/call":
name = str(params.get("name") or "").strip()
arguments = params.get("arguments") or {}
try:
payload = execute_tool(name, arguments)
return _response(msg_id, format_tool_content(payload))
except Exception as exc:
log.warning("tool call failed name=%s err=%s", name, exc)
return _response(
msg_id,
{
"isError": True,
"content": [
{"type": "text", "text": json.dumps({"error": str(exc)})}
],
},
)
return _error(msg_id, -32601, f"Method not found: {method}")
def run_stdio_server() -> None:
_setup_django()
while True:
message = _read_message()
if message is None:
return
try:
response = _handle_message(message)
if response is not None:
_write_message(response)
except Exception as exc:
msg_id = message.get("id")
_write_message(_error(msg_id, -32000, str(exc)))
if __name__ == "__main__":
run_stdio_server()