Coverage for agentos/core/loop.py: 34%
304 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.70 核心循环 — Gemini + Metrics + CostAnalytics 集成版。
3v0.40: Swarm多Agent并行、Agent间通信、语义缓存、任务队列。
4v0.70: MetricsCollector、CostAnalytics实时监控。
5"""
7from __future__ import annotations
9import asyncio
10import time
11from dataclasses import dataclass, field
12from enum import Enum
13from typing import Any, AsyncIterator, Callable
15from agentos.core.context import ContextManager
16from agentos.tools.registry import ToolRegistry
17from agentos.models.router import ModelRouter, AllModelsFailed
18from agentos.security.sandbox import SandboxManager
19from agentos.observability.tracer import Tracer
20from agentos.observability.metrics import MetricsCollector
21from agentos.observability.cost_analytics import CostAnalytics
22from agentos.core.streaming import StreamChunk, StreamEvent
23from agentos.storage.base import CheckpointStore
24from agentos.checkpoint.base import Checkpoint, CheckpointMetadata, CheckpointBackend
25from agentos.cost.tracker import CostTracker
26from agentos.swarm.coordinator import SwarmCoordinator, SwarmTopology, AgentRole, SwarmResult, MessageBus
27from agentos.comm.layer import CommunicationLayer
28from agentos.cache.llm_cache import LLMCache
29from agentos.multimodal.manager import MultimodalManager
32class LoopState(str, Enum):
34 """主循环状态。"""
36 RUNNING = "running"
37 PAUSED = "paused"
38 WAITING_HUMAN = "waiting_human"
39 COMPLETED = "completed"
40 FAILED = "failed"
41 CANCELLED = "cancelled"
44@dataclass
45class AgentResult:
46 """Agent 主循环的最终运行结果。"""
48 output: str
49 iterations: int
50 tokens_used: dict[str, int] = field(default_factory=dict)
51 cost_usd: float = 0.0
52 duration_ms: float = 0.0
53 tool_calls_total: int = 0
54 reflections_count: int = 0
55 human_interrupts: int = 0
56 final_state: LoopState = LoopState.COMPLETED
57 error: str | None = None
58 # v0.40
59 swarm_result: SwarmResult | None = None
60 cache_hit: bool = False
63@dataclass
64class LoopConfig:
65 """Agent 主循环的运行时配置。"""
67 max_iterations: int = 100
68 max_retries_per_step: int = 2
69 step_timeout_seconds: int = 120
70 enable_streaming: bool = False
71 enable_checkpoints: bool = True
72 checkpoint_interval: int = 5
73 # v0.30
74 enable_reflection: bool = True
75 reflection_frequency: int = 3
76 max_reflection_loops: int = 3
77 enable_self_critique: bool = True
78 enable_human_in_the_loop: bool = False
79 human_approval_trigger: str = "high_risk"
80 enable_cost_tracking: bool = True
81 auto_select_model: bool = True
82 # v0.40
83 enable_swarm: bool = False
84 swarm_topology: str = "sequential"
85 swarm_roles: list[AgentRole] = field(default_factory=list)
86 max_parallel_agents: int = 4
87 enable_comm_layer: bool = True
88 enable_semantic_cache: bool = True
89 # v1.11.0 — long-running task support
90 checkpoint_backend: CheckpointBackend | None = None # Full checkpoint backend for crash recovery
91 enable_auto_paging: bool = True # Auto-evict old memories when context fills
92 auto_page_threshold: float = 0.85 # Page out at 85% context window usage
95class MaxIterationsExceeded(Exception):
97 """超出最大迭代次数异常。"""
99 pass
102class HumanInterruptNeeded(Exception):
104 """需要人工介入异常。"""
106 def __init__(self, message: str, context: dict | None = None):
107 super().__init__(message)
108 self.context = context or {}
111@dataclass
112class ReflectionResult:
113 """反思结果。"""
114 quality_score: float
115 issues: list[str]
116 suggestions: list[str]
117 should_continue: bool
118 new_plan: str | None = None
121class AgentLoop:
122 """v0.30 核心循环 — Reflection + HITL + Self-Critique + 自动路由 + 成本追踪。"""
124 def __init__(
125 self,
126 model_router: ModelRouter,
127 tool_registry: ToolRegistry,
128 context_manager: ContextManager,
129 sandbox_manager: SandboxManager | None = None,
130 tracer: Tracer | None = None,
131 checkpoint_store: CheckpointStore | None = None,
132 checkpoint_backend: CheckpointBackend | None = None,
133 cost_tracker: CostTracker | None = None,
134 config: LoopConfig | None = None,
135 on_iteration: Callable | None = None,
136 on_stream: Callable[[StreamChunk], None] | None = None,
137 on_human_interrupt: Callable[[str, dict], str | None] | None = None,
138 on_reflection: Callable[[ReflectionResult], None] | None = None,
139 metrics_collector: MetricsCollector | None = None,
140 cost_analytics: CostAnalytics | None = None,
141 ):
142 self.model_router = model_router
143 self.tool_registry = tool_registry
144 self.context_manager = context_manager
145 self.sandbox_manager = sandbox_manager
146 self.tracer = tracer or Tracer.noop()
147 self.checkpoint_store = checkpoint_store
148 self.cost_tracker = cost_tracker or CostTracker.noop()
149 self.config = config or LoopConfig()
150 self.checkpoint_backend = checkpoint_backend # v1.11.0 full checkpoint integration
151 self._auto_page_callback: Callable | None = None # v1.11.0 auto-paging callback
152 self.on_iteration = on_iteration
153 self.on_stream = on_stream
154 self.on_human_interrupt = on_human_interrupt
155 self.on_reflection = on_reflection
156 self.metrics = metrics_collector or MetricsCollector()
157 self.cost_analytics = cost_analytics or CostAnalytics()
158 self._cancelled = False
159 self._reflection_history: list[ReflectionResult] = []
160 self._human_interrupts = 0
162 # ── 运行入口 ──────────────────────────────────
164 async def run(self, task: str, session_id: str = "") -> AgentResult:
165 start_time = time.time()
166 self.context_manager.init_session(session_id, task)
168 if self.config.auto_select_model:
169 await self._auto_route_model(task)
171 iteration = await self._try_restore(session_id)
172 tool_calls_total = 0
173 reflection_loops = 0
175 while iteration < self.config.max_iterations and not self._cancelled:
176 iteration += 1
178 if self.config.enable_reflection and iteration % self.config.reflection_frequency == 0:
179 with self.tracer.step("reflection"):
180 reflection = await self._reflect(session_id)
181 self._reflection_history.append(reflection)
182 if not reflection.should_continue and reflection_loops < self.config.max_reflection_loops:
183 reflection_loops += 1
184 if reflection.new_plan:
185 self.context_manager.update_plan(reflection.new_plan)
186 continue
188 try:
189 with self.tracer.step(f"loop_{iteration}"):
190 step_result = await self._execute_step_sync(iteration, session_id)
192 if step_result.is_terminal:
193 duration_ms = (time.time() - start_time) * 1000
194 return AgentResult(
195 output=step_result.content,
196 iterations=iteration,
197 tokens_used=self.tracer.token_summary(),
198 cost_usd=self.cost_tracker.total_cost,
199 duration_ms=duration_ms,
200 tool_calls_total=tool_calls_total,
201 reflections_count=len(self._reflection_history),
202 human_interrupts=self._human_interrupts,
203 )
205 if step_result.tool_results:
206 tool_calls_total += len(step_result.tool_results)
208 if self.on_iteration:
209 self.on_iteration(iteration, step_result.tool_results or [])
211 except HumanInterruptNeeded as e:
212 self._human_interrupts += 1
213 if self.on_human_interrupt:
214 feedback = self.on_human_interrupt(str(e), e.context)
215 if feedback:
216 self.context_manager.append_user_message(feedback)
217 continue
219 except StepTimeoutError:
220 return AgentResult(output="", iterations=iteration, final_state=LoopState.FAILED, error="Step timeout")
222 if self.config.enable_checkpoints and iteration % self.config.checkpoint_interval == 0:
223 await self._save_checkpoint(session_id, iteration)
225 raise MaxIterationsExceeded(f"超过 {self.config.max_iterations} 步")
227 # ── Reflection ────────────────────────────────
229 async def _reflect(self, session_id: str) -> ReflectionResult:
230 prompt = f"""你是一个反思者。审核以下Agent执行过程:
232任务: {self.context_manager.current_task}
233已执行: {self.context_manager.step_count} 步
235评估并返回JSON:
236{{"quality_score": 0.0-1.0, "issues": [...], "suggestions": [...], "should_continue": true/false, "new_plan": "如果调整,新计划"}}"""
238 resp = await self.model_router.call_simple(prompt)
239 try:
240 import json
241 d = json.loads(resp)
242 result = ReflectionResult(
243 quality_score=d.get("quality_score", 0.5),
244 issues=d.get("issues", []),
245 suggestions=d.get("suggestions", []),
246 should_continue=d.get("should_continue", True),
247 new_plan=d.get("new_plan"),
248 )
249 except Exception:
250 result = ReflectionResult(0.5, [], [], True)
251 if self.on_reflection:
252 self.on_reflection(result)
253 return result
255 # ── Self-Critique ─────────────────────────────
257 async def _self_critique(self, text: str) -> str:
258 if not self.config.enable_self_critique:
259 return text
260 prompt = f"""审视以下回答,找出逻辑错误或不准确之处。如果已足够好就原样返回。
262{text[:3000]}"""
263 improved = await self.model_router.call_simple(prompt)
264 return improved or text
266 # ── Auto Route ────────────────────────────────
268 async def _auto_route_model(self, task: str):
269 score = self._estimate_complexity(task)
270 if score > 0.7:
271 self.model_router.set_preferred("deepseek-r1")
272 elif score > 0.4:
273 self.model_router.set_preferred("kimi-k2.6")
274 else:
275 self.model_router.set_preferred("deepseek-v3.1")
277 def _estimate_complexity(self, task: str) -> float:
278 kw = ["分析", "对比", "设计", "架构", "review", "refactor", "实现", "优化", "诊断", "troubleshoot", "debug", "deploy", "migrate", "安全", "security"]
279 score = sum(0.15 for k in kw if k in task.lower())
280 return min(score + min(len(task) / 2000, 0.3), 1.0)
282 # ── 步骤执行 ──────────────────────────────────
284 async def _execute_step_sync(self, iteration: int, session_id: str) -> "StepResult":
285 last_error = None
286 for attempt in range(self.config.max_retries_per_step + 1):
287 try:
288 return await asyncio.wait_for(self._do_step(iteration, session_id), timeout=self.config.step_timeout_seconds)
289 except asyncio.TimeoutError:
290 last_error = StepTimeoutError(f"Step {iteration} timeout")
291 except AllModelsFailed as e:
292 last_error = e
293 await asyncio.sleep(2 ** attempt)
294 raise last_error
296 async def _do_step(self, iteration: int, session_id: str) -> "StepResult":
297 ctx = self.context_manager.build_context(
298 model_type=self.model_router.model_type,
299 tools=self.tool_registry.get_schemas_for_model(self.model_router.model_type),
300 )
302 # v1.11.0 — auto-page old memories if context nearing limit
303 if self.config.enable_auto_paging and self._auto_page_callback:
304 usage_ratio = self.context_manager.estimate_context_usage()
305 if usage_ratio > self.config.auto_page_threshold:
306 page_count = await self._auto_page_callback(usage_ratio)
308 resp = await self.model_router.call(ctx)
310 # 成本记录
311 if self.config.enable_cost_tracking and hasattr(resp, "usage"):
312 self.cost_tracker.record(self.model_router.current_model, resp.usage)
314 if not resp.tool_calls:
315 if self.config.enable_self_critique:
316 improved = await self._self_critique(resp.content)
317 return StepResult(content=improved, is_terminal=True)
318 return StepResult(content=resp.content, is_terminal=True)
320 # HITL 检查
321 if self.config.enable_human_in_the_loop:
322 for tc in resp.tool_calls:
323 if self._is_high_risk(tc):
324 raise HumanInterruptNeeded(f"高风险操作需确认: {tc.name}", {"tool": tc.name, "args": tc.arguments})
326 groups = self._group_independent_calls(resp.tool_calls)
327 all_results = []
328 for group in groups:
329 sandbox = self.sandbox_manager.get_sandbox(session_id) if self.sandbox_manager else None
330 batch_results = await self.tool_registry.execute_batch(group, sandbox=sandbox)
331 all_results.extend(batch_results)
333 self.context_manager.append_tool_results(all_results)
334 return StepResult(content="", is_terminal=False, tool_results=all_results)
336 def _is_high_risk(self, tc) -> bool:
337 risky = ["delete", "rm", "uninstall", "format", "sudo", "kill", "drop"]
338 name = tc.name.lower() if hasattr(tc, "name") else tc.get("name", "").lower()
339 return any(r in name for r in risky)
341 def _group_independent_calls(self, tool_calls: list) -> list[list]:
342 if len(tool_calls) <= 1:
343 return [tool_calls] if tool_calls else []
344 groups: list[list] = []
345 for call in tool_calls:
346 for group in groups:
347 if not self._has_conflict(call, group):
348 group.append(call)
349 break
350 else:
351 groups.append([call])
352 return groups
354 def _has_conflict(self, call, group: list) -> bool:
355 write_paths = set()
356 for tc in group:
357 tool = self.tool_registry.get(tc.name)
358 if tool and tool.is_write_operation(tc.arguments):
359 if p := tool.extract_target_path(tc.arguments):
360 write_paths.add(p)
361 cur = self.tool_registry.get(call.name)
362 if cur and cur.is_read_operation(call.arguments):
363 return cur.extract_target_path(call.arguments) in write_paths
364 return False
366 # ── v1.11.0 全量 Checkpoint (完整状态快照) ────
368 async def _save_checkpoint(self, session_id: str, iteration: int):
369 """Save full runtime state snapshot via CheckpointBackend."""
370 backend = self.checkpoint_backend
371 if not backend:
372 # Fallback to thin CheckpointStore
373 if not self.checkpoint_store:
374 return
375 snap = {
376 "session_id": session_id, "iteration": iteration,
377 "messages": [{"role": m.role, "content": m.content} for m in self.context_manager._messages],
378 "timestamp": time.time(),
379 }
380 await self.checkpoint_store.save(session_id, snap)
381 return
383 # Full checkpoint via CheckpointBackend
384 try:
385 from datetime import datetime, timezone
386 import uuid
388 checkpoint_id = f"ckpt-{session_id}-{iteration:06d}"
389 parent_id = getattr(self, '_last_checkpoint_id', None)
391 cp = Checkpoint(
392 metadata=CheckpointMetadata(
393 thread_id=session_id,
394 checkpoint_id=checkpoint_id,
395 step=iteration,
396 parent_checkpoint_id=parent_id,
397 created_at=datetime.now(timezone.utc).isoformat(),
398 tags=["auto", f"iter_{iteration}"],
399 ),
400 messages=[{"role": m.role, "content": m.content} for m in self.context_manager._messages],
401 state={
402 "iteration": iteration,
403 "task": self.context_manager.current_task,
404 "session_id": session_id,
405 "cost_usd": self.cost_tracker.total_cost,
406 "reflections": len(self._reflection_history),
407 "human_interrupts": self._human_interrupts,
408 "loop_state": self.context_manager.current_state if hasattr(self.context_manager, 'current_state') else "running",
409 },
410 tools_result={},
411 next_node="loop",
412 )
413 await backend.put(cp)
414 self._last_checkpoint_id = checkpoint_id
416 except Exception as e:
417 pass # Checkpoint failure must not crash the loop
419 async def _try_restore(self, session_id: str) -> int:
420 """Restore full state from last checkpoint. Returns iteration to resume from."""
421 backend = self.checkpoint_backend
422 if not backend:
423 # Fallback to thin CheckpointStore
424 if not self.checkpoint_store or not self.config.enable_checkpoints:
425 return 0
426 snap = await self.checkpoint_store.load(session_id)
427 if not snap:
428 return 0
429 iter_count = snap.get("iteration", 0)
430 if iter_count > 0:
431 msgs = snap.get("messages", [])
432 for msg in msgs:
433 self.context_manager.append_message(msg["role"], msg["content"])
434 return iter_count
436 if not self.config.enable_checkpoints:
437 return 0
439 try:
440 latest = await backend.get_latest(session_id)
441 if not latest:
442 return 0
443 self._last_checkpoint_id = latest.metadata.checkpoint_id
444 iter_count = latest.metadata.step
446 # Restore messages
447 for msg in latest.messages:
448 self.context_manager.append_message(msg.get("role", "user"), msg.get("content", ""))
450 # Restore state
451 state = latest.state
452 self._human_interrupts = state.get("human_interrupts", 0)
454 return iter_count
455 except Exception:
456 return 0
458 def set_auto_paging(self, callback: Callable):
459 """Register callback for automatic memory paging (v1.11.0)."""
460 self._auto_page_callback = callback
462 def cancel(self):
463 self._cancelled = True
465 # ── v0.40 Swarm执行 ──────────────────────────
467 async def run_swarm(self, task: str, roles: list[AgentRole] | None = None) -> AgentResult:
468 """以Swarm模式执行任务 — 多Agent协作。"""
469 start_time = time.time()
470 roles = roles or self.config.swarm_roles
471 if not roles:
472 return AgentResult(output="[Swarm] No roles defined", iterations=0, final_state=LoopState.FAILED, error="No roles")
474 topology = SwarmTopology(self.config.swarm_topology)
475 comm_layer = CommunicationLayer() if self.config.enable_comm_layer else None
477 swarm = SwarmCoordinator(
478 router=self.model_router,
479 tool_registry=self.tool_registry,
480 topology=topology,
481 max_parallel=self.config.max_parallel_agents,
482 )
483 swarm.register_roles(roles)
485 swarm_result = await swarm.execute(task, roles)
486 duration_ms = (time.time() - start_time) * 1000
488 return AgentResult(
489 output=swarm_result.combined_output,
490 iterations=1,
491 cost_usd=self.cost_tracker.total_cost,
492 duration_ms=duration_ms,
493 tool_calls_total=0,
494 reflections_count=0,
495 human_interrupts=0,
496 final_state=LoopState.COMPLETED,
497 swarm_result=swarm_result,
498 )
501class StepTimeoutError(Exception):
503 """步骤超时异常。"""
505 pass
508@dataclass
509class StepResult:
510 """步骤执行结果。"""
511 content: str
512 is_terminal: bool = False
513 tool_results: list | None = None