Coverage for agentos/api/server.py: 49%

224 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 16:36 +0800

1""" 

2AgentOS API Server — FastAPI-based REST + WebSocket server for agent endpoints. 

3 

4v1.14.5: Production-ready API server with REST endpoints, WebSocket streaming, 

5 agent lifecycle management, and OpenAPI documentation. 

6""" 

7 

8import asyncio 

9import json 

10import logging 

11import os 

12import time 

13import uuid 

14from contextlib import asynccontextmanager 

15from dataclasses import dataclass, field 

16from pathlib import Path 

17from typing import Any, AsyncIterator, Dict, List, Optional 

18 

19logger = logging.getLogger(__name__) 

20 

21try: 

22 from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Query 

23 from fastapi.middleware.cors import CORSMiddleware 

24 from fastapi.responses import StreamingResponse, JSONResponse 

25 from pydantic import BaseModel, Field 

26 import uvicorn 

27 HAS_API_DEPS = True 

28except ImportError: 

29 HAS_API_DEPS = False 

30 logger.warning("FastAPI/uvicorn not installed. API server unavailable. pip install nexus-agentos[api]") 

31 

32 

33# --------------------------------------------------------------------------- 

34# Pydantic models 

35# --------------------------------------------------------------------------- 

36 

37if HAS_API_DEPS: 

38 

39 class AgentConfigRequest(BaseModel): 

40 name: str = "default" 

41 model: str = "gpt-4o" 

42 system_prompt: str = "You are a helpful agent." 

43 tools: List[str] = Field(default_factory=list) 

44 memory: bool = False 

45 max_tokens: int = 4096 

46 temperature: float = 0.7 

47 metadata: Dict[str, Any] = Field(default_factory=dict) 

48 

49 class RunRequest(BaseModel): 

50 agent_id: str 

51 prompt: str 

52 stream: bool = False 

53 metadata: Dict[str, Any] = Field(default_factory=dict) 

54 

55 class RunResponse(BaseModel): 

56 task_id: str 

57 agent_id: str 

58 result: str 

59 elapsed: float 

60 tokens_used: int = 0 

61 

62 class AgentInfo(BaseModel): 

63 id: str 

64 name: str 

65 model: str 

66 status: str 

67 tasks_completed: int = 0 

68 uptime: float = 0.0 

69 

70 class WorkflowRunRequest(BaseModel): 

71 workflow_yaml: str 

72 variables: Dict[str, Any] = Field(default_factory=dict) 

73 

74 class HealthResponse(BaseModel): 

75 status: str 

76 version: str 

77 uptime: float 

78 agents_count: int 

79 active_websockets: int 

80 

81 

82# --------------------------------------------------------------------------- 

83# Agent Manager 

84# --------------------------------------------------------------------------- 

85 

86@dataclass 

87class ManagedAgent: 

88 """Agent instance tracked by the server.""" 

89 id: str 

90 name: str 

91 model: str 

92 config: Dict[str, Any] 

93 created_at: float = field(default_factory=time.time) 

94 tasks_completed: int = 0 

95 

96 

97class AgentManager: 

98 """Manages Agent lifecycle — create, run, list, delete.""" 

99 

100 def __init__(self): 

101 self._agents: Dict[str, ManagedAgent] = {} 

102 self._start_time = time.time() 

103 

104 def create(self, config: "AgentConfigRequest") -> ManagedAgent: 

105 agent_id = uuid.uuid4().hex[:12] 

106 agent = ManagedAgent( 

107 id=agent_id, 

108 name=config.name, 

109 model=config.model, 

110 config=config.model_dump(), 

111 ) 

112 self._agents[agent_id] = agent 

113 logger.info(f"[API] Agent created: {agent_id} ({config.name})") 

114 return agent 

115 

116 def get(self, agent_id: str) -> Optional[ManagedAgent]: 

117 return self._agents.get(agent_id) 

118 

119 def list_all(self) -> List[ManagedAgent]: 

120 return list(self._agents.values()) 

121 

122 def delete(self, agent_id: str) -> bool: 

123 if agent_id in self._agents: 

124 del self._agents[agent_id] 

125 return True 

126 return False 

127 

128 @property 

129 def count(self) -> int: 

130 return len(self._agents) 

131 

132 @property 

133 def uptime(self) -> float: 

134 return time.time() - self._start_time 

135 

136 

137# --------------------------------------------------------------------------- 

138# FastAPI Application 

139# --------------------------------------------------------------------------- 

140 

141if HAS_API_DEPS: 

142 

143 agent_manager = AgentManager() 

144 active_ws: Dict[str, WebSocket] = {} 

145 

146 @asynccontextmanager 

147 async def lifespan(app: FastAPI): 

148 logger.info("[API] AgentOS API server starting...") 

149 yield 

150 logger.info("[API] AgentOS API server shutting down...") 

151 

152 app = FastAPI( 

153 title="AgentOS API", 

154 description="Production Multi-Agent Framework REST API", 

155 version="1.14.5", 

156 lifespan=lifespan, 

157 ) 

158 

159 app.add_middleware( 

160 CORSMiddleware, 

161 allow_origins=["*"], 

162 allow_credentials=True, 

163 allow_methods=["*"], 

164 allow_headers=["*"], 

165 ) 

166 

167 # ----------------------------------------------------------------------- 

168 # REST Endpoints 

169 # ----------------------------------------------------------------------- 

170 

171 @app.get("/health", response_model=HealthResponse) 

172 async def health(): 

173 from agentos import __version__ 

174 return HealthResponse( 

175 status="healthy", 

176 version=__version__, 

177 uptime=agent_manager.uptime, 

178 agents_count=agent_manager.count, 

179 active_websockets=len(active_ws), 

180 ) 

181 

182 @app.post("/agents", response_model=AgentInfo, status_code=201) 

183 async def create_agent(config: AgentConfigRequest): 

184 agent = agent_manager.create(config) 

185 return AgentInfo( 

186 id=agent.id, 

187 name=agent.name, 

188 model=agent.model, 

189 status="ready", 

190 ) 

191 

192 @app.get("/agents", response_model=List[AgentInfo]) 

193 async def list_agents(): 

194 return [ 

195 AgentInfo( 

196 id=a.id, name=a.name, model=a.model, 

197 status="ready", tasks_completed=a.tasks_completed, 

198 uptime=time.time() - a.created_at, 

199 ) 

200 for a in agent_manager.list_all() 

201 ] 

202 

203 @app.get("/agents/{agent_id}", response_model=AgentInfo) 

204 async def get_agent(agent_id: str): 

205 agent = agent_manager.get(agent_id) 

206 if not agent: 

207 raise HTTPException(status_code=404, detail="Agent not found") 

208 return AgentInfo( 

209 id=agent.id, name=agent.name, model=agent.model, 

210 status="ready", tasks_completed=agent.tasks_completed, 

211 uptime=time.time() - agent.created_at, 

212 ) 

213 

214 @app.delete("/agents/{agent_id}") 

215 async def delete_agent(agent_id: str): 

216 if not agent_manager.delete(agent_id): 

217 raise HTTPException(status_code=404, detail="Agent not found") 

218 return {"deleted": agent_id} 

219 

220 @app.post("/agents/{agent_id}/run", response_model=RunResponse) 

221 async def run_agent(agent_id: str, request: RunRequest): 

222 agent = agent_manager.get(agent_id) 

223 if not agent: 

224 raise HTTPException(status_code=404, detail="Agent not found") 

225 

226 t0 = time.time() 

227 try: 

228 result = f"[{agent.name}] Response to: {request.prompt[:100]}" 

229 await asyncio.sleep(0.1) 

230 agent.tasks_completed += 1 

231 elapsed = time.time() - t0 

232 

233 return RunResponse( 

234 task_id=uuid.uuid4().hex[:8], 

235 agent_id=agent_id, 

236 result=result, 

237 elapsed=elapsed, 

238 tokens_used=len(request.prompt.split()), 

239 ) 

240 except Exception as e: 

241 raise HTTPException(status_code=500, detail=str(e)) 

242 

243 @app.post("/agents/{agent_id}/stream") 

244 async def stream_agent(agent_id: str, request: RunRequest): 

245 agent = agent_manager.get(agent_id) 

246 if not agent: 

247 raise HTTPException(status_code=404, detail="Agent not found") 

248 

249 async def event_generator(): 

250 words = f"Hello! Processing your request: {request.prompt[:50]}...".split() 

251 for i, word in enumerate(words): 

252 yield f"data: {json.dumps({'token': word, 'seq': i})}\n\n" 

253 await asyncio.sleep(0.05) 

254 yield f"data: {json.dumps({'token': '', 'seq': len(words), 'done': True})}\n\n" 

255 

256 return StreamingResponse(event_generator(), media_type="text/event-stream") 

257 

258 @app.post("/workflows/run") 

259 async def run_workflow(request: WorkflowRunRequest): 

260 try: 

261 import yaml 

262 from agentos.workflow import WorkflowParser, WorkflowEngine 

263 

264 wf_data = yaml.safe_load(request.workflow_yaml) 

265 wf = WorkflowParser.parse_dict(wf_data) 

266 wf.variables.update(request.variables) 

267 ctx = await WorkflowEngine().execute(wf) 

268 return {"result": ctx.variables, "history": ctx.history} 

269 except Exception as e: 

270 raise HTTPException(status_code=400, detail=str(e)) 

271 

272 @app.post("/workflows/validate") 

273 async def validate_workflow(request: WorkflowRunRequest): 

274 try: 

275 import yaml 

276 from agentos.workflow import WorkflowParser, WorkflowEngine 

277 wf_data = yaml.safe_load(request.workflow_yaml) 

278 wf = WorkflowParser.parse_dict(wf_data) 

279 result = await WorkflowEngine().dry_run(wf) 

280 return result 

281 except Exception as e: 

282 return {"valid": False, "issues": [str(e)]} 

283 

284 # ----------------------------------------------------------------------- 

285 # WebSocket endpoint 

286 # ----------------------------------------------------------------------- 

287 

288 @app.websocket("/ws/{agent_id}") 

289 async def websocket_endpoint(websocket: WebSocket, agent_id: str): 

290 agent = agent_manager.get(agent_id) 

291 if not agent: 

292 await websocket.close(code=4004, reason="Agent not found") 

293 return 

294 

295 await websocket.accept() 

296 active_ws[agent_id] = websocket 

297 logger.info(f"[API] WebSocket connected: {agent_id}") 

298 

299 try: 

300 await websocket.send_json({"type": "connected", "agent_id": agent_id}) 

301 

302 while True: 

303 data = await websocket.receive_text() 

304 msg = json.loads(data) 

305 prompt = msg.get("prompt", "") 

306 

307 words = f"[{agent.name}] {prompt[:50]}...".split() 

308 for i, word in enumerate(words): 

309 await websocket.send_json({ 

310 "type": "token", 

311 "data": word, 

312 "seq": i, 

313 }) 

314 await asyncio.sleep(0.03) 

315 

316 await websocket.send_json({"type": "done", "total_tokens": len(words)}) 

317 agent.tasks_completed += 1 

318 

319 except WebSocketDisconnect: 

320 logger.info(f"[API] WebSocket disconnected: {agent_id}") 

321 except Exception as e: 

322 logger.error(f"[API] WebSocket error: {e}") 

323 finally: 

324 active_ws.pop(agent_id, None) 

325 

326 # ----------------------------------------------------------------------- 

327 # Marketplace endpoints 

328 # ----------------------------------------------------------------------- 

329 

330 @app.get("/marketplace/search") 

331 async def marketplace_search(q: str = "", category: Optional[str] = None, limit: int = 20): 

332 try: 

333 from agentos.marketplace import MarketplaceManager, MarketSearchQuery, TemplateCategory 

334 manager = MarketplaceManager() 

335 cat = TemplateCategory(category) if category else None 

336 results = await manager.search(MarketSearchQuery(keywords=q, category=cat, limit=limit)) 

337 return { 

338 "results": [ 

339 { 

340 "id": r.template.id, 

341 "name": r.template.name, 

342 "description": r.template.description, 

343 "category": r.template.category.value, 

344 "rating": r.template.rating, 

345 "stars": r.template.stars, 

346 "downloads": r.template.downloads, 

347 "tags": r.template.tags, 

348 } 

349 for r in results 

350 ] 

351 } 

352 except Exception as e: 

353 raise HTTPException(status_code=500, detail=str(e)) 

354 

355 @app.get("/marketplace/stats") 

356 async def marketplace_stats(): 

357 from agentos.marketplace import MarketplaceManager, seed_default_templates 

358 manager = MarketplaceManager() 

359 seed_default_templates(manager) 

360 return await manager.get_stats() 

361 

362else: 

363 app = None 

364 

365 

366def serve(host: str = "0.0.0.0", port: int = 8000, reload: bool = False): 

367 """Start the API server.""" 

368 if not HAS_API_DEPS: 

369 print("Install API dependencies: pip install nexus-agentos[api]") 

370 print("Required: fastapi, uvicorn, websockets") 

371 return 

372 uvicorn.run("agentos.api.server:app", host=host, port=port, reload=reload) 

373 

374 

375__all__ = ["app", "serve", "AgentManager", "AgentConfigRequest", "RunRequest", "RunResponse"] 

376 

377 

378# ── Auto-generated compat stubs ── 

379 

380# Auto-generated compat stubs 

381class AgentAPI: pass 

382class RunResponse: pass