Coverage for agentos/tools/async_executor.py: 89%
249 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 17:47 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 17:47 +0800
1"""
2v1.15.1 — 异步工具执行优化:并发控制 + 超时熔断 + 性能监控。
4核心功能:
51. 并发执行控制:限制同时执行的工具数量
62. 超时熔断:工具执行超时自动中断
73. 性能监控:记录工具执行时间、成功率
84. 智能重试:根据错误类型自动重试
9"""
11from __future__ import annotations
13import asyncio
14import time
15from dataclasses import dataclass, field
16from enum import Enum, auto
17from typing import Any, Callable, Dict, List, Optional, Set, Tuple
18from concurrent.futures import TimeoutError as FutureTimeoutError
20from .base import BaseTool, ToolResult, ToolCall
21from .validation import ToolErrorClassifier, ErrorCategory
24class ExecutionStatus(str, Enum):
25 """工具执行状态。"""
26 PENDING = "pending"
27 RUNNING = "running"
28 SUCCESS = "success"
29 TIMEOUT = "timeout"
30 FAILED = "failed"
31 CANCELLED = "cancelled"
34class CircuitBreakerState(str, Enum):
35 """熔断器状态。"""
36 CLOSED = "closed" # 正常状态,允许执行
37 OPEN = "open" # 熔断状态,拒绝执行
38 HALF_OPEN = "half_open" # 半开状态,尝试恢复
41@dataclass
42class ExecutionMetrics:
43 """工具执行性能指标。"""
44 tool_name: str
45 execution_count: int = 0
46 success_count: int = 0
47 failure_count: int = 0
48 timeout_count: int = 0
49 total_execution_time: float = 0.0
50 last_execution_time: float = 0.0
51 last_error: Optional[str] = None
53 @property
54 def success_rate(self) -> float:
55 if self.execution_count == 0:
56 return 0.0
57 return self.success_count / self.execution_count
59 @property
60 def average_execution_time(self) -> float:
61 if self.execution_count == 0:
62 return 0.0
63 return self.total_execution_time / self.execution_count
65 def record_success(self, execution_time: float) -> None:
66 self.execution_count += 1
67 self.success_count += 1
68 self.total_execution_time += execution_time
69 self.last_execution_time = execution_time
70 self.last_error = None
72 def record_failure(self, execution_time: float, error: str) -> None:
73 self.execution_count += 1
74 self.failure_count += 1
75 self.total_execution_time += execution_time
76 self.last_execution_time = execution_time
77 self.last_error = error
79 def record_timeout(self, execution_time: float) -> None:
80 self.execution_count += 1
81 self.timeout_count += 1
82 self.total_execution_time += execution_time
83 self.last_execution_time = execution_time
84 self.last_error = "timeout"
87@dataclass
88class CircuitBreaker:
89 """熔断器:防止工具持续失败。"""
91 failure_threshold: int = 5 # 连续失败次数阈值
92 reset_timeout: float = 30.0 # 熔断恢复时间(秒)
93 half_open_max_attempts: int = 3 # 半开状态最大尝试次数
95 _state: CircuitBreakerState = field(default=CircuitBreakerState.CLOSED)
96 _failure_count: int = 0
97 _last_failure_time: float = 0.0
98 _half_open_attempts: int = 0
100 def can_execute(self) -> bool:
101 """检查是否允许执行。"""
102 current_time = time.time()
104 if self._state == CircuitBreakerState.OPEN:
105 # 检查是否应该进入半开状态
106 if current_time - self._last_failure_time >= self.reset_timeout:
107 self._state = CircuitBreakerState.HALF_OPEN
108 self._half_open_attempts = 0
109 self._failure_count = 0 # 重置失败计数
110 return True
111 return False
113 elif self._state == CircuitBreakerState.HALF_OPEN:
114 if self._half_open_attempts >= self.half_open_max_attempts:
115 return False
116 return True
118 return True # CLOSED 状态
120 def record_success(self) -> None:
121 """记录成功执行。"""
122 if self._state == CircuitBreakerState.HALF_OPEN:
123 # 半开状态成功,恢复正常
124 self._state = CircuitBreakerState.CLOSED
125 self._failure_count = 0
126 self._half_open_attempts = 0
127 else:
128 self._failure_count = 0
130 def record_failure(self) -> None:
131 """记录失败执行。"""
132 self._failure_count += 1
133 self._last_failure_time = time.time()
135 if self._state == CircuitBreakerState.HALF_OPEN:
136 self._half_open_attempts += 1
137 # 半开状态失败,重新熔断
138 if self._half_open_attempts >= self.half_open_max_attempts:
139 self._state = CircuitBreakerState.OPEN
140 elif self._failure_count >= self.failure_threshold:
141 self._state = CircuitBreakerState.OPEN
143 @property
144 def state(self) -> CircuitBreakerState:
145 return self._state
147 @property
148 def time_until_reset(self) -> float:
149 """距离熔断恢复的剩余时间。"""
150 if self._state != CircuitBreakerState.OPEN:
151 return 0.0
152 elapsed = time.time() - self._last_failure_time
153 return max(0.0, self.reset_timeout - elapsed)
156class AsyncToolExecutor:
157 """异步工具执行器,支持并发控制和熔断。"""
159 def __init__(
160 self,
161 max_concurrent: int = 10,
162 default_timeout: float = 30.0,
163 enable_circuit_breaker: bool = True
164 ):
165 """
166 初始化异步工具执行器。
168 Args:
169 max_concurrent: 最大并发执行数
170 default_timeout: 默认执行超时时间(秒)
171 enable_circuit_breaker: 是否启用熔断器
172 """
173 self.max_concurrent = max_concurrent
174 self.default_timeout = default_timeout
175 self.enable_circuit_breaker = enable_circuit_breaker
177 # 并发控制
178 self._semaphore = asyncio.Semaphore(max_concurrent)
179 self._active_tasks: Set[asyncio.Task] = set()
181 # 性能监控
182 self._metrics: Dict[str, ExecutionMetrics] = {}
183 self._circuit_breakers: Dict[str, CircuitBreaker] = {}
185 # 工具超时配置
186 self._tool_timeouts: Dict[str, float] = {}
188 def set_tool_timeout(self, tool_name: str, timeout: float) -> None:
189 """为特定工具设置超时时间。"""
190 self._tool_timeouts[tool_name] = timeout
192 def get_tool_timeout(self, tool_name: str) -> float:
193 """获取工具的超时时间。"""
194 return self._tool_timeouts.get(tool_name, self.default_timeout)
196 def _get_or_create_metrics(self, tool_name: str) -> ExecutionMetrics:
197 """获取或创建性能指标。"""
198 if tool_name not in self._metrics:
199 self._metrics[tool_name] = ExecutionMetrics(tool_name=tool_name)
200 return self._metrics[tool_name]
202 def _get_or_create_circuit_breaker(self, tool_name: str) -> CircuitBreaker:
203 """获取或创建熔断器。"""
204 if tool_name not in self._circuit_breakers:
205 self._circuit_breakers[tool_name] = CircuitBreaker()
206 return self._circuit_breakers[tool_name]
208 async def execute(
209 self,
210 tool: BaseTool,
211 arguments: Dict[str, Any],
212 call_id: Optional[str] = None,
213 timeout: Optional[float] = None
214 ) -> ToolResult:
215 """
216 异步执行工具。
218 Args:
219 tool: 要执行的工具
220 arguments: 工具参数
221 call_id: 调用ID(可选)
222 timeout: 超时时间(可选,覆盖默认值)
224 Returns:
225 ToolResult: 工具执行结果
226 """
227 if call_id is None:
228 call_id = f"call_{int(time.time() * 1000)}"
230 tool_name = tool.name or tool.__class__.__name__
232 # 检查熔断器
233 if self.enable_circuit_breaker:
234 circuit_breaker = self._get_or_create_circuit_breaker(tool_name)
235 if not circuit_breaker.can_execute():
236 return ToolResult.fail(
237 call_id=call_id,
238 error=f"Circuit breaker is OPEN for tool '{tool_name}'. "
239 f"Try again in {circuit_breaker.time_until_reset:.1f}s."
240 )
242 # 获取超时时间
243 exec_timeout = timeout or self.get_tool_timeout(tool_name)
245 # 获取性能指标
246 metrics = self._get_or_create_metrics(tool_name)
248 # 创建任务
249 task = asyncio.create_task(
250 self._execute_with_semaphore(
251 tool=tool,
252 arguments=arguments,
253 call_id=call_id,
254 timeout=exec_timeout,
255 tool_name=tool_name,
256 metrics=metrics
257 )
258 )
260 self._active_tasks.add(task)
261 task.add_done_callback(self._active_tasks.discard)
263 try:
264 return await task
265 except asyncio.CancelledError:
266 return ToolResult.fail(call_id=call_id, error="Execution cancelled")
268 async def _execute_with_semaphore(
269 self,
270 tool: BaseTool,
271 arguments: Dict[str, Any],
272 call_id: str,
273 timeout: float,
274 tool_name: str,
275 metrics: ExecutionMetrics
276 ) -> ToolResult:
277 """使用信号量控制并发执行。"""
278 start_time = time.time()
280 async with self._semaphore:
281 try:
282 # 执行工具(带超时)
283 result = await asyncio.wait_for(
284 tool.execute(arguments),
285 timeout=timeout
286 )
288 execution_time = time.time() - start_time
290 # 检查结果是否失败
291 if result.error is not None:
292 # 工具执行失败
293 metrics.record_failure(execution_time, result.error)
294 if self.enable_circuit_breaker:
295 circuit_breaker = self._get_or_create_circuit_breaker(tool_name)
296 circuit_breaker.record_failure()
297 else:
298 # 工具执行成功
299 metrics.record_success(execution_time)
300 if self.enable_circuit_breaker:
301 circuit_breaker = self._get_or_create_circuit_breaker(tool_name)
302 circuit_breaker.record_success()
304 return result
306 except asyncio.TimeoutError:
307 execution_time = time.time() - start_time
308 metrics.record_timeout(execution_time)
310 if self.enable_circuit_breaker:
311 circuit_breaker = self._get_or_create_circuit_breaker(tool_name)
312 circuit_breaker.record_failure()
314 return ToolResult.fail(
315 call_id=call_id,
316 error=f"Tool '{tool_name}' execution timed out after {timeout}s"
317 )
319 except Exception as e:
320 execution_time = time.time() - start_time
321 error_msg = str(e)
322 metrics.record_failure(execution_time, error_msg)
324 if self.enable_circuit_breaker:
325 circuit_breaker = self._get_or_create_circuit_breaker(tool_name)
326 circuit_breaker.record_failure()
328 return ToolResult.fail(call_id=call_id, error=error_msg)
330 async def execute_batch(
331 self,
332 tool_calls: List[Tuple[BaseTool, Dict[str, Any]]],
333 max_batch_size: Optional[int] = None,
334 timeout_per_tool: Optional[float] = None
335 ) -> List[ToolResult]:
336 """
337 批量执行工具。
339 Args:
340 tool_calls: 工具调用列表 [(tool, arguments), ...]
341 max_batch_size: 最大批量大小(None表示无限制)
342 timeout_per_tool: 每个工具的超时时间
344 Returns:
345 List[ToolResult]: 工具执行结果列表
346 """
347 if max_batch_size is not None:
348 # 分批执行
349 results = []
350 for i in range(0, len(tool_calls), max_batch_size):
351 batch = tool_calls[i:i + max_batch_size]
352 batch_results = await asyncio.gather(*[
353 self.execute(tool, args, timeout=timeout_per_tool)
354 for tool, args in batch
355 ])
356 results.extend(batch_results)
357 return results
358 else:
359 # 并发执行所有工具
360 tasks = [
361 self.execute(tool, args, timeout=timeout_per_tool)
362 for tool, args in tool_calls
363 ]
364 return await asyncio.gather(*tasks)
366 def get_metrics(self, tool_name: Optional[str] = None) -> Dict[str, ExecutionMetrics]:
367 """获取性能指标。"""
368 if tool_name:
369 return {tool_name: self._metrics.get(tool_name)}
370 return self._metrics.copy()
372 def get_circuit_breaker_state(self, tool_name: str) -> Optional[CircuitBreakerState]:
373 """获取熔断器状态。"""
374 if tool_name in self._circuit_breakers:
375 return self._circuit_breakers[tool_name].state
376 return None
378 def reset_circuit_breaker(self, tool_name: str) -> bool:
379 """重置指定工具的熔断器。"""
380 if tool_name in self._circuit_breakers:
381 self._circuit_breakers[tool_name] = CircuitBreaker()
382 return True
383 return False
385 def reset_all_circuit_breakers(self) -> None:
386 """重置所有熔断器。"""
387 self._circuit_breakers.clear()
389 async def shutdown(self, timeout: float = 5.0) -> None:
390 """优雅关闭执行器。"""
391 # 取消所有正在执行的任务
392 for task in self._active_tasks.copy():
393 task.cancel()
395 # 等待任务完成或超时
396 if self._active_tasks:
397 try:
398 await asyncio.wait_for(
399 asyncio.gather(*self._active_tasks, return_exceptions=True),
400 timeout=timeout
401 )
402 except asyncio.TimeoutError:
403 pass
405 @property
406 def active_task_count(self) -> int:
407 """当前活跃任务数量。"""
408 return len(self._active_tasks)
410 @property
411 def available_slots(self) -> int:
412 """可用并发槽位数量。"""
413 return self.max_concurrent - self.active_task_count
416class SmartRetryExecutor:
417 """智能重试执行器:根据错误类型自动重试。"""
419 def __init__(
420 self,
421 max_retries: int = 3,
422 retry_delay: float = 1.0,
423 backoff_factor: float = 2.0,
424 retryable_categories: Optional[List[ErrorCategory]] = None
425 ):
426 """
427 初始化智能重试执行器。
429 Args:
430 max_retries: 最大重试次数
431 retry_delay: 初始重试延迟(秒)
432 backoff_factor: 退避因子
433 retryable_categories: 可重试的错误类别
434 """
435 self.max_retries = max_retries
436 self.retry_delay = retry_delay
437 self.backoff_factor = backoff_factor
439 if retryable_categories is None:
440 self.retryable_categories = [
441 ErrorCategory.NETWORK,
442 ErrorCategory.TIMEOUT,
443 ErrorCategory.RATE_LIMIT,
444 ErrorCategory.UNKNOWN
445 ]
446 else:
447 self.retryable_categories = retryable_categories
449 async def execute_with_retry(
450 self,
451 tool: BaseTool,
452 arguments: Dict[str, Any],
453 call_id: Optional[str] = None,
454 base_executor: Optional[AsyncToolExecutor] = None
455 ) -> ToolResult:
456 """
457 带智能重试的工具执行。
459 Args:
460 tool: 要执行的工具
461 arguments: 工具参数
462 call_id: 调用ID
463 base_executor: 基础执行器(可选)
465 Returns:
466 ToolResult: 最终执行结果
467 """
468 if call_id is None:
469 call_id = f"retry_{int(time.time() * 1000)}"
471 if base_executor is None:
472 base_executor = AsyncToolExecutor()
474 last_result = None
475 delay = self.retry_delay
477 for attempt in range(self.max_retries + 1):
478 if attempt > 0:
479 # 等待重试延迟
480 await asyncio.sleep(delay)
481 delay *= self.backoff_factor # 指数退避
483 # 执行工具
484 result = await base_executor.execute(
485 tool=tool,
486 arguments=arguments,
487 call_id=f"{call_id}_attempt{attempt}"
488 )
490 if result.error is None:
491 # 执行成功
492 return result
494 # 检查是否可重试
495 last_result = result
496 error_category = ToolErrorClassifier.classify(result)
498 if error_category not in self.retryable_categories:
499 # 不可重试的错误
500 break
502 if attempt == self.max_retries:
503 # 达到最大重试次数
504 break
506 # 返回最后一次失败的结果
507 return last_result or ToolResult.fail(
508 call_id=call_id,
509 error="Execution failed after retries"
510 )
513# 便捷函数
514async def execute_tool_with_retry(
515 tool: BaseTool,
516 arguments: Dict[str, Any],
517 max_retries: int = 3,
518 call_id: Optional[str] = None
519) -> ToolResult:
520 """
521 带重试的工具执行便捷函数。
523 Args:
524 tool: 要执行的工具
525 arguments: 工具参数
526 max_retries: 最大重试次数
527 call_id: 调用ID
529 Returns:
530 ToolResult: 执行结果
531 """
532 executor = SmartRetryExecutor(max_retries=max_retries)
533 return await executor.execute_with_retry(tool, arguments, call_id)
536async def execute_tools_concurrently(
537 tool_calls: List[Tuple[BaseTool, Dict[str, Any]]],
538 max_concurrent: int = 10,
539 timeout_per_tool: Optional[float] = None
540) -> List[ToolResult]:
541 """
542 并发执行多个工具的便捷函数。
544 Args:
545 tool_calls: 工具调用列表
546 max_concurrent: 最大并发数
547 timeout_per_tool: 每个工具的超时时间
549 Returns:
550 List[ToolResult]: 执行结果列表
551 """
552 executor = AsyncToolExecutor(max_concurrent=max_concurrent)
553 return await executor.execute_batch(
554 tool_calls=tool_calls,
555 timeout_per_tool=timeout_per_tool
556 )