Coverage for agentos/mcp/server.py: 28%
221 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
1"""MCP Server 实现 — 将 AgentOS 暴露为 MCP Server。
3支持 stdio JSON-RPC 2.0 传输,暴露 LLM 对话、工具调用、Agent 运行等能力。
4其他 MCP 客户端(如 Claude Desktop、Cursor)可直接连接使用。
6用法:
7 agentos mcp-server # 以 stdio 模式启动
8 agentos mcp-server --port 9000 # 以 HTTP SSE 模式启动(可选)
9"""
11from __future__ import annotations
13import asyncio
14import json
15import logging
16import os
17import sys
18import uuid
19from typing import Any, Dict, List, Optional
21logger = logging.getLogger(__name__)
23# ── MCP Server 核心 ─────────────────────────
26class MCPServer:
27 """MCP Server — stdio JSON-RPC 2.0 传输。
29 实现 MCP 协议的 server 端,暴露 AgentOS 能力。
30 客户端通过 stdio 发送 JSON-RPC 请求,服务器响应。
32 支持的操作:
33 - initialize: 协议握手,返回 capabilities
34 - tools/list: 列出可用工具
35 - tools/call: 调用工具
36 - resources/list: 列出可用资源
37 - prompts/list: 列出可用提示
38 """
40 def __init__(
41 self,
42 server_info=None,
43 *,
44 name: str = "agentos",
45 version: str = "1.5.2",
46 tools: Optional[List] = None,
47 resources: Optional[List] = None,
48 prompts: Optional[List] = None,
49 ):
50 if server_info is not None:
51 if isinstance(server_info, ServerInfo):
52 self.name = server_info.name
53 self.version = server_info.version
54 else:
55 # backward compat: first arg is name
56 self.name = server_info
57 self.version = version
58 else:
59 self.name = name
60 self.version = version
61 self._tools: Dict[str, Any] = {}
62 self._resources: Dict[str, MCPResource] = {}
63 self._prompts: Dict[str, MCPPromptDef] = {}
64 self._initialized = False
66 for t in (tools or []):
67 self.register_tool(t)
68 for r in (resources or []):
69 self._resources[r.uri] = r
70 for p in (prompts or []):
71 self._prompts[p.name] = p
73 # 内置工具
74 self._register_builtin_tools()
76 @property
77 def info(self) -> "ServerInfo":
78 return ServerInfo(name=self.name, version=self.version)
80 @property
81 def tools(self) -> Dict[str, Any]:
82 return self._tools
84 def register_tool(self, tool):
85 """注册一个 MCP 工具。兼容 MCPToolDef 和 Tool dataclass。"""
86 if hasattr(tool, 'name'):
87 self._tools[tool.name] = tool
88 else:
89 # MCPToolDef compat
90 self._tools[tool.name] = tool
92 async def list_tools(self) -> list:
93 """异步列出所有工具。"""
94 result = []
95 for t in self._tools.values():
96 result.append({
97 "name": t.name,
98 "description": t.description,
99 "inputSchema": getattr(t, 'input_schema', {}),
100 })
101 return result
103 def run_stdio(self):
104 """以 stdio 模式运行 MCP Server(同步阻塞)。"""
105 asyncio.run(self._run_stdio_async())
107 async def _run_stdio_async(self):
108 """异步 stdio 循环。"""
109 loop = asyncio.get_event_loop()
110 reader = asyncio.StreamReader()
111 protocol = asyncio.StreamReaderProtocol(reader)
112 await loop.connect_read_pipe(lambda: protocol, sys.stdin)
114 writer_transport, writer_protocol = await loop.connect_write_pipe(
115 asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), "wb")
116 )
117 writer = asyncio.StreamWriter(writer_transport, writer_protocol, reader, loop)
119 logger.info(f"MCP Server '{self.name}' v{self.version} started (stdio)")
121 while True:
122 try:
123 line = await reader.readline()
124 if not line:
125 break
126 line_str = line.decode("utf-8").strip()
127 if not line_str:
128 continue
130 try:
131 request = json.loads(line_str)
132 except json.JSONDecodeError:
133 continue
135 response = await self._handle_request(request)
136 if response is not None:
137 payload = json.dumps(response, ensure_ascii=False) + "\n"
138 writer.write(payload.encode("utf-8"))
139 await writer.drain()
140 except Exception as e:
141 logger.error(f"MCP Server error: {e}")
142 break
144 async def _handle_request(self, request: dict) -> dict | None:
145 """处理单个 JSON-RPC 请求。"""
146 req_id = request.get("id")
147 method = request.get("method", "")
148 params = request.get("params", {})
150 # 通知类(无 id),不回复
151 if req_id is None:
152 if method == "notifications/initialized":
153 self._initialized = True
154 return None
156 try:
157 result = await self._dispatch(method, params)
158 return {
159 "jsonrpc": "2.0",
160 "id": req_id,
161 "result": result,
162 }
163 except MCPError as e:
164 return {
165 "jsonrpc": "2.0",
166 "id": req_id,
167 "error": {"code": e.code, "message": e.message},
168 }
169 except Exception as e:
170 return {
171 "jsonrpc": "2.0",
172 "id": req_id,
173 "error": {"code": -32603, "message": str(e)},
174 }
176 async def _dispatch(self, method: str, params: dict) -> Any:
177 """路由到对应的处理器。"""
178 handlers = {
179 "initialize": self._handle_initialize,
180 "tools/list": self._handle_tools_list,
181 "tools/call": self._handle_tools_call,
182 "resources/list": self._handle_resources_list,
183 "resources/read": self._handle_resources_read,
184 "prompts/list": self._handle_prompts_list,
185 "prompts/get": self._handle_prompts_get,
186 }
187 handler = handlers.get(method)
188 if handler is None:
189 raise MCPError(-32601, f"Method not found: {method}")
190 return await handler(params)
192 # ── MCP 协议方法 ──────────────────────
194 async def _handle_initialize(self, params: dict) -> dict:
195 return {
196 "protocolVersion": "2024-11-05",
197 "serverInfo": {
198 "name": self.name,
199 "version": self.version,
200 },
201 "capabilities": {
202 "tools": {"listChanged": False},
203 "resources": {"subscribe": False, "listChanged": False},
204 "prompts": {"listChanged": False},
205 },
206 }
208 async def _handle_tools_list(self, params: dict) -> dict:
209 tools = []
210 for t in self._tools.values():
211 tools.append({
212 "name": t.name,
213 "description": t.description,
214 "inputSchema": t.input_schema,
215 })
216 return {"tools": tools}
218 async def _handle_tools_call(self, params: dict) -> dict:
219 tool_name = params.get("name", "")
220 arguments = params.get("arguments", {})
221 tool = self._tools.get(tool_name)
222 if tool is None:
223 raise MCPError(-32602, f"Unknown tool: {tool_name}")
225 try:
226 result = tool.handler(arguments) if not asyncio.iscoroutinefunction(tool.handler) else await tool.handler(arguments)
227 return {
228 "content": [
229 {"type": "text", "text": str(result) if not isinstance(result, str) else result}
230 ]
231 }
232 except Exception as e:
233 return {
234 "content": [
235 {"type": "text", "text": f"Error: {str(e)}"}
236 ],
237 "isError": True,
238 }
240 async def _handle_resources_list(self, params: dict) -> dict:
241 resources = []
242 for r in self._resources.values():
243 resources.append({
244 "uri": r.uri,
245 "name": r.name,
246 "description": r.description,
247 "mimeType": r.mime_type,
248 })
249 return {"resources": resources}
251 async def _handle_resources_read(self, params: dict) -> dict:
252 uri = params.get("uri", "")
253 r = self._resources.get(uri)
254 if r is None:
255 raise MCPError(-32602, f"Unknown resource: {uri}")
256 text = r.content() if callable(r.content) else r.content
257 return {
258 "contents": [
259 {"uri": uri, "mimeType": r.mime_type, "text": str(text)}
260 ]
261 }
263 async def _handle_prompts_list(self, params: dict) -> dict:
264 prompts = []
265 for p in self._prompts.values():
266 prompts.append({
267 "name": p.name,
268 "description": p.description,
269 "arguments": p.arguments,
270 })
271 return {"prompts": prompts}
273 async def _handle_prompts_get(self, params: dict) -> dict:
274 prompt_name = params.get("name", "")
275 prompt_args = params.get("arguments", {})
276 p = self._prompts.get(prompt_name)
277 if p is None:
278 raise MCPError(-32602, f"Unknown prompt: {prompt_name}")
279 template = p.template(prompt_args) if callable(p.template) else p.template
280 return {
281 "description": p.description,
282 "messages": [
283 {"role": "user", "content": {"type": "text", "text": template}}
284 ]
285 }
287 # ── 内置工具 ──────────────────────────
289 def _register_builtin_tools(self):
290 """注册 AgentOS 内置 MCP 工具。"""
292 self.register_tool(MCPToolDef(
293 name="agentos_chat",
294 description="使用 AgentOS LLM 进行对话(支持 OpenAI/DeepSeek/Anthropic/Claude/Ollama)",
295 input_schema={
296 "type": "object",
297 "properties": {
298 "messages": {
299 "type": "array",
300 "description": "对话消息列表",
301 "items": {
302 "type": "object",
303 "properties": {
304 "role": {"type": "string", "enum": ["system", "user", "assistant"]},
305 "content": {"type": "string"},
306 },
307 "required": ["role", "content"],
308 },
309 },
310 "model": {"type": "string", "description": "模型名称,默认从配置读取"},
311 "temperature": {"type": "number", "description": "温度参数(0-2)"},
312 "max_tokens": {"type": "integer", "description": "最大输出 token 数"},
313 },
314 "required": ["messages"],
315 },
316 handler=self._tool_agentos_chat,
317 ))
319 self.register_tool(MCPToolDef(
320 name="agentos_list_tools",
321 description="列出 AgentOS 中所有可用的工具(含 MCP 工具)",
322 input_schema={
323 "type": "object",
324 "properties": {
325 "format": {"type": "string", "enum": ["openai", "anthropic"], "description": "输出格式"},
326 },
327 },
328 handler=self._tool_list_tools,
329 ))
331 self.register_tool(MCPToolDef(
332 name="agentos_version",
333 description="获取 AgentOS 版本信息",
334 input_schema={"type": "object", "properties": {}},
335 handler=self._tool_version,
336 ))
338 async def _tool_agentos_chat(self, args: dict) -> str:
339 """调用 AgentOS LLM 对话。"""
340 try:
341 from agentos.llm import LLMClient, LLMMessage
342 except ImportError:
343 return "Error: AgentOS LLM 模块不可用。请确认已安装 nexus-agentos。"
345 messages_raw = args.get("messages", [])
346 model = args.get("model")
347 temperature = args.get("temperature", 0.7)
348 max_tokens = args.get("max_tokens", 4096)
350 messages = [LLMMessage(role=m["role"], content=m["content"]) for m in messages_raw]
352 client = LLMClient(model=model)
353 response = await client.chat(messages, temperature=temperature, max_tokens=max_tokens)
354 return response.content
356 def _tool_list_tools(self, args: dict) -> str:
357 """列出可用工具。"""
358 fmt = args.get("format", "openai")
359 tools_list = []
360 for name, tool in self._tools.items():
361 if fmt == "openai":
362 tools_list.append({
363 "type": "function",
364 "function": {"name": name, "description": tool.description, "parameters": tool.input_schema},
365 })
366 else:
367 tools_list.append({"name": name, "description": tool.description, "input_schema": tool.input_schema})
368 return json.dumps(tools_list, ensure_ascii=False, indent=2)
370 def _tool_version(self, args: dict) -> str:
371 """返回版本信息。"""
372 try:
373 from agentos import __version__
374 except ImportError:
375 __version__ = self.version
376 return json.dumps({
377 "name": self.name,
378 "version": self.version,
379 "agentos_version": __version__,
380 "tools_count": len(self._tools),
381 }, ensure_ascii=False)
384# ── 数据结构 ───────────────────────────────
387class MCPToolDef:
388 """MCP 工具定义。"""
390 def __init__(
391 self,
392 name: str,
393 description: str,
394 input_schema: dict,
395 handler,
396 ):
397 self.name = name
398 self.description = description
399 self.input_schema = input_schema
400 self.handler = handler
403class MCPResource:
404 """MCP 资源定义。"""
406 def __init__(
407 self,
408 uri: str,
409 name: str = "",
410 description: str = "",
411 mime_type: str = "text/plain",
412 content: Any = "",
413 ):
414 self.uri = uri
415 self.name = name
416 self.description = description
417 self.mime_type = mime_type
418 self.content = content
421class MCPPromptDef:
422 """MCP 提示模板定义。"""
424 def __init__(
425 self,
426 name: str,
427 description: str = "",
428 arguments: list = None,
429 template: Any = "",
430 ):
431 self.name = name
432 self.description = description
433 self.arguments = arguments or []
434 self.template = template
437class MCPError(Exception):
438 """MCP 协议错误(与服务端共用异常类)。"""
440 def __init__(self, code: int, message: str):
441 self.code = code
442 self.message = message
443 super().__init__(f"MCP Error [{code}]: {message}")
446# ── 便捷函数 ───────────────────────────────
449def create_default_server() -> MCPServer:
450 """创建预配置了 AgentOS 内置工具的 MCP Server。"""
451 return MCPServer(
452 name="agentos",
453 version="1.5.2",
454 )
457def start_mcp_server(port: int = 0):
458 """启动 MCP Server。
460 Args:
461 port: 0 表示 stdio 模式,>0 表示 HTTP SSE 模式(暂未实现)。
462 """
463 if port == 0:
464 server = create_default_server()
465 server.run_stdio()
466 else:
467 print(f"MCP HTTP SSE 模式暂未实现。请使用 stdio 模式(port=0)。")
468 sys.exit(1)
470# ── ServerInfo & Tool (test compatibility) ──
471from dataclasses import dataclass, field
473@dataclass
474class ServerInfo:
475 name: str
476 version: str
477 description: str = ""
479@dataclass
480class Tool:
481 name: str
482 description: str
483 input_schema: dict
484 call: callable = field(default=lambda params: None)
486@dataclass
487class AgentCard:
488 agent_id: str
489 name: str
490 version: str
491 capabilities: list = field(default_factory=list)
492 endpoint: str = ""