Coverage for agentos/mcp/builtin_servers.py: 29%

402 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Built-in MCP Servers for AgentOS (v1.8.1). 

3 

4Pure Python implementations of common MCP tools. 8 servers, 32+ tools total. 

5 

6Servers: 

7 FilesystemServer (7) - Safe file I/O with path validation 

8 WebFetchServer (3) - HTTP client with content extraction 

9 MemoryServer (6) - Persistent knowledge graph 

10 SearchServer (4) - Web search via DuckDuckGo 

11 GitServer (4) - Git operations 

12 ShellServer (3) - Safe shell command execution 

13 CodeServer (3) - Python/JS code execution in sandbox 

14 TextServer (4) - Text manipulation & formatting 

15""" 

16 

17from __future__ import annotations 

18 

19import json 

20import os 

21import re 

22import hashlib 

23import shutil 

24import subprocess 

25import tempfile 

26import time 

27import urllib.parse 

28import urllib.request 

29import urllib.error 

30from dataclasses import dataclass, field 

31from datetime import datetime 

32from pathlib import Path 

33from typing import Any, Dict, List, Optional 

34 

35 

36# ── Filesystem MCP Server (7 tools) ───────── 

37 

38 

39class FilesystemServer: 

40 """MCP-compatible filesystem server with safe path validation.""" 

41 

42 NAME = "filesystem" 

43 VERSION = "1.0.0" 

44 

45 def __init__(self, allowed_paths: Optional[List[str]] = None): 

46 self._allowed_paths = [ 

47 Path(p).resolve() 

48 for p in (allowed_paths or [os.getcwd(), str(Path.home())]) 

49 ] 

50 for p in self._allowed_paths: 

51 p.mkdir(parents=True, exist_ok=True) 

52 

53 def _validate_path(self, path_str: str) -> Path: 

54 p = Path(path_str).expanduser().resolve() 

55 for allowed in self._allowed_paths: 

56 try: 

57 p.relative_to(allowed) 

58 return p 

59 except ValueError: 

60 continue 

61 raise ValueError(f"Path '{path_str}' outside allowed directories") 

62 

63 def get_tools(self) -> List[Dict[str, Any]]: 

64 return [ 

65 {"name": "read_file", "description": "Read contents of a text file", 

66 "inputSchema": {"type": "object", "properties": { 

67 "path": {"type": "string"}, "encoding": {"type": "string", "default": "utf-8"}}, 

68 "required": ["path"]}}, 

69 {"name": "write_file", "description": "Write text content to a file", 

70 "inputSchema": {"type": "object", "properties": { 

71 "path": {"type": "string"}, "content": {"type": "string"}, 

72 "encoding": {"type": "string", "default": "utf-8"}}, 

73 "required": ["path", "content"]}}, 

74 {"name": "list_directory", "description": "List directory contents with metadata", 

75 "inputSchema": {"type": "object", "properties": { 

76 "path": {"type": "string"}, "recursive": {"type": "boolean", "default": False}}, 

77 "required": ["path"]}}, 

78 {"name": "search_files", "description": "Search files by glob pattern", 

79 "inputSchema": {"type": "object", "properties": { 

80 "path": {"type": "string"}, "pattern": {"type": "string"}}, 

81 "required": ["path", "pattern"]}}, 

82 {"name": "get_file_info", "description": "Get file/directory metadata", 

83 "inputSchema": {"type": "object", "properties": { 

84 "path": {"type": "string"}}, "required": ["path"]}}, 

85 {"name": "create_directory", "description": "Create directory and parents", 

86 "inputSchema": {"type": "object", "properties": { 

87 "path": {"type": "string"}}, "required": ["path"]}}, 

88 {"name": "move_file", "description": "Move or rename a file/directory", 

89 "inputSchema": {"type": "object", "properties": { 

90 "source": {"type": "string"}, "destination": {"type": "string"}}, 

91 "required": ["source", "destination"]}}, 

92 ] 

93 

94 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

95 return getattr(self, f"_handle_{tool_name}")(**arguments) 

96 

97 def _handle_read_file(self, path: str, encoding: str = "utf-8") -> str: 

98 p = self._validate_path(path) 

99 if not p.is_file(): raise FileNotFoundError(f"Not found: {p}") 

100 return p.read_text(encoding=encoding) 

101 

102 def _handle_write_file(self, path: str, content: str, encoding: str = "utf-8") -> str: 

103 p = self._validate_path(path) 

104 p.parent.mkdir(parents=True, exist_ok=True) 

105 p.write_text(content, encoding=encoding) 

106 return f"Wrote {len(content)} bytes to {p}" 

107 

108 def _handle_list_directory(self, path: str, recursive: bool = False) -> List[Dict]: 

109 p = self._validate_path(path) 

110 if not p.is_dir(): raise NotADirectoryError(f"Not a directory: {p}") 

111 entries = [] 

112 for item in (p.rglob("*") if recursive else p.iterdir()): 

113 if item.name.startswith("."): continue 

114 s = item.stat() 

115 entries.append({"name": item.name, "path": str(item), "size": s.st_size, 

116 "is_dir": item.is_dir(), "modified": datetime.fromtimestamp(s.st_mtime).isoformat()}) 

117 return sorted(entries, key=lambda e: (not e["is_dir"], e["name"])) 

118 

119 def _handle_search_files(self, path: str, pattern: str) -> List[Dict]: 

120 p = self._validate_path(path) 

121 if not p.is_dir(): raise NotADirectoryError(f"Not a directory: {p}") 

122 return sorted([{"name": i.name, "path": str(i), "size": i.stat().st_size, "is_dir": i.is_dir()} 

123 for i in p.rglob(pattern) if not i.name.startswith(".")], key=lambda e: e["path"]) 

124 

125 def _handle_get_file_info(self, path: str) -> Dict: 

126 p = self._validate_path(path) 

127 if not p.exists(): raise FileNotFoundError(f"Not found: {p}") 

128 s = p.stat(); ext = p.suffix.lower() 

129 mime = {".txt": "text/plain", ".md": "text/markdown", ".py": "text/x-python", 

130 ".json": "application/json", ".html": "text/html", ".pdf": "application/pdf"} 

131 return {"name": p.name, "path": str(p), "size": s.st_size, "is_dir": p.is_dir(), 

132 "is_file": p.is_file(), "extension": ext, "mime_type": mime.get(ext, "application/octet-stream"), 

133 "created": datetime.fromtimestamp(s.st_ctime).isoformat(), 

134 "modified": datetime.fromtimestamp(s.st_mtime).isoformat(), 

135 "permissions": oct(s.st_mode)[-3:]} 

136 

137 def _handle_create_directory(self, path: str) -> str: 

138 p = self._validate_path(path) 

139 p.mkdir(parents=True, exist_ok=True) 

140 return f"Created: {p}" 

141 

142 def _handle_move_file(self, source: str, destination: str) -> str: 

143 src = self._validate_path(source); dst = self._validate_path(destination) 

144 if not src.exists(): raise FileNotFoundError(f"Not found: {src}") 

145 src.rename(dst) 

146 return f"Moved {src} -> {dst}" 

147 

148 

149# ── Web Fetch MCP Server (3 tools) ─────────── 

150 

151 

152class WebFetchServer: 

153 """MCP-compatible HTTP client with content extraction.""" 

154 

155 NAME = "webfetch" 

156 VERSION = "1.0.0" 

157 

158 def __init__(self, user_agent: str = "AgentOS-MCP/1.0", timeout: int = 30): 

159 self._ua = user_agent; self._timeout = timeout 

160 

161 def get_tools(self) -> List[Dict[str, Any]]: 

162 return [ 

163 {"name": "fetch_url", "description": "Fetch a web page and return cleaned text", 

164 "inputSchema": {"type": "object", "properties": { 

165 "url": {"type": "string"}, "max_length": {"type": "integer", "default": 50000}}, 

166 "required": ["url"]}}, 

167 {"name": "fetch_json", "description": "Fetch and parse JSON from URL", 

168 "inputSchema": {"type": "object", "properties": { 

169 "url": {"type": "string"}, "headers": {"type": "object"}}, 

170 "required": ["url"]}}, 

171 {"name": "check_url", "description": "HEAD request to verify URL accessibility", 

172 "inputSchema": {"type": "object", "properties": { 

173 "url": {"type": "string"}}, "required": ["url"]}}, 

174 ] 

175 

176 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

177 return getattr(self, f"_handle_{tool_name}")(**arguments) 

178 

179 @staticmethod 

180 def _validate_url(url: str) -> None: 

181 p = urllib.parse.urlparse(url) 

182 if p.scheme not in ("http", "https"): raise ValueError(f"Invalid scheme: {p.scheme}") 

183 

184 @staticmethod 

185 def _strip_html(html: str) -> str: 

186 html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.I) 

187 html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.I) 

188 text = re.sub(r'<[^>]+>', ' ', html) 

189 text = re.sub(r'\s+', ' ', text).strip() 

190 for e, c in [('&amp;','&'),('&lt;','<'),('&gt;','>'),('&quot;','"'),('&#39;',"'"),('&nbsp;',' ')]: 

191 text = text.replace(e, c) 

192 return text 

193 

194 def _handle_fetch_url(self, url: str, max_length: int = 50000) -> str: 

195 self._validate_url(url) 

196 req = urllib.request.Request(url, headers={"User-Agent": self._ua}) 

197 try: 

198 with urllib.request.urlopen(req, timeout=self._timeout) as r: 

199 ct = r.headers.get("Content-Type", "") 

200 enc = ct.split("charset=")[-1].split(";")[0].strip() if "charset=" in ct else "utf-8" 

201 txt = r.read().decode(enc, errors="replace") 

202 if "text/html" in ct: txt = self._strip_html(txt) 

203 return txt[:max_length] + (f"\n\n[Truncated at {max_length}]" if len(txt) > max_length else "") 

204 except urllib.error.HTTPError as e: return f"HTTP {e.code}: {e.reason}" 

205 except Exception as e: return f"Error: {e}" 

206 

207 def _handle_fetch_json(self, url: str, headers: Optional[Dict] = None) -> Any: 

208 self._validate_url(url) 

209 h = {"User-Agent": self._ua, "Accept": "application/json"} 

210 if headers: h.update(headers) 

211 try: 

212 with urllib.request.urlopen(urllib.request.Request(url, headers=h), timeout=self._timeout) as r: 

213 return json.loads(r.read()) 

214 except json.JSONDecodeError as e: return {"error": "Invalid JSON", "detail": str(e)} 

215 except Exception as e: return {"error": str(e)} 

216 

217 def _handle_check_url(self, url: str) -> Dict: 

218 self._validate_url(url) 

219 req = urllib.request.Request(url, headers={"User-Agent": self._ua}, method="HEAD") 

220 try: 

221 with urllib.request.urlopen(req, timeout=self._timeout) as r: 

222 return {"url": url, "status": r.status, "accessible": 200 <= r.status < 400, 

223 "content_type": r.headers.get("Content-Type",""), "content_length": r.headers.get("Content-Length","")} 

224 except urllib.error.HTTPError as e: return {"url": url, "status": e.code, "accessible": False, "reason": e.reason} 

225 except Exception as e: return {"url": url, "status": 0, "accessible": False, "error": str(e)} 

226 

227 

228# ── Memory / Knowledge Graph MCP (6 tools) ─── 

229 

230 

231class MemoryServer: 

232 """Persistent knowledge graph for agent memory.""" 

233 

234 NAME = "memory" 

235 VERSION = "1.0.0" 

236 

237 def __init__(self, storage_path: str = ""): 

238 self._path = Path(storage_path or str(Path.home() / ".agentos" / "memory" / "kg.json")) 

239 self._path.parent.mkdir(parents=True, exist_ok=True) 

240 self._entries: Dict[str, Dict] = {} 

241 if self._path.exists(): 

242 try: 

243 for e in json.loads(self._path.read_text()).get("entries", []): self._entries[e["id"]] = e 

244 except: pass 

245 

246 def _save(self): 

247 self._path.write_text(json.dumps({"version": "1.0", "updated_at": time.time(), 

248 "entries": list(self._entries.values())}, indent=2, ensure_ascii=False)) 

249 

250 def _make_id(self, content: str) -> str: 

251 return hashlib.sha256(content.encode()).hexdigest()[:16] 

252 

253 def get_tools(self) -> List[Dict[str, Any]]: 

254 return [ 

255 {"name": "store_memory", "description": "Store a fact/memory", 

256 "inputSchema": {"type": "object", "properties": { 

257 "content": {"type": "string"}, "category": {"type": "string", "default": "general"}, 

258 "tags": {"type": "array", "items": {"type": "string"}}, "metadata": {"type": "object"}}, 

259 "required": ["content"]}}, 

260 {"name": "retrieve_memory", "description": "Retrieve memory by ID", 

261 "inputSchema": {"type": "object", "properties": {"memory_id": {"type": "string"}}, "required": ["memory_id"]}}, 

262 {"name": "search_memory", "description": "Search memories by keyword/category/tags", 

263 "inputSchema": {"type": "object", "properties": { 

264 "query": {"type": "string"}, "category": {"type": "string"}, 

265 "tags": {"type": "array", "items": {"type": "string"}}, "limit": {"type": "integer", "default": 20}}}}, 

266 {"name": "list_categories", "description": "List all categories with counts", 

267 "inputSchema": {"type": "object", "properties": {}}}, 

268 {"name": "delete_memory", "description": "Delete a memory by ID", 

269 "inputSchema": {"type": "object", "properties": {"memory_id": {"type": "string"}}, "required": ["memory_id"]}}, 

270 {"name": "update_memory", "description": "Update memory content/metadata", 

271 "inputSchema": {"type": "object", "properties": { 

272 "memory_id": {"type": "string"}, "content": {"type": "string"}, 

273 "category": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}}, 

274 "required": ["memory_id"]}}, 

275 ] 

276 

277 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

278 return getattr(self, f"_handle_{tool_name}")(**arguments) 

279 

280 def _handle_store_memory(self, content: str, category: str = "general", 

281 tags: list = None, metadata: dict = None) -> Dict: 

282 mid = self._make_id(content) 

283 entry = self._entries.get(mid, {}) 

284 if entry: 

285 entry["updated_at"] = time.time() 

286 if tags: entry.setdefault("tags", []).extend(t for t in tags if t not in entry.get("tags", [])) 

287 if category != "general": entry["category"] = category 

288 self._save(); return {"id": mid, "action": "updated"} 

289 entry = {"id": mid, "content": content, "category": category, "tags": tags or [], 

290 "metadata": metadata or {}, "created_at": time.time(), "updated_at": time.time()} 

291 self._entries[mid] = entry; self._save() 

292 return {"id": mid, "action": "stored"} 

293 

294 def _handle_retrieve_memory(self, memory_id: str) -> Optional[Dict]: 

295 return self._entries.get(memory_id) 

296 

297 def _handle_search_memory(self, query: str = "", category: str = None, 

298 tags: list = None, limit: int = 20) -> List[Dict]: 

299 results = [] 

300 for e in self._entries.values(): 

301 if category and e.get("category") != category: continue 

302 if tags and not all(t in e.get("tags", []) for t in tags): continue 

303 if query and query.lower() not in e.get("content","").lower(): continue 

304 results.append({"id": e["id"], "content": e["content"], "category": e.get("category",""), 

305 "tags": e.get("tags",[]), "created_at": e.get("created_at",0)}) 

306 return sorted(results, key=lambda r: r["created_at"], reverse=True)[:limit] 

307 

308 def _handle_list_categories(self) -> Dict[str, int]: 

309 c: Dict[str, int] = {} 

310 for e in self._entries.values(): c[e.get("category","general")] = c.get(e.get("category","general"), 0) + 1 

311 return dict(sorted(c.items(), key=lambda x: -x[1])) 

312 

313 def _handle_delete_memory(self, memory_id: str) -> Dict: 

314 if memory_id in self._entries: del self._entries[memory_id]; self._save(); return {"deleted": True, "id": memory_id} 

315 return {"deleted": False, "id": memory_id, "reason": "not found"} 

316 

317 def _handle_update_memory(self, memory_id: str, content: str = None, category: str = None, tags: list = None) -> Dict: 

318 e = self._entries.get(memory_id) 

319 if not e: return {"updated": False, "reason": "not found"} 

320 if content is not None: e["content"] = content 

321 if category is not None: e["category"] = category 

322 if tags is not None: e["tags"] = tags 

323 e["updated_at"] = time.time(); self._save() 

324 return {"updated": True, "id": memory_id} 

325 

326 

327# ── Web Search Server (4 tools) ────────────── 

328 

329 

330class SearchServer: 

331 """MCP-compatible web search via DuckDuckGo + Google fallback.""" 

332 

333 NAME = "search" 

334 VERSION = "1.0.0" 

335 

336 def get_tools(self) -> List[Dict[str, Any]]: 

337 return [ 

338 {"name": "web_search", "description": "Search the web", 

339 "inputSchema": {"type": "object", "properties": { 

340 "query": {"type": "string"}, "max_results": {"type": "integer", "default": 10}}, 

341 "required": ["query"]}}, 

342 {"name": "news_search", "description": "Search for recent news", 

343 "inputSchema": {"type": "object", "properties": { 

344 "query": {"type": "string"}, "max_results": {"type": "integer", "default": 10}}, 

345 "required": ["query"]}}, 

346 {"name": "image_search", "description": "Search for images", 

347 "inputSchema": {"type": "object", "properties": { 

348 "query": {"type": "string"}, "max_results": {"type": "integer", "default": 10}}, 

349 "required": ["query"]}}, 

350 {"name": "suggest", "description": "Get search autocomplete suggestions", 

351 "inputSchema": {"type": "object", "properties": { 

352 "query": {"type": "string"}}, "required": ["query"]}}, 

353 ] 

354 

355 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

356 return getattr(self, f"_handle_{tool_name}")(**arguments) 

357 

358 def _handle_web_search(self, query: str, max_results: int = 10) -> List[Dict]: 

359 """Search via DuckDuckGo HTML (no API key needed).""" 

360 q = urllib.parse.quote_plus(query) 

361 url = f"https://html.duckduckgo.com/html/?q={q}" 

362 req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) 

363 try: 

364 with urllib.request.urlopen(req, timeout=15) as r: 

365 html = r.read().decode("utf-8", errors="replace") 

366 except Exception as e: 

367 return [{"error": f"Search failed: {e}"}] 

368 

369 results = [] 

370 # Parse DuckDuckGo HTML results 

371 for m in re.finditer(r'<a[^>]*class="result__a"[^>]*href="([^"]*)"[^>]*>(.*?)</a>', html, re.DOTALL): 

372 if len(results) >= max_results: break 

373 link = m.group(1) 

374 title = re.sub(r'<[^>]+>', '', m.group(2)).strip() 

375 # Find snippet 

376 snippet = "" 

377 sn_match = re.search(r'<a[^>]*class="result__snippet"[^>]*>(.*?)</a>', html[m.end():m.end()+500], re.DOTALL) 

378 if sn_match: snippet = re.sub(r'<[^>]+>', '', sn_match.group(1)).strip() 

379 results.append({"title": title, "url": link, "snippet": snippet, "source": "duckduckgo"}) 

380 return results 

381 

382 def _handle_news_search(self, query: str, max_results: int = 10) -> List[Dict]: 

383 q = urllib.parse.quote_plus(f"{query} news") 

384 return self._handle_web_search(q, max_results) 

385 

386 def _handle_image_search(self, query: str, max_results: int = 10) -> List[Dict]: 

387 q = urllib.parse.quote_plus(query) 

388 url = f"https://duckduckgo.com/?q={q}&iax=images&ia=images" 

389 req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) 

390 try: 

391 with urllib.request.urlopen(req, timeout=15) as r: 

392 html = r.read().decode("utf-8", errors="replace") 

393 except Exception as e: 

394 return [{"error": str(e)}] 

395 

396 # Extract vqd token for image search 

397 vqd = "" 

398 vqd_m = re.search(r'vqd=([\d-]+)', html) 

399 if vqd_m: vqd = vqd_m.group(1) 

400 

401 if vqd: 

402 img_url = f"https://duckduckgo.com/i.js?q={q}&vqd={vqd}&o=json&p=1&s=0" 

403 try: 

404 with urllib.request.urlopen(urllib.request.Request(img_url, headers={"User-Agent": "Mozilla/5.0"}), timeout=15) as r: 

405 data = json.loads(r.read()) 

406 results = data.get("results", [])[:max_results] 

407 return [{"title": r.get("title",""), "url": r.get("url",""), 

408 "thumbnail": r.get("thumbnail",""), "source": "duckduckgo"} for r in results] 

409 except: pass 

410 return [] 

411 

412 def _handle_suggest(self, query: str) -> List[str]: 

413 q = urllib.parse.quote_plus(query) 

414 url = f"https://duckduckgo.com/ac/?q={q}&type=list" 

415 try: 

416 with urllib.request.urlopen(url, timeout=8) as r: 

417 data = json.loads(r.read()) 

418 return [item.get("phrase","") for item in data[:10]] 

419 except: return [] 

420 

421 

422# ── Git Server (4 tools) ───────────────────── 

423 

424 

425class GitServer: 

426 """MCP-compatible git operations.""" 

427 

428 NAME = "git" 

429 VERSION = "1.0.0" 

430 

431 def __init__(self, repo_path: str = ""): 

432 self._repo_path = Path(repo_path).resolve() if repo_path else Path.cwd() 

433 self._git: Optional[str] = shutil.which("git") 

434 

435 def _run_git(self, *args) -> Dict[str, Any]: 

436 if not self._git: return {"error": "git not installed"} 

437 try: 

438 r = subprocess.run([self._git, *args], capture_output=True, text=True, 

439 cwd=str(self._repo_path), timeout=30) 

440 return {"stdout": r.stdout.strip(), "stderr": r.stderr.strip(), "returncode": r.returncode} 

441 except subprocess.TimeoutExpired: 

442 return {"error": "Command timed out"} 

443 except Exception as e: 

444 return {"error": str(e)} 

445 

446 def get_tools(self) -> List[Dict[str, Any]]: 

447 return [ 

448 {"name": "git_status", "description": "Show working tree status", 

449 "inputSchema": {"type": "object", "properties": {}}}, 

450 {"name": "git_log", "description": "Show commit history", 

451 "inputSchema": {"type": "object", "properties": { 

452 "max_count": {"type": "integer", "default": 10}, "oneline": {"type": "boolean", "default": True}}}}, 

453 {"name": "git_diff", "description": "Show changes between commits/working tree", 

454 "inputSchema": {"type": "object", "properties": { 

455 "staged": {"type": "boolean", "default": False}, "commit": {"type": "string"}}}}, 

456 {"name": "git_branch", "description": "List branches", 

457 "inputSchema": {"type": "object", "properties": {"remote": {"type": "boolean", "default": False}}}}, 

458 ] 

459 

460 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

461 return getattr(self, f"_handle_{tool_name}")(**arguments) 

462 

463 def _handle_git_status(self) -> Dict: 

464 return self._run_git("status", "--porcelain") 

465 

466 def _handle_git_log(self, max_count: int = 10, oneline: bool = True) -> Dict: 

467 args = ["log", f"-n{max_count}"] 

468 if oneline: args.append("--oneline") 

469 return self._run_git(*args) 

470 

471 def _handle_git_diff(self, staged: bool = False, commit: str = "") -> Dict: 

472 args = ["diff"] 

473 if staged: args.append("--staged") 

474 if commit: args.append(commit) 

475 return self._run_git(*args) 

476 

477 def _handle_git_branch(self, remote: bool = False) -> Dict: 

478 args = ["branch"] 

479 if remote: args.append("-r") 

480 return self._run_git(*args) 

481 

482 

483# ── Shell Server (3 tools) ─────────────────── 

484 

485 

486class ShellServer: 

487 """MCP-compatible safe shell command execution.""" 

488 

489 NAME = "shell" 

490 VERSION = "1.0.0" 

491 

492 SAFE_COMMANDS = {"ls", "cat", "head", "tail", "wc", "grep", "find", "du", "df", 

493 "echo", "date", "whoami", "uname", "pwd", "which", "env", "ps", 

494 "top", "htop", "tree", "file", "stat", "md5sum", "sha256sum", 

495 "python3", "python", "pip", "npm", "node", "curl", "wget"} 

496 

497 def _is_safe(self, cmd: str) -> bool: 

498 base = cmd.strip().split()[0] if cmd.strip() else "" 

499 return base in self.SAFE_COMMANDS 

500 

501 def get_tools(self) -> List[Dict[str, Any]]: 

502 return [ 

503 {"name": "run_command", "description": "Execute a safe shell command", 

504 "inputSchema": {"type": "object", "properties": { 

505 "command": {"type": "string"}, "timeout": {"type": "integer", "default": 30}}, 

506 "required": ["command"]}}, 

507 {"name": "system_info", "description": "Get system information (OS, CPU, memory)", 

508 "inputSchema": {"type": "object", "properties": {}}}, 

509 {"name": "disk_usage", "description": "Show disk usage for a path", 

510 "inputSchema": {"type": "object", "properties": { 

511 "path": {"type": "string", "default": "."}}}}, 

512 ] 

513 

514 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

515 return getattr(self, f"_handle_{tool_name}")(**arguments) 

516 

517 def _handle_run_command(self, command: str, timeout: int = 30) -> Dict: 

518 if not self._is_safe(command): 

519 return {"error": f"Command not in safelist. Allowed: {sorted(self.SAFE_COMMANDS)}"} 

520 try: 

521 r = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout) 

522 return {"stdout": r.stdout, "stderr": r.stderr, "returncode": r.returncode} 

523 except subprocess.TimeoutExpired: return {"error": "Timeout"} 

524 except Exception as e: return {"error": str(e)} 

525 

526 def _handle_system_info(self) -> Dict: 

527 import platform 

528 return {"os": platform.system(), "release": platform.release(), "version": platform.version(), 

529 "machine": platform.machine(), "processor": platform.processor(), 

530 "python": platform.python_version(), "hostname": platform.node()} 

531 

532 def _handle_disk_usage(self, path: str = ".") -> Dict: 

533 try: 

534 usage = shutil.disk_usage(path) 

535 return {"path": path, "total_gb": round(usage.total/1024**3, 2), 

536 "used_gb": round(usage.used/1024**3, 2), "free_gb": round(usage.free/1024**3, 2)} 

537 except Exception as e: return {"error": str(e)} 

538 

539 

540# ── Code Server (3 tools) ──────────────────── 

541 

542 

543class CodeServer: 

544 """MCP-compatible sandboxed code execution.""" 

545 

546 NAME = "code" 

547 VERSION = "1.0.0" 

548 

549 def get_tools(self) -> List[Dict[str, Any]]: 

550 return [ 

551 {"name": "run_python", "description": "Execute Python code in sandbox", 

552 "inputSchema": {"type": "object", "properties": { 

553 "code": {"type": "string"}, "timeout": {"type": "integer", "default": 10}}, 

554 "required": ["code"]}}, 

555 {"name": "run_shell", "description": "Execute a one-liner bash command", 

556 "inputSchema": {"type": "object", "properties": { 

557 "command": {"type": "string"}, "timeout": {"type": "integer", "default": 10}}, 

558 "required": ["command"]}}, 

559 {"name": "lint_code", "description": "Basic code linting (syntax check)", 

560 "inputSchema": {"type": "object", "properties": { 

561 "code": {"type": "string"}, "language": {"type": "string", "default": "python"}}, 

562 "required": ["code"]}}, 

563 ] 

564 

565 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

566 return getattr(self, f"_handle_{tool_name}")(**arguments) 

567 

568 def _handle_run_python(self, code: str, timeout: int = 10) -> Dict: 

569 try: 

570 # Restricted execution via compile + eval in limited namespace 

571 restricted_globals = {"__builtins__": { 

572 "print": print, "len": len, "range": range, "int": int, "float": float, 

573 "str": str, "list": list, "dict": dict, "bool": bool, "set": set, "tuple": tuple, 

574 "sum": sum, "min": min, "max": max, "abs": abs, "round": round, "sorted": sorted, 

575 "enumerate": enumerate, "zip": zip, "map": map, "filter": filter, 

576 "json": __import__("json"), "math": __import__("math"), 

577 "datetime": __import__("datetime"), "re": __import__("re"), 

578 "collections": __import__("collections"), "itertools": __import__("itertools"), 

579 }} 

580 import io, sys 

581 old_stdout = sys.stdout 

582 sys.stdout = buffer = io.StringIO() 

583 try: 

584 compiled = compile(code, "<mcp_sandbox>", "exec") 

585 exec(compiled, restricted_globals) 

586 output = buffer.getvalue() 

587 finally: 

588 sys.stdout = old_stdout 

589 return {"output": output, "success": True} 

590 except Exception as e: 

591 return {"output": str(e), "success": False, "error": type(e).__name__} 

592 

593 def _handle_run_shell(self, command: str, timeout: int = 10) -> Dict: 

594 try: 

595 r = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout) 

596 return {"stdout": r.stdout, "stderr": r.stderr, "returncode": r.returncode} 

597 except subprocess.TimeoutExpired: return {"error": "Timeout"} 

598 except Exception as e: return {"error": str(e)} 

599 

600 def _handle_lint_code(self, code: str, language: str = "python") -> Dict: 

601 if language == "python": 

602 try: 

603 compile(code, "<lint>", "exec") 

604 return {"valid": True, "errors": []} 

605 except SyntaxError as e: 

606 return {"valid": False, "errors": [{"line": e.lineno, "offset": e.offset, "message": e.msg}]} 

607 return {"valid": None, "message": f"Linting not supported for {language}"} 

608 

609 

610# ── Text Server (4 tools) ──────────────────── 

611 

612 

613class TextServer: 

614 """MCP-compatible text manipulation tools.""" 

615 

616 NAME = "text" 

617 VERSION = "1.0.0" 

618 

619 def get_tools(self) -> List[Dict[str, Any]]: 

620 return [ 

621 {"name": "count_tokens", "description": "Estimate token count (OpenAI tiktoken-style approximate)", 

622 "inputSchema": {"type": "object", "properties": { 

623 "text": {"type": "string"}}, "required": ["text"]}}, 

624 {"name": "extract_regex", "description": "Extract patterns from text using regex", 

625 "inputSchema": {"type": "object", "properties": { 

626 "text": {"type": "string"}, "pattern": {"type": "string"}, "group": {"type": "integer", "default": 0}}, 

627 "required": ["text", "pattern"]}}, 

628 {"name": "summarize_text", "description": "Simple extractive text summarization", 

629 "inputSchema": {"type": "object", "properties": { 

630 "text": {"type": "string"}, "max_sentences": {"type": "integer", "default": 5}}, 

631 "required": ["text"]}}, 

632 {"name": "format_json", "description": "Format/validate/prettify JSON", 

633 "inputSchema": {"type": "object", "properties": { 

634 "text": {"type": "string"}, "indent": {"type": "integer", "default": 2}}, 

635 "required": ["text"]}}, 

636 ] 

637 

638 def call_tool(self, tool_name: str, arguments: Dict) -> Any: 

639 return getattr(self, f"_handle_{tool_name}")(**arguments) 

640 

641 def _handle_count_tokens(self, text: str) -> Dict: 

642 # Approximate: ~4 chars per token for English, ~1.5 for CJK 

643 words = len(re.findall(r'\w+', text)) 

644 chars = len(text) 

645 return {"tokens_approx": max(1, words + chars // 4), "characters": chars, "words": words} 

646 

647 def _handle_extract_regex(self, text: str, pattern: str, group: int = 0) -> List[str]: 

648 try: 

649 return [m.group(group) if group else m.group(0) for m in re.finditer(pattern, text)] 

650 except re.error as e: return [f"Invalid regex: {e}"] 

651 

652 def _handle_summarize_text(self, text: str, max_sentences: int = 5) -> str: 

653 sentences = re.split(r'(?<=[.!?])\s+', text) 

654 if len(sentences) <= max_sentences: return text 

655 # Simple extractive: take first sentence + longest sentences 

656 first = sentences[0] 

657 rest = sorted(sentences[1:], key=len, reverse=True)[:max_sentences - 1] 

658 return ". ".join([first] + rest) + "." 

659 

660 def _handle_format_json(self, text: str, indent: int = 2) -> Dict: 

661 try: 

662 data = json.loads(text) 

663 formatted = json.dumps(data, indent=indent, ensure_ascii=False) 

664 return {"valid": True, "formatted": formatted, "keys": list(data.keys()) if isinstance(data, dict) else None} 

665 except json.JSONDecodeError as e: 

666 return {"valid": False, "error": str(e)} 

667 

668 

669# ── Built-in Server Registry ───────────────── 

670 

671 

672class BuiltinMCPRegistry: 

673 """Registry of all built-in MCP servers. Single interface for tool discovery/calling.""" 

674 

675 def __init__(self): 

676 self._servers: Dict[str, Any] = {} 

677 

678 def register_server(self, server: Any) -> None: 

679 self._servers[server.NAME] = server 

680 

681 def list_all_tools(self) -> List[Dict[str, Any]]: 

682 tools = [] 

683 for srv_name, server in self._servers.items(): 

684 for tool in server.get_tools(): 

685 tools.append({"server": srv_name, "name": f"mcp__{srv_name}__{tool['name']}", 

686 "description": tool.get("description", ""), "inputSchema": tool.get("inputSchema", {})}) 

687 return tools 

688 

689 def get_tool_schemas(self, format: str = "openai") -> List[Dict[str, Any]]: 

690 schemas = [] 

691 for srv_name, server in self._servers.items(): 

692 for tool in server.get_tools(): 

693 schemas.append({"type": "function", 

694 "function": {"name": f"mcp__{srv_name}__{tool['name']}", 

695 "description": tool.get("description", ""), 

696 "parameters": tool.get("inputSchema", {})}}) 

697 return schemas 

698 

699 def call_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]) -> Any: 

700 server = self._servers.get(server_name) 

701 if not server: raise ValueError(f"Server '{server_name}' not found") 

702 return server.call_tool(tool_name, arguments) 

703 

704 def call_tool_by_full_name(self, full_name: str, arguments: Dict[str, Any]) -> Any: 

705 parts = full_name.split("__", 2) 

706 if len(parts) != 3 or parts[0] != "mcp": raise ValueError(f"Invalid tool name: {full_name}") 

707 return self.call_tool(parts[1], parts[2], arguments) 

708 

709 @property 

710 def server_names(self) -> List[str]: return list(self._servers.keys()) 

711 

712 @property 

713 def tool_count(self) -> int: return sum(len(s.get_tools()) for s in self._servers.values()) 

714 

715 

716def create_default_registry( 

717 allowed_paths: Optional[List[str]] = None, 

718 memory_path: Optional[str] = None, 

719 repo_path: str = "", 

720) -> BuiltinMCPRegistry: 

721 """Create a BuiltinMCPRegistry with all 8 servers registered.""" 

722 if allowed_paths is None: allowed_paths = [os.getcwd(), str(Path.home())] 

723 reg = BuiltinMCPRegistry() 

724 reg.register_server(FilesystemServer(allowed_paths=allowed_paths)) 

725 reg.register_server(WebFetchServer()) 

726 reg.register_server(MemoryServer(storage_path=memory_path or "")) 

727 reg.register_server(SearchServer()) 

728 reg.register_server(GitServer(repo_path=repo_path)) 

729 reg.register_server(ShellServer()) 

730 reg.register_server(CodeServer()) 

731 reg.register_server(TextServer()) 

732 return reg