Coverage for agentos/mcp/builtin_servers.py: 29%
402 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Built-in MCP Servers for AgentOS (v1.8.1).
4Pure Python implementations of common MCP tools. 8 servers, 32+ tools total.
6Servers:
7 FilesystemServer (7) - Safe file I/O with path validation
8 WebFetchServer (3) - HTTP client with content extraction
9 MemoryServer (6) - Persistent knowledge graph
10 SearchServer (4) - Web search via DuckDuckGo
11 GitServer (4) - Git operations
12 ShellServer (3) - Safe shell command execution
13 CodeServer (3) - Python/JS code execution in sandbox
14 TextServer (4) - Text manipulation & formatting
15"""
17from __future__ import annotations
19import json
20import os
21import re
22import hashlib
23import shutil
24import subprocess
25import tempfile
26import time
27import urllib.parse
28import urllib.request
29import urllib.error
30from dataclasses import dataclass, field
31from datetime import datetime
32from pathlib import Path
33from typing import Any, Dict, List, Optional
36# ── Filesystem MCP Server (7 tools) ─────────
39class FilesystemServer:
40 """MCP-compatible filesystem server with safe path validation."""
42 NAME = "filesystem"
43 VERSION = "1.0.0"
45 def __init__(self, allowed_paths: Optional[List[str]] = None):
46 self._allowed_paths = [
47 Path(p).resolve()
48 for p in (allowed_paths or [os.getcwd(), str(Path.home())])
49 ]
50 for p in self._allowed_paths:
51 p.mkdir(parents=True, exist_ok=True)
53 def _validate_path(self, path_str: str) -> Path:
54 p = Path(path_str).expanduser().resolve()
55 for allowed in self._allowed_paths:
56 try:
57 p.relative_to(allowed)
58 return p
59 except ValueError:
60 continue
61 raise ValueError(f"Path '{path_str}' outside allowed directories")
63 def get_tools(self) -> List[Dict[str, Any]]:
64 return [
65 {"name": "read_file", "description": "Read contents of a text file",
66 "inputSchema": {"type": "object", "properties": {
67 "path": {"type": "string"}, "encoding": {"type": "string", "default": "utf-8"}},
68 "required": ["path"]}},
69 {"name": "write_file", "description": "Write text content to a file",
70 "inputSchema": {"type": "object", "properties": {
71 "path": {"type": "string"}, "content": {"type": "string"},
72 "encoding": {"type": "string", "default": "utf-8"}},
73 "required": ["path", "content"]}},
74 {"name": "list_directory", "description": "List directory contents with metadata",
75 "inputSchema": {"type": "object", "properties": {
76 "path": {"type": "string"}, "recursive": {"type": "boolean", "default": False}},
77 "required": ["path"]}},
78 {"name": "search_files", "description": "Search files by glob pattern",
79 "inputSchema": {"type": "object", "properties": {
80 "path": {"type": "string"}, "pattern": {"type": "string"}},
81 "required": ["path", "pattern"]}},
82 {"name": "get_file_info", "description": "Get file/directory metadata",
83 "inputSchema": {"type": "object", "properties": {
84 "path": {"type": "string"}}, "required": ["path"]}},
85 {"name": "create_directory", "description": "Create directory and parents",
86 "inputSchema": {"type": "object", "properties": {
87 "path": {"type": "string"}}, "required": ["path"]}},
88 {"name": "move_file", "description": "Move or rename a file/directory",
89 "inputSchema": {"type": "object", "properties": {
90 "source": {"type": "string"}, "destination": {"type": "string"}},
91 "required": ["source", "destination"]}},
92 ]
94 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
95 return getattr(self, f"_handle_{tool_name}")(**arguments)
97 def _handle_read_file(self, path: str, encoding: str = "utf-8") -> str:
98 p = self._validate_path(path)
99 if not p.is_file(): raise FileNotFoundError(f"Not found: {p}")
100 return p.read_text(encoding=encoding)
102 def _handle_write_file(self, path: str, content: str, encoding: str = "utf-8") -> str:
103 p = self._validate_path(path)
104 p.parent.mkdir(parents=True, exist_ok=True)
105 p.write_text(content, encoding=encoding)
106 return f"Wrote {len(content)} bytes to {p}"
108 def _handle_list_directory(self, path: str, recursive: bool = False) -> List[Dict]:
109 p = self._validate_path(path)
110 if not p.is_dir(): raise NotADirectoryError(f"Not a directory: {p}")
111 entries = []
112 for item in (p.rglob("*") if recursive else p.iterdir()):
113 if item.name.startswith("."): continue
114 s = item.stat()
115 entries.append({"name": item.name, "path": str(item), "size": s.st_size,
116 "is_dir": item.is_dir(), "modified": datetime.fromtimestamp(s.st_mtime).isoformat()})
117 return sorted(entries, key=lambda e: (not e["is_dir"], e["name"]))
119 def _handle_search_files(self, path: str, pattern: str) -> List[Dict]:
120 p = self._validate_path(path)
121 if not p.is_dir(): raise NotADirectoryError(f"Not a directory: {p}")
122 return sorted([{"name": i.name, "path": str(i), "size": i.stat().st_size, "is_dir": i.is_dir()}
123 for i in p.rglob(pattern) if not i.name.startswith(".")], key=lambda e: e["path"])
125 def _handle_get_file_info(self, path: str) -> Dict:
126 p = self._validate_path(path)
127 if not p.exists(): raise FileNotFoundError(f"Not found: {p}")
128 s = p.stat(); ext = p.suffix.lower()
129 mime = {".txt": "text/plain", ".md": "text/markdown", ".py": "text/x-python",
130 ".json": "application/json", ".html": "text/html", ".pdf": "application/pdf"}
131 return {"name": p.name, "path": str(p), "size": s.st_size, "is_dir": p.is_dir(),
132 "is_file": p.is_file(), "extension": ext, "mime_type": mime.get(ext, "application/octet-stream"),
133 "created": datetime.fromtimestamp(s.st_ctime).isoformat(),
134 "modified": datetime.fromtimestamp(s.st_mtime).isoformat(),
135 "permissions": oct(s.st_mode)[-3:]}
137 def _handle_create_directory(self, path: str) -> str:
138 p = self._validate_path(path)
139 p.mkdir(parents=True, exist_ok=True)
140 return f"Created: {p}"
142 def _handle_move_file(self, source: str, destination: str) -> str:
143 src = self._validate_path(source); dst = self._validate_path(destination)
144 if not src.exists(): raise FileNotFoundError(f"Not found: {src}")
145 src.rename(dst)
146 return f"Moved {src} -> {dst}"
149# ── Web Fetch MCP Server (3 tools) ───────────
152class WebFetchServer:
153 """MCP-compatible HTTP client with content extraction."""
155 NAME = "webfetch"
156 VERSION = "1.0.0"
158 def __init__(self, user_agent: str = "AgentOS-MCP/1.0", timeout: int = 30):
159 self._ua = user_agent; self._timeout = timeout
161 def get_tools(self) -> List[Dict[str, Any]]:
162 return [
163 {"name": "fetch_url", "description": "Fetch a web page and return cleaned text",
164 "inputSchema": {"type": "object", "properties": {
165 "url": {"type": "string"}, "max_length": {"type": "integer", "default": 50000}},
166 "required": ["url"]}},
167 {"name": "fetch_json", "description": "Fetch and parse JSON from URL",
168 "inputSchema": {"type": "object", "properties": {
169 "url": {"type": "string"}, "headers": {"type": "object"}},
170 "required": ["url"]}},
171 {"name": "check_url", "description": "HEAD request to verify URL accessibility",
172 "inputSchema": {"type": "object", "properties": {
173 "url": {"type": "string"}}, "required": ["url"]}},
174 ]
176 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
177 return getattr(self, f"_handle_{tool_name}")(**arguments)
179 @staticmethod
180 def _validate_url(url: str) -> None:
181 p = urllib.parse.urlparse(url)
182 if p.scheme not in ("http", "https"): raise ValueError(f"Invalid scheme: {p.scheme}")
184 @staticmethod
185 def _strip_html(html: str) -> str:
186 html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.I)
187 html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.I)
188 text = re.sub(r'<[^>]+>', ' ', html)
189 text = re.sub(r'\s+', ' ', text).strip()
190 for e, c in [('&','&'),('<','<'),('>','>'),('"','"'),(''',"'"),(' ',' ')]:
191 text = text.replace(e, c)
192 return text
194 def _handle_fetch_url(self, url: str, max_length: int = 50000) -> str:
195 self._validate_url(url)
196 req = urllib.request.Request(url, headers={"User-Agent": self._ua})
197 try:
198 with urllib.request.urlopen(req, timeout=self._timeout) as r:
199 ct = r.headers.get("Content-Type", "")
200 enc = ct.split("charset=")[-1].split(";")[0].strip() if "charset=" in ct else "utf-8"
201 txt = r.read().decode(enc, errors="replace")
202 if "text/html" in ct: txt = self._strip_html(txt)
203 return txt[:max_length] + (f"\n\n[Truncated at {max_length}]" if len(txt) > max_length else "")
204 except urllib.error.HTTPError as e: return f"HTTP {e.code}: {e.reason}"
205 except Exception as e: return f"Error: {e}"
207 def _handle_fetch_json(self, url: str, headers: Optional[Dict] = None) -> Any:
208 self._validate_url(url)
209 h = {"User-Agent": self._ua, "Accept": "application/json"}
210 if headers: h.update(headers)
211 try:
212 with urllib.request.urlopen(urllib.request.Request(url, headers=h), timeout=self._timeout) as r:
213 return json.loads(r.read())
214 except json.JSONDecodeError as e: return {"error": "Invalid JSON", "detail": str(e)}
215 except Exception as e: return {"error": str(e)}
217 def _handle_check_url(self, url: str) -> Dict:
218 self._validate_url(url)
219 req = urllib.request.Request(url, headers={"User-Agent": self._ua}, method="HEAD")
220 try:
221 with urllib.request.urlopen(req, timeout=self._timeout) as r:
222 return {"url": url, "status": r.status, "accessible": 200 <= r.status < 400,
223 "content_type": r.headers.get("Content-Type",""), "content_length": r.headers.get("Content-Length","")}
224 except urllib.error.HTTPError as e: return {"url": url, "status": e.code, "accessible": False, "reason": e.reason}
225 except Exception as e: return {"url": url, "status": 0, "accessible": False, "error": str(e)}
228# ── Memory / Knowledge Graph MCP (6 tools) ───
231class MemoryServer:
232 """Persistent knowledge graph for agent memory."""
234 NAME = "memory"
235 VERSION = "1.0.0"
237 def __init__(self, storage_path: str = ""):
238 self._path = Path(storage_path or str(Path.home() / ".agentos" / "memory" / "kg.json"))
239 self._path.parent.mkdir(parents=True, exist_ok=True)
240 self._entries: Dict[str, Dict] = {}
241 if self._path.exists():
242 try:
243 for e in json.loads(self._path.read_text()).get("entries", []): self._entries[e["id"]] = e
244 except: pass
246 def _save(self):
247 self._path.write_text(json.dumps({"version": "1.0", "updated_at": time.time(),
248 "entries": list(self._entries.values())}, indent=2, ensure_ascii=False))
250 def _make_id(self, content: str) -> str:
251 return hashlib.sha256(content.encode()).hexdigest()[:16]
253 def get_tools(self) -> List[Dict[str, Any]]:
254 return [
255 {"name": "store_memory", "description": "Store a fact/memory",
256 "inputSchema": {"type": "object", "properties": {
257 "content": {"type": "string"}, "category": {"type": "string", "default": "general"},
258 "tags": {"type": "array", "items": {"type": "string"}}, "metadata": {"type": "object"}},
259 "required": ["content"]}},
260 {"name": "retrieve_memory", "description": "Retrieve memory by ID",
261 "inputSchema": {"type": "object", "properties": {"memory_id": {"type": "string"}}, "required": ["memory_id"]}},
262 {"name": "search_memory", "description": "Search memories by keyword/category/tags",
263 "inputSchema": {"type": "object", "properties": {
264 "query": {"type": "string"}, "category": {"type": "string"},
265 "tags": {"type": "array", "items": {"type": "string"}}, "limit": {"type": "integer", "default": 20}}}},
266 {"name": "list_categories", "description": "List all categories with counts",
267 "inputSchema": {"type": "object", "properties": {}}},
268 {"name": "delete_memory", "description": "Delete a memory by ID",
269 "inputSchema": {"type": "object", "properties": {"memory_id": {"type": "string"}}, "required": ["memory_id"]}},
270 {"name": "update_memory", "description": "Update memory content/metadata",
271 "inputSchema": {"type": "object", "properties": {
272 "memory_id": {"type": "string"}, "content": {"type": "string"},
273 "category": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}},
274 "required": ["memory_id"]}},
275 ]
277 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
278 return getattr(self, f"_handle_{tool_name}")(**arguments)
280 def _handle_store_memory(self, content: str, category: str = "general",
281 tags: list = None, metadata: dict = None) -> Dict:
282 mid = self._make_id(content)
283 entry = self._entries.get(mid, {})
284 if entry:
285 entry["updated_at"] = time.time()
286 if tags: entry.setdefault("tags", []).extend(t for t in tags if t not in entry.get("tags", []))
287 if category != "general": entry["category"] = category
288 self._save(); return {"id": mid, "action": "updated"}
289 entry = {"id": mid, "content": content, "category": category, "tags": tags or [],
290 "metadata": metadata or {}, "created_at": time.time(), "updated_at": time.time()}
291 self._entries[mid] = entry; self._save()
292 return {"id": mid, "action": "stored"}
294 def _handle_retrieve_memory(self, memory_id: str) -> Optional[Dict]:
295 return self._entries.get(memory_id)
297 def _handle_search_memory(self, query: str = "", category: str = None,
298 tags: list = None, limit: int = 20) -> List[Dict]:
299 results = []
300 for e in self._entries.values():
301 if category and e.get("category") != category: continue
302 if tags and not all(t in e.get("tags", []) for t in tags): continue
303 if query and query.lower() not in e.get("content","").lower(): continue
304 results.append({"id": e["id"], "content": e["content"], "category": e.get("category",""),
305 "tags": e.get("tags",[]), "created_at": e.get("created_at",0)})
306 return sorted(results, key=lambda r: r["created_at"], reverse=True)[:limit]
308 def _handle_list_categories(self) -> Dict[str, int]:
309 c: Dict[str, int] = {}
310 for e in self._entries.values(): c[e.get("category","general")] = c.get(e.get("category","general"), 0) + 1
311 return dict(sorted(c.items(), key=lambda x: -x[1]))
313 def _handle_delete_memory(self, memory_id: str) -> Dict:
314 if memory_id in self._entries: del self._entries[memory_id]; self._save(); return {"deleted": True, "id": memory_id}
315 return {"deleted": False, "id": memory_id, "reason": "not found"}
317 def _handle_update_memory(self, memory_id: str, content: str = None, category: str = None, tags: list = None) -> Dict:
318 e = self._entries.get(memory_id)
319 if not e: return {"updated": False, "reason": "not found"}
320 if content is not None: e["content"] = content
321 if category is not None: e["category"] = category
322 if tags is not None: e["tags"] = tags
323 e["updated_at"] = time.time(); self._save()
324 return {"updated": True, "id": memory_id}
327# ── Web Search Server (4 tools) ──────────────
330class SearchServer:
331 """MCP-compatible web search via DuckDuckGo + Google fallback."""
333 NAME = "search"
334 VERSION = "1.0.0"
336 def get_tools(self) -> List[Dict[str, Any]]:
337 return [
338 {"name": "web_search", "description": "Search the web",
339 "inputSchema": {"type": "object", "properties": {
340 "query": {"type": "string"}, "max_results": {"type": "integer", "default": 10}},
341 "required": ["query"]}},
342 {"name": "news_search", "description": "Search for recent news",
343 "inputSchema": {"type": "object", "properties": {
344 "query": {"type": "string"}, "max_results": {"type": "integer", "default": 10}},
345 "required": ["query"]}},
346 {"name": "image_search", "description": "Search for images",
347 "inputSchema": {"type": "object", "properties": {
348 "query": {"type": "string"}, "max_results": {"type": "integer", "default": 10}},
349 "required": ["query"]}},
350 {"name": "suggest", "description": "Get search autocomplete suggestions",
351 "inputSchema": {"type": "object", "properties": {
352 "query": {"type": "string"}}, "required": ["query"]}},
353 ]
355 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
356 return getattr(self, f"_handle_{tool_name}")(**arguments)
358 def _handle_web_search(self, query: str, max_results: int = 10) -> List[Dict]:
359 """Search via DuckDuckGo HTML (no API key needed)."""
360 q = urllib.parse.quote_plus(query)
361 url = f"https://html.duckduckgo.com/html/?q={q}"
362 req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
363 try:
364 with urllib.request.urlopen(req, timeout=15) as r:
365 html = r.read().decode("utf-8", errors="replace")
366 except Exception as e:
367 return [{"error": f"Search failed: {e}"}]
369 results = []
370 # Parse DuckDuckGo HTML results
371 for m in re.finditer(r'<a[^>]*class="result__a"[^>]*href="([^"]*)"[^>]*>(.*?)</a>', html, re.DOTALL):
372 if len(results) >= max_results: break
373 link = m.group(1)
374 title = re.sub(r'<[^>]+>', '', m.group(2)).strip()
375 # Find snippet
376 snippet = ""
377 sn_match = re.search(r'<a[^>]*class="result__snippet"[^>]*>(.*?)</a>', html[m.end():m.end()+500], re.DOTALL)
378 if sn_match: snippet = re.sub(r'<[^>]+>', '', sn_match.group(1)).strip()
379 results.append({"title": title, "url": link, "snippet": snippet, "source": "duckduckgo"})
380 return results
382 def _handle_news_search(self, query: str, max_results: int = 10) -> List[Dict]:
383 q = urllib.parse.quote_plus(f"{query} news")
384 return self._handle_web_search(q, max_results)
386 def _handle_image_search(self, query: str, max_results: int = 10) -> List[Dict]:
387 q = urllib.parse.quote_plus(query)
388 url = f"https://duckduckgo.com/?q={q}&iax=images&ia=images"
389 req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
390 try:
391 with urllib.request.urlopen(req, timeout=15) as r:
392 html = r.read().decode("utf-8", errors="replace")
393 except Exception as e:
394 return [{"error": str(e)}]
396 # Extract vqd token for image search
397 vqd = ""
398 vqd_m = re.search(r'vqd=([\d-]+)', html)
399 if vqd_m: vqd = vqd_m.group(1)
401 if vqd:
402 img_url = f"https://duckduckgo.com/i.js?q={q}&vqd={vqd}&o=json&p=1&s=0"
403 try:
404 with urllib.request.urlopen(urllib.request.Request(img_url, headers={"User-Agent": "Mozilla/5.0"}), timeout=15) as r:
405 data = json.loads(r.read())
406 results = data.get("results", [])[:max_results]
407 return [{"title": r.get("title",""), "url": r.get("url",""),
408 "thumbnail": r.get("thumbnail",""), "source": "duckduckgo"} for r in results]
409 except: pass
410 return []
412 def _handle_suggest(self, query: str) -> List[str]:
413 q = urllib.parse.quote_plus(query)
414 url = f"https://duckduckgo.com/ac/?q={q}&type=list"
415 try:
416 with urllib.request.urlopen(url, timeout=8) as r:
417 data = json.loads(r.read())
418 return [item.get("phrase","") for item in data[:10]]
419 except: return []
422# ── Git Server (4 tools) ─────────────────────
425class GitServer:
426 """MCP-compatible git operations."""
428 NAME = "git"
429 VERSION = "1.0.0"
431 def __init__(self, repo_path: str = ""):
432 self._repo_path = Path(repo_path).resolve() if repo_path else Path.cwd()
433 self._git: Optional[str] = shutil.which("git")
435 def _run_git(self, *args) -> Dict[str, Any]:
436 if not self._git: return {"error": "git not installed"}
437 try:
438 r = subprocess.run([self._git, *args], capture_output=True, text=True,
439 cwd=str(self._repo_path), timeout=30)
440 return {"stdout": r.stdout.strip(), "stderr": r.stderr.strip(), "returncode": r.returncode}
441 except subprocess.TimeoutExpired:
442 return {"error": "Command timed out"}
443 except Exception as e:
444 return {"error": str(e)}
446 def get_tools(self) -> List[Dict[str, Any]]:
447 return [
448 {"name": "git_status", "description": "Show working tree status",
449 "inputSchema": {"type": "object", "properties": {}}},
450 {"name": "git_log", "description": "Show commit history",
451 "inputSchema": {"type": "object", "properties": {
452 "max_count": {"type": "integer", "default": 10}, "oneline": {"type": "boolean", "default": True}}}},
453 {"name": "git_diff", "description": "Show changes between commits/working tree",
454 "inputSchema": {"type": "object", "properties": {
455 "staged": {"type": "boolean", "default": False}, "commit": {"type": "string"}}}},
456 {"name": "git_branch", "description": "List branches",
457 "inputSchema": {"type": "object", "properties": {"remote": {"type": "boolean", "default": False}}}},
458 ]
460 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
461 return getattr(self, f"_handle_{tool_name}")(**arguments)
463 def _handle_git_status(self) -> Dict:
464 return self._run_git("status", "--porcelain")
466 def _handle_git_log(self, max_count: int = 10, oneline: bool = True) -> Dict:
467 args = ["log", f"-n{max_count}"]
468 if oneline: args.append("--oneline")
469 return self._run_git(*args)
471 def _handle_git_diff(self, staged: bool = False, commit: str = "") -> Dict:
472 args = ["diff"]
473 if staged: args.append("--staged")
474 if commit: args.append(commit)
475 return self._run_git(*args)
477 def _handle_git_branch(self, remote: bool = False) -> Dict:
478 args = ["branch"]
479 if remote: args.append("-r")
480 return self._run_git(*args)
483# ── Shell Server (3 tools) ───────────────────
486class ShellServer:
487 """MCP-compatible safe shell command execution."""
489 NAME = "shell"
490 VERSION = "1.0.0"
492 SAFE_COMMANDS = {"ls", "cat", "head", "tail", "wc", "grep", "find", "du", "df",
493 "echo", "date", "whoami", "uname", "pwd", "which", "env", "ps",
494 "top", "htop", "tree", "file", "stat", "md5sum", "sha256sum",
495 "python3", "python", "pip", "npm", "node", "curl", "wget"}
497 def _is_safe(self, cmd: str) -> bool:
498 base = cmd.strip().split()[0] if cmd.strip() else ""
499 return base in self.SAFE_COMMANDS
501 def get_tools(self) -> List[Dict[str, Any]]:
502 return [
503 {"name": "run_command", "description": "Execute a safe shell command",
504 "inputSchema": {"type": "object", "properties": {
505 "command": {"type": "string"}, "timeout": {"type": "integer", "default": 30}},
506 "required": ["command"]}},
507 {"name": "system_info", "description": "Get system information (OS, CPU, memory)",
508 "inputSchema": {"type": "object", "properties": {}}},
509 {"name": "disk_usage", "description": "Show disk usage for a path",
510 "inputSchema": {"type": "object", "properties": {
511 "path": {"type": "string", "default": "."}}}},
512 ]
514 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
515 return getattr(self, f"_handle_{tool_name}")(**arguments)
517 def _handle_run_command(self, command: str, timeout: int = 30) -> Dict:
518 if not self._is_safe(command):
519 return {"error": f"Command not in safelist. Allowed: {sorted(self.SAFE_COMMANDS)}"}
520 try:
521 r = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
522 return {"stdout": r.stdout, "stderr": r.stderr, "returncode": r.returncode}
523 except subprocess.TimeoutExpired: return {"error": "Timeout"}
524 except Exception as e: return {"error": str(e)}
526 def _handle_system_info(self) -> Dict:
527 import platform
528 return {"os": platform.system(), "release": platform.release(), "version": platform.version(),
529 "machine": platform.machine(), "processor": platform.processor(),
530 "python": platform.python_version(), "hostname": platform.node()}
532 def _handle_disk_usage(self, path: str = ".") -> Dict:
533 try:
534 usage = shutil.disk_usage(path)
535 return {"path": path, "total_gb": round(usage.total/1024**3, 2),
536 "used_gb": round(usage.used/1024**3, 2), "free_gb": round(usage.free/1024**3, 2)}
537 except Exception as e: return {"error": str(e)}
540# ── Code Server (3 tools) ────────────────────
543class CodeServer:
544 """MCP-compatible sandboxed code execution."""
546 NAME = "code"
547 VERSION = "1.0.0"
549 def get_tools(self) -> List[Dict[str, Any]]:
550 return [
551 {"name": "run_python", "description": "Execute Python code in sandbox",
552 "inputSchema": {"type": "object", "properties": {
553 "code": {"type": "string"}, "timeout": {"type": "integer", "default": 10}},
554 "required": ["code"]}},
555 {"name": "run_shell", "description": "Execute a one-liner bash command",
556 "inputSchema": {"type": "object", "properties": {
557 "command": {"type": "string"}, "timeout": {"type": "integer", "default": 10}},
558 "required": ["command"]}},
559 {"name": "lint_code", "description": "Basic code linting (syntax check)",
560 "inputSchema": {"type": "object", "properties": {
561 "code": {"type": "string"}, "language": {"type": "string", "default": "python"}},
562 "required": ["code"]}},
563 ]
565 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
566 return getattr(self, f"_handle_{tool_name}")(**arguments)
568 def _handle_run_python(self, code: str, timeout: int = 10) -> Dict:
569 try:
570 # Restricted execution via compile + eval in limited namespace
571 restricted_globals = {"__builtins__": {
572 "print": print, "len": len, "range": range, "int": int, "float": float,
573 "str": str, "list": list, "dict": dict, "bool": bool, "set": set, "tuple": tuple,
574 "sum": sum, "min": min, "max": max, "abs": abs, "round": round, "sorted": sorted,
575 "enumerate": enumerate, "zip": zip, "map": map, "filter": filter,
576 "json": __import__("json"), "math": __import__("math"),
577 "datetime": __import__("datetime"), "re": __import__("re"),
578 "collections": __import__("collections"), "itertools": __import__("itertools"),
579 }}
580 import io, sys
581 old_stdout = sys.stdout
582 sys.stdout = buffer = io.StringIO()
583 try:
584 compiled = compile(code, "<mcp_sandbox>", "exec")
585 exec(compiled, restricted_globals)
586 output = buffer.getvalue()
587 finally:
588 sys.stdout = old_stdout
589 return {"output": output, "success": True}
590 except Exception as e:
591 return {"output": str(e), "success": False, "error": type(e).__name__}
593 def _handle_run_shell(self, command: str, timeout: int = 10) -> Dict:
594 try:
595 r = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
596 return {"stdout": r.stdout, "stderr": r.stderr, "returncode": r.returncode}
597 except subprocess.TimeoutExpired: return {"error": "Timeout"}
598 except Exception as e: return {"error": str(e)}
600 def _handle_lint_code(self, code: str, language: str = "python") -> Dict:
601 if language == "python":
602 try:
603 compile(code, "<lint>", "exec")
604 return {"valid": True, "errors": []}
605 except SyntaxError as e:
606 return {"valid": False, "errors": [{"line": e.lineno, "offset": e.offset, "message": e.msg}]}
607 return {"valid": None, "message": f"Linting not supported for {language}"}
610# ── Text Server (4 tools) ────────────────────
613class TextServer:
614 """MCP-compatible text manipulation tools."""
616 NAME = "text"
617 VERSION = "1.0.0"
619 def get_tools(self) -> List[Dict[str, Any]]:
620 return [
621 {"name": "count_tokens", "description": "Estimate token count (OpenAI tiktoken-style approximate)",
622 "inputSchema": {"type": "object", "properties": {
623 "text": {"type": "string"}}, "required": ["text"]}},
624 {"name": "extract_regex", "description": "Extract patterns from text using regex",
625 "inputSchema": {"type": "object", "properties": {
626 "text": {"type": "string"}, "pattern": {"type": "string"}, "group": {"type": "integer", "default": 0}},
627 "required": ["text", "pattern"]}},
628 {"name": "summarize_text", "description": "Simple extractive text summarization",
629 "inputSchema": {"type": "object", "properties": {
630 "text": {"type": "string"}, "max_sentences": {"type": "integer", "default": 5}},
631 "required": ["text"]}},
632 {"name": "format_json", "description": "Format/validate/prettify JSON",
633 "inputSchema": {"type": "object", "properties": {
634 "text": {"type": "string"}, "indent": {"type": "integer", "default": 2}},
635 "required": ["text"]}},
636 ]
638 def call_tool(self, tool_name: str, arguments: Dict) -> Any:
639 return getattr(self, f"_handle_{tool_name}")(**arguments)
641 def _handle_count_tokens(self, text: str) -> Dict:
642 # Approximate: ~4 chars per token for English, ~1.5 for CJK
643 words = len(re.findall(r'\w+', text))
644 chars = len(text)
645 return {"tokens_approx": max(1, words + chars // 4), "characters": chars, "words": words}
647 def _handle_extract_regex(self, text: str, pattern: str, group: int = 0) -> List[str]:
648 try:
649 return [m.group(group) if group else m.group(0) for m in re.finditer(pattern, text)]
650 except re.error as e: return [f"Invalid regex: {e}"]
652 def _handle_summarize_text(self, text: str, max_sentences: int = 5) -> str:
653 sentences = re.split(r'(?<=[.!?])\s+', text)
654 if len(sentences) <= max_sentences: return text
655 # Simple extractive: take first sentence + longest sentences
656 first = sentences[0]
657 rest = sorted(sentences[1:], key=len, reverse=True)[:max_sentences - 1]
658 return ". ".join([first] + rest) + "."
660 def _handle_format_json(self, text: str, indent: int = 2) -> Dict:
661 try:
662 data = json.loads(text)
663 formatted = json.dumps(data, indent=indent, ensure_ascii=False)
664 return {"valid": True, "formatted": formatted, "keys": list(data.keys()) if isinstance(data, dict) else None}
665 except json.JSONDecodeError as e:
666 return {"valid": False, "error": str(e)}
669# ── Built-in Server Registry ─────────────────
672class BuiltinMCPRegistry:
673 """Registry of all built-in MCP servers. Single interface for tool discovery/calling."""
675 def __init__(self):
676 self._servers: Dict[str, Any] = {}
678 def register_server(self, server: Any) -> None:
679 self._servers[server.NAME] = server
681 def list_all_tools(self) -> List[Dict[str, Any]]:
682 tools = []
683 for srv_name, server in self._servers.items():
684 for tool in server.get_tools():
685 tools.append({"server": srv_name, "name": f"mcp__{srv_name}__{tool['name']}",
686 "description": tool.get("description", ""), "inputSchema": tool.get("inputSchema", {})})
687 return tools
689 def get_tool_schemas(self, format: str = "openai") -> List[Dict[str, Any]]:
690 schemas = []
691 for srv_name, server in self._servers.items():
692 for tool in server.get_tools():
693 schemas.append({"type": "function",
694 "function": {"name": f"mcp__{srv_name}__{tool['name']}",
695 "description": tool.get("description", ""),
696 "parameters": tool.get("inputSchema", {})}})
697 return schemas
699 def call_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]) -> Any:
700 server = self._servers.get(server_name)
701 if not server: raise ValueError(f"Server '{server_name}' not found")
702 return server.call_tool(tool_name, arguments)
704 def call_tool_by_full_name(self, full_name: str, arguments: Dict[str, Any]) -> Any:
705 parts = full_name.split("__", 2)
706 if len(parts) != 3 or parts[0] != "mcp": raise ValueError(f"Invalid tool name: {full_name}")
707 return self.call_tool(parts[1], parts[2], arguments)
709 @property
710 def server_names(self) -> List[str]: return list(self._servers.keys())
712 @property
713 def tool_count(self) -> int: return sum(len(s.get_tools()) for s in self._servers.values())
716def create_default_registry(
717 allowed_paths: Optional[List[str]] = None,
718 memory_path: Optional[str] = None,
719 repo_path: str = "",
720) -> BuiltinMCPRegistry:
721 """Create a BuiltinMCPRegistry with all 8 servers registered."""
722 if allowed_paths is None: allowed_paths = [os.getcwd(), str(Path.home())]
723 reg = BuiltinMCPRegistry()
724 reg.register_server(FilesystemServer(allowed_paths=allowed_paths))
725 reg.register_server(WebFetchServer())
726 reg.register_server(MemoryServer(storage_path=memory_path or ""))
727 reg.register_server(SearchServer())
728 reg.register_server(GitServer(repo_path=repo_path))
729 reg.register_server(ShellServer())
730 reg.register_server(CodeServer())
731 reg.register_server(TextServer())
732 return reg