Coverage for agentos/core/loop.py: 34%

304 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.70 核心循环 — Gemini + Metrics + CostAnalytics 集成版。 

3v0.40: Swarm多Agent并行、Agent间通信、语义缓存、任务队列。 

4v0.70: MetricsCollector、CostAnalytics实时监控。 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import time 

11from dataclasses import dataclass, field 

12from enum import Enum 

13from typing import Any, AsyncIterator, Callable 

14 

15from agentos.core.context import ContextManager 

16from agentos.tools.registry import ToolRegistry 

17from agentos.models.router import ModelRouter, AllModelsFailed 

18from agentos.security.sandbox import SandboxManager 

19from agentos.observability.tracer import Tracer 

20from agentos.observability.metrics import MetricsCollector 

21from agentos.observability.cost_analytics import CostAnalytics 

22from agentos.core.streaming import StreamChunk, StreamEvent 

23from agentos.storage.base import CheckpointStore 

24from agentos.checkpoint.base import Checkpoint, CheckpointMetadata, CheckpointBackend 

25from agentos.cost.tracker import CostTracker 

26from agentos.swarm.coordinator import SwarmCoordinator, SwarmTopology, AgentRole, SwarmResult, MessageBus 

27from agentos.comm.layer import CommunicationLayer 

28from agentos.cache.llm_cache import LLMCache 

29from agentos.multimodal.manager import MultimodalManager 

30 

31 

32class LoopState(str, Enum): 

33 

34 """主循环状态。""" 

35 

36 RUNNING = "running" 

37 PAUSED = "paused" 

38 WAITING_HUMAN = "waiting_human" 

39 COMPLETED = "completed" 

40 FAILED = "failed" 

41 CANCELLED = "cancelled" 

42 

43 

44@dataclass 

45class AgentResult: 

46 """Agent 主循环的最终运行结果。""" 

47 

48 output: str 

49 iterations: int 

50 tokens_used: dict[str, int] = field(default_factory=dict) 

51 cost_usd: float = 0.0 

52 duration_ms: float = 0.0 

53 tool_calls_total: int = 0 

54 reflections_count: int = 0 

55 human_interrupts: int = 0 

56 final_state: LoopState = LoopState.COMPLETED 

57 error: str | None = None 

58 # v0.40 

59 swarm_result: SwarmResult | None = None 

60 cache_hit: bool = False 

61 

62 

63@dataclass 

64class LoopConfig: 

65 """Agent 主循环的运行时配置。""" 

66 

67 max_iterations: int = 100 

68 max_retries_per_step: int = 2 

69 step_timeout_seconds: int = 120 

70 enable_streaming: bool = False 

71 enable_checkpoints: bool = True 

72 checkpoint_interval: int = 5 

73 # v0.30 

74 enable_reflection: bool = True 

75 reflection_frequency: int = 3 

76 max_reflection_loops: int = 3 

77 enable_self_critique: bool = True 

78 enable_human_in_the_loop: bool = False 

79 human_approval_trigger: str = "high_risk" 

80 enable_cost_tracking: bool = True 

81 auto_select_model: bool = True 

82 # v0.40 

83 enable_swarm: bool = False 

84 swarm_topology: str = "sequential" 

85 swarm_roles: list[AgentRole] = field(default_factory=list) 

86 max_parallel_agents: int = 4 

87 enable_comm_layer: bool = True 

88 enable_semantic_cache: bool = True 

89 # v1.11.0 — long-running task support 

90 checkpoint_backend: CheckpointBackend | None = None # Full checkpoint backend for crash recovery 

91 enable_auto_paging: bool = True # Auto-evict old memories when context fills 

92 auto_page_threshold: float = 0.85 # Page out at 85% context window usage 

93 

94 

95class MaxIterationsExceeded(Exception): 

96 

97 """超出最大迭代次数异常。""" 

98 

99 pass 

100 

101 

102class HumanInterruptNeeded(Exception): 

103 

104 """需要人工介入异常。""" 

105 

106 def __init__(self, message: str, context: dict | None = None): 

107 super().__init__(message) 

108 self.context = context or {} 

109 

110 

111@dataclass 

112class ReflectionResult: 

113 """反思结果。""" 

114 quality_score: float 

115 issues: list[str] 

116 suggestions: list[str] 

117 should_continue: bool 

118 new_plan: str | None = None 

119 

120 

121class AgentLoop: 

122 """v0.30 核心循环 — Reflection + HITL + Self-Critique + 自动路由 + 成本追踪。""" 

123 

124 def __init__( 

125 self, 

126 model_router: ModelRouter, 

127 tool_registry: ToolRegistry, 

128 context_manager: ContextManager, 

129 sandbox_manager: SandboxManager | None = None, 

130 tracer: Tracer | None = None, 

131 checkpoint_store: CheckpointStore | None = None, 

132 checkpoint_backend: CheckpointBackend | None = None, 

133 cost_tracker: CostTracker | None = None, 

134 config: LoopConfig | None = None, 

135 on_iteration: Callable | None = None, 

136 on_stream: Callable[[StreamChunk], None] | None = None, 

137 on_human_interrupt: Callable[[str, dict], str | None] | None = None, 

138 on_reflection: Callable[[ReflectionResult], None] | None = None, 

139 metrics_collector: MetricsCollector | None = None, 

140 cost_analytics: CostAnalytics | None = None, 

141 ): 

142 self.model_router = model_router 

143 self.tool_registry = tool_registry 

144 self.context_manager = context_manager 

145 self.sandbox_manager = sandbox_manager 

146 self.tracer = tracer or Tracer.noop() 

147 self.checkpoint_store = checkpoint_store 

148 self.cost_tracker = cost_tracker or CostTracker.noop() 

149 self.config = config or LoopConfig() 

150 self.checkpoint_backend = checkpoint_backend # v1.11.0 full checkpoint integration 

151 self._auto_page_callback: Callable | None = None # v1.11.0 auto-paging callback 

152 self.on_iteration = on_iteration 

153 self.on_stream = on_stream 

154 self.on_human_interrupt = on_human_interrupt 

155 self.on_reflection = on_reflection 

156 self.metrics = metrics_collector or MetricsCollector() 

157 self.cost_analytics = cost_analytics or CostAnalytics() 

158 self._cancelled = False 

159 self._reflection_history: list[ReflectionResult] = [] 

160 self._human_interrupts = 0 

161 

162 # ── 运行入口 ────────────────────────────────── 

163 

164 async def run(self, task: str, session_id: str = "") -> AgentResult: 

165 start_time = time.time() 

166 self.context_manager.init_session(session_id, task) 

167 

168 if self.config.auto_select_model: 

169 await self._auto_route_model(task) 

170 

171 iteration = await self._try_restore(session_id) 

172 tool_calls_total = 0 

173 reflection_loops = 0 

174 

175 while iteration < self.config.max_iterations and not self._cancelled: 

176 iteration += 1 

177 

178 if self.config.enable_reflection and iteration % self.config.reflection_frequency == 0: 

179 with self.tracer.step("reflection"): 

180 reflection = await self._reflect(session_id) 

181 self._reflection_history.append(reflection) 

182 if not reflection.should_continue and reflection_loops < self.config.max_reflection_loops: 

183 reflection_loops += 1 

184 if reflection.new_plan: 

185 self.context_manager.update_plan(reflection.new_plan) 

186 continue 

187 

188 try: 

189 with self.tracer.step(f"loop_{iteration}"): 

190 step_result = await self._execute_step_sync(iteration, session_id) 

191 

192 if step_result.is_terminal: 

193 duration_ms = (time.time() - start_time) * 1000 

194 return AgentResult( 

195 output=step_result.content, 

196 iterations=iteration, 

197 tokens_used=self.tracer.token_summary(), 

198 cost_usd=self.cost_tracker.total_cost, 

199 duration_ms=duration_ms, 

200 tool_calls_total=tool_calls_total, 

201 reflections_count=len(self._reflection_history), 

202 human_interrupts=self._human_interrupts, 

203 ) 

204 

205 if step_result.tool_results: 

206 tool_calls_total += len(step_result.tool_results) 

207 

208 if self.on_iteration: 

209 self.on_iteration(iteration, step_result.tool_results or []) 

210 

211 except HumanInterruptNeeded as e: 

212 self._human_interrupts += 1 

213 if self.on_human_interrupt: 

214 feedback = self.on_human_interrupt(str(e), e.context) 

215 if feedback: 

216 self.context_manager.append_user_message(feedback) 

217 continue 

218 

219 except StepTimeoutError: 

220 return AgentResult(output="", iterations=iteration, final_state=LoopState.FAILED, error="Step timeout") 

221 

222 if self.config.enable_checkpoints and iteration % self.config.checkpoint_interval == 0: 

223 await self._save_checkpoint(session_id, iteration) 

224 

225 raise MaxIterationsExceeded(f"超过 {self.config.max_iterations} 步") 

226 

227 # ── Reflection ──────────────────────────────── 

228 

229 async def _reflect(self, session_id: str) -> ReflectionResult: 

230 prompt = f"""你是一个反思者。审核以下Agent执行过程: 

231 

232任务: {self.context_manager.current_task} 

233已执行: {self.context_manager.step_count} 步 

234 

235评估并返回JSON: 

236{{"quality_score": 0.0-1.0, "issues": [...], "suggestions": [...], "should_continue": true/false, "new_plan": "如果调整,新计划"}}""" 

237 

238 resp = await self.model_router.call_simple(prompt) 

239 try: 

240 import json 

241 d = json.loads(resp) 

242 result = ReflectionResult( 

243 quality_score=d.get("quality_score", 0.5), 

244 issues=d.get("issues", []), 

245 suggestions=d.get("suggestions", []), 

246 should_continue=d.get("should_continue", True), 

247 new_plan=d.get("new_plan"), 

248 ) 

249 except Exception: 

250 result = ReflectionResult(0.5, [], [], True) 

251 if self.on_reflection: 

252 self.on_reflection(result) 

253 return result 

254 

255 # ── Self-Critique ───────────────────────────── 

256 

257 async def _self_critique(self, text: str) -> str: 

258 if not self.config.enable_self_critique: 

259 return text 

260 prompt = f"""审视以下回答,找出逻辑错误或不准确之处。如果已足够好就原样返回。 

261 

262{text[:3000]}""" 

263 improved = await self.model_router.call_simple(prompt) 

264 return improved or text 

265 

266 # ── Auto Route ──────────────────────────────── 

267 

268 async def _auto_route_model(self, task: str): 

269 score = self._estimate_complexity(task) 

270 if score > 0.7: 

271 self.model_router.set_preferred("deepseek-r1") 

272 elif score > 0.4: 

273 self.model_router.set_preferred("kimi-k2.6") 

274 else: 

275 self.model_router.set_preferred("deepseek-v3.1") 

276 

277 def _estimate_complexity(self, task: str) -> float: 

278 kw = ["分析", "对比", "设计", "架构", "review", "refactor", "实现", "优化", "诊断", "troubleshoot", "debug", "deploy", "migrate", "安全", "security"] 

279 score = sum(0.15 for k in kw if k in task.lower()) 

280 return min(score + min(len(task) / 2000, 0.3), 1.0) 

281 

282 # ── 步骤执行 ────────────────────────────────── 

283 

284 async def _execute_step_sync(self, iteration: int, session_id: str) -> "StepResult": 

285 last_error = None 

286 for attempt in range(self.config.max_retries_per_step + 1): 

287 try: 

288 return await asyncio.wait_for(self._do_step(iteration, session_id), timeout=self.config.step_timeout_seconds) 

289 except asyncio.TimeoutError: 

290 last_error = StepTimeoutError(f"Step {iteration} timeout") 

291 except AllModelsFailed as e: 

292 last_error = e 

293 await asyncio.sleep(2 ** attempt) 

294 raise last_error 

295 

296 async def _do_step(self, iteration: int, session_id: str) -> "StepResult": 

297 ctx = self.context_manager.build_context( 

298 model_type=self.model_router.model_type, 

299 tools=self.tool_registry.get_schemas_for_model(self.model_router.model_type), 

300 ) 

301 

302 # v1.11.0 — auto-page old memories if context nearing limit 

303 if self.config.enable_auto_paging and self._auto_page_callback: 

304 usage_ratio = self.context_manager.estimate_context_usage() 

305 if usage_ratio > self.config.auto_page_threshold: 

306 page_count = await self._auto_page_callback(usage_ratio) 

307 

308 resp = await self.model_router.call(ctx) 

309 

310 # 成本记录 

311 if self.config.enable_cost_tracking and hasattr(resp, "usage"): 

312 self.cost_tracker.record(self.model_router.current_model, resp.usage) 

313 

314 if not resp.tool_calls: 

315 if self.config.enable_self_critique: 

316 improved = await self._self_critique(resp.content) 

317 return StepResult(content=improved, is_terminal=True) 

318 return StepResult(content=resp.content, is_terminal=True) 

319 

320 # HITL 检查 

321 if self.config.enable_human_in_the_loop: 

322 for tc in resp.tool_calls: 

323 if self._is_high_risk(tc): 

324 raise HumanInterruptNeeded(f"高风险操作需确认: {tc.name}", {"tool": tc.name, "args": tc.arguments}) 

325 

326 groups = self._group_independent_calls(resp.tool_calls) 

327 all_results = [] 

328 for group in groups: 

329 sandbox = self.sandbox_manager.get_sandbox(session_id) if self.sandbox_manager else None 

330 batch_results = await self.tool_registry.execute_batch(group, sandbox=sandbox) 

331 all_results.extend(batch_results) 

332 

333 self.context_manager.append_tool_results(all_results) 

334 return StepResult(content="", is_terminal=False, tool_results=all_results) 

335 

336 def _is_high_risk(self, tc) -> bool: 

337 risky = ["delete", "rm", "uninstall", "format", "sudo", "kill", "drop"] 

338 name = tc.name.lower() if hasattr(tc, "name") else tc.get("name", "").lower() 

339 return any(r in name for r in risky) 

340 

341 def _group_independent_calls(self, tool_calls: list) -> list[list]: 

342 if len(tool_calls) <= 1: 

343 return [tool_calls] if tool_calls else [] 

344 groups: list[list] = [] 

345 for call in tool_calls: 

346 for group in groups: 

347 if not self._has_conflict(call, group): 

348 group.append(call) 

349 break 

350 else: 

351 groups.append([call]) 

352 return groups 

353 

354 def _has_conflict(self, call, group: list) -> bool: 

355 write_paths = set() 

356 for tc in group: 

357 tool = self.tool_registry.get(tc.name) 

358 if tool and tool.is_write_operation(tc.arguments): 

359 if p := tool.extract_target_path(tc.arguments): 

360 write_paths.add(p) 

361 cur = self.tool_registry.get(call.name) 

362 if cur and cur.is_read_operation(call.arguments): 

363 return cur.extract_target_path(call.arguments) in write_paths 

364 return False 

365 

366 # ── v1.11.0 全量 Checkpoint (完整状态快照) ──── 

367 

368 async def _save_checkpoint(self, session_id: str, iteration: int): 

369 """Save full runtime state snapshot via CheckpointBackend.""" 

370 backend = self.checkpoint_backend 

371 if not backend: 

372 # Fallback to thin CheckpointStore 

373 if not self.checkpoint_store: 

374 return 

375 snap = { 

376 "session_id": session_id, "iteration": iteration, 

377 "messages": [{"role": m.role, "content": m.content} for m in self.context_manager._messages], 

378 "timestamp": time.time(), 

379 } 

380 await self.checkpoint_store.save(session_id, snap) 

381 return 

382 

383 # Full checkpoint via CheckpointBackend 

384 try: 

385 from datetime import datetime, timezone 

386 import uuid 

387 

388 checkpoint_id = f"ckpt-{session_id}-{iteration:06d}" 

389 parent_id = getattr(self, '_last_checkpoint_id', None) 

390 

391 cp = Checkpoint( 

392 metadata=CheckpointMetadata( 

393 thread_id=session_id, 

394 checkpoint_id=checkpoint_id, 

395 step=iteration, 

396 parent_checkpoint_id=parent_id, 

397 created_at=datetime.now(timezone.utc).isoformat(), 

398 tags=["auto", f"iter_{iteration}"], 

399 ), 

400 messages=[{"role": m.role, "content": m.content} for m in self.context_manager._messages], 

401 state={ 

402 "iteration": iteration, 

403 "task": self.context_manager.current_task, 

404 "session_id": session_id, 

405 "cost_usd": self.cost_tracker.total_cost, 

406 "reflections": len(self._reflection_history), 

407 "human_interrupts": self._human_interrupts, 

408 "loop_state": self.context_manager.current_state if hasattr(self.context_manager, 'current_state') else "running", 

409 }, 

410 tools_result={}, 

411 next_node="loop", 

412 ) 

413 await backend.put(cp) 

414 self._last_checkpoint_id = checkpoint_id 

415 

416 except Exception as e: 

417 pass # Checkpoint failure must not crash the loop 

418 

419 async def _try_restore(self, session_id: str) -> int: 

420 """Restore full state from last checkpoint. Returns iteration to resume from.""" 

421 backend = self.checkpoint_backend 

422 if not backend: 

423 # Fallback to thin CheckpointStore 

424 if not self.checkpoint_store or not self.config.enable_checkpoints: 

425 return 0 

426 snap = await self.checkpoint_store.load(session_id) 

427 if not snap: 

428 return 0 

429 iter_count = snap.get("iteration", 0) 

430 if iter_count > 0: 

431 msgs = snap.get("messages", []) 

432 for msg in msgs: 

433 self.context_manager.append_message(msg["role"], msg["content"]) 

434 return iter_count 

435 

436 if not self.config.enable_checkpoints: 

437 return 0 

438 

439 try: 

440 latest = await backend.get_latest(session_id) 

441 if not latest: 

442 return 0 

443 self._last_checkpoint_id = latest.metadata.checkpoint_id 

444 iter_count = latest.metadata.step 

445 

446 # Restore messages 

447 for msg in latest.messages: 

448 self.context_manager.append_message(msg.get("role", "user"), msg.get("content", "")) 

449 

450 # Restore state 

451 state = latest.state 

452 self._human_interrupts = state.get("human_interrupts", 0) 

453 

454 return iter_count 

455 except Exception: 

456 return 0 

457 

458 def set_auto_paging(self, callback: Callable): 

459 """Register callback for automatic memory paging (v1.11.0).""" 

460 self._auto_page_callback = callback 

461 

462 def cancel(self): 

463 self._cancelled = True 

464 

465 # ── v0.40 Swarm执行 ────────────────────────── 

466 

467 async def run_swarm(self, task: str, roles: list[AgentRole] | None = None) -> AgentResult: 

468 """以Swarm模式执行任务 — 多Agent协作。""" 

469 start_time = time.time() 

470 roles = roles or self.config.swarm_roles 

471 if not roles: 

472 return AgentResult(output="[Swarm] No roles defined", iterations=0, final_state=LoopState.FAILED, error="No roles") 

473 

474 topology = SwarmTopology(self.config.swarm_topology) 

475 comm_layer = CommunicationLayer() if self.config.enable_comm_layer else None 

476 

477 swarm = SwarmCoordinator( 

478 router=self.model_router, 

479 tool_registry=self.tool_registry, 

480 topology=topology, 

481 max_parallel=self.config.max_parallel_agents, 

482 ) 

483 swarm.register_roles(roles) 

484 

485 swarm_result = await swarm.execute(task, roles) 

486 duration_ms = (time.time() - start_time) * 1000 

487 

488 return AgentResult( 

489 output=swarm_result.combined_output, 

490 iterations=1, 

491 cost_usd=self.cost_tracker.total_cost, 

492 duration_ms=duration_ms, 

493 tool_calls_total=0, 

494 reflections_count=0, 

495 human_interrupts=0, 

496 final_state=LoopState.COMPLETED, 

497 swarm_result=swarm_result, 

498 ) 

499 

500 

501class StepTimeoutError(Exception): 

502 

503 """步骤超时异常。""" 

504 

505 pass 

506 

507 

508@dataclass 

509class StepResult: 

510 """步骤执行结果。""" 

511 content: str 

512 is_terminal: bool = False 

513 tool_results: list | None = None