Coverage for agentos/mcp/server.py: 28%

221 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 16:36 +0800

1"""MCP Server 实现 — 将 AgentOS 暴露为 MCP Server。 

2 

3支持 stdio JSON-RPC 2.0 传输,暴露 LLM 对话、工具调用、Agent 运行等能力。 

4其他 MCP 客户端(如 Claude Desktop、Cursor)可直接连接使用。 

5 

6用法: 

7 agentos mcp-server # 以 stdio 模式启动 

8 agentos mcp-server --port 9000 # 以 HTTP SSE 模式启动(可选) 

9""" 

10 

11from __future__ import annotations 

12 

13import asyncio 

14import json 

15import logging 

16import os 

17import sys 

18import uuid 

19from typing import Any, Dict, List, Optional 

20 

21logger = logging.getLogger(__name__) 

22 

23# ── MCP Server 核心 ───────────────────────── 

24 

25 

26class MCPServer: 

27 """MCP Server — stdio JSON-RPC 2.0 传输。 

28 

29 实现 MCP 协议的 server 端,暴露 AgentOS 能力。 

30 客户端通过 stdio 发送 JSON-RPC 请求,服务器响应。 

31 

32 支持的操作: 

33 - initialize: 协议握手,返回 capabilities 

34 - tools/list: 列出可用工具 

35 - tools/call: 调用工具 

36 - resources/list: 列出可用资源 

37 - prompts/list: 列出可用提示 

38 """ 

39 

40 def __init__( 

41 self, 

42 server_info=None, 

43 *, 

44 name: str = "agentos", 

45 version: str = "1.5.2", 

46 tools: Optional[List] = None, 

47 resources: Optional[List] = None, 

48 prompts: Optional[List] = None, 

49 ): 

50 if server_info is not None: 

51 if isinstance(server_info, ServerInfo): 

52 self.name = server_info.name 

53 self.version = server_info.version 

54 else: 

55 # backward compat: first arg is name 

56 self.name = server_info 

57 self.version = version 

58 else: 

59 self.name = name 

60 self.version = version 

61 self._tools: Dict[str, Any] = {} 

62 self._resources: Dict[str, MCPResource] = {} 

63 self._prompts: Dict[str, MCPPromptDef] = {} 

64 self._initialized = False 

65 

66 for t in (tools or []): 

67 self.register_tool(t) 

68 for r in (resources or []): 

69 self._resources[r.uri] = r 

70 for p in (prompts or []): 

71 self._prompts[p.name] = p 

72 

73 # 内置工具 

74 self._register_builtin_tools() 

75 

76 @property 

77 def info(self) -> "ServerInfo": 

78 return ServerInfo(name=self.name, version=self.version) 

79 

80 @property 

81 def tools(self) -> Dict[str, Any]: 

82 return self._tools 

83 

84 def register_tool(self, tool): 

85 """注册一个 MCP 工具。兼容 MCPToolDef 和 Tool dataclass。""" 

86 if hasattr(tool, 'name'): 

87 self._tools[tool.name] = tool 

88 else: 

89 # MCPToolDef compat 

90 self._tools[tool.name] = tool 

91 

92 async def list_tools(self) -> list: 

93 """异步列出所有工具。""" 

94 result = [] 

95 for t in self._tools.values(): 

96 result.append({ 

97 "name": t.name, 

98 "description": t.description, 

99 "inputSchema": getattr(t, 'input_schema', {}), 

100 }) 

101 return result 

102 

103 def run_stdio(self): 

104 """以 stdio 模式运行 MCP Server(同步阻塞)。""" 

105 asyncio.run(self._run_stdio_async()) 

106 

107 async def _run_stdio_async(self): 

108 """异步 stdio 循环。""" 

109 loop = asyncio.get_event_loop() 

110 reader = asyncio.StreamReader() 

111 protocol = asyncio.StreamReaderProtocol(reader) 

112 await loop.connect_read_pipe(lambda: protocol, sys.stdin) 

113 

114 writer_transport, writer_protocol = await loop.connect_write_pipe( 

115 asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), "wb") 

116 ) 

117 writer = asyncio.StreamWriter(writer_transport, writer_protocol, reader, loop) 

118 

119 logger.info(f"MCP Server '{self.name}' v{self.version} started (stdio)") 

120 

121 while True: 

122 try: 

123 line = await reader.readline() 

124 if not line: 

125 break 

126 line_str = line.decode("utf-8").strip() 

127 if not line_str: 

128 continue 

129 

130 try: 

131 request = json.loads(line_str) 

132 except json.JSONDecodeError: 

133 continue 

134 

135 response = await self._handle_request(request) 

136 if response is not None: 

137 payload = json.dumps(response, ensure_ascii=False) + "\n" 

138 writer.write(payload.encode("utf-8")) 

139 await writer.drain() 

140 except Exception as e: 

141 logger.error(f"MCP Server error: {e}") 

142 break 

143 

144 async def _handle_request(self, request: dict) -> dict | None: 

145 """处理单个 JSON-RPC 请求。""" 

146 req_id = request.get("id") 

147 method = request.get("method", "") 

148 params = request.get("params", {}) 

149 

150 # 通知类(无 id),不回复 

151 if req_id is None: 

152 if method == "notifications/initialized": 

153 self._initialized = True 

154 return None 

155 

156 try: 

157 result = await self._dispatch(method, params) 

158 return { 

159 "jsonrpc": "2.0", 

160 "id": req_id, 

161 "result": result, 

162 } 

163 except MCPError as e: 

164 return { 

165 "jsonrpc": "2.0", 

166 "id": req_id, 

167 "error": {"code": e.code, "message": e.message}, 

168 } 

169 except Exception as e: 

170 return { 

171 "jsonrpc": "2.0", 

172 "id": req_id, 

173 "error": {"code": -32603, "message": str(e)}, 

174 } 

175 

176 async def _dispatch(self, method: str, params: dict) -> Any: 

177 """路由到对应的处理器。""" 

178 handlers = { 

179 "initialize": self._handle_initialize, 

180 "tools/list": self._handle_tools_list, 

181 "tools/call": self._handle_tools_call, 

182 "resources/list": self._handle_resources_list, 

183 "resources/read": self._handle_resources_read, 

184 "prompts/list": self._handle_prompts_list, 

185 "prompts/get": self._handle_prompts_get, 

186 } 

187 handler = handlers.get(method) 

188 if handler is None: 

189 raise MCPError(-32601, f"Method not found: {method}") 

190 return await handler(params) 

191 

192 # ── MCP 协议方法 ────────────────────── 

193 

194 async def _handle_initialize(self, params: dict) -> dict: 

195 return { 

196 "protocolVersion": "2024-11-05", 

197 "serverInfo": { 

198 "name": self.name, 

199 "version": self.version, 

200 }, 

201 "capabilities": { 

202 "tools": {"listChanged": False}, 

203 "resources": {"subscribe": False, "listChanged": False}, 

204 "prompts": {"listChanged": False}, 

205 }, 

206 } 

207 

208 async def _handle_tools_list(self, params: dict) -> dict: 

209 tools = [] 

210 for t in self._tools.values(): 

211 tools.append({ 

212 "name": t.name, 

213 "description": t.description, 

214 "inputSchema": t.input_schema, 

215 }) 

216 return {"tools": tools} 

217 

218 async def _handle_tools_call(self, params: dict) -> dict: 

219 tool_name = params.get("name", "") 

220 arguments = params.get("arguments", {}) 

221 tool = self._tools.get(tool_name) 

222 if tool is None: 

223 raise MCPError(-32602, f"Unknown tool: {tool_name}") 

224 

225 try: 

226 result = tool.handler(arguments) if not asyncio.iscoroutinefunction(tool.handler) else await tool.handler(arguments) 

227 return { 

228 "content": [ 

229 {"type": "text", "text": str(result) if not isinstance(result, str) else result} 

230 ] 

231 } 

232 except Exception as e: 

233 return { 

234 "content": [ 

235 {"type": "text", "text": f"Error: {str(e)}"} 

236 ], 

237 "isError": True, 

238 } 

239 

240 async def _handle_resources_list(self, params: dict) -> dict: 

241 resources = [] 

242 for r in self._resources.values(): 

243 resources.append({ 

244 "uri": r.uri, 

245 "name": r.name, 

246 "description": r.description, 

247 "mimeType": r.mime_type, 

248 }) 

249 return {"resources": resources} 

250 

251 async def _handle_resources_read(self, params: dict) -> dict: 

252 uri = params.get("uri", "") 

253 r = self._resources.get(uri) 

254 if r is None: 

255 raise MCPError(-32602, f"Unknown resource: {uri}") 

256 text = r.content() if callable(r.content) else r.content 

257 return { 

258 "contents": [ 

259 {"uri": uri, "mimeType": r.mime_type, "text": str(text)} 

260 ] 

261 } 

262 

263 async def _handle_prompts_list(self, params: dict) -> dict: 

264 prompts = [] 

265 for p in self._prompts.values(): 

266 prompts.append({ 

267 "name": p.name, 

268 "description": p.description, 

269 "arguments": p.arguments, 

270 }) 

271 return {"prompts": prompts} 

272 

273 async def _handle_prompts_get(self, params: dict) -> dict: 

274 prompt_name = params.get("name", "") 

275 prompt_args = params.get("arguments", {}) 

276 p = self._prompts.get(prompt_name) 

277 if p is None: 

278 raise MCPError(-32602, f"Unknown prompt: {prompt_name}") 

279 template = p.template(prompt_args) if callable(p.template) else p.template 

280 return { 

281 "description": p.description, 

282 "messages": [ 

283 {"role": "user", "content": {"type": "text", "text": template}} 

284 ] 

285 } 

286 

287 # ── 内置工具 ────────────────────────── 

288 

289 def _register_builtin_tools(self): 

290 """注册 AgentOS 内置 MCP 工具。""" 

291 

292 self.register_tool(MCPToolDef( 

293 name="agentos_chat", 

294 description="使用 AgentOS LLM 进行对话(支持 OpenAI/DeepSeek/Anthropic/Claude/Ollama)", 

295 input_schema={ 

296 "type": "object", 

297 "properties": { 

298 "messages": { 

299 "type": "array", 

300 "description": "对话消息列表", 

301 "items": { 

302 "type": "object", 

303 "properties": { 

304 "role": {"type": "string", "enum": ["system", "user", "assistant"]}, 

305 "content": {"type": "string"}, 

306 }, 

307 "required": ["role", "content"], 

308 }, 

309 }, 

310 "model": {"type": "string", "description": "模型名称,默认从配置读取"}, 

311 "temperature": {"type": "number", "description": "温度参数(0-2)"}, 

312 "max_tokens": {"type": "integer", "description": "最大输出 token 数"}, 

313 }, 

314 "required": ["messages"], 

315 }, 

316 handler=self._tool_agentos_chat, 

317 )) 

318 

319 self.register_tool(MCPToolDef( 

320 name="agentos_list_tools", 

321 description="列出 AgentOS 中所有可用的工具(含 MCP 工具)", 

322 input_schema={ 

323 "type": "object", 

324 "properties": { 

325 "format": {"type": "string", "enum": ["openai", "anthropic"], "description": "输出格式"}, 

326 }, 

327 }, 

328 handler=self._tool_list_tools, 

329 )) 

330 

331 self.register_tool(MCPToolDef( 

332 name="agentos_version", 

333 description="获取 AgentOS 版本信息", 

334 input_schema={"type": "object", "properties": {}}, 

335 handler=self._tool_version, 

336 )) 

337 

338 async def _tool_agentos_chat(self, args: dict) -> str: 

339 """调用 AgentOS LLM 对话。""" 

340 try: 

341 from agentos.llm import LLMClient, LLMMessage 

342 except ImportError: 

343 return "Error: AgentOS LLM 模块不可用。请确认已安装 nexus-agentos。" 

344 

345 messages_raw = args.get("messages", []) 

346 model = args.get("model") 

347 temperature = args.get("temperature", 0.7) 

348 max_tokens = args.get("max_tokens", 4096) 

349 

350 messages = [LLMMessage(role=m["role"], content=m["content"]) for m in messages_raw] 

351 

352 client = LLMClient(model=model) 

353 response = await client.chat(messages, temperature=temperature, max_tokens=max_tokens) 

354 return response.content 

355 

356 def _tool_list_tools(self, args: dict) -> str: 

357 """列出可用工具。""" 

358 fmt = args.get("format", "openai") 

359 tools_list = [] 

360 for name, tool in self._tools.items(): 

361 if fmt == "openai": 

362 tools_list.append({ 

363 "type": "function", 

364 "function": {"name": name, "description": tool.description, "parameters": tool.input_schema}, 

365 }) 

366 else: 

367 tools_list.append({"name": name, "description": tool.description, "input_schema": tool.input_schema}) 

368 return json.dumps(tools_list, ensure_ascii=False, indent=2) 

369 

370 def _tool_version(self, args: dict) -> str: 

371 """返回版本信息。""" 

372 try: 

373 from agentos import __version__ 

374 except ImportError: 

375 __version__ = self.version 

376 return json.dumps({ 

377 "name": self.name, 

378 "version": self.version, 

379 "agentos_version": __version__, 

380 "tools_count": len(self._tools), 

381 }, ensure_ascii=False) 

382 

383 

384# ── 数据结构 ─────────────────────────────── 

385 

386 

387class MCPToolDef: 

388 """MCP 工具定义。""" 

389 

390 def __init__( 

391 self, 

392 name: str, 

393 description: str, 

394 input_schema: dict, 

395 handler, 

396 ): 

397 self.name = name 

398 self.description = description 

399 self.input_schema = input_schema 

400 self.handler = handler 

401 

402 

403class MCPResource: 

404 """MCP 资源定义。""" 

405 

406 def __init__( 

407 self, 

408 uri: str, 

409 name: str = "", 

410 description: str = "", 

411 mime_type: str = "text/plain", 

412 content: Any = "", 

413 ): 

414 self.uri = uri 

415 self.name = name 

416 self.description = description 

417 self.mime_type = mime_type 

418 self.content = content 

419 

420 

421class MCPPromptDef: 

422 """MCP 提示模板定义。""" 

423 

424 def __init__( 

425 self, 

426 name: str, 

427 description: str = "", 

428 arguments: list = None, 

429 template: Any = "", 

430 ): 

431 self.name = name 

432 self.description = description 

433 self.arguments = arguments or [] 

434 self.template = template 

435 

436 

437class MCPError(Exception): 

438 """MCP 协议错误(与服务端共用异常类)。""" 

439 

440 def __init__(self, code: int, message: str): 

441 self.code = code 

442 self.message = message 

443 super().__init__(f"MCP Error [{code}]: {message}") 

444 

445 

446# ── 便捷函数 ─────────────────────────────── 

447 

448 

449def create_default_server() -> MCPServer: 

450 """创建预配置了 AgentOS 内置工具的 MCP Server。""" 

451 return MCPServer( 

452 name="agentos", 

453 version="1.5.2", 

454 ) 

455 

456 

457def start_mcp_server(port: int = 0): 

458 """启动 MCP Server。 

459 

460 Args: 

461 port: 0 表示 stdio 模式,>0 表示 HTTP SSE 模式(暂未实现)。 

462 """ 

463 if port == 0: 

464 server = create_default_server() 

465 server.run_stdio() 

466 else: 

467 print(f"MCP HTTP SSE 模式暂未实现。请使用 stdio 模式(port=0)。") 

468 sys.exit(1) 

469 

470# ── ServerInfo & Tool (test compatibility) ── 

471from dataclasses import dataclass, field 

472 

473@dataclass 

474class ServerInfo: 

475 name: str 

476 version: str 

477 description: str = "" 

478 

479@dataclass 

480class Tool: 

481 name: str 

482 description: str 

483 input_schema: dict 

484 call: callable = field(default=lambda params: None) 

485 

486@dataclass 

487class AgentCard: 

488 agent_id: str 

489 name: str 

490 version: str 

491 capabilities: list = field(default_factory=list) 

492 endpoint: str = ""