Coverage for src \ truenex_memory \ mcp \ server.py: 69%
162 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""MCP stdio server for Truenex Memory.
3The stdio transport uses newline-delimited JSON-RPC messages. This module keeps
4the implementation dependency-free so local tests can exercise the MCP contract
5without installing the optional Python MCP SDK.
6"""
8from __future__ import annotations
10import json
11from pathlib import Path
12import sys
13from typing import Any
15from truenex_memory import __version__
16from truenex_memory.mcp.tools import (
17 global_project_context, global_status, memory_add, memory_search,
18 task_open, task_step_add, task_close,
19)
22SUPPORTED_PROTOCOL_VERSIONS = (
23 "2025-11-25",
24 "2025-06-18",
25 "2025-03-26",
26 "2024-11-05",
27)
28DEFAULT_PROTOCOL_VERSION = SUPPORTED_PROTOCOL_VERSIONS[0]
30JSONRPC_VERSION = "2.0"
33def run_stdio_server(project_root: Path | str = ".") -> None:
34 """Run the MCP stdio server."""
36 root = Path(project_root).resolve()
37 for line in sys.stdin:
38 line = line.strip()
39 if not line:
40 continue
41 response = handle_jsonrpc_line(line, project_root=root)
42 if response is None:
43 continue
44 print(json.dumps(response, separators=(",", ":"), sort_keys=True), flush=True)
47def handle_jsonrpc_line(line: str, *, project_root: Path | str = ".") -> dict[str, Any] | list[dict[str, Any]] | None:
48 """Handle one newline-delimited JSON-RPC message or batch."""
50 try:
51 message = json.loads(line)
52 except json.JSONDecodeError as exc:
53 return _error_response(None, -32700, "Parse error", str(exc))
55 if isinstance(message, list):
56 responses = [
57 response
58 for item in message
59 if (response := handle_jsonrpc_message(item, project_root=project_root)) is not None
60 ]
61 return responses or None
62 return handle_jsonrpc_message(message, project_root=project_root)
65def handle_jsonrpc_message(message: object, *, project_root: Path | str = ".") -> dict[str, Any] | None:
66 """Handle one JSON-RPC request or notification."""
68 if not isinstance(message, dict):
69 return _error_response(None, -32600, "Invalid Request", "message must be an object")
70 request_id = message.get("id")
71 method = message.get("method")
72 if message.get("jsonrpc") != JSONRPC_VERSION or not isinstance(method, str):
73 return _error_response(request_id, -32600, "Invalid Request", "expected JSON-RPC 2.0 request")
75 is_notification = "id" not in message
76 params = message.get("params", {})
77 if params is None:
78 params = {}
79 if not isinstance(params, dict):
80 return None if is_notification else _error_response(request_id, -32602, "Invalid params")
82 try:
83 result = _dispatch(method, params, project_root=Path(project_root))
84 except KeyError as exc:
85 return None if is_notification else _error_response(request_id, -32601, "Method not found", str(exc))
86 except ValueError as exc:
87 return None if is_notification else _error_response(request_id, -32602, "Invalid params", str(exc))
88 except Exception as exc: # pragma: no cover - defensive protocol boundary
89 return None if is_notification else _error_response(request_id, -32603, "Internal error", str(exc))
91 if is_notification:
92 return None
93 return {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result}
96def _dispatch(method: str, params: dict[str, Any], *, project_root: Path) -> dict[str, Any]:
97 if method == "initialize":
98 return _initialize(params)
99 if method == "ping":
100 return {}
101 if method == "tools/list":
102 return {"tools": _tool_definitions()}
103 if method == "tools/call":
104 return _call_tool(params, project_root=project_root)
105 if method.startswith("notifications/"):
106 return {}
107 raise KeyError(method)
110def _initialize(params: dict[str, Any]) -> dict[str, Any]:
111 requested = params.get("protocolVersion")
112 protocol_version = requested if requested in SUPPORTED_PROTOCOL_VERSIONS else DEFAULT_PROTOCOL_VERSION
113 return {
114 "protocolVersion": protocol_version,
115 "capabilities": {"tools": {"listChanged": False}},
116 "serverInfo": {"name": "truenex-memory", "version": __version__},
117 }
120def _tool_definitions() -> list[dict[str, Any]]:
121 """Return the explicit stdio MCP tool surface.
123 This is intentionally smaller than the local Python registry in
124 `truenex_memory.mcp`: agent-facing stdio tools are limited to memory and
125 read-only global bootstrap operations.
126 """
127 return [
128 {
129 "name": "memory_search",
130 "description": "Search local Truenex Memory for project context and source-backed decisions.",
131 "inputSchema": {
132 "type": "object",
133 "properties": {
134 "query": {
135 "type": "string",
136 "description": "Natural-language query for project memory.",
137 },
138 "top_k": {
139 "type": "integer",
140 "description": "Maximum number of results to return.",
141 "minimum": 1,
142 "maximum": 50,
143 "default": 5,
144 },
145 },
146 "required": ["query"],
147 "additionalProperties": False,
148 },
149 },
150 {
151 "name": "memory_add",
152 "description": "Add a local memory note or decision for the current project.",
153 "inputSchema": {
154 "type": "object",
155 "properties": {
156 "content": {
157 "type": "string",
158 "description": "Memory content to store locally.",
159 },
160 "memory_type": {
161 "type": "string",
162 "description": "Memory type, such as note or decision.",
163 "default": "note",
164 },
165 },
166 "required": ["content"],
167 "additionalProperties": False,
168 },
169 },
170 {
171 "name": "global_status",
172 "description": "Read-only report of the Truenex Memory global store status (catalog, database, ledger, indexed, problems).",
173 "inputSchema": {
174 "type": "object",
175 "properties": {
176 "home": {
177 "type": "string",
178 "description": "Override the global home directory (default: user home).",
179 },
180 "catalog": {
181 "type": "string",
182 "description": "Override path to sources.json catalog.",
183 },
184 "db": {
185 "type": "string",
186 "description": "Override path to SQLite global database.",
187 },
188 },
189 "additionalProperties": False,
190 },
191 },
192 {
193 "name": "global_project_context",
194 "description": "Read-only project context report from the Truenex Memory global store (catalog roots, ledger, indexed documents/chunks). Server aliases are hints only; no SSH/network/DB execution.",
195 "inputSchema": {
196 "type": "object",
197 "properties": {
198 "project": {
199 "type": "string",
200 "description": "Project name, path, or alias to resolve.",
201 },
202 "home": {
203 "type": "string",
204 "description": "Override the global home directory (default: user home).",
205 },
206 "catalog": {
207 "type": "string",
208 "description": "Override path to sources.json catalog.",
209 },
210 "db": {
211 "type": "string",
212 "description": "Override path to SQLite global database.",
213 },
214 "limit": {
215 "type": "integer",
216 "description": "Maximum indexed documents/chunks to return.",
217 "minimum": 1,
218 "maximum": 100,
219 "default": 20,
220 },
221 },
222 "required": ["project"],
223 "additionalProperties": False,
224 },
225 },
226 {
227 "name": "task_open",
228 "description": "Open a new adaptive task record. Call at the start of every agent session to enable outcome tracking.",
229 "inputSchema": {
230 "type": "object",
231 "properties": {
232 "title": {"type": "string", "description": "Short description of the task."},
233 "task_type": {"type": "string", "enum": ["bugfix", "feature", "refactor", "review", "query"], "default": "feature"},
234 "project": {"type": "string", "description": "Project name (e.g. AI_Agent)."},
235 "agent_session_id": {"type": "string", "description": "Current agent session ID."},
236 },
237 "required": ["title"],
238 "additionalProperties": False,
239 },
240 },
241 {
242 "name": "task_step_add",
243 "description": "Record a step taken within the current task (prompt, output, brain judgment, token usage).",
244 "inputSchema": {
245 "type": "object",
246 "properties": {
247 "task_id": {"type": "string", "description": "Task ID returned by task_open."},
248 "prompt_used": {"type": "string"},
249 "output": {"type": "string"},
250 "brain_judgment": {"type": "string", "enum": ["ok", "needs_revision", "rejected"]},
251 "tokens_used": {"type": "integer"},
252 "duration_s": {"type": "number"},
253 "model_used": {"type": "string"},
254 },
255 "required": ["task_id"],
256 "additionalProperties": False,
257 },
258 },
259 {
260 "name": "task_close",
261 "description": "Close the current task. Ask the human for a quality judgment (1=positive, 0=partial, -1=negative) before calling. Omit human_outcome if the human did not respond.",
262 "inputSchema": {
263 "type": "object",
264 "properties": {
265 "task_id": {"type": "string"},
266 "human_outcome": {"type": "integer", "enum": [1, 0, -1]},
267 "human_comment": {"type": "string"},
268 },
269 "required": ["task_id"],
270 "additionalProperties": False,
271 },
272 },
273 ]
276def _call_tool(params: dict[str, Any], *, project_root: Path) -> dict[str, Any]:
277 name = params.get("name")
278 arguments = params.get("arguments", {})
279 if not isinstance(name, str) or not name:
280 raise ValueError("tool name is required")
281 if arguments is None:
282 arguments = {}
283 if not isinstance(arguments, dict):
284 raise ValueError("tool arguments must be an object")
286 try:
287 if name == "memory_search":
288 payload = _call_memory_search(arguments, project_root=project_root)
289 elif name == "memory_add":
290 payload = _call_memory_add(arguments, project_root=project_root)
291 elif name == "global_status":
292 payload = _call_global_status(arguments)
293 elif name == "global_project_context":
294 payload = _call_global_project_context(arguments)
295 elif name == "task_open":
296 payload = _call_task_open(arguments)
297 elif name == "task_step_add":
298 payload = _call_task_step_add(arguments)
299 elif name == "task_close":
300 payload = _call_task_close(arguments)
301 else:
302 raise ValueError(f"unknown tool: {name}")
303 return _tool_result(payload, is_error=False)
304 except Exception as exc:
305 return _tool_result({"error": str(exc)}, is_error=True)
308def _call_memory_search(arguments: dict[str, Any], *, project_root: Path) -> dict[str, object]:
309 query = arguments.get("query")
310 top_k = arguments.get("top_k", 5)
311 if not isinstance(query, str) or not query.strip():
312 raise ValueError("query must be a non-empty string")
313 if not isinstance(top_k, int):
314 raise ValueError("top_k must be an integer")
315 if top_k < 1 or top_k > 50:
316 raise ValueError("top_k must be between 1 and 50")
317 return memory_search(query, top_k=top_k, project_root=project_root)
320def _call_memory_add(arguments: dict[str, Any], *, project_root: Path) -> dict[str, object]:
321 content = arguments.get("content")
322 memory_type = arguments.get("memory_type", "note")
323 if not isinstance(content, str) or not content.strip():
324 raise ValueError("content must be a non-empty string")
325 if not isinstance(memory_type, str) or not memory_type.strip():
326 raise ValueError("memory_type must be a non-empty string")
327 return memory_add(content, memory_type=memory_type, project_root=project_root)
330def _call_global_status(arguments: dict[str, Any]) -> dict[str, object]:
331 home = arguments.get("home")
332 catalog = arguments.get("catalog")
333 db = arguments.get("db")
335 for name, val in (("home", home), ("catalog", catalog), ("db", db)):
336 if val is not None and not isinstance(val, str):
337 raise ValueError(f"{name} must be a string")
339 return global_status(home=home, catalog=catalog, db=db)
342def _call_global_project_context(arguments: dict[str, Any]) -> dict[str, object]:
343 project = arguments.get("project")
344 home = arguments.get("home")
345 catalog = arguments.get("catalog")
346 db = arguments.get("db")
347 limit = arguments.get("limit", 20)
349 if not isinstance(project, str) or not project.strip():
350 raise ValueError("project must be a non-empty string")
351 for name, val in (("home", home), ("catalog", catalog), ("db", db)):
352 if val is not None and not isinstance(val, str):
353 raise ValueError(f"{name} must be a string")
354 if not isinstance(limit, int) or limit < 1 or limit > 100:
355 raise ValueError("limit must be an integer between 1 and 100")
357 return global_project_context(
358 project=project,
359 home=home,
360 catalog=catalog,
361 db=db,
362 limit=limit,
363 )
366def _call_task_open(arguments: dict[str, Any]) -> dict[str, object]:
367 title = arguments.get("title")
368 if not isinstance(title, str) or not title.strip():
369 raise ValueError("title must be a non-empty string")
370 return task_open(
371 title,
372 arguments.get("task_type", "feature"),
373 project=arguments.get("project"),
374 agent_session_id=arguments.get("agent_session_id"),
375 )
378def _call_task_step_add(arguments: dict[str, Any]) -> dict[str, object]:
379 task_id = arguments.get("task_id")
380 if not isinstance(task_id, str) or not task_id.strip():
381 raise ValueError("task_id must be a non-empty string")
382 return task_step_add(
383 task_id,
384 prompt_used=arguments.get("prompt_used"),
385 output=arguments.get("output"),
386 brain_judgment=arguments.get("brain_judgment"),
387 tokens_used=arguments.get("tokens_used"),
388 duration_s=arguments.get("duration_s"),
389 model_used=arguments.get("model_used"),
390 )
393def _call_task_close(arguments: dict[str, Any]) -> dict[str, object]:
394 task_id = arguments.get("task_id")
395 if not isinstance(task_id, str) or not task_id.strip():
396 raise ValueError("task_id must be a non-empty string")
397 human_outcome = arguments.get("human_outcome")
398 if human_outcome is not None and human_outcome not in (1, 0, -1):
399 raise ValueError("human_outcome must be 1, 0, or -1")
400 return task_close(task_id, human_outcome=human_outcome, human_comment=arguments.get("human_comment"))
403def _tool_result(payload: dict[str, object], *, is_error: bool) -> dict[str, Any]:
404 return {
405 "content": [{"type": "text", "text": json.dumps(payload, indent=2, sort_keys=True)}],
406 "isError": is_error,
407 }
410def _error_response(
411 request_id: object,
412 code: int,
413 message: str,
414 data: object | None = None,
415) -> dict[str, Any]:
416 error: dict[str, Any] = {"code": code, "message": message}
417 if data is not None:
418 error["data"] = data
419 return {"jsonrpc": JSONRPC_VERSION, "id": request_id, "error": error}