Coverage for agentos/swarm/coordinator.py: 19%

484 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.8: Smart Swarm Coordinator with tool registry + intelligent routing. 

3 

4Full intelligence stack: 

5- TaskDecomposer: decompose complex tasks into sub-task DAGs 

6- ResultFusion: LLM-as-Judge aggregation with confidence scoring 

7- EvalFeedbackLoop: execute → evaluate → retry → converge 

8- CodeSandbox: safe code generation & execution with test cases 

9- HumanLoop: human-in-the-loop breakpoints for approval/intervention 

10- AgentMonitor: quality gates + self-monitoring pipeline 

11- ExecutionTrace: full span-tree observability + bottleneck detection 

12- AgentMemory: three-tier memory (working/short-term/long-term) + context window 

13- ToolRegistry: schema-based tool catalog with versioning, capabilities, search 

14- ToolRouter: intelligent tool selection with LLM + semantic matching 

15- ToolExecutor: safe tool execution with validation, rate limiting, destructive confirmation 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import time 

22import uuid 

23from dataclasses import dataclass, field 

24from enum import Enum 

25from typing import Any, Callable, Optional 

26 

27from agentos.core.di import Agent, RunContext 

28from agentos.swarm.task_decomposer import TaskDecomposer, Decomposition, SubTask 

29from agentos.swarm.result_fusion import ResultFusion, FusedResult 

30from agentos.swarm.eval_feedback_loop import EvalFeedbackLoop, LoopResult, RetryConfig 

31from agentos.swarm.code_sandbox import CodeSandbox, SandboxResult, TestCase, CodeFeedbackExtractor 

32from agentos.swarm.human_loop import ( 

33 HITLManager, HITLConfig, Breakpoint, BreakpointType, HumanDecision, 

34) 

35from agentos.swarm.agent_monitor import ( 

36 AgentMonitor, QualityGate, MonitorReport, GateResult, GateStatus, GateAction, 

37 output_not_empty, output_length_range, no_error_output, contains_keywords, 

38 latency_max, confidence_min, 

39) 

40from agentos.swarm.execution_trace import ( 

41 ExecutionTrace, TraceSpan, TraceEvent, TraceCollector, 

42) 

43from agentos.swarm.agent_memory import ( 

44 AgentMemory, WorkingMemory, ShortTermMemory, LongTermMemory, 

45 ContextWindowManager, ContextBudget, MemoryEntry, 

46) 

47from agentos.swarm.tool_registry import ( 

48 ToolRegistry, ToolRouter, ToolExecutor, ToolSchema, ToolParam, 

49 ToolCategory, RoutingDecision, RoutingContext, ToolExecutionError, 

50 create_tool, 

51) 

52from agentos.security.guard import ( 

53 GuardPipeline, InputGuard, OutputGuard, 

54 PIIDetector, ContentSafetyFilter, 

55 create_strict_guard, create_permissive_guard, 

56 GuardChainResult, 

57) 

58 

59 

60class SwarmTopology(str, Enum): 

61 """Swarm topology types.""" 

62 STAR = "star" # Central coordinator 

63 RING = "ring" # Circular message passing 

64 MESH = "mesh" # All-to-all communication 

65 TREE = "tree" # Hierarchical structure 

66 

67 

68class ExecutionMode(str, Enum): 

69 """Execution strategy for the coordinator.""" 

70 RAW = "raw" # Original topology-only execution 

71 SMART = "smart" # Decompose → Execute DAG → Fuse 

72 FEEDBACK = "feedback" # Smart + eval feedback loop 

73 

74 

75@dataclass 

76class AgentRole: 

77 """Agent 角色定义。""" 

78 name: str 

79 goal: str 

80 backstory: str = "" 

81 tools: list[str] = field(default_factory=list) 

82 model: str = "auto" 

83 temperature: float = 0.7 

84 allow_delegation: bool = True 

85 verbose: bool = False 

86 

87 

88class MessageBus: 

89 """Agent 间消息总线 — 黑板模式。""" 

90 

91 def __init__(self): 

92 self._messages: list[dict] = [] 

93 self._subscribers: dict[str, list[Callable]] = {} 

94 self._shared_memory: dict[str, Any] = {} 

95 

96 def publish(self, sender: str, topic: str, data: dict): 

97 msg = {"sender": sender, "topic": topic, "data": data} 

98 self._messages.append(msg) 

99 if topic in self._subscribers: 

100 for cb in self._subscribers[topic]: 

101 cb(msg) 

102 

103 def subscribe(self, topic: str, callback: Callable): 

104 self._subscribers.setdefault(topic, []).append(callback) 

105 

106 @property 

107 def messages(self) -> list[dict]: 

108 return self._messages 

109 

110 @property 

111 def shared_memory(self) -> dict[str, Any]: 

112 return self._shared_memory 

113 

114 

115@dataclass 

116class SwarmMessage: 

117 """ 

118 Message in swarm communication. 

119 

120 Attributes: 

121 id: Unique identifier 

122 sender: Sender agent name 

123 receiver: Receiver agent name (None = broadcast) 

124 content: Message content 

125 metadata: Additional metadata 

126 timestamp: Message timestamp 

127 """ 

128 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

129 sender: str = "" 

130 receiver: Optional[str] = None 

131 content: Any = None 

132 metadata: dict[str, Any] = field(default_factory=dict) 

133 timestamp: float = field(default_factory=time.time) 

134 

135 def to_dict(self) -> dict[str, Any]: 

136 """Convert to dict.""" 

137 return { 

138 "id": self.id, 

139 "sender": self.sender, 

140 "receiver": self.receiver, 

141 "content": self.content, 

142 "metadata": self.metadata, 

143 "timestamp": self.timestamp, 

144 } 

145 

146 

147@dataclass 

148class SwarmResult: 

149 """ 

150 Result of swarm execution. 

151 

152 Attributes: 

153 id: Unique identifier 

154 topology: Swarm topology 

155 mode: Execution mode used 

156 outputs: Agent outputs 

157 messages: Communication messages 

158 duration: Execution duration 

159 success: Whether execution succeeded 

160 fused: ResultFusion output (smart mode only) 

161 decomposition: Task decomposition used (smart mode only) 

162 feedback_loop: Feedback loop result (feedback mode only) 

163 """ 

164 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

165 topology: SwarmTopology = SwarmTopology.STAR 

166 mode: ExecutionMode = ExecutionMode.RAW 

167 outputs: dict[str, Any] = field(default_factory=dict) 

168 messages: list[SwarmMessage] = field(default_factory=list) 

169 duration: float = 0.0 

170 success: bool = True 

171 fused: Optional[FusedResult] = None 

172 decomposition: Optional[Decomposition] = None 

173 feedback_loop: Optional[LoopResult] = None 

174 

175 def to_dict(self) -> dict[str, Any]: 

176 """Convert to dict.""" 

177 d: dict[str, Any] = { 

178 "id": self.id, 

179 "topology": self.topology.value, 

180 "mode": self.mode.value, 

181 "outputs": self.outputs, 

182 "messages": [m.to_dict() for m in self.messages], 

183 "duration": f"{self.duration:.2f}s", 

184 "success": self.success, 

185 } 

186 if self.fused: 

187 d["fused"] = { 

188 "action": self.fused.action, 

189 "confidence": self.fused.confidence, 

190 "reason": self.fused.reason, 

191 } 

192 if self.decomposition: 

193 d["decomposition"] = { 

194 "sub_tasks": [st.to_dict() for st in self.decomposition.sub_tasks], 

195 "total_steps": self.decomposition.total_steps, 

196 } 

197 if self.feedback_loop: 

198 d["feedback_loop"] = { 

199 "attempts": self.feedback_loop.attempts, 

200 "best_score": self.feedback_loop.best_score, 

201 "converged": self.feedback_loop.converged, 

202 "duration": f"{self.feedback_loop.duration:.2f}s", 

203 } 

204 return d 

205 

206 

207class SmartSwarmCoordinator: 

208 """ 

209 v1.9.4: Multi-agent coordination with intelligent orchestration. 

210 

211 Upgrades the coordinator with: 

212 - TaskDecomposer: LLM-driven sub-task DAG decomposition 

213 - ResultFusion: LLM-as-Judge aggregation with confidence scoring 

214 - EvalFeedbackLoop: execute → evaluate → retry → converge 

215 

216 Usage: 

217 coordinator = SmartSwarmCoordinator(topology=SwarmTopology.MESH) 

218 coordinator.register(agent1) 

219 coordinator.register(agent2) 

220 

221 # Smart mode with decomposition + fusion 

222 result = await coordinator.smart_execute("complex research task") 

223 

224 # Feedback mode with evaluation retry loop 

225 result = await coordinator.execute_with_feedback( 

226 task, expected_output, scorer 

227 ) 

228 """ 

229 

230 def __init__( 

231 self, 

232 topology: SwarmTopology = SwarmTopology.STAR, 

233 max_rounds: int = 10, 

234 execution_mode: ExecutionMode = ExecutionMode.SMART, 

235 decomposer: TaskDecomposer | None = None, 

236 fusion: ResultFusion | None = None, 

237 feedback_loop: EvalFeedbackLoop | None = None, 

238 sandbox: CodeSandbox | None = None, 

239 hitl_manager: HITLManager | None = None, 

240 monitor: AgentMonitor | None = None, 

241 trace_collector: TraceCollector | None = None, 

242 memory: AgentMemory | None = None, 

243 tool_registry: ToolRegistry | None = None, 

244 tool_router: ToolRouter | None = None, 

245 tool_executor: ToolExecutor | None = None, 

246 guard: GuardPipeline | None = None, 

247 ): 

248 """ 

249 Initialize smart swarm coordinator. 

250 

251 Args: 

252 topology: Swarm topology 

253 max_rounds: Maximum communication rounds 

254 execution_mode: Default execution mode 

255 decomposer: TaskDecomposer instance (created if None) 

256 fusion: ResultFusion instance (created if None) 

257 feedback_loop: EvalFeedbackLoop instance (created if None) 

258 sandbox: CodeSandbox instance for code execution (created if None) 

259 hitl_manager: HITLManager for human-in-the-loop (created if None) 

260 monitor: AgentMonitor for quality gating (created if None) 

261 trace_collector: TraceCollector for execution traces (created if None) 

262 memory: AgentMemory for layered memory (created if None) 

263 tool_registry: ToolRegistry for tool catalog (created if None) 

264 tool_router: ToolRouter for intelligent tool selection (created if None) 

265 tool_executor: ToolExecutor for safe tool execution (created if None) 

266 guard: GuardPipeline for input/output safety filtering (created if None) 

267 """ 

268 self.topology = topology 

269 self.max_rounds = max_rounds 

270 self.execution_mode = execution_mode 

271 self._agents: dict[str, Agent[Any, Any]] = {} 

272 self._message_queue: list[SwarmMessage] = [] 

273 

274 self.decomposer = decomposer or TaskDecomposer() 

275 self.fusion = fusion or ResultFusion() 

276 self.feedback = feedback_loop or EvalFeedbackLoop() 

277 self.sandbox = sandbox or CodeSandbox() 

278 self.hitl = hitl_manager or HITLManager() 

279 self.monitor = monitor or AgentMonitor() 

280 self.tracer = trace_collector or TraceCollector() 

281 self.memory = memory or AgentMemory() 

282 self.tool_registry = tool_registry or ToolRegistry() 

283 self.tool_router = tool_router or ToolRouter(self.tool_registry) 

284 self.tool_executor = tool_executor or ToolExecutor(self.tool_registry) 

285 self.guard = guard or create_strict_guard() 

286 

287 # Original topology methods bound for backward compatibility 

288 self._topo_handlers = { 

289 SwarmTopology.STAR: self._execute_star, 

290 SwarmTopology.RING: self._execute_ring, 

291 SwarmTopology.MESH: self._execute_mesh, 

292 SwarmTopology.TREE: self._execute_tree, 

293 } 

294 

295 # ── Agent management ────────────────────────────────────────── 

296 

297 def register(self, agent: Agent[Any, Any]) -> None: 

298 self._agents[agent.name] = agent 

299 

300 def unregister(self, agent_name: str) -> bool: 

301 if agent_name in self._agents: 

302 del self._agents[agent_name] 

303 return True 

304 return False 

305 

306 def get_agent(self, agent_name: str) -> Optional[Agent[Any, Any]]: 

307 return self._agents.get(agent_name) 

308 

309 def list_agents(self) -> list[str]: 

310 return list(self._agents.keys()) 

311 

312 # ── Execution API ───────────────────────────────────────────── 

313 

314 async def execute( 

315 self, 

316 task: Any, 

317 mode: ExecutionMode | None = None, 

318 **metadata, 

319 ) -> SwarmResult: 

320 """Execute a task. Delegates to smart_execute or raw topology.""" 

321 mode = mode or self.execution_mode 

322 if mode == ExecutionMode.SMART: 

323 return await self.smart_execute(task, **metadata) 

324 return await self._execute_raw(task, **metadata) 

325 

326 async def smart_execute( 

327 self, 

328 task: Any, 

329 _trace: ExecutionTrace | None = None, 

330 **metadata, 

331 ) -> SwarmResult: 

332 """Smart execution: decompose → execute DAG → fuse. 

333 

334 Uses ExecutionTrace for observability when tracer is available. 

335 

336 Args: 

337 task: Task description (string or structured) 

338 _trace: Optional trace to attach (auto-created if self.tracer exists) 

339 **metadata: Additional metadata 

340 

341 Returns: 

342 SwarmResult with fused output and decomposition trace 

343 """ 

344 start_time = time.time() 

345 task_str = str(task) 

346 

347 # Step 0: Security guard — input filtering 

348 guard_result = self.guard.process_input(task_str) 

349 if guard_result.blocked: 

350 result = SwarmResult( 

351 topology=self.topology, 

352 mode=ExecutionMode.SMART, 

353 output=f"[BLOCKED] Input rejected by guard: {guard_result.blocked_by}. Reason: {', '.join(guard_result.warnings)}", 

354 completed=False, 

355 ) 

356 return result 

357 if guard_result.final_content != task_str: 

358 task_str = guard_result.final_content # PII-redacted version 

359 

360 # Trace setup 

361 trace = _trace 

362 if trace is None and self.tracer is not None: 

363 trace = ExecutionTrace(task_name=task_str[:80]) 

364 self.tracer.add(trace) 

365 

366 if trace: 

367 root = trace.start_span(TraceEvent.TASK_START, name="smart_execute", data={"task": task_str}) 

368 

369 result = SwarmResult( 

370 topology=self.topology, 

371 mode=ExecutionMode.SMART, 

372 ) 

373 

374 agent_names = self.list_agents() 

375 

376 # Step 0: Load memory context 

377 self.memory.set_task(task_str) 

378 memory_context = self.memory.get_context(query=task_str) if self.memory else "" 

379 

380 # Step 1: Decompose 

381 if trace: 

382 dspan = trace.start_span(TraceEvent.DECOMPOSE, name="decompose") 

383 decomp = self.decomposer.decompose(task_str, agents=agent_names) 

384 result.decomposition = decomp 

385 if trace and dspan: 

386 trace.end_span(dspan.id, status="done", data={"sub_tasks": len(decomp.sub_tasks)}) 

387 

388 # Step 2: Execute sub-tasks in dependency order 

389 sub_outputs: dict[str, dict[str, Any]] = {} 

390 completed: set[str] = set() 

391 

392 for _round in range(self.max_rounds): 

393 ready = [ 

394 st for st in decomp.sub_tasks 

395 if st.status == "pending" 

396 and all(dep in completed for dep in st.depends_on) 

397 ] 

398 if not ready: 

399 break 

400 

401 for st in ready: 

402 st.status = "running" 

403 

404 if trace: 

405 stspan = trace.start_span(TraceEvent.SUBTASK_START, name=st.description[:60], data={"id": st.id}) 

406 

407 # Build context from dependencies 

408 context = task_str 

409 if memory_context: 

410 context = f"{memory_context}\n\n[Current Task]\n{task_str}" 

411 if st.depends_on: 

412 dep_contexts = [] 

413 for dep_id in st.depends_on: 

414 dep_outputs = sub_outputs.get(dep_id, {}) 

415 for name, out in dep_outputs.items(): 

416 dep_contexts.append(f"[{name}]: {str(out)[:300]}") 

417 if dep_contexts: 

418 context = f"{context}\n\nPrevious results:\n" + "\n".join(dep_contexts) 

419 

420 # Execute with all agents on this sub-task 

421 topo_result = await self._execute_raw(context, **metadata) 

422 sub_outputs[st.id] = topo_result.outputs 

423 st.output = topo_result.outputs 

424 st.status = "done" if topo_result.success else "failed" 

425 completed.add(st.id) 

426 

427 # Store sub-task result in memory 

428 self.memory.remember( 

429 content=f"SubTask [{st.description}]: {json.dumps(topo_result.outputs, default=str)[:500]}", 

430 role="assistant", 

431 importance=0.6, 

432 metadata={"subtask_id": st.id, "status": st.status}, 

433 ) 

434 

435 if trace and stspan: 

436 st_status = "done" if st.status == "done" else "failed" 

437 trace.end_span(stspan.id, status=st_status, data={"output_keys": list(topo_result.outputs.keys())}) 

438 

439 # Step 3: Fuse results from final sub-tasks 

440 if trace: 

441 fspan = trace.start_span(TraceEvent.FUSE, name="fuse_results") 

442 

443 final_subtasks = [ 

444 st for st in decomp.sub_tasks 

445 if st.status == "done" and st.id not in { 

446 s.id for s in decomp.sub_tasks 

447 if any(d == st.id for d in s.depends_on) 

448 } 

449 ] 

450 if final_subtasks: 

451 all_final: dict[str, Any] = {} 

452 for st in final_subtasks: 

453 if st.output: 

454 all_final.update(st.output) 

455 if all_final: 

456 fused = self.fusion.fuse(task_str, all_final) 

457 result.fused = fused 

458 result.outputs = all_final 

459 result.success = fused.confidence >= 0.3 

460 

461 if not result.outputs and sub_outputs: 

462 all_outputs: dict[str, Any] = {} 

463 for st_outputs in sub_outputs.values(): 

464 all_outputs.update(st_outputs) 

465 if all_outputs: 

466 fused = self.fusion.fuse(task_str, all_outputs) 

467 result.fused = fused 

468 result.outputs = all_outputs 

469 result.success = fused.confidence >= 0.3 

470 

471 if trace and fspan: 

472 trace.end_span(fspan.id, status="done", data={"confidence": result.fused.confidence if result.fused else 0}) 

473 

474 result.duration = time.time() - start_time 

475 

476 if trace and root: 

477 trace.end_span(root.id, status="done" if result.success else "failed") 

478 

479 # Output guard — filter agent output before returning to user 

480 if result.outputs: 

481 guarded_outputs: dict[str, Any] = {} 

482 for key, value in result.outputs.items(): 

483 output_str = str(value) 

484 output_guard = self.guard.process_output(output_str) 

485 if output_guard.blocked: 

486 guarded_outputs[key] = f"[BLOCKED by guard: {output_guard.blocked_by}]" 

487 elif output_guard.final_content != output_str: 

488 guarded_outputs[key] = output_guard.final_content 

489 else: 

490 guarded_outputs[key] = value 

491 result.outputs = guarded_outputs 

492 

493 return result 

494 

495 async def execute_with_feedback( 

496 self, 

497 task: Any, 

498 expected_output: str = "", 

499 scoring_strategy: str = "general", 

500 retry_config: RetryConfig | None = None, 

501 **metadata, 

502 ) -> SwarmResult: 

503 """Execution with eval-driven feedback loop. 

504 

505 Args: 

506 task: Task description 

507 expected_output: Reference for scoring 

508 scoring_strategy: Scoring strategy (qa/code/summary/translation) 

509 retry_config: Retry configuration 

510 **metadata: Additional metadata 

511 

512 Returns: 

513 SwarmResult with feedback_loop trace 

514 """ 

515 start_time = time.time() 

516 result = SwarmResult( 

517 topology=self.topology, 

518 mode=ExecutionMode.FEEDBACK, 

519 ) 

520 

521 task_str = str(task) 

522 

523 # Build executor that uses smart_execute 

524 async def executor(t: str) -> Any: 

525 r = await self.smart_execute(t, **metadata) 

526 fused = r.fused 

527 if fused and fused.merged: 

528 content = fused.merged 

529 # If it's a dict with agent outputs, stringify 

530 if isinstance(content, dict): 

531 parts = [] 

532 for k, v in content.items(): 

533 if v and not isinstance(v, dict): 

534 parts.append(str(v)) 

535 return "\n".join(parts) if parts else str(content) 

536 return str(content) 

537 return str(r.outputs) 

538 

539 # Wire scorer if available 

540 scorer = None 

541 try: 

542 from agentos.evaluation.scorers import CompositeScorerV2 

543 scorer = CompositeScorerV2() 

544 except Exception: 

545 pass 

546 

547 feedback = EvalFeedbackLoop( 

548 scorer=scorer, 

549 config=retry_config or RetryConfig(max_retries=3), 

550 ) 

551 

552 loop_result = await feedback.run( 

553 task=task_str, 

554 executor=executor, 

555 expected=expected_output, 

556 strategy=scoring_strategy, 

557 ) 

558 

559 result.feedback_loop = loop_result 

560 result.outputs = {"final": str(loop_result.final_output) if loop_result.final_output else ""} 

561 result.success = loop_result.converged 

562 result.duration = time.time() - start_time 

563 return result 

564 

565 # ── Raw topology execution (backward compatible) ────────────── 

566 

567 async def _execute_raw( 

568 self, 

569 task: Any, 

570 **metadata, 

571 ) -> SwarmResult: 

572 """Original topology-only execution.""" 

573 handler = self._topo_handlers.get(self.topology) 

574 if handler is None: 

575 raise ValueError(f"Unknown topology: {self.topology}") 

576 return await handler(task, metadata) 

577 

578 # ── Star Topology ───────────────────────────────────────────── 

579 

580 async def _execute_star( 

581 self, 

582 task: Any, 

583 metadata: dict[str, Any], 

584 ) -> SwarmResult: 

585 result = SwarmResult(topology=SwarmTopology.STAR, mode=ExecutionMode.RAW) 

586 for agent_name, agent in self._agents.items(): 

587 try: 

588 message = SwarmMessage( 

589 sender="coordinator", 

590 receiver=agent_name, 

591 content=task, 

592 metadata=metadata, 

593 ) 

594 result.messages.append(message) 

595 output = await agent.invoke(task, **metadata) 

596 result.outputs[agent_name] = output 

597 response = SwarmMessage( 

598 sender=agent_name, 

599 receiver="coordinator", 

600 content=output, 

601 ) 

602 result.messages.append(response) 

603 except Exception as e: 

604 result.outputs[agent_name] = {"error": str(e)} 

605 result.success = False 

606 return result 

607 

608 # ── Ring Topology ───────────────────────────────────────────── 

609 

610 async def _execute_ring( 

611 self, 

612 task: Any, 

613 metadata: dict[str, Any], 

614 ) -> SwarmResult: 

615 result = SwarmResult(topology=SwarmTopology.RING, mode=ExecutionMode.RAW) 

616 agent_names = list(self._agents.keys()) 

617 if not agent_names: 

618 return result 

619 

620 current_input = task 

621 for i, agent_name in enumerate(agent_names): 

622 agent = self._agents[agent_name] 

623 next_agent = agent_names[(i + 1) % len(agent_names)] 

624 try: 

625 output = await agent.invoke(current_input, **metadata) 

626 result.outputs[agent_name] = output 

627 message = SwarmMessage( 

628 sender=agent_name, 

629 receiver=next_agent, 

630 content=output, 

631 ) 

632 result.messages.append(message) 

633 current_input = output 

634 except Exception as e: 

635 result.outputs[agent_name] = {"error": str(e)} 

636 result.success = False 

637 return result 

638 

639 # ── Mesh Topology ───────────────────────────────────────────── 

640 

641 async def _execute_mesh( 

642 self, 

643 task: Any, 

644 metadata: dict[str, Any], 

645 ) -> SwarmResult: 

646 result = SwarmResult(topology=SwarmTopology.MESH, mode=ExecutionMode.RAW) 

647 tasks_ = [] 

648 for agent_name, agent in self._agents.items(): 

649 tasks_.append(self._execute_agent_mesh(agent, task, metadata, result)) 

650 await asyncio.gather(*tasks_, return_exceptions=True) 

651 for sender_name, output in result.outputs.items(): 

652 for receiver_name in self._agents.keys(): 

653 if sender_name != receiver_name: 

654 message = SwarmMessage( 

655 sender=sender_name, 

656 receiver=receiver_name, 

657 content=output, 

658 ) 

659 result.messages.append(message) 

660 return result 

661 

662 async def _execute_agent_mesh( 

663 self, 

664 agent: Agent[Any, Any], 

665 task: Any, 

666 metadata: dict[str, Any], 

667 result: SwarmResult, 

668 ) -> None: 

669 try: 

670 output = await agent.invoke(task, **metadata) 

671 result.outputs[agent.name] = output 

672 except Exception as e: 

673 result.outputs[agent.name] = {"error": str(e)} 

674 result.success = False 

675 

676 # ── Tree Topology ───────────────────────────────────────────── 

677 

678 async def _execute_tree( 

679 self, 

680 task: Any, 

681 metadata: dict[str, Any], 

682 ) -> SwarmResult: 

683 result = SwarmResult(topology=SwarmTopology.TREE, mode=ExecutionMode.RAW) 

684 agent_names = list(self._agents.keys()) 

685 if not agent_names: 

686 return result 

687 

688 root_name = agent_names[0] 

689 root_agent = self._agents[root_name] 

690 try: 

691 root_output = await root_agent.invoke(task, **metadata) 

692 result.outputs[root_name] = root_output 

693 except Exception as e: 

694 result.outputs[root_name] = {"error": str(e)} 

695 result.success = False 

696 return result 

697 

698 children = agent_names[1:] 

699 for child_name in children: 

700 child_agent = self._agents[child_name] 

701 message = SwarmMessage( 

702 sender=root_name, 

703 receiver=child_name, 

704 content=root_output, 

705 ) 

706 result.messages.append(message) 

707 try: 

708 child_output = await child_agent.invoke(root_output, **metadata) 

709 result.outputs[child_name] = child_output 

710 response = SwarmMessage( 

711 sender=child_name, 

712 receiver=root_name, 

713 content=child_output, 

714 ) 

715 result.messages.append(response) 

716 except Exception as e: 

717 result.outputs[child_name] = {"error": str(e)} 

718 result.success = False 

719 return result 

720 

721 # ── Messaging ───────────────────────────────────────────────── 

722 

723 def send_message( 

724 self, 

725 sender: str, 

726 receiver: Optional[str], 

727 content: Any, 

728 **metadata, 

729 ) -> SwarmMessage: 

730 message = SwarmMessage( 

731 sender=sender, 

732 receiver=receiver, 

733 content=content, 

734 metadata=metadata, 

735 ) 

736 self._message_queue.append(message) 

737 return message 

738 

739 def get_messages( 

740 self, 

741 receiver: Optional[str] = None, 

742 ) -> list[SwarmMessage]: 

743 if receiver: 

744 return [ 

745 m for m in self._message_queue 

746 if m.receiver == receiver or m.receiver is None 

747 ] 

748 return self._message_queue.copy() 

749 

750 def clear_messages(self) -> None: 

751 self._message_queue.clear() 

752 

753 # ── Code Sandbox Execution (v1.9.5) ─────────────────────────── 

754 

755 async def execute_code( 

756 self, 

757 code: str, 

758 func_name: str = "", 

759 test_cases: list[TestCase] | None = None, 

760 setup_code: str = "", 

761 sandbox: CodeSandbox | None = None, 

762 max_retries: int = 3, 

763 code_generator: Callable[[str, list[str]], str] | None = None, 

764 ) -> SandboxResult: 

765 """Execute code in sandbox with test cases and feedback-driven retry. 

766 

767 Supports code generation: if code_generator is provided and initial run 

768 fails, it will use the feedback extractor to guide re-generation. 

769 

770 Args: 

771 code: Code to execute (or initial code if using generator) 

772 func_name: Function name to test 

773 test_cases: Test cases for validation 

774 setup_code: Setup code (imports, fixtures) 

775 sandbox: Custom sandbox instance 

776 max_retries: Max retry attempts with code generation 

777 code_generator: Callable(spec, feedback_suggestions) → new_code 

778 

779 Returns: 

780 SandboxResult with execution details and test outcomes 

781 """ 

782 sb = sandbox or self.sandbox 

783 

784 result = sb.run(code, func_name, test_cases, setup_code) 

785 

786 # If initial run succeeded, we're done 

787 if result.all_passed: 

788 return result 

789 

790 # Feedback-driven retry loop 

791 for attempt in range(1, max_retries + 1): 

792 if not code_generator: 

793 break 

794 

795 suggestions = CodeFeedbackExtractor.extract(result) 

796 if not suggestions: 

797 break 

798 

799 # Generate improved code 

800 spec = f"Function: {func_name}, Test cases: {len(test_cases or [])}" 

801 try: 

802 new_code = code_generator(spec, suggestions) 

803 except Exception: 

804 break 

805 

806 if not new_code or new_code == code: 

807 break 

808 

809 code = new_code 

810 result = sb.run(code, func_name, test_cases, setup_code) 

811 

812 if result.all_passed: 

813 break 

814 

815 if attempt == max_retries: 

816 break # Don't overwrite last result 

817 

818 return result 

819 

820 # ── HITL-Enhanced Execution (v1.9.5) ────────────────────────── 

821 

822 async def smart_execute_with_hitl( 

823 self, 

824 task: Any, 

825 hitl: HITLManager | None = None, 

826 **metadata, 

827 ) -> SwarmResult: 

828 """Smart execution with human-in-the-loop breakpoints. 

829 

830 Same as smart_execute but pauses at configurable checkpoints: 

831 - Before each sub-task (if hitl.break_on_every_task) 

832 - On sub-task failure (if hitl.break_on_failure) 

833 - On low-confidence fusion (if config threshold met) 

834 

835 Args: 

836 task: Task description 

837 hitl: HITLManager instance (uses self.hitl if None) 

838 **metadata: Additional metadata 

839 

840 Returns: 

841 SwarmResult with fused output 

842 """ 

843 hitl_mgr = hitl or self.hitl 

844 start_time = time.time() 

845 result = SwarmResult( 

846 topology=self.topology, 

847 mode=ExecutionMode.SMART, 

848 ) 

849 

850 task_str = str(task) 

851 agent_names = self.list_agents() 

852 

853 # Step 1: Decompose 

854 decomp = self.decomposer.decompose(task_str, agents=agent_names) 

855 result.decomposition = decomp 

856 

857 # Step 2: Execute sub-tasks with HITL gates 

858 sub_outputs: dict[str, dict[str, Any]] = {} 

859 completed: set[str] = set() 

860 aborted = False 

861 

862 for _round in range(self.max_rounds): 

863 if aborted: 

864 break 

865 

866 ready = [ 

867 st for st in decomp.sub_tasks 

868 if st.status == "pending" 

869 and all(dep in completed for dep in st.depends_on) 

870 ] 

871 if not ready: 

872 break 

873 

874 for st in ready: 

875 # HITL: check before executing sub-task 

876 if hitl_mgr.config.break_on_every_task: 

877 decision, feedback = await hitl_mgr.request_decision( 

878 bp_type=BreakpointType.BEFORE_TASK, 

879 task_id=st.id, 

880 message=f"Execute sub-task: {st.description}?", 

881 context={"task": task_str, "sub_task": st.description}, 

882 options=["approve", "abort", "modify"], 

883 ) 

884 if decision == HumanDecision.ABORT: 

885 aborted = True 

886 break 

887 if decision == HumanDecision.MODIFY and feedback: 

888 st.description = f"{st.description} [modified: {feedback}]" 

889 

890 st.status = "running" 

891 

892 # Build context from dependencies 

893 context = task_str 

894 if st.depends_on: 

895 dep_contexts = [] 

896 for dep_id in st.depends_on: 

897 dep_outputs = sub_outputs.get(dep_id, {}) 

898 for name, out in dep_outputs.items(): 

899 dep_contexts.append(f"[{name}]: {str(out)[:300]}") 

900 if dep_contexts: 

901 context = f"{task_str}\n\nPrevious results:\n" + "\n".join(dep_contexts) 

902 

903 # Execute 

904 topo_result = await self._execute_raw(context, **metadata) 

905 sub_outputs[st.id] = topo_result.outputs 

906 st.output = topo_result.outputs 

907 st.status = "done" if topo_result.success else "failed" 

908 completed.add(st.id) 

909 

910 # HITL: check on failure 

911 if not topo_result.success: 

912 decision, feedback = await hitl_mgr.should_break_on_failure( 

913 task_id=st.id, 

914 error=topo_result.error or "Unknown error", 

915 attempt=1, 

916 ) 

917 if decision == HumanDecision.ABORT: 

918 aborted = True 

919 break 

920 if decision == HumanDecision.MODIFY and feedback: 

921 st.description = f"{st.description} [retry with: {feedback}]" 

922 st.status = "pending" # Re-queue for retry 

923 completed.discard(st.id) 

924 del sub_outputs[st.id] 

925 

926 if aborted: 

927 result.success = False 

928 result.error = "Aborted by human" 

929 return result 

930 

931 # Step 3: Fuse results 

932 final_subtasks = [ 

933 st for st in decomp.sub_tasks 

934 if st.status == "done" and st.id not in { 

935 s.id for s in decomp.sub_tasks 

936 if any(d == st.id for d in s.depends_on) 

937 } 

938 ] 

939 if final_subtasks: 

940 all_final: dict[str, Any] = {} 

941 for st in final_subtasks: 

942 if st.output: 

943 all_final.update(st.output) 

944 if all_final: 

945 fused = self.fusion.fuse(task_str, all_final) 

946 result.fused = fused 

947 result.outputs = all_final 

948 

949 # HITL: check low confidence 

950 if fused.confidence < hitl_mgr.config.break_on_low_confidence: 

951 decision, feedback = await hitl_mgr.should_break_on_result( 

952 task_id="final", 

953 output=all_final, 

954 confidence=fused.confidence, 

955 ) 

956 if decision == HumanDecision.ABORT: 

957 result.success = False 

958 result.error = "Aborted by human at final result" 

959 return result 

960 if decision == HumanDecision.REJECT: 

961 result.success = False 

962 result.error = f"Rejected: {feedback}" 

963 return result 

964 

965 result.success = fused.confidence >= 0.3 

966 

967 if not result.outputs and sub_outputs: 

968 all_outputs: dict[str, Any] = {} 

969 for st_outputs in sub_outputs.values(): 

970 all_outputs.update(st_outputs) 

971 if all_outputs: 

972 fused = self.fusion.fuse(task_str, all_outputs) 

973 result.fused = fused 

974 result.outputs = all_outputs 

975 result.success = fused.confidence >= 0.3 

976 

977 result.duration = time.time() - start_time 

978 return result 

979 

980 # ── Monitored Execution (v1.9.6) ───────────────────────────── 

981 

982 async def monitor_execute( 

983 self, 

984 task: Any, 

985 quality_gates: list[QualityGate] | None = None, 

986 fallback_fn: Callable[[], Any] | None = None, 

987 **metadata, 

988 ) -> tuple[Any, MonitorReport]: 

989 """Execute with automatic quality gating. 

990 

991 Runs smart_execute through the AgentMonitor pipeline. If gates fail, 

992 automatically retries or falls back based on gate configuration. 

993 

994 Args: 

995 task: Task description 

996 quality_gates: Custom quality gates (uses monitor defaults if None) 

997 fallback_fn: Fallback function if all gates fail 

998 **metadata: Additional metadata 

999 

1000 Returns: 

1001 Tuple of (final_output, MonitorReport) 

1002 """ 

1003 # Configure monitor with custom gates if provided 

1004 monitor = self.monitor 

1005 if quality_gates: 

1006 monitor = AgentMonitor( 

1007 max_retries=self.monitor.max_retries, 

1008 default_fallback=self.monitor.default_fallback, 

1009 ) 

1010 monitor.add_gates(quality_gates) 

1011 elif not self.monitor._gates: 

1012 # Default gates if none configured 

1013 monitor = AgentMonitor() 

1014 monitor.add_gates([ 

1015 output_not_empty(), 

1016 no_error_output(), 

1017 ]) 

1018 

1019 # Track latency for latency gates 

1020 start = time.time() 

1021 

1022 async def execute_fn() -> Any: 

1023 result = await self.smart_execute(task, **metadata) 

1024 fused = result.fused 

1025 if fused and fused.merged: 

1026 return fused.merged 

1027 return result.outputs 

1028 

1029 output, report = await monitor.monitor_execution( 

1030 task_fn=execute_fn, 

1031 task_name=str(task)[:80], 

1032 context={"_latency_ms": 0}, 

1033 fallback_fn=fallback_fn, 

1034 ) 

1035 

1036 # Inject actual latency 

1037 elapsed = (time.time() - start) * 1000 

1038 for gate in report.gates: 

1039 gate.data["_latency_ms"] = elapsed 

1040 

1041 return output, report 

1042 

1043 # ── Tool Registry Convenience Methods ───────────────────────── 

1044 

1045 def register_tool( 

1046 self, 

1047 name: str, 

1048 description: str, 

1049 handler: Callable, 

1050 category: ToolCategory = ToolCategory.CUSTOM, 

1051 params: list[ToolParam] | None = None, 

1052 capabilities: list[str] | None = None, 

1053 tags: list[str] | None = None, 

1054 is_destructive: bool = False, 

1055 rate_limit: int = 0, 

1056 **kwargs, 

1057 ) -> ToolSchema: 

1058 """Register a tool in the coordinator's tool registry.""" 

1059 tool = create_tool( 

1060 name=name, description=description, handler=handler, 

1061 category=category, params=params or [], 

1062 capabilities=capabilities or [], tags=tags or [], 

1063 is_destructive=is_destructive, rate_limit=rate_limit, **kwargs, 

1064 ) 

1065 return self.tool_registry.register(tool) 

1066 

1067 def find_tool(self, query: str, top_k: int = 5) -> list[tuple[ToolSchema, float]]: 

1068 """Search for tools matching a natural language query.""" 

1069 return self.tool_registry.search(query, top_k=top_k) 

1070 

1071 def route_tool(self, task: str, **ctx_kwargs) -> RoutingDecision: 

1072 """Route a task to the best matching tool.""" 

1073 context = RoutingContext(task=task, **ctx_kwargs) 

1074 return self.tool_router.route(context) 

1075 

1076 def execute_tool(self, tool_name: str, params: dict[str, Any] | None = None, force: bool = False) -> Any: 

1077 """Execute a registered tool safely.""" 

1078 return self.tool_executor.execute(tool_name, params, force=force) 

1079 

1080 

1081# ── Backward-compatible alias ───────────────────────────────────── 

1082SwarmCoordinator = SmartSwarmCoordinator