Coverage for src \ truenex_memory \ mcp \ server.py: 69%

162 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""MCP stdio server for Truenex Memory. 

2 

3The stdio transport uses newline-delimited JSON-RPC messages. This module keeps 

4the implementation dependency-free so local tests can exercise the MCP contract 

5without installing the optional Python MCP SDK. 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11from pathlib import Path 

12import sys 

13from typing import Any 

14 

15from truenex_memory import __version__ 

16from truenex_memory.mcp.tools import ( 

17 global_project_context, global_status, memory_add, memory_search, 

18 task_open, task_step_add, task_close, 

19) 

20 

21 

22SUPPORTED_PROTOCOL_VERSIONS = ( 

23 "2025-11-25", 

24 "2025-06-18", 

25 "2025-03-26", 

26 "2024-11-05", 

27) 

28DEFAULT_PROTOCOL_VERSION = SUPPORTED_PROTOCOL_VERSIONS[0] 

29 

30JSONRPC_VERSION = "2.0" 

31 

32 

33def run_stdio_server(project_root: Path | str = ".") -> None: 

34 """Run the MCP stdio server.""" 

35 

36 root = Path(project_root).resolve() 

37 for line in sys.stdin: 

38 line = line.strip() 

39 if not line: 

40 continue 

41 response = handle_jsonrpc_line(line, project_root=root) 

42 if response is None: 

43 continue 

44 print(json.dumps(response, separators=(",", ":"), sort_keys=True), flush=True) 

45 

46 

47def handle_jsonrpc_line(line: str, *, project_root: Path | str = ".") -> dict[str, Any] | list[dict[str, Any]] | None: 

48 """Handle one newline-delimited JSON-RPC message or batch.""" 

49 

50 try: 

51 message = json.loads(line) 

52 except json.JSONDecodeError as exc: 

53 return _error_response(None, -32700, "Parse error", str(exc)) 

54 

55 if isinstance(message, list): 

56 responses = [ 

57 response 

58 for item in message 

59 if (response := handle_jsonrpc_message(item, project_root=project_root)) is not None 

60 ] 

61 return responses or None 

62 return handle_jsonrpc_message(message, project_root=project_root) 

63 

64 

65def handle_jsonrpc_message(message: object, *, project_root: Path | str = ".") -> dict[str, Any] | None: 

66 """Handle one JSON-RPC request or notification.""" 

67 

68 if not isinstance(message, dict): 

69 return _error_response(None, -32600, "Invalid Request", "message must be an object") 

70 request_id = message.get("id") 

71 method = message.get("method") 

72 if message.get("jsonrpc") != JSONRPC_VERSION or not isinstance(method, str): 

73 return _error_response(request_id, -32600, "Invalid Request", "expected JSON-RPC 2.0 request") 

74 

75 is_notification = "id" not in message 

76 params = message.get("params", {}) 

77 if params is None: 

78 params = {} 

79 if not isinstance(params, dict): 

80 return None if is_notification else _error_response(request_id, -32602, "Invalid params") 

81 

82 try: 

83 result = _dispatch(method, params, project_root=Path(project_root)) 

84 except KeyError as exc: 

85 return None if is_notification else _error_response(request_id, -32601, "Method not found", str(exc)) 

86 except ValueError as exc: 

87 return None if is_notification else _error_response(request_id, -32602, "Invalid params", str(exc)) 

88 except Exception as exc: # pragma: no cover - defensive protocol boundary 

89 return None if is_notification else _error_response(request_id, -32603, "Internal error", str(exc)) 

90 

91 if is_notification: 

92 return None 

93 return {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result} 

94 

95 

96def _dispatch(method: str, params: dict[str, Any], *, project_root: Path) -> dict[str, Any]: 

97 if method == "initialize": 

98 return _initialize(params) 

99 if method == "ping": 

100 return {} 

101 if method == "tools/list": 

102 return {"tools": _tool_definitions()} 

103 if method == "tools/call": 

104 return _call_tool(params, project_root=project_root) 

105 if method.startswith("notifications/"): 

106 return {} 

107 raise KeyError(method) 

108 

109 

110def _initialize(params: dict[str, Any]) -> dict[str, Any]: 

111 requested = params.get("protocolVersion") 

112 protocol_version = requested if requested in SUPPORTED_PROTOCOL_VERSIONS else DEFAULT_PROTOCOL_VERSION 

113 return { 

114 "protocolVersion": protocol_version, 

115 "capabilities": {"tools": {"listChanged": False}}, 

116 "serverInfo": {"name": "truenex-memory", "version": __version__}, 

117 } 

118 

119 

120def _tool_definitions() -> list[dict[str, Any]]: 

121 """Return the explicit stdio MCP tool surface. 

122 

123 This is intentionally smaller than the local Python registry in 

124 `truenex_memory.mcp`: agent-facing stdio tools are limited to memory and 

125 read-only global bootstrap operations. 

126 """ 

127 return [ 

128 { 

129 "name": "memory_search", 

130 "description": "Search local Truenex Memory for project context and source-backed decisions.", 

131 "inputSchema": { 

132 "type": "object", 

133 "properties": { 

134 "query": { 

135 "type": "string", 

136 "description": "Natural-language query for project memory.", 

137 }, 

138 "top_k": { 

139 "type": "integer", 

140 "description": "Maximum number of results to return.", 

141 "minimum": 1, 

142 "maximum": 50, 

143 "default": 5, 

144 }, 

145 }, 

146 "required": ["query"], 

147 "additionalProperties": False, 

148 }, 

149 }, 

150 { 

151 "name": "memory_add", 

152 "description": "Add a local memory note or decision for the current project.", 

153 "inputSchema": { 

154 "type": "object", 

155 "properties": { 

156 "content": { 

157 "type": "string", 

158 "description": "Memory content to store locally.", 

159 }, 

160 "memory_type": { 

161 "type": "string", 

162 "description": "Memory type, such as note or decision.", 

163 "default": "note", 

164 }, 

165 }, 

166 "required": ["content"], 

167 "additionalProperties": False, 

168 }, 

169 }, 

170 { 

171 "name": "global_status", 

172 "description": "Read-only report of the Truenex Memory global store status (catalog, database, ledger, indexed, problems).", 

173 "inputSchema": { 

174 "type": "object", 

175 "properties": { 

176 "home": { 

177 "type": "string", 

178 "description": "Override the global home directory (default: user home).", 

179 }, 

180 "catalog": { 

181 "type": "string", 

182 "description": "Override path to sources.json catalog.", 

183 }, 

184 "db": { 

185 "type": "string", 

186 "description": "Override path to SQLite global database.", 

187 }, 

188 }, 

189 "additionalProperties": False, 

190 }, 

191 }, 

192 { 

193 "name": "global_project_context", 

194 "description": "Read-only project context report from the Truenex Memory global store (catalog roots, ledger, indexed documents/chunks). Server aliases are hints only; no SSH/network/DB execution.", 

195 "inputSchema": { 

196 "type": "object", 

197 "properties": { 

198 "project": { 

199 "type": "string", 

200 "description": "Project name, path, or alias to resolve.", 

201 }, 

202 "home": { 

203 "type": "string", 

204 "description": "Override the global home directory (default: user home).", 

205 }, 

206 "catalog": { 

207 "type": "string", 

208 "description": "Override path to sources.json catalog.", 

209 }, 

210 "db": { 

211 "type": "string", 

212 "description": "Override path to SQLite global database.", 

213 }, 

214 "limit": { 

215 "type": "integer", 

216 "description": "Maximum indexed documents/chunks to return.", 

217 "minimum": 1, 

218 "maximum": 100, 

219 "default": 20, 

220 }, 

221 }, 

222 "required": ["project"], 

223 "additionalProperties": False, 

224 }, 

225 }, 

226 { 

227 "name": "task_open", 

228 "description": "Open a new adaptive task record. Call at the start of every agent session to enable outcome tracking.", 

229 "inputSchema": { 

230 "type": "object", 

231 "properties": { 

232 "title": {"type": "string", "description": "Short description of the task."}, 

233 "task_type": {"type": "string", "enum": ["bugfix", "feature", "refactor", "review", "query"], "default": "feature"}, 

234 "project": {"type": "string", "description": "Project name (e.g. AI_Agent)."}, 

235 "agent_session_id": {"type": "string", "description": "Current agent session ID."}, 

236 }, 

237 "required": ["title"], 

238 "additionalProperties": False, 

239 }, 

240 }, 

241 { 

242 "name": "task_step_add", 

243 "description": "Record a step taken within the current task (prompt, output, brain judgment, token usage).", 

244 "inputSchema": { 

245 "type": "object", 

246 "properties": { 

247 "task_id": {"type": "string", "description": "Task ID returned by task_open."}, 

248 "prompt_used": {"type": "string"}, 

249 "output": {"type": "string"}, 

250 "brain_judgment": {"type": "string", "enum": ["ok", "needs_revision", "rejected"]}, 

251 "tokens_used": {"type": "integer"}, 

252 "duration_s": {"type": "number"}, 

253 "model_used": {"type": "string"}, 

254 }, 

255 "required": ["task_id"], 

256 "additionalProperties": False, 

257 }, 

258 }, 

259 { 

260 "name": "task_close", 

261 "description": "Close the current task. Ask the human for a quality judgment (1=positive, 0=partial, -1=negative) before calling. Omit human_outcome if the human did not respond.", 

262 "inputSchema": { 

263 "type": "object", 

264 "properties": { 

265 "task_id": {"type": "string"}, 

266 "human_outcome": {"type": "integer", "enum": [1, 0, -1]}, 

267 "human_comment": {"type": "string"}, 

268 }, 

269 "required": ["task_id"], 

270 "additionalProperties": False, 

271 }, 

272 }, 

273 ] 

274 

275 

276def _call_tool(params: dict[str, Any], *, project_root: Path) -> dict[str, Any]: 

277 name = params.get("name") 

278 arguments = params.get("arguments", {}) 

279 if not isinstance(name, str) or not name: 

280 raise ValueError("tool name is required") 

281 if arguments is None: 

282 arguments = {} 

283 if not isinstance(arguments, dict): 

284 raise ValueError("tool arguments must be an object") 

285 

286 try: 

287 if name == "memory_search": 

288 payload = _call_memory_search(arguments, project_root=project_root) 

289 elif name == "memory_add": 

290 payload = _call_memory_add(arguments, project_root=project_root) 

291 elif name == "global_status": 

292 payload = _call_global_status(arguments) 

293 elif name == "global_project_context": 

294 payload = _call_global_project_context(arguments) 

295 elif name == "task_open": 

296 payload = _call_task_open(arguments) 

297 elif name == "task_step_add": 

298 payload = _call_task_step_add(arguments) 

299 elif name == "task_close": 

300 payload = _call_task_close(arguments) 

301 else: 

302 raise ValueError(f"unknown tool: {name}") 

303 return _tool_result(payload, is_error=False) 

304 except Exception as exc: 

305 return _tool_result({"error": str(exc)}, is_error=True) 

306 

307 

308def _call_memory_search(arguments: dict[str, Any], *, project_root: Path) -> dict[str, object]: 

309 query = arguments.get("query") 

310 top_k = arguments.get("top_k", 5) 

311 if not isinstance(query, str) or not query.strip(): 

312 raise ValueError("query must be a non-empty string") 

313 if not isinstance(top_k, int): 

314 raise ValueError("top_k must be an integer") 

315 if top_k < 1 or top_k > 50: 

316 raise ValueError("top_k must be between 1 and 50") 

317 return memory_search(query, top_k=top_k, project_root=project_root) 

318 

319 

320def _call_memory_add(arguments: dict[str, Any], *, project_root: Path) -> dict[str, object]: 

321 content = arguments.get("content") 

322 memory_type = arguments.get("memory_type", "note") 

323 if not isinstance(content, str) or not content.strip(): 

324 raise ValueError("content must be a non-empty string") 

325 if not isinstance(memory_type, str) or not memory_type.strip(): 

326 raise ValueError("memory_type must be a non-empty string") 

327 return memory_add(content, memory_type=memory_type, project_root=project_root) 

328 

329 

330def _call_global_status(arguments: dict[str, Any]) -> dict[str, object]: 

331 home = arguments.get("home") 

332 catalog = arguments.get("catalog") 

333 db = arguments.get("db") 

334 

335 for name, val in (("home", home), ("catalog", catalog), ("db", db)): 

336 if val is not None and not isinstance(val, str): 

337 raise ValueError(f"{name} must be a string") 

338 

339 return global_status(home=home, catalog=catalog, db=db) 

340 

341 

342def _call_global_project_context(arguments: dict[str, Any]) -> dict[str, object]: 

343 project = arguments.get("project") 

344 home = arguments.get("home") 

345 catalog = arguments.get("catalog") 

346 db = arguments.get("db") 

347 limit = arguments.get("limit", 20) 

348 

349 if not isinstance(project, str) or not project.strip(): 

350 raise ValueError("project must be a non-empty string") 

351 for name, val in (("home", home), ("catalog", catalog), ("db", db)): 

352 if val is not None and not isinstance(val, str): 

353 raise ValueError(f"{name} must be a string") 

354 if not isinstance(limit, int) or limit < 1 or limit > 100: 

355 raise ValueError("limit must be an integer between 1 and 100") 

356 

357 return global_project_context( 

358 project=project, 

359 home=home, 

360 catalog=catalog, 

361 db=db, 

362 limit=limit, 

363 ) 

364 

365 

366def _call_task_open(arguments: dict[str, Any]) -> dict[str, object]: 

367 title = arguments.get("title") 

368 if not isinstance(title, str) or not title.strip(): 

369 raise ValueError("title must be a non-empty string") 

370 return task_open( 

371 title, 

372 arguments.get("task_type", "feature"), 

373 project=arguments.get("project"), 

374 agent_session_id=arguments.get("agent_session_id"), 

375 ) 

376 

377 

378def _call_task_step_add(arguments: dict[str, Any]) -> dict[str, object]: 

379 task_id = arguments.get("task_id") 

380 if not isinstance(task_id, str) or not task_id.strip(): 

381 raise ValueError("task_id must be a non-empty string") 

382 return task_step_add( 

383 task_id, 

384 prompt_used=arguments.get("prompt_used"), 

385 output=arguments.get("output"), 

386 brain_judgment=arguments.get("brain_judgment"), 

387 tokens_used=arguments.get("tokens_used"), 

388 duration_s=arguments.get("duration_s"), 

389 model_used=arguments.get("model_used"), 

390 ) 

391 

392 

393def _call_task_close(arguments: dict[str, Any]) -> dict[str, object]: 

394 task_id = arguments.get("task_id") 

395 if not isinstance(task_id, str) or not task_id.strip(): 

396 raise ValueError("task_id must be a non-empty string") 

397 human_outcome = arguments.get("human_outcome") 

398 if human_outcome is not None and human_outcome not in (1, 0, -1): 

399 raise ValueError("human_outcome must be 1, 0, or -1") 

400 return task_close(task_id, human_outcome=human_outcome, human_comment=arguments.get("human_comment")) 

401 

402 

403def _tool_result(payload: dict[str, object], *, is_error: bool) -> dict[str, Any]: 

404 return { 

405 "content": [{"type": "text", "text": json.dumps(payload, indent=2, sort_keys=True)}], 

406 "isError": is_error, 

407 } 

408 

409 

410def _error_response( 

411 request_id: object, 

412 code: int, 

413 message: str, 

414 data: object | None = None, 

415) -> dict[str, Any]: 

416 error: dict[str, Any] = {"code": code, "message": message} 

417 if data is not None: 

418 error["data"] = data 

419 return {"jsonrpc": JSONRPC_VERSION, "id": request_id, "error": error}