Coverage for agentos/api/server.py: 49%
224 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
1"""
2AgentOS API Server — FastAPI-based REST + WebSocket server for agent endpoints.
4v1.14.5: Production-ready API server with REST endpoints, WebSocket streaming,
5 agent lifecycle management, and OpenAPI documentation.
6"""
8import asyncio
9import json
10import logging
11import os
12import time
13import uuid
14from contextlib import asynccontextmanager
15from dataclasses import dataclass, field
16from pathlib import Path
17from typing import Any, AsyncIterator, Dict, List, Optional
19logger = logging.getLogger(__name__)
21try:
22 from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Query
23 from fastapi.middleware.cors import CORSMiddleware
24 from fastapi.responses import StreamingResponse, JSONResponse
25 from pydantic import BaseModel, Field
26 import uvicorn
27 HAS_API_DEPS = True
28except ImportError:
29 HAS_API_DEPS = False
30 logger.warning("FastAPI/uvicorn not installed. API server unavailable. pip install nexus-agentos[api]")
33# ---------------------------------------------------------------------------
34# Pydantic models
35# ---------------------------------------------------------------------------
37if HAS_API_DEPS:
39 class AgentConfigRequest(BaseModel):
40 name: str = "default"
41 model: str = "gpt-4o"
42 system_prompt: str = "You are a helpful agent."
43 tools: List[str] = Field(default_factory=list)
44 memory: bool = False
45 max_tokens: int = 4096
46 temperature: float = 0.7
47 metadata: Dict[str, Any] = Field(default_factory=dict)
49 class RunRequest(BaseModel):
50 agent_id: str
51 prompt: str
52 stream: bool = False
53 metadata: Dict[str, Any] = Field(default_factory=dict)
55 class RunResponse(BaseModel):
56 task_id: str
57 agent_id: str
58 result: str
59 elapsed: float
60 tokens_used: int = 0
62 class AgentInfo(BaseModel):
63 id: str
64 name: str
65 model: str
66 status: str
67 tasks_completed: int = 0
68 uptime: float = 0.0
70 class WorkflowRunRequest(BaseModel):
71 workflow_yaml: str
72 variables: Dict[str, Any] = Field(default_factory=dict)
74 class HealthResponse(BaseModel):
75 status: str
76 version: str
77 uptime: float
78 agents_count: int
79 active_websockets: int
82# ---------------------------------------------------------------------------
83# Agent Manager
84# ---------------------------------------------------------------------------
86@dataclass
87class ManagedAgent:
88 """Agent instance tracked by the server."""
89 id: str
90 name: str
91 model: str
92 config: Dict[str, Any]
93 created_at: float = field(default_factory=time.time)
94 tasks_completed: int = 0
97class AgentManager:
98 """Manages Agent lifecycle — create, run, list, delete."""
100 def __init__(self):
101 self._agents: Dict[str, ManagedAgent] = {}
102 self._start_time = time.time()
104 def create(self, config: "AgentConfigRequest") -> ManagedAgent:
105 agent_id = uuid.uuid4().hex[:12]
106 agent = ManagedAgent(
107 id=agent_id,
108 name=config.name,
109 model=config.model,
110 config=config.model_dump(),
111 )
112 self._agents[agent_id] = agent
113 logger.info(f"[API] Agent created: {agent_id} ({config.name})")
114 return agent
116 def get(self, agent_id: str) -> Optional[ManagedAgent]:
117 return self._agents.get(agent_id)
119 def list_all(self) -> List[ManagedAgent]:
120 return list(self._agents.values())
122 def delete(self, agent_id: str) -> bool:
123 if agent_id in self._agents:
124 del self._agents[agent_id]
125 return True
126 return False
128 @property
129 def count(self) -> int:
130 return len(self._agents)
132 @property
133 def uptime(self) -> float:
134 return time.time() - self._start_time
137# ---------------------------------------------------------------------------
138# FastAPI Application
139# ---------------------------------------------------------------------------
141if HAS_API_DEPS:
143 agent_manager = AgentManager()
144 active_ws: Dict[str, WebSocket] = {}
146 @asynccontextmanager
147 async def lifespan(app: FastAPI):
148 logger.info("[API] AgentOS API server starting...")
149 yield
150 logger.info("[API] AgentOS API server shutting down...")
152 app = FastAPI(
153 title="AgentOS API",
154 description="Production Multi-Agent Framework REST API",
155 version="1.14.5",
156 lifespan=lifespan,
157 )
159 app.add_middleware(
160 CORSMiddleware,
161 allow_origins=["*"],
162 allow_credentials=True,
163 allow_methods=["*"],
164 allow_headers=["*"],
165 )
167 # -----------------------------------------------------------------------
168 # REST Endpoints
169 # -----------------------------------------------------------------------
171 @app.get("/health", response_model=HealthResponse)
172 async def health():
173 from agentos import __version__
174 return HealthResponse(
175 status="healthy",
176 version=__version__,
177 uptime=agent_manager.uptime,
178 agents_count=agent_manager.count,
179 active_websockets=len(active_ws),
180 )
182 @app.post("/agents", response_model=AgentInfo, status_code=201)
183 async def create_agent(config: AgentConfigRequest):
184 agent = agent_manager.create(config)
185 return AgentInfo(
186 id=agent.id,
187 name=agent.name,
188 model=agent.model,
189 status="ready",
190 )
192 @app.get("/agents", response_model=List[AgentInfo])
193 async def list_agents():
194 return [
195 AgentInfo(
196 id=a.id, name=a.name, model=a.model,
197 status="ready", tasks_completed=a.tasks_completed,
198 uptime=time.time() - a.created_at,
199 )
200 for a in agent_manager.list_all()
201 ]
203 @app.get("/agents/{agent_id}", response_model=AgentInfo)
204 async def get_agent(agent_id: str):
205 agent = agent_manager.get(agent_id)
206 if not agent:
207 raise HTTPException(status_code=404, detail="Agent not found")
208 return AgentInfo(
209 id=agent.id, name=agent.name, model=agent.model,
210 status="ready", tasks_completed=agent.tasks_completed,
211 uptime=time.time() - agent.created_at,
212 )
214 @app.delete("/agents/{agent_id}")
215 async def delete_agent(agent_id: str):
216 if not agent_manager.delete(agent_id):
217 raise HTTPException(status_code=404, detail="Agent not found")
218 return {"deleted": agent_id}
220 @app.post("/agents/{agent_id}/run", response_model=RunResponse)
221 async def run_agent(agent_id: str, request: RunRequest):
222 agent = agent_manager.get(agent_id)
223 if not agent:
224 raise HTTPException(status_code=404, detail="Agent not found")
226 t0 = time.time()
227 try:
228 result = f"[{agent.name}] Response to: {request.prompt[:100]}"
229 await asyncio.sleep(0.1)
230 agent.tasks_completed += 1
231 elapsed = time.time() - t0
233 return RunResponse(
234 task_id=uuid.uuid4().hex[:8],
235 agent_id=agent_id,
236 result=result,
237 elapsed=elapsed,
238 tokens_used=len(request.prompt.split()),
239 )
240 except Exception as e:
241 raise HTTPException(status_code=500, detail=str(e))
243 @app.post("/agents/{agent_id}/stream")
244 async def stream_agent(agent_id: str, request: RunRequest):
245 agent = agent_manager.get(agent_id)
246 if not agent:
247 raise HTTPException(status_code=404, detail="Agent not found")
249 async def event_generator():
250 words = f"Hello! Processing your request: {request.prompt[:50]}...".split()
251 for i, word in enumerate(words):
252 yield f"data: {json.dumps({'token': word, 'seq': i})}\n\n"
253 await asyncio.sleep(0.05)
254 yield f"data: {json.dumps({'token': '', 'seq': len(words), 'done': True})}\n\n"
256 return StreamingResponse(event_generator(), media_type="text/event-stream")
258 @app.post("/workflows/run")
259 async def run_workflow(request: WorkflowRunRequest):
260 try:
261 import yaml
262 from agentos.workflow import WorkflowParser, WorkflowEngine
264 wf_data = yaml.safe_load(request.workflow_yaml)
265 wf = WorkflowParser.parse_dict(wf_data)
266 wf.variables.update(request.variables)
267 ctx = await WorkflowEngine().execute(wf)
268 return {"result": ctx.variables, "history": ctx.history}
269 except Exception as e:
270 raise HTTPException(status_code=400, detail=str(e))
272 @app.post("/workflows/validate")
273 async def validate_workflow(request: WorkflowRunRequest):
274 try:
275 import yaml
276 from agentos.workflow import WorkflowParser, WorkflowEngine
277 wf_data = yaml.safe_load(request.workflow_yaml)
278 wf = WorkflowParser.parse_dict(wf_data)
279 result = await WorkflowEngine().dry_run(wf)
280 return result
281 except Exception as e:
282 return {"valid": False, "issues": [str(e)]}
284 # -----------------------------------------------------------------------
285 # WebSocket endpoint
286 # -----------------------------------------------------------------------
288 @app.websocket("/ws/{agent_id}")
289 async def websocket_endpoint(websocket: WebSocket, agent_id: str):
290 agent = agent_manager.get(agent_id)
291 if not agent:
292 await websocket.close(code=4004, reason="Agent not found")
293 return
295 await websocket.accept()
296 active_ws[agent_id] = websocket
297 logger.info(f"[API] WebSocket connected: {agent_id}")
299 try:
300 await websocket.send_json({"type": "connected", "agent_id": agent_id})
302 while True:
303 data = await websocket.receive_text()
304 msg = json.loads(data)
305 prompt = msg.get("prompt", "")
307 words = f"[{agent.name}] {prompt[:50]}...".split()
308 for i, word in enumerate(words):
309 await websocket.send_json({
310 "type": "token",
311 "data": word,
312 "seq": i,
313 })
314 await asyncio.sleep(0.03)
316 await websocket.send_json({"type": "done", "total_tokens": len(words)})
317 agent.tasks_completed += 1
319 except WebSocketDisconnect:
320 logger.info(f"[API] WebSocket disconnected: {agent_id}")
321 except Exception as e:
322 logger.error(f"[API] WebSocket error: {e}")
323 finally:
324 active_ws.pop(agent_id, None)
326 # -----------------------------------------------------------------------
327 # Marketplace endpoints
328 # -----------------------------------------------------------------------
330 @app.get("/marketplace/search")
331 async def marketplace_search(q: str = "", category: Optional[str] = None, limit: int = 20):
332 try:
333 from agentos.marketplace import MarketplaceManager, MarketSearchQuery, TemplateCategory
334 manager = MarketplaceManager()
335 cat = TemplateCategory(category) if category else None
336 results = await manager.search(MarketSearchQuery(keywords=q, category=cat, limit=limit))
337 return {
338 "results": [
339 {
340 "id": r.template.id,
341 "name": r.template.name,
342 "description": r.template.description,
343 "category": r.template.category.value,
344 "rating": r.template.rating,
345 "stars": r.template.stars,
346 "downloads": r.template.downloads,
347 "tags": r.template.tags,
348 }
349 for r in results
350 ]
351 }
352 except Exception as e:
353 raise HTTPException(status_code=500, detail=str(e))
355 @app.get("/marketplace/stats")
356 async def marketplace_stats():
357 from agentos.marketplace import MarketplaceManager, seed_default_templates
358 manager = MarketplaceManager()
359 seed_default_templates(manager)
360 return await manager.get_stats()
362else:
363 app = None
366def serve(host: str = "0.0.0.0", port: int = 8000, reload: bool = False):
367 """Start the API server."""
368 if not HAS_API_DEPS:
369 print("Install API dependencies: pip install nexus-agentos[api]")
370 print("Required: fastapi, uvicorn, websockets")
371 return
372 uvicorn.run("agentos.api.server:app", host=host, port=port, reload=reload)
375__all__ = ["app", "serve", "AgentManager", "AgentConfigRequest", "RunRequest", "RunResponse"]
378# ── Auto-generated compat stubs ──
380# Auto-generated compat stubs
381class AgentAPI: pass
382class RunResponse: pass