Coverage for agentos/tools/async_executor.py: 31%

249 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 20:13 +0800

1""" 

2v1.15.1 — 异步工具执行优化:并发控制 + 超时熔断 + 性能监控。 

3 

4核心功能: 

51. 并发执行控制:限制同时执行的工具数量 

62. 超时熔断:工具执行超时自动中断 

73. 性能监控:记录工具执行时间、成功率 

84. 智能重试:根据错误类型自动重试 

9""" 

10 

11from __future__ import annotations 

12 

13import asyncio 

14import time 

15from dataclasses import dataclass, field 

16from enum import Enum, auto 

17from typing import Any, Callable, Dict, List, Optional, Set, Tuple 

18from concurrent.futures import TimeoutError as FutureTimeoutError 

19 

20from .base import BaseTool, ToolResult, ToolCall 

21from .validation import ToolErrorClassifier, ErrorCategory 

22 

23 

24class ExecutionStatus(str, Enum): 

25 """工具执行状态。""" 

26 PENDING = "pending" 

27 RUNNING = "running" 

28 SUCCESS = "success" 

29 TIMEOUT = "timeout" 

30 FAILED = "failed" 

31 CANCELLED = "cancelled" 

32 

33 

34class CircuitBreakerState(str, Enum): 

35 """熔断器状态。""" 

36 CLOSED = "closed" # 正常状态,允许执行 

37 OPEN = "open" # 熔断状态,拒绝执行 

38 HALF_OPEN = "half_open" # 半开状态,尝试恢复 

39 

40 

41@dataclass 

42class ExecutionMetrics: 

43 """工具执行性能指标。""" 

44 tool_name: str 

45 execution_count: int = 0 

46 success_count: int = 0 

47 failure_count: int = 0 

48 timeout_count: int = 0 

49 total_execution_time: float = 0.0 

50 last_execution_time: float = 0.0 

51 last_error: Optional[str] = None 

52 

53 @property 

54 def success_rate(self) -> float: 

55 if self.execution_count == 0: 

56 return 0.0 

57 return self.success_count / self.execution_count 

58 

59 @property 

60 def average_execution_time(self) -> float: 

61 if self.execution_count == 0: 

62 return 0.0 

63 return self.total_execution_time / self.execution_count 

64 

65 def record_success(self, execution_time: float) -> None: 

66 self.execution_count += 1 

67 self.success_count += 1 

68 self.total_execution_time += execution_time 

69 self.last_execution_time = execution_time 

70 self.last_error = None 

71 

72 def record_failure(self, execution_time: float, error: str) -> None: 

73 self.execution_count += 1 

74 self.failure_count += 1 

75 self.total_execution_time += execution_time 

76 self.last_execution_time = execution_time 

77 self.last_error = error 

78 

79 def record_timeout(self, execution_time: float) -> None: 

80 self.execution_count += 1 

81 self.timeout_count += 1 

82 self.total_execution_time += execution_time 

83 self.last_execution_time = execution_time 

84 self.last_error = "timeout" 

85 

86 

87@dataclass 

88class CircuitBreaker: 

89 """熔断器:防止工具持续失败。""" 

90 

91 failure_threshold: int = 5 # 连续失败次数阈值 

92 reset_timeout: float = 30.0 # 熔断恢复时间(秒) 

93 half_open_max_attempts: int = 3 # 半开状态最大尝试次数 

94 

95 _state: CircuitBreakerState = field(default=CircuitBreakerState.CLOSED) 

96 _failure_count: int = 0 

97 _last_failure_time: float = 0.0 

98 _half_open_attempts: int = 0 

99 

100 def can_execute(self) -> bool: 

101 """检查是否允许执行。""" 

102 current_time = time.time() 

103 

104 if self._state == CircuitBreakerState.OPEN: 

105 # 检查是否应该进入半开状态 

106 if current_time - self._last_failure_time >= self.reset_timeout: 

107 self._state = CircuitBreakerState.HALF_OPEN 

108 self._half_open_attempts = 0 

109 self._failure_count = 0 # 重置失败计数 

110 return True 

111 return False 

112 

113 elif self._state == CircuitBreakerState.HALF_OPEN: 

114 if self._half_open_attempts >= self.half_open_max_attempts: 

115 return False 

116 return True 

117 

118 return True # CLOSED 状态 

119 

120 def record_success(self) -> None: 

121 """记录成功执行。""" 

122 if self._state == CircuitBreakerState.HALF_OPEN: 

123 # 半开状态成功,恢复正常 

124 self._state = CircuitBreakerState.CLOSED 

125 self._failure_count = 0 

126 self._half_open_attempts = 0 

127 else: 

128 self._failure_count = 0 

129 

130 def record_failure(self) -> None: 

131 """记录失败执行。""" 

132 self._failure_count += 1 

133 self._last_failure_time = time.time() 

134 

135 if self._state == CircuitBreakerState.HALF_OPEN: 

136 self._half_open_attempts += 1 

137 # 半开状态失败,重新熔断 

138 if self._half_open_attempts >= self.half_open_max_attempts: 

139 self._state = CircuitBreakerState.OPEN 

140 elif self._failure_count >= self.failure_threshold: 

141 self._state = CircuitBreakerState.OPEN 

142 

143 @property 

144 def state(self) -> CircuitBreakerState: 

145 return self._state 

146 

147 @property 

148 def time_until_reset(self) -> float: 

149 """距离熔断恢复的剩余时间。""" 

150 if self._state != CircuitBreakerState.OPEN: 

151 return 0.0 

152 elapsed = time.time() - self._last_failure_time 

153 return max(0.0, self.reset_timeout - elapsed) 

154 

155 

156class AsyncToolExecutor: 

157 """异步工具执行器,支持并发控制和熔断。""" 

158 

159 def __init__( 

160 self, 

161 max_concurrent: int = 10, 

162 default_timeout: float = 30.0, 

163 enable_circuit_breaker: bool = True 

164 ): 

165 """ 

166 初始化异步工具执行器。 

167  

168 Args: 

169 max_concurrent: 最大并发执行数 

170 default_timeout: 默认执行超时时间(秒) 

171 enable_circuit_breaker: 是否启用熔断器 

172 """ 

173 self.max_concurrent = max_concurrent 

174 self.default_timeout = default_timeout 

175 self.enable_circuit_breaker = enable_circuit_breaker 

176 

177 # 并发控制 

178 self._semaphore = asyncio.Semaphore(max_concurrent) 

179 self._active_tasks: Set[asyncio.Task] = set() 

180 

181 # 性能监控 

182 self._metrics: Dict[str, ExecutionMetrics] = {} 

183 self._circuit_breakers: Dict[str, CircuitBreaker] = {} 

184 

185 # 工具超时配置 

186 self._tool_timeouts: Dict[str, float] = {} 

187 

188 def set_tool_timeout(self, tool_name: str, timeout: float) -> None: 

189 """为特定工具设置超时时间。""" 

190 self._tool_timeouts[tool_name] = timeout 

191 

192 def get_tool_timeout(self, tool_name: str) -> float: 

193 """获取工具的超时时间。""" 

194 return self._tool_timeouts.get(tool_name, self.default_timeout) 

195 

196 def _get_or_create_metrics(self, tool_name: str) -> ExecutionMetrics: 

197 """获取或创建性能指标。""" 

198 if tool_name not in self._metrics: 

199 self._metrics[tool_name] = ExecutionMetrics(tool_name=tool_name) 

200 return self._metrics[tool_name] 

201 

202 def _get_or_create_circuit_breaker(self, tool_name: str) -> CircuitBreaker: 

203 """获取或创建熔断器。""" 

204 if tool_name not in self._circuit_breakers: 

205 self._circuit_breakers[tool_name] = CircuitBreaker() 

206 return self._circuit_breakers[tool_name] 

207 

208 async def execute( 

209 self, 

210 tool: BaseTool, 

211 arguments: Dict[str, Any], 

212 call_id: Optional[str] = None, 

213 timeout: Optional[float] = None 

214 ) -> ToolResult: 

215 """ 

216 异步执行工具。 

217  

218 Args: 

219 tool: 要执行的工具 

220 arguments: 工具参数 

221 call_id: 调用ID(可选) 

222 timeout: 超时时间(可选,覆盖默认值) 

223  

224 Returns: 

225 ToolResult: 工具执行结果 

226 """ 

227 if call_id is None: 

228 call_id = f"call_{int(time.time() * 1000)}" 

229 

230 tool_name = tool.name or tool.__class__.__name__ 

231 

232 # 检查熔断器 

233 if self.enable_circuit_breaker: 

234 circuit_breaker = self._get_or_create_circuit_breaker(tool_name) 

235 if not circuit_breaker.can_execute(): 

236 return ToolResult.fail( 

237 call_id=call_id, 

238 error=f"Circuit breaker is OPEN for tool '{tool_name}'. " 

239 f"Try again in {circuit_breaker.time_until_reset:.1f}s." 

240 ) 

241 

242 # 获取超时时间 

243 exec_timeout = timeout or self.get_tool_timeout(tool_name) 

244 

245 # 获取性能指标 

246 metrics = self._get_or_create_metrics(tool_name) 

247 

248 # 创建任务 

249 task = asyncio.create_task( 

250 self._execute_with_semaphore( 

251 tool=tool, 

252 arguments=arguments, 

253 call_id=call_id, 

254 timeout=exec_timeout, 

255 tool_name=tool_name, 

256 metrics=metrics 

257 ) 

258 ) 

259 

260 self._active_tasks.add(task) 

261 task.add_done_callback(self._active_tasks.discard) 

262 

263 try: 

264 return await task 

265 except asyncio.CancelledError: 

266 return ToolResult.fail(call_id=call_id, error="Execution cancelled") 

267 

268 async def _execute_with_semaphore( 

269 self, 

270 tool: BaseTool, 

271 arguments: Dict[str, Any], 

272 call_id: str, 

273 timeout: float, 

274 tool_name: str, 

275 metrics: ExecutionMetrics 

276 ) -> ToolResult: 

277 """使用信号量控制并发执行。""" 

278 start_time = time.time() 

279 

280 async with self._semaphore: 

281 try: 

282 # 执行工具(带超时) 

283 result = await asyncio.wait_for( 

284 tool.execute(arguments), 

285 timeout=timeout 

286 ) 

287 

288 execution_time = time.time() - start_time 

289 

290 # 检查结果是否失败 

291 if result.error is not None: 

292 # 工具执行失败 

293 metrics.record_failure(execution_time, result.error) 

294 if self.enable_circuit_breaker: 

295 circuit_breaker = self._get_or_create_circuit_breaker(tool_name) 

296 circuit_breaker.record_failure() 

297 else: 

298 # 工具执行成功 

299 metrics.record_success(execution_time) 

300 if self.enable_circuit_breaker: 

301 circuit_breaker = self._get_or_create_circuit_breaker(tool_name) 

302 circuit_breaker.record_success() 

303 

304 return result 

305 

306 except asyncio.TimeoutError: 

307 execution_time = time.time() - start_time 

308 metrics.record_timeout(execution_time) 

309 

310 if self.enable_circuit_breaker: 

311 circuit_breaker = self._get_or_create_circuit_breaker(tool_name) 

312 circuit_breaker.record_failure() 

313 

314 return ToolResult.fail( 

315 call_id=call_id, 

316 error=f"Tool '{tool_name}' execution timed out after {timeout}s" 

317 ) 

318 

319 except Exception as e: 

320 execution_time = time.time() - start_time 

321 error_msg = str(e) 

322 metrics.record_failure(execution_time, error_msg) 

323 

324 if self.enable_circuit_breaker: 

325 circuit_breaker = self._get_or_create_circuit_breaker(tool_name) 

326 circuit_breaker.record_failure() 

327 

328 return ToolResult.fail(call_id=call_id, error=error_msg) 

329 

330 async def execute_batch( 

331 self, 

332 tool_calls: List[Tuple[BaseTool, Dict[str, Any]]], 

333 max_batch_size: Optional[int] = None, 

334 timeout_per_tool: Optional[float] = None 

335 ) -> List[ToolResult]: 

336 """ 

337 批量执行工具。 

338  

339 Args: 

340 tool_calls: 工具调用列表 [(tool, arguments), ...] 

341 max_batch_size: 最大批量大小(None表示无限制) 

342 timeout_per_tool: 每个工具的超时时间 

343  

344 Returns: 

345 List[ToolResult]: 工具执行结果列表 

346 """ 

347 if max_batch_size is not None: 

348 # 分批执行 

349 results = [] 

350 for i in range(0, len(tool_calls), max_batch_size): 

351 batch = tool_calls[i:i + max_batch_size] 

352 batch_results = await asyncio.gather(*[ 

353 self.execute(tool, args, timeout=timeout_per_tool) 

354 for tool, args in batch 

355 ]) 

356 results.extend(batch_results) 

357 return results 

358 else: 

359 # 并发执行所有工具 

360 tasks = [ 

361 self.execute(tool, args, timeout=timeout_per_tool) 

362 for tool, args in tool_calls 

363 ] 

364 return await asyncio.gather(*tasks) 

365 

366 def get_metrics(self, tool_name: Optional[str] = None) -> Dict[str, ExecutionMetrics]: 

367 """获取性能指标。""" 

368 if tool_name: 

369 return {tool_name: self._metrics.get(tool_name)} 

370 return self._metrics.copy() 

371 

372 def get_circuit_breaker_state(self, tool_name: str) -> Optional[CircuitBreakerState]: 

373 """获取熔断器状态。""" 

374 if tool_name in self._circuit_breakers: 

375 return self._circuit_breakers[tool_name].state 

376 return None 

377 

378 def reset_circuit_breaker(self, tool_name: str) -> bool: 

379 """重置指定工具的熔断器。""" 

380 if tool_name in self._circuit_breakers: 

381 self._circuit_breakers[tool_name] = CircuitBreaker() 

382 return True 

383 return False 

384 

385 def reset_all_circuit_breakers(self) -> None: 

386 """重置所有熔断器。""" 

387 self._circuit_breakers.clear() 

388 

389 async def shutdown(self, timeout: float = 5.0) -> None: 

390 """优雅关闭执行器。""" 

391 # 取消所有正在执行的任务 

392 for task in self._active_tasks.copy(): 

393 task.cancel() 

394 

395 # 等待任务完成或超时 

396 if self._active_tasks: 

397 try: 

398 await asyncio.wait_for( 

399 asyncio.gather(*self._active_tasks, return_exceptions=True), 

400 timeout=timeout 

401 ) 

402 except asyncio.TimeoutError: 

403 pass 

404 

405 @property 

406 def active_task_count(self) -> int: 

407 """当前活跃任务数量。""" 

408 return len(self._active_tasks) 

409 

410 @property 

411 def available_slots(self) -> int: 

412 """可用并发槽位数量。""" 

413 return self.max_concurrent - self.active_task_count 

414 

415 

416class SmartRetryExecutor: 

417 """智能重试执行器:根据错误类型自动重试。""" 

418 

419 def __init__( 

420 self, 

421 max_retries: int = 3, 

422 retry_delay: float = 1.0, 

423 backoff_factor: float = 2.0, 

424 retryable_categories: Optional[List[ErrorCategory]] = None 

425 ): 

426 """ 

427 初始化智能重试执行器。 

428  

429 Args: 

430 max_retries: 最大重试次数 

431 retry_delay: 初始重试延迟(秒) 

432 backoff_factor: 退避因子 

433 retryable_categories: 可重试的错误类别 

434 """ 

435 self.max_retries = max_retries 

436 self.retry_delay = retry_delay 

437 self.backoff_factor = backoff_factor 

438 

439 if retryable_categories is None: 

440 self.retryable_categories = [ 

441 ErrorCategory.NETWORK, 

442 ErrorCategory.TIMEOUT, 

443 ErrorCategory.RATE_LIMIT, 

444 ErrorCategory.UNKNOWN 

445 ] 

446 else: 

447 self.retryable_categories = retryable_categories 

448 

449 async def execute_with_retry( 

450 self, 

451 tool: BaseTool, 

452 arguments: Dict[str, Any], 

453 call_id: Optional[str] = None, 

454 base_executor: Optional[AsyncToolExecutor] = None 

455 ) -> ToolResult: 

456 """ 

457 带智能重试的工具执行。 

458  

459 Args: 

460 tool: 要执行的工具 

461 arguments: 工具参数 

462 call_id: 调用ID 

463 base_executor: 基础执行器(可选) 

464  

465 Returns: 

466 ToolResult: 最终执行结果 

467 """ 

468 if call_id is None: 

469 call_id = f"retry_{int(time.time() * 1000)}" 

470 

471 if base_executor is None: 

472 base_executor = AsyncToolExecutor() 

473 

474 last_result = None 

475 delay = self.retry_delay 

476 

477 for attempt in range(self.max_retries + 1): 

478 if attempt > 0: 

479 # 等待重试延迟 

480 await asyncio.sleep(delay) 

481 delay *= self.backoff_factor # 指数退避 

482 

483 # 执行工具 

484 result = await base_executor.execute( 

485 tool=tool, 

486 arguments=arguments, 

487 call_id=f"{call_id}_attempt{attempt}" 

488 ) 

489 

490 if result.error is None: 

491 # 执行成功 

492 return result 

493 

494 # 检查是否可重试 

495 last_result = result 

496 error_category = ToolErrorClassifier.classify(result) 

497 

498 if error_category not in self.retryable_categories: 

499 # 不可重试的错误 

500 break 

501 

502 if attempt == self.max_retries: 

503 # 达到最大重试次数 

504 break 

505 

506 # 返回最后一次失败的结果 

507 return last_result or ToolResult.fail( 

508 call_id=call_id, 

509 error="Execution failed after retries" 

510 ) 

511 

512 

513# 便捷函数 

514async def execute_tool_with_retry( 

515 tool: BaseTool, 

516 arguments: Dict[str, Any], 

517 max_retries: int = 3, 

518 call_id: Optional[str] = None 

519) -> ToolResult: 

520 """ 

521 带重试的工具执行便捷函数。 

522  

523 Args: 

524 tool: 要执行的工具 

525 arguments: 工具参数 

526 max_retries: 最大重试次数 

527 call_id: 调用ID 

528  

529 Returns: 

530 ToolResult: 执行结果 

531 """ 

532 executor = SmartRetryExecutor(max_retries=max_retries) 

533 return await executor.execute_with_retry(tool, arguments, call_id) 

534 

535 

536async def execute_tools_concurrently( 

537 tool_calls: List[Tuple[BaseTool, Dict[str, Any]]], 

538 max_concurrent: int = 10, 

539 timeout_per_tool: Optional[float] = None 

540) -> List[ToolResult]: 

541 """ 

542 并发执行多个工具的便捷函数。 

543  

544 Args: 

545 tool_calls: 工具调用列表 

546 max_concurrent: 最大并发数 

547 timeout_per_tool: 每个工具的超时时间 

548  

549 Returns: 

550 List[ToolResult]: 执行结果列表 

551 """ 

552 executor = AsyncToolExecutor(max_concurrent=max_concurrent) 

553 return await executor.execute_batch( 

554 tool_calls=tool_calls, 

555 timeout_per_tool=timeout_per_tool 

556 )