Coverage for agentos/swarm/coordinator.py: 25%

652 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 18:40 +0800

1""" 

2v1.9.8: Smart Swarm Coordinator with tool registry + intelligent routing. 

3 

4Full intelligence stack: 

5- TaskDecomposer: decompose complex tasks into sub-task DAGs 

6- ResultFusion: LLM-as-Judge aggregation with confidence scoring 

7- EvalFeedbackLoop: execute → evaluate → retry → converge 

8- CodeSandbox: safe code generation & execution with test cases 

9- HumanLoop: human-in-the-loop breakpoints for approval/intervention 

10- AgentMonitor: quality gates + self-monitoring pipeline 

11- ExecutionTrace: full span-tree observability + bottleneck detection 

12- AgentMemory: three-tier memory (working/short-term/long-term) + context window 

13- ToolRegistry: schema-based tool catalog with versioning, capabilities, search 

14- ToolRouter: intelligent tool selection with LLM + semantic matching 

15- ToolExecutor: safe tool execution with validation, rate limiting, destructive confirmation 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import time 

22import uuid 

23from collections import defaultdict 

24from dataclasses import dataclass, field 

25from enum import Enum 

26from typing import Any, Callable, Optional, Awaitable 

27 

28from agentos.core.di import Agent, RunContext 

29from agentos.swarm.task_decomposer import TaskDecomposer, Decomposition, SubTask 

30from agentos.swarm.result_fusion import ResultFusion, FusedResult 

31from agentos.swarm.eval_feedback_loop import EvalFeedbackLoop, LoopResult, RetryConfig 

32from agentos.swarm.code_sandbox import CodeSandbox, SandboxResult, TestCase, CodeFeedbackExtractor 

33from agentos.swarm.human_loop import ( 

34 HITLManager, HITLConfig, Breakpoint, BreakpointType, HumanDecision, 

35) 

36from agentos.swarm.agent_monitor import ( 

37 AgentMonitor, QualityGate, MonitorReport, GateResult, GateStatus, GateAction, 

38 output_not_empty, output_length_range, no_error_output, contains_keywords, 

39 latency_max, confidence_min, 

40) 

41from agentos.swarm.execution_trace import ( 

42 ExecutionTrace, TraceSpan, TraceEvent, TraceCollector, 

43) 

44from agentos.swarm.agent_memory import ( 

45 AgentMemory, WorkingMemory, ShortTermMemory, LongTermMemory, 

46 ContextWindowManager, ContextBudget, MemoryEntry, 

47) 

48from agentos.swarm.tool_registry import ( 

49 ToolRegistry, ToolRouter, ToolExecutor, ToolSchema, ToolParam, 

50 ToolCategory, RoutingDecision, RoutingContext, ToolExecutionError, 

51 create_tool, 

52) 

53from agentos.security.guard import ( 

54 GuardPipeline, InputGuard, OutputGuard, 

55 PIIDetector, ContentSafetyFilter, 

56 create_strict_guard, create_permissive_guard, 

57 GuardChainResult, 

58) 

59 

60 

61class SwarmTopology(str, Enum): 

62 """Swarm topology types.""" 

63 STAR = "star" # Central coordinator 

64 RING = "ring" # Circular message passing 

65 MESH = "mesh" # All-to-all communication 

66 TREE = "tree" # Hierarchical structure 

67 DAG = "dag" # Workflow-based dependencies 

68 HYBRID = "hybrid" # Dynamic topology switching 

69 

70 

71class ExecutionMode(str, Enum): 

72 """Execution strategy for the coordinator.""" 

73 RAW = "raw" # Original topology-only execution 

74 SMART = "smart" # Decompose → Execute DAG → Fuse 

75 FEEDBACK = "feedback" # Smart + eval feedback loop 

76 

77 

78@dataclass 

79class AgentRole: 

80 """Agent 角色定义。""" 

81 name: str 

82 goal: str 

83 backstory: str = "" 

84 tools: list[str] = field(default_factory=list) 

85 model: str = "auto" 

86 temperature: float = 0.7 

87 allow_delegation: bool = True 

88 verbose: bool = False 

89 

90 

91class MessageBus: 

92 """Agent 间消息总线 — 黑板模式。""" 

93 

94 def __init__(self): 

95 self._messages: list[dict] = [] 

96 self._subscribers: dict[str, list[Callable]] = {} 

97 self._shared_memory: dict[str, Any] = {} 

98 

99 def publish(self, sender: str, topic: str, data: dict): 

100 msg = {"sender": sender, "topic": topic, "data": data} 

101 self._messages.append(msg) 

102 if topic in self._subscribers: 

103 for cb in self._subscribers[topic]: 

104 cb(msg) 

105 

106 def subscribe(self, topic: str, callback: Callable): 

107 self._subscribers.setdefault(topic, []).append(callback) 

108 

109 @property 

110 def messages(self) -> list[dict]: 

111 return self._messages 

112 

113 @property 

114 def shared_memory(self) -> dict[str, Any]: 

115 return self._shared_memory 

116 

117 

118@dataclass 

119class SwarmMessage: 

120 """ 

121 Message in swarm communication. 

122 

123 Attributes: 

124 id: Unique identifier 

125 sender: Sender agent name 

126 receiver: Receiver agent name (None = broadcast) 

127 content: Message content 

128 metadata: Additional metadata 

129 timestamp: Message timestamp 

130 """ 

131 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

132 sender: str = "" 

133 receiver: Optional[str] = None 

134 content: Any = None 

135 metadata: dict[str, Any] = field(default_factory=dict) 

136 timestamp: float = field(default_factory=time.time) 

137 

138 def to_dict(self) -> dict[str, Any]: 

139 """Convert to dict.""" 

140 return { 

141 "id": self.id, 

142 "sender": self.sender, 

143 "receiver": self.receiver, 

144 "content": self.content, 

145 "metadata": self.metadata, 

146 "timestamp": self.timestamp, 

147 } 

148 

149 

150@dataclass 

151class SwarmResult: 

152 """ 

153 Result of swarm execution. 

154 

155 Attributes: 

156 id: Unique identifier 

157 topology: Swarm topology 

158 mode: Execution mode used 

159 outputs: Agent outputs 

160 messages: Communication messages 

161 duration: Execution duration 

162 success: Whether execution succeeded 

163 fused: ResultFusion output (smart mode only) 

164 decomposition: Task decomposition used (smart mode only) 

165 feedback_loop: Feedback loop result (feedback mode only) 

166 """ 

167 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

168 topology: SwarmTopology = SwarmTopology.STAR 

169 mode: ExecutionMode = ExecutionMode.RAW 

170 outputs: dict[str, Any] = field(default_factory=dict) 

171 messages: list[SwarmMessage] = field(default_factory=list) 

172 duration: float = 0.0 

173 success: bool = True 

174 fused: Optional[FusedResult] = None 

175 decomposition: Optional[Decomposition] = None 

176 feedback_loop: Optional[LoopResult] = None 

177 

178 def to_dict(self) -> dict[str, Any]: 

179 """Convert to dict.""" 

180 d: dict[str, Any] = { 

181 "id": self.id, 

182 "topology": self.topology.value, 

183 "mode": self.mode.value, 

184 "outputs": self.outputs, 

185 "messages": [m.to_dict() for m in self.messages], 

186 "duration": f"{self.duration:.2f}s", 

187 "success": self.success, 

188 } 

189 if self.fused: 

190 d["fused"] = { 

191 "action": self.fused.action, 

192 "confidence": self.fused.confidence, 

193 "reason": self.fused.reason, 

194 } 

195 if self.decomposition: 

196 d["decomposition"] = { 

197 "sub_tasks": [st.to_dict() for st in self.decomposition.sub_tasks], 

198 "total_steps": self.decomposition.total_steps, 

199 } 

200 if self.feedback_loop: 

201 d["feedback_loop"] = { 

202 "attempts": self.feedback_loop.attempts, 

203 "best_score": self.feedback_loop.best_score, 

204 "converged": self.feedback_loop.converged, 

205 "duration": f"{self.feedback_loop.duration:.2f}s", 

206 } 

207 return d 

208 

209 

210# ── Swarm Agent Role Enum (v1.16.2, migrated from orchestration/swarm_coordinator.py) ─ 

211 

212class SwarmAgentRole(str, Enum): 

213 """Role of an agent within a swarm (enum-based, distinct from AgentRole dataclass).""" 

214 COORDINATOR = "coordinator" 

215 WORKER = "worker" 

216 REVIEWER = "reviewer" 

217 OBSERVER = "observer" 

218 SPECIALIST = "specialist" 

219 

220 

221class TaskPriority(str, Enum): 

222 """Priority level for swarm tasks.""" 

223 CRITICAL = "critical" 

224 HIGH = "high" 

225 MEDIUM = "medium" 

226 LOW = "low" 

227 

228 

229class TaskStatus(str, Enum): 

230 """Execution status of a swarm task.""" 

231 PENDING = "pending" 

232 ASSIGNED = "assigned" 

233 RUNNING = "running" 

234 COMPLETED = "completed" 

235 FAILED = "failed" 

236 CANCELED = "canceled" 

237 

238 

239@dataclass 

240class SwarmAgentInfo: 

241 """Metadata about a swarm agent (v1.16.2, migrated from orchestration).""" 

242 agent_id: str 

243 role: SwarmAgentRole 

244 capabilities: list[str] = field(default_factory=list) 

245 model: str = "" 

246 max_concurrency: int = 3 

247 current_load: int = 0 

248 is_alive: bool = True 

249 last_heartbeat: float = 0.0 

250 metadata: dict[str, Any] = field(default_factory=dict) 

251 

252 @property 

253 def is_available(self) -> bool: 

254 return self.is_alive and self.current_load < self.max_concurrency 

255 

256 

257@dataclass 

258class SwarmTask: 

259 """A task to be executed by the swarm (v1.16.2, migrated from orchestration).""" 

260 task_id: str 

261 description: str 

262 priority: TaskPriority = TaskPriority.MEDIUM 

263 status: TaskStatus = TaskStatus.PENDING 

264 assigned_to: str = "" 

265 required_capabilities: list[str] = field(default_factory=list) 

266 parent_task_id: str = "" 

267 dependencies: list[str] = field(default_factory=list) 

268 result: Any = None 

269 error: str = "" 

270 started_at: float = 0.0 

271 completed_at: float = 0.0 

272 retry_count: int = 0 

273 max_retries: int = 3 

274 metadata: dict[str, Any] = field(default_factory=dict) 

275 

276 @property 

277 def is_ready(self) -> bool: 

278 return self.status == TaskStatus.PENDING and not self.dependencies 

279 

280 @property 

281 def duration_ms(self) -> float: 

282 if self.completed_at and self.started_at: 

283 return (self.completed_at - self.started_at) * 1000 

284 return 0.0 

285 

286 

287# ── Dynamic Task Allocator ────────────────────────────────────── 

288 

289class TaskAllocator: 

290 """Workload-aware dynamic task allocation (v1.16.2, migrated from orchestration). 

291 

292 Considers: capabilities, load, priority, affinity. 

293 """ 

294 

295 def __init__(self): 

296 self._assignments: dict[str, str] = {} 

297 

298 def allocate( 

299 self, 

300 task: SwarmTask, 

301 agents: list[SwarmAgentInfo], 

302 ) -> Optional[str]: 

303 available = [a for a in agents if a.is_available] 

304 if not available: 

305 return None 

306 

307 scored: list[tuple[SwarmAgentInfo, float]] = [] 

308 for agent in available: 

309 score = 0.0 

310 

311 if task.required_capabilities: 

312 match = len(set(task.required_capabilities) & set(agent.capabilities)) 

313 total = len(task.required_capabilities) 

314 score += (match / total) * 50 if total > 0 else 25 

315 

316 score -= agent.current_load * 10 

317 

318 if agent.role == SwarmAgentRole.SPECIALIST: 

319 if any(cap in agent.capabilities for cap in task.required_capabilities): 

320 score += 20 

321 

322 if task.parent_task_id and self._assignments.get(task.parent_task_id) == agent.agent_id: 

323 score += 15 

324 

325 scored.append((agent, score)) 

326 

327 scored.sort(key=lambda x: x[1], reverse=True) 

328 

329 if scored and scored[0][1] > 0: 

330 best = scored[0][0] 

331 self._assignments[task.task_id] = best.agent_id 

332 return best.agent_id 

333 

334 return available[0].agent_id if available else None 

335 

336 

337# ── Conflict Resolver ─────────────────────────────────────────── 

338 

339class ConflictType(str, Enum): 

340 """Type of conflict between agent outputs.""" 

341 FACTUAL = "factual" 

342 METHODOLOGICAL = "methodological" 

343 OUTPUT = "output" 

344 RESOURCE = "resource" 

345 

346 

347class ConflictResolver: 

348 """Detect and resolve conflicts between agents (v1.16.2, migrated from orchestration). 

349 

350 Strategies: majority vote, weighted vote, ranked choice, escalation. 

351 """ 

352 

353 def __init__(self): 

354 self._conflict_log: list[dict] = [] 

355 

356 def detect_conflict( 

357 self, 

358 agent_outputs: dict[str, Any], 

359 expected_type: str = "text", 

360 ) -> list[dict]: 

361 conflicts = [] 

362 agents = list(agent_outputs.keys()) 

363 if len(agents) < 2: 

364 return conflicts 

365 

366 outputs = list(agent_outputs.values()) 

367 

368 if all(isinstance(o, str) for o in outputs): 

369 for i in range(len(outputs)): 

370 for j in range(i + 1, len(outputs)): 

371 similarity = self._text_similarity(outputs[i], outputs[j]) 

372 if similarity < 0.3: 

373 conflicts.append({ 

374 "type": ConflictType.OUTPUT.value, 

375 "agents": [agents[i], agents[j]], 

376 "similarity": similarity, 

377 "outputs": {agents[i]: outputs[i][:200], agents[j]: outputs[j][:200]}, 

378 }) 

379 

380 elif all(isinstance(o, (int, float)) for o in outputs): 

381 values = outputs 

382 mean_val = sum(values) / len(values) 

383 for i, val in enumerate(values): 

384 if abs(val - mean_val) / max(abs(mean_val), 1) > 0.5: 

385 conflicts.append({ 

386 "type": ConflictType.FACTUAL.value, 

387 "agents": [agents[i]], 

388 "value": val, 

389 "mean": mean_val, 

390 "deviation": abs(val - mean_val) / max(abs(mean_val), 1), 

391 }) 

392 

393 return conflicts 

394 

395 def resolve( 

396 self, 

397 agent_outputs: dict[str, Any], 

398 weights: dict[str, float] | None = None, 

399 strategy: str = "majority", 

400 expected_type: str = "text", 

401 ) -> dict[str, Any]: 

402 if len(agent_outputs) == 1: 

403 agent_id = list(agent_outputs.keys())[0] 

404 return {"output": agent_outputs[agent_id], "method": "single_agent", "conflict": False} 

405 

406 outputs = list(agent_outputs.values()) 

407 

408 if all(isinstance(o, str) for o in outputs): 

409 return self._resolve_text(agent_outputs, weights, strategy) 

410 elif all(isinstance(o, (int, float)) for o in outputs): 

411 return self._resolve_numeric(agent_outputs, weights, strategy) 

412 else: 

413 return {"output": outputs[0], "method": "first", "conflict": True} 

414 

415 def _resolve_text(self, outputs: dict[str, str], weights: dict[str, float] | None, strategy: str) -> dict: 

416 if strategy == "majority": 

417 votes: dict[str, list[str]] = defaultdict(list) 

418 agent_ids = list(outputs.keys()) 

419 for i, a1 in enumerate(agent_ids): 

420 best_match = a1 

421 best_sim = 0 

422 for j, a2 in enumerate(agent_ids): 

423 if i == j: 

424 continue 

425 sim = self._text_similarity(outputs[a1], outputs[a2]) 

426 if sim > best_sim: 

427 best_sim = sim 

428 best_match = a2 

429 key = outputs[best_match][:50] 

430 votes[key].append(a1) 

431 

432 winning_key = max(votes, key=lambda k: len(votes[k])) 

433 winning_agent = votes[winning_key][0] 

434 return { 

435 "output": outputs[winning_agent], 

436 "method": "majority", 

437 "votes": {k: len(v) for k, v in votes.items()}, 

438 "conflict": len(votes) > 1, 

439 } 

440 

441 elif strategy == "weighted": 

442 if not weights: 

443 return self._resolve_text(outputs, weights, "majority") 

444 best_agent = max(weights, key=weights.get) 

445 return {"output": outputs.get(best_agent, list(outputs.values())[0]), "method": "weighted", "conflict": False} 

446 

447 else: 

448 return {"output": list(outputs.values())[0], "method": "first", "conflict": False} 

449 

450 def _resolve_numeric(self, outputs: dict[str, float], weights: dict[str, float] | None, strategy: str) -> dict: 

451 values = list(outputs.values()) 

452 agents = list(outputs.keys()) 

453 

454 if strategy == "weighted" and weights: 

455 total_weight = sum(weights.get(a, 1.0) for a in agents) 

456 weighted = sum(weights.get(a, 1.0) * outputs[a] for a in agents) / total_weight 

457 return {"output": weighted, "method": "weighted_average", "conflict": False} 

458 else: 

459 avg = sum(values) / len(values) 

460 return {"output": avg, "method": "average", "conflict": False} 

461 

462 def _text_similarity(self, a: str, b: str) -> float: 

463 if a == b: 

464 return 1.0 

465 tokens_a = set(a.lower().split()) 

466 tokens_b = set(b.lower().split()) 

467 if not tokens_a or not tokens_b: 

468 return 0.0 

469 intersection = tokens_a & tokens_b 

470 union = tokens_a | tokens_b 

471 return len(intersection) / len(union) if union else 0.0 

472 

473 

474class SmartSwarmCoordinator: 

475 """ 

476 v1.9.4: Multi-agent coordination with intelligent orchestration. 

477 

478 Upgrades the coordinator with: 

479 - TaskDecomposer: LLM-driven sub-task DAG decomposition 

480 - ResultFusion: LLM-as-Judge aggregation with confidence scoring 

481 - EvalFeedbackLoop: execute → evaluate → retry → converge 

482 

483 Usage: 

484 coordinator = SmartSwarmCoordinator(topology=SwarmTopology.MESH) 

485 coordinator.register(agent1) 

486 coordinator.register(agent2) 

487 

488 # Smart mode with decomposition + fusion 

489 result = await coordinator.smart_execute("complex research task") 

490 

491 # Feedback mode with evaluation retry loop 

492 result = await coordinator.execute_with_feedback( 

493 task, expected_output, scorer 

494 ) 

495 """ 

496 

497 def __init__( 

498 self, 

499 topology: SwarmTopology = SwarmTopology.STAR, 

500 max_rounds: int = 10, 

501 execution_mode: ExecutionMode = ExecutionMode.SMART, 

502 decomposer: TaskDecomposer | None = None, 

503 fusion: ResultFusion | None = None, 

504 feedback_loop: EvalFeedbackLoop | None = None, 

505 sandbox: CodeSandbox | None = None, 

506 hitl_manager: HITLManager | None = None, 

507 monitor: AgentMonitor | None = None, 

508 trace_collector: TraceCollector | None = None, 

509 memory: AgentMemory | None = None, 

510 tool_registry: ToolRegistry | None = None, 

511 tool_router: ToolRouter | None = None, 

512 tool_executor: ToolExecutor | None = None, 

513 guard: GuardPipeline | None = None, 

514 ): 

515 """ 

516 Initialize smart swarm coordinator. 

517 

518 Args: 

519 topology: Swarm topology 

520 max_rounds: Maximum communication rounds 

521 execution_mode: Default execution mode 

522 decomposer: TaskDecomposer instance (created if None) 

523 fusion: ResultFusion instance (created if None) 

524 feedback_loop: EvalFeedbackLoop instance (created if None) 

525 sandbox: CodeSandbox instance for code execution (created if None) 

526 hitl_manager: HITLManager for human-in-the-loop (created if None) 

527 monitor: AgentMonitor for quality gating (created if None) 

528 trace_collector: TraceCollector for execution traces (created if None) 

529 memory: AgentMemory for layered memory (created if None) 

530 tool_registry: ToolRegistry for tool catalog (created if None) 

531 tool_router: ToolRouter for intelligent tool selection (created if None) 

532 tool_executor: ToolExecutor for safe tool execution (created if None) 

533 guard: GuardPipeline for input/output safety filtering (created if None) 

534 """ 

535 self.topology = topology 

536 self.max_rounds = max_rounds 

537 self.execution_mode = execution_mode 

538 self._agents: dict[str, Agent[Any, Any]] = {} 

539 self._message_queue: list[SwarmMessage] = [] 

540 

541 self.decomposer = decomposer or TaskDecomposer() 

542 self.fusion = fusion or ResultFusion() 

543 self.feedback = feedback_loop or EvalFeedbackLoop() 

544 self.sandbox = sandbox or CodeSandbox() 

545 self.hitl = hitl_manager or HITLManager() 

546 self.monitor = monitor or AgentMonitor() 

547 self.tracer = trace_collector or TraceCollector() 

548 self.memory = memory or AgentMemory() 

549 self.tool_registry = tool_registry or ToolRegistry() 

550 self.tool_router = tool_router or ToolRouter(self.tool_registry) 

551 self.tool_executor = tool_executor or ToolExecutor(self.tool_registry) 

552 self.guard = guard or create_strict_guard() 

553 

554 # Original topology methods bound for backward compatibility 

555 self._topo_handlers = { 

556 SwarmTopology.STAR: self._execute_star, 

557 SwarmTopology.RING: self._execute_ring, 

558 SwarmTopology.MESH: self._execute_mesh, 

559 SwarmTopology.TREE: self._execute_tree, 

560 } 

561 

562 # ── Agent management ────────────────────────────────────────── 

563 

564 def register(self, agent: Agent[Any, Any]) -> None: 

565 self._agents[agent.name] = agent 

566 

567 def unregister(self, agent_name: str) -> bool: 

568 if agent_name in self._agents: 

569 del self._agents[agent_name] 

570 return True 

571 return False 

572 

573 def get_agent(self, agent_name: str) -> Optional[Agent[Any, Any]]: 

574 return self._agents.get(agent_name) 

575 

576 def list_agents(self) -> list[str]: 

577 return list(self._agents.keys()) 

578 

579 # ── Execution API ───────────────────────────────────────────── 

580 

581 async def execute( 

582 self, 

583 task: Any, 

584 mode: ExecutionMode | None = None, 

585 **metadata, 

586 ) -> SwarmResult: 

587 """Execute a task. Delegates to smart_execute or raw topology.""" 

588 mode = mode or self.execution_mode 

589 if mode == ExecutionMode.SMART: 

590 return await self.smart_execute(task, **metadata) 

591 return await self._execute_raw(task, **metadata) 

592 

593 async def smart_execute( 

594 self, 

595 task: Any, 

596 _trace: ExecutionTrace | None = None, 

597 **metadata, 

598 ) -> SwarmResult: 

599 """Smart execution: decompose → execute DAG → fuse. 

600 

601 Uses ExecutionTrace for observability when tracer is available. 

602 

603 Args: 

604 task: Task description (string or structured) 

605 _trace: Optional trace to attach (auto-created if self.tracer exists) 

606 **metadata: Additional metadata 

607 

608 Returns: 

609 SwarmResult with fused output and decomposition trace 

610 """ 

611 start_time = time.time() 

612 task_str = str(task) 

613 

614 # Step 0: Security guard — input filtering 

615 guard_result = self.guard.process_input(task_str) 

616 if guard_result.blocked: 

617 result = SwarmResult( 

618 topology=self.topology, 

619 mode=ExecutionMode.SMART, 

620 output=f"[BLOCKED] Input rejected by guard: {guard_result.blocked_by}. Reason: {', '.join(guard_result.warnings)}", 

621 completed=False, 

622 ) 

623 return result 

624 if guard_result.final_content != task_str: 

625 task_str = guard_result.final_content # PII-redacted version 

626 

627 # Trace setup 

628 trace = _trace 

629 if trace is None and self.tracer is not None: 

630 trace = ExecutionTrace(task_name=task_str[:80]) 

631 self.tracer.add(trace) 

632 

633 if trace: 

634 root = trace.start_span(TraceEvent.TASK_START, name="smart_execute", data={"task": task_str}) 

635 

636 result = SwarmResult( 

637 topology=self.topology, 

638 mode=ExecutionMode.SMART, 

639 ) 

640 

641 agent_names = self.list_agents() 

642 

643 # Step 0: Load memory context 

644 self.memory.set_task(task_str) 

645 memory_context = self.memory.get_context(query=task_str) if self.memory else "" 

646 

647 # Step 1: Decompose 

648 if trace: 

649 dspan = trace.start_span(TraceEvent.DECOMPOSE, name="decompose") 

650 decomp = self.decomposer.decompose(task_str, agents=agent_names) 

651 result.decomposition = decomp 

652 if trace and dspan: 

653 trace.end_span(dspan.id, status="done", data={"sub_tasks": len(decomp.sub_tasks)}) 

654 

655 # Step 2: Execute sub-tasks in dependency order 

656 sub_outputs: dict[str, dict[str, Any]] = {} 

657 completed: set[str] = set() 

658 

659 for _round in range(self.max_rounds): 

660 ready = [ 

661 st for st in decomp.sub_tasks 

662 if st.status == "pending" 

663 and all(dep in completed for dep in st.depends_on) 

664 ] 

665 if not ready: 

666 break 

667 

668 for st in ready: 

669 st.status = "running" 

670 

671 if trace: 

672 stspan = trace.start_span(TraceEvent.SUBTASK_START, name=st.description[:60], data={"id": st.id}) 

673 

674 # Build context from dependencies 

675 context = task_str 

676 if memory_context: 

677 context = f"{memory_context}\n\n[Current Task]\n{task_str}" 

678 if st.depends_on: 

679 dep_contexts = [] 

680 for dep_id in st.depends_on: 

681 dep_outputs = sub_outputs.get(dep_id, {}) 

682 for name, out in dep_outputs.items(): 

683 dep_contexts.append(f"[{name}]: {str(out)[:300]}") 

684 if dep_contexts: 

685 context = f"{context}\n\nPrevious results:\n" + "\n".join(dep_contexts) 

686 

687 # Execute with all agents on this sub-task 

688 topo_result = await self._execute_raw(context, **metadata) 

689 sub_outputs[st.id] = topo_result.outputs 

690 st.output = topo_result.outputs 

691 st.status = "done" if topo_result.success else "failed" 

692 completed.add(st.id) 

693 

694 # Store sub-task result in memory 

695 self.memory.remember( 

696 content=f"SubTask [{st.description}]: {json.dumps(topo_result.outputs, default=str)[:500]}", 

697 role="assistant", 

698 importance=0.6, 

699 metadata={"subtask_id": st.id, "status": st.status}, 

700 ) 

701 

702 if trace and stspan: 

703 st_status = "done" if st.status == "done" else "failed" 

704 trace.end_span(stspan.id, status=st_status, data={"output_keys": list(topo_result.outputs.keys())}) 

705 

706 # Step 3: Fuse results from final sub-tasks 

707 if trace: 

708 fspan = trace.start_span(TraceEvent.FUSE, name="fuse_results") 

709 

710 final_subtasks = [ 

711 st for st in decomp.sub_tasks 

712 if st.status == "done" and st.id not in { 

713 s.id for s in decomp.sub_tasks 

714 if any(d == st.id for d in s.depends_on) 

715 } 

716 ] 

717 if final_subtasks: 

718 all_final: dict[str, Any] = {} 

719 for st in final_subtasks: 

720 if st.output: 

721 all_final.update(st.output) 

722 if all_final: 

723 fused = self.fusion.fuse(task_str, all_final) 

724 result.fused = fused 

725 result.outputs = all_final 

726 result.success = fused.confidence >= 0.3 

727 

728 if not result.outputs and sub_outputs: 

729 all_outputs: dict[str, Any] = {} 

730 for st_outputs in sub_outputs.values(): 

731 all_outputs.update(st_outputs) 

732 if all_outputs: 

733 fused = self.fusion.fuse(task_str, all_outputs) 

734 result.fused = fused 

735 result.outputs = all_outputs 

736 result.success = fused.confidence >= 0.3 

737 

738 if trace and fspan: 

739 trace.end_span(fspan.id, status="done", data={"confidence": result.fused.confidence if result.fused else 0}) 

740 

741 result.duration = time.time() - start_time 

742 

743 if trace and root: 

744 trace.end_span(root.id, status="done" if result.success else "failed") 

745 

746 # Output guard — filter agent output before returning to user 

747 if result.outputs: 

748 guarded_outputs: dict[str, Any] = {} 

749 for key, value in result.outputs.items(): 

750 output_str = str(value) 

751 output_guard = self.guard.process_output(output_str) 

752 if output_guard.blocked: 

753 guarded_outputs[key] = f"[BLOCKED by guard: {output_guard.blocked_by}]" 

754 elif output_guard.final_content != output_str: 

755 guarded_outputs[key] = output_guard.final_content 

756 else: 

757 guarded_outputs[key] = value 

758 result.outputs = guarded_outputs 

759 

760 return result 

761 

762 async def execute_with_feedback( 

763 self, 

764 task: Any, 

765 expected_output: str = "", 

766 scoring_strategy: str = "general", 

767 retry_config: RetryConfig | None = None, 

768 **metadata, 

769 ) -> SwarmResult: 

770 """Execution with eval-driven feedback loop. 

771 

772 Args: 

773 task: Task description 

774 expected_output: Reference for scoring 

775 scoring_strategy: Scoring strategy (qa/code/summary/translation) 

776 retry_config: Retry configuration 

777 **metadata: Additional metadata 

778 

779 Returns: 

780 SwarmResult with feedback_loop trace 

781 """ 

782 start_time = time.time() 

783 result = SwarmResult( 

784 topology=self.topology, 

785 mode=ExecutionMode.FEEDBACK, 

786 ) 

787 

788 task_str = str(task) 

789 

790 # Build executor that uses smart_execute 

791 async def executor(t: str) -> Any: 

792 r = await self.smart_execute(t, **metadata) 

793 fused = r.fused 

794 if fused and fused.merged: 

795 content = fused.merged 

796 # If it's a dict with agent outputs, stringify 

797 if isinstance(content, dict): 

798 parts = [] 

799 for k, v in content.items(): 

800 if v and not isinstance(v, dict): 

801 parts.append(str(v)) 

802 return "\n".join(parts) if parts else str(content) 

803 return str(content) 

804 return str(r.outputs) 

805 

806 # Wire scorer if available 

807 scorer = None 

808 try: 

809 from agentos.evaluation.scorers import CompositeScorerV2 

810 scorer = CompositeScorerV2() 

811 except Exception: 

812 pass 

813 

814 feedback = EvalFeedbackLoop( 

815 scorer=scorer, 

816 config=retry_config or RetryConfig(max_retries=3), 

817 ) 

818 

819 loop_result = await feedback.run( 

820 task=task_str, 

821 executor=executor, 

822 expected=expected_output, 

823 strategy=scoring_strategy, 

824 ) 

825 

826 result.feedback_loop = loop_result 

827 result.outputs = {"final": str(loop_result.final_output) if loop_result.final_output else ""} 

828 result.success = loop_result.converged 

829 result.duration = time.time() - start_time 

830 return result 

831 

832 # ── Raw topology execution (backward compatible) ────────────── 

833 

834 async def _execute_raw( 

835 self, 

836 task: Any, 

837 **metadata, 

838 ) -> SwarmResult: 

839 """Original topology-only execution.""" 

840 handler = self._topo_handlers.get(self.topology) 

841 if handler is None: 

842 raise ValueError(f"Unknown topology: {self.topology}") 

843 return await handler(task, metadata) 

844 

845 # ── Star Topology ───────────────────────────────────────────── 

846 

847 async def _execute_star( 

848 self, 

849 task: Any, 

850 metadata: dict[str, Any], 

851 ) -> SwarmResult: 

852 result = SwarmResult(topology=SwarmTopology.STAR, mode=ExecutionMode.RAW) 

853 for agent_name, agent in self._agents.items(): 

854 try: 

855 message = SwarmMessage( 

856 sender="coordinator", 

857 receiver=agent_name, 

858 content=task, 

859 metadata=metadata, 

860 ) 

861 result.messages.append(message) 

862 output = await agent.invoke(task, **metadata) 

863 result.outputs[agent_name] = output 

864 response = SwarmMessage( 

865 sender=agent_name, 

866 receiver="coordinator", 

867 content=output, 

868 ) 

869 result.messages.append(response) 

870 except Exception as e: 

871 result.outputs[agent_name] = {"error": str(e)} 

872 result.success = False 

873 return result 

874 

875 # ── Ring Topology ───────────────────────────────────────────── 

876 

877 async def _execute_ring( 

878 self, 

879 task: Any, 

880 metadata: dict[str, Any], 

881 ) -> SwarmResult: 

882 result = SwarmResult(topology=SwarmTopology.RING, mode=ExecutionMode.RAW) 

883 agent_names = list(self._agents.keys()) 

884 if not agent_names: 

885 return result 

886 

887 current_input = task 

888 for i, agent_name in enumerate(agent_names): 

889 agent = self._agents[agent_name] 

890 next_agent = agent_names[(i + 1) % len(agent_names)] 

891 try: 

892 output = await agent.invoke(current_input, **metadata) 

893 result.outputs[agent_name] = output 

894 message = SwarmMessage( 

895 sender=agent_name, 

896 receiver=next_agent, 

897 content=output, 

898 ) 

899 result.messages.append(message) 

900 current_input = output 

901 except Exception as e: 

902 result.outputs[agent_name] = {"error": str(e)} 

903 result.success = False 

904 return result 

905 

906 # ── Mesh Topology ───────────────────────────────────────────── 

907 

908 async def _execute_mesh( 

909 self, 

910 task: Any, 

911 metadata: dict[str, Any], 

912 ) -> SwarmResult: 

913 result = SwarmResult(topology=SwarmTopology.MESH, mode=ExecutionMode.RAW) 

914 tasks_ = [] 

915 for agent_name, agent in self._agents.items(): 

916 tasks_.append(self._execute_agent_mesh(agent, task, metadata, result)) 

917 await asyncio.gather(*tasks_, return_exceptions=True) 

918 for sender_name, output in result.outputs.items(): 

919 for receiver_name in self._agents.keys(): 

920 if sender_name != receiver_name: 

921 message = SwarmMessage( 

922 sender=sender_name, 

923 receiver=receiver_name, 

924 content=output, 

925 ) 

926 result.messages.append(message) 

927 return result 

928 

929 async def _execute_agent_mesh( 

930 self, 

931 agent: Agent[Any, Any], 

932 task: Any, 

933 metadata: dict[str, Any], 

934 result: SwarmResult, 

935 ) -> None: 

936 try: 

937 output = await agent.invoke(task, **metadata) 

938 result.outputs[agent.name] = output 

939 except Exception as e: 

940 result.outputs[agent.name] = {"error": str(e)} 

941 result.success = False 

942 

943 # ── Tree Topology ───────────────────────────────────────────── 

944 

945 async def _execute_tree( 

946 self, 

947 task: Any, 

948 metadata: dict[str, Any], 

949 ) -> SwarmResult: 

950 result = SwarmResult(topology=SwarmTopology.TREE, mode=ExecutionMode.RAW) 

951 agent_names = list(self._agents.keys()) 

952 if not agent_names: 

953 return result 

954 

955 root_name = agent_names[0] 

956 root_agent = self._agents[root_name] 

957 try: 

958 root_output = await root_agent.invoke(task, **metadata) 

959 result.outputs[root_name] = root_output 

960 except Exception as e: 

961 result.outputs[root_name] = {"error": str(e)} 

962 result.success = False 

963 return result 

964 

965 children = agent_names[1:] 

966 for child_name in children: 

967 child_agent = self._agents[child_name] 

968 message = SwarmMessage( 

969 sender=root_name, 

970 receiver=child_name, 

971 content=root_output, 

972 ) 

973 result.messages.append(message) 

974 try: 

975 child_output = await child_agent.invoke(root_output, **metadata) 

976 result.outputs[child_name] = child_output 

977 response = SwarmMessage( 

978 sender=child_name, 

979 receiver=root_name, 

980 content=child_output, 

981 ) 

982 result.messages.append(response) 

983 except Exception as e: 

984 result.outputs[child_name] = {"error": str(e)} 

985 result.success = False 

986 return result 

987 

988 # ── Messaging ───────────────────────────────────────────────── 

989 

990 def send_message( 

991 self, 

992 sender: str, 

993 receiver: Optional[str], 

994 content: Any, 

995 **metadata, 

996 ) -> SwarmMessage: 

997 message = SwarmMessage( 

998 sender=sender, 

999 receiver=receiver, 

1000 content=content, 

1001 metadata=metadata, 

1002 ) 

1003 self._message_queue.append(message) 

1004 return message 

1005 

1006 def get_messages( 

1007 self, 

1008 receiver: Optional[str] = None, 

1009 ) -> list[SwarmMessage]: 

1010 if receiver: 

1011 return [ 

1012 m for m in self._message_queue 

1013 if m.receiver == receiver or m.receiver is None 

1014 ] 

1015 return self._message_queue.copy() 

1016 

1017 def clear_messages(self) -> None: 

1018 self._message_queue.clear() 

1019 

1020 # ── Code Sandbox Execution (v1.9.5) ─────────────────────────── 

1021 

1022 async def execute_code( 

1023 self, 

1024 code: str, 

1025 func_name: str = "", 

1026 test_cases: list[TestCase] | None = None, 

1027 setup_code: str = "", 

1028 sandbox: CodeSandbox | None = None, 

1029 max_retries: int = 3, 

1030 code_generator: Callable[[str, list[str]], str] | None = None, 

1031 ) -> SandboxResult: 

1032 """Execute code in sandbox with test cases and feedback-driven retry. 

1033 

1034 Supports code generation: if code_generator is provided and initial run 

1035 fails, it will use the feedback extractor to guide re-generation. 

1036 

1037 Args: 

1038 code: Code to execute (or initial code if using generator) 

1039 func_name: Function name to test 

1040 test_cases: Test cases for validation 

1041 setup_code: Setup code (imports, fixtures) 

1042 sandbox: Custom sandbox instance 

1043 max_retries: Max retry attempts with code generation 

1044 code_generator: Callable(spec, feedback_suggestions) → new_code 

1045 

1046 Returns: 

1047 SandboxResult with execution details and test outcomes 

1048 """ 

1049 sb = sandbox or self.sandbox 

1050 

1051 result = sb.run(code, func_name, test_cases, setup_code) 

1052 

1053 # If initial run succeeded, we're done 

1054 if result.all_passed: 

1055 return result 

1056 

1057 # Feedback-driven retry loop 

1058 for attempt in range(1, max_retries + 1): 

1059 if not code_generator: 

1060 break 

1061 

1062 suggestions = CodeFeedbackExtractor.extract(result) 

1063 if not suggestions: 

1064 break 

1065 

1066 # Generate improved code 

1067 spec = f"Function: {func_name}, Test cases: {len(test_cases or [])}" 

1068 try: 

1069 new_code = code_generator(spec, suggestions) 

1070 except Exception: 

1071 break 

1072 

1073 if not new_code or new_code == code: 

1074 break 

1075 

1076 code = new_code 

1077 result = sb.run(code, func_name, test_cases, setup_code) 

1078 

1079 if result.all_passed: 

1080 break 

1081 

1082 if attempt == max_retries: 

1083 break # Don't overwrite last result 

1084 

1085 return result 

1086 

1087 # ── HITL-Enhanced Execution (v1.9.5) ────────────────────────── 

1088 

1089 async def smart_execute_with_hitl( 

1090 self, 

1091 task: Any, 

1092 hitl: HITLManager | None = None, 

1093 **metadata, 

1094 ) -> SwarmResult: 

1095 """Smart execution with human-in-the-loop breakpoints. 

1096 

1097 Same as smart_execute but pauses at configurable checkpoints: 

1098 - Before each sub-task (if hitl.break_on_every_task) 

1099 - On sub-task failure (if hitl.break_on_failure) 

1100 - On low-confidence fusion (if config threshold met) 

1101 

1102 Args: 

1103 task: Task description 

1104 hitl: HITLManager instance (uses self.hitl if None) 

1105 **metadata: Additional metadata 

1106 

1107 Returns: 

1108 SwarmResult with fused output 

1109 """ 

1110 hitl_mgr = hitl or self.hitl 

1111 start_time = time.time() 

1112 result = SwarmResult( 

1113 topology=self.topology, 

1114 mode=ExecutionMode.SMART, 

1115 ) 

1116 

1117 task_str = str(task) 

1118 agent_names = self.list_agents() 

1119 

1120 # Step 1: Decompose 

1121 decomp = self.decomposer.decompose(task_str, agents=agent_names) 

1122 result.decomposition = decomp 

1123 

1124 # Step 2: Execute sub-tasks with HITL gates 

1125 sub_outputs: dict[str, dict[str, Any]] = {} 

1126 completed: set[str] = set() 

1127 aborted = False 

1128 

1129 for _round in range(self.max_rounds): 

1130 if aborted: 

1131 break 

1132 

1133 ready = [ 

1134 st for st in decomp.sub_tasks 

1135 if st.status == "pending" 

1136 and all(dep in completed for dep in st.depends_on) 

1137 ] 

1138 if not ready: 

1139 break 

1140 

1141 for st in ready: 

1142 # HITL: check before executing sub-task 

1143 if hitl_mgr.config.break_on_every_task: 

1144 decision, feedback = await hitl_mgr.request_decision( 

1145 bp_type=BreakpointType.BEFORE_TASK, 

1146 task_id=st.id, 

1147 message=f"Execute sub-task: {st.description}?", 

1148 context={"task": task_str, "sub_task": st.description}, 

1149 options=["approve", "abort", "modify"], 

1150 ) 

1151 if decision == HumanDecision.ABORT: 

1152 aborted = True 

1153 break 

1154 if decision == HumanDecision.MODIFY and feedback: 

1155 st.description = f"{st.description} [modified: {feedback}]" 

1156 

1157 st.status = "running" 

1158 

1159 # Build context from dependencies 

1160 context = task_str 

1161 if st.depends_on: 

1162 dep_contexts = [] 

1163 for dep_id in st.depends_on: 

1164 dep_outputs = sub_outputs.get(dep_id, {}) 

1165 for name, out in dep_outputs.items(): 

1166 dep_contexts.append(f"[{name}]: {str(out)[:300]}") 

1167 if dep_contexts: 

1168 context = f"{task_str}\n\nPrevious results:\n" + "\n".join(dep_contexts) 

1169 

1170 # Execute 

1171 topo_result = await self._execute_raw(context, **metadata) 

1172 sub_outputs[st.id] = topo_result.outputs 

1173 st.output = topo_result.outputs 

1174 st.status = "done" if topo_result.success else "failed" 

1175 completed.add(st.id) 

1176 

1177 # HITL: check on failure 

1178 if not topo_result.success: 

1179 decision, feedback = await hitl_mgr.should_break_on_failure( 

1180 task_id=st.id, 

1181 error=topo_result.error or "Unknown error", 

1182 attempt=1, 

1183 ) 

1184 if decision == HumanDecision.ABORT: 

1185 aborted = True 

1186 break 

1187 if decision == HumanDecision.MODIFY and feedback: 

1188 st.description = f"{st.description} [retry with: {feedback}]" 

1189 st.status = "pending" # Re-queue for retry 

1190 completed.discard(st.id) 

1191 del sub_outputs[st.id] 

1192 

1193 if aborted: 

1194 result.success = False 

1195 result.error = "Aborted by human" 

1196 return result 

1197 

1198 # Step 3: Fuse results 

1199 final_subtasks = [ 

1200 st for st in decomp.sub_tasks 

1201 if st.status == "done" and st.id not in { 

1202 s.id for s in decomp.sub_tasks 

1203 if any(d == st.id for d in s.depends_on) 

1204 } 

1205 ] 

1206 if final_subtasks: 

1207 all_final: dict[str, Any] = {} 

1208 for st in final_subtasks: 

1209 if st.output: 

1210 all_final.update(st.output) 

1211 if all_final: 

1212 fused = self.fusion.fuse(task_str, all_final) 

1213 result.fused = fused 

1214 result.outputs = all_final 

1215 

1216 # HITL: check low confidence 

1217 if fused.confidence < hitl_mgr.config.break_on_low_confidence: 

1218 decision, feedback = await hitl_mgr.should_break_on_result( 

1219 task_id="final", 

1220 output=all_final, 

1221 confidence=fused.confidence, 

1222 ) 

1223 if decision == HumanDecision.ABORT: 

1224 result.success = False 

1225 result.error = "Aborted by human at final result" 

1226 return result 

1227 if decision == HumanDecision.REJECT: 

1228 result.success = False 

1229 result.error = f"Rejected: {feedback}" 

1230 return result 

1231 

1232 result.success = fused.confidence >= 0.3 

1233 

1234 if not result.outputs and sub_outputs: 

1235 all_outputs: dict[str, Any] = {} 

1236 for st_outputs in sub_outputs.values(): 

1237 all_outputs.update(st_outputs) 

1238 if all_outputs: 

1239 fused = self.fusion.fuse(task_str, all_outputs) 

1240 result.fused = fused 

1241 result.outputs = all_outputs 

1242 result.success = fused.confidence >= 0.3 

1243 

1244 result.duration = time.time() - start_time 

1245 return result 

1246 

1247 # ── Monitored Execution (v1.9.6) ───────────────────────────── 

1248 

1249 async def monitor_execute( 

1250 self, 

1251 task: Any, 

1252 quality_gates: list[QualityGate] | None = None, 

1253 fallback_fn: Callable[[], Any] | None = None, 

1254 **metadata, 

1255 ) -> tuple[Any, MonitorReport]: 

1256 """Execute with automatic quality gating. 

1257 

1258 Runs smart_execute through the AgentMonitor pipeline. If gates fail, 

1259 automatically retries or falls back based on gate configuration. 

1260 

1261 Args: 

1262 task: Task description 

1263 quality_gates: Custom quality gates (uses monitor defaults if None) 

1264 fallback_fn: Fallback function if all gates fail 

1265 **metadata: Additional metadata 

1266 

1267 Returns: 

1268 Tuple of (final_output, MonitorReport) 

1269 """ 

1270 # Configure monitor with custom gates if provided 

1271 monitor = self.monitor 

1272 if quality_gates: 

1273 monitor = AgentMonitor( 

1274 max_retries=self.monitor.max_retries, 

1275 default_fallback=self.monitor.default_fallback, 

1276 ) 

1277 monitor.add_gates(quality_gates) 

1278 elif not self.monitor._gates: 

1279 # Default gates if none configured 

1280 monitor = AgentMonitor() 

1281 monitor.add_gates([ 

1282 output_not_empty(), 

1283 no_error_output(), 

1284 ]) 

1285 

1286 # Track latency for latency gates 

1287 start = time.time() 

1288 

1289 async def execute_fn() -> Any: 

1290 result = await self.smart_execute(task, **metadata) 

1291 fused = result.fused 

1292 if fused and fused.merged: 

1293 return fused.merged 

1294 return result.outputs 

1295 

1296 output, report = await monitor.monitor_execution( 

1297 task_fn=execute_fn, 

1298 task_name=str(task)[:80], 

1299 context={"_latency_ms": 0}, 

1300 fallback_fn=fallback_fn, 

1301 ) 

1302 

1303 # Inject actual latency 

1304 elapsed = (time.time() - start) * 1000 

1305 for gate in report.gates: 

1306 gate.data["_latency_ms"] = elapsed 

1307 

1308 return output, report 

1309 

1310 # ── Tool Registry Convenience Methods ───────────────────────── 

1311 

1312 def register_tool( 

1313 self, 

1314 name: str, 

1315 description: str, 

1316 handler: Callable, 

1317 category: ToolCategory = ToolCategory.CUSTOM, 

1318 params: list[ToolParam] | None = None, 

1319 capabilities: list[str] | None = None, 

1320 tags: list[str] | None = None, 

1321 is_destructive: bool = False, 

1322 rate_limit: int = 0, 

1323 **kwargs, 

1324 ) -> ToolSchema: 

1325 """Register a tool in the coordinator's tool registry.""" 

1326 tool = create_tool( 

1327 name=name, description=description, handler=handler, 

1328 category=category, params=params or [], 

1329 capabilities=capabilities or [], tags=tags or [], 

1330 is_destructive=is_destructive, rate_limit=rate_limit, **kwargs, 

1331 ) 

1332 return self.tool_registry.register(tool) 

1333 

1334 def find_tool(self, query: str, top_k: int = 5) -> list[tuple[ToolSchema, float]]: 

1335 """Search for tools matching a natural language query.""" 

1336 return self.tool_registry.search(query, top_k=top_k) 

1337 

1338 def route_tool(self, task: str, **ctx_kwargs) -> RoutingDecision: 

1339 """Route a task to the best matching tool.""" 

1340 context = RoutingContext(task=task, **ctx_kwargs) 

1341 return self.tool_router.route(context) 

1342 

1343 def execute_tool(self, tool_name: str, params: dict[str, Any] | None = None, force: bool = False) -> Any: 

1344 """Execute a registered tool safely.""" 

1345 return self.tool_executor.execute(tool_name, params, force=force) 

1346 

1347 

1348# ── Backward-compatible alias ───────────────────────────────────── 

1349SwarmCoordinator = SmartSwarmCoordinator