Coverage for agentos/swarm/coordinator.py: 19%
484 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.8: Smart Swarm Coordinator with tool registry + intelligent routing.
4Full intelligence stack:
5- TaskDecomposer: decompose complex tasks into sub-task DAGs
6- ResultFusion: LLM-as-Judge aggregation with confidence scoring
7- EvalFeedbackLoop: execute → evaluate → retry → converge
8- CodeSandbox: safe code generation & execution with test cases
9- HumanLoop: human-in-the-loop breakpoints for approval/intervention
10- AgentMonitor: quality gates + self-monitoring pipeline
11- ExecutionTrace: full span-tree observability + bottleneck detection
12- AgentMemory: three-tier memory (working/short-term/long-term) + context window
13- ToolRegistry: schema-based tool catalog with versioning, capabilities, search
14- ToolRouter: intelligent tool selection with LLM + semantic matching
15- ToolExecutor: safe tool execution with validation, rate limiting, destructive confirmation
16"""
18from __future__ import annotations
20import asyncio
21import time
22import uuid
23from dataclasses import dataclass, field
24from enum import Enum
25from typing import Any, Callable, Optional
27from agentos.core.di import Agent, RunContext
28from agentos.swarm.task_decomposer import TaskDecomposer, Decomposition, SubTask
29from agentos.swarm.result_fusion import ResultFusion, FusedResult
30from agentos.swarm.eval_feedback_loop import EvalFeedbackLoop, LoopResult, RetryConfig
31from agentos.swarm.code_sandbox import CodeSandbox, SandboxResult, TestCase, CodeFeedbackExtractor
32from agentos.swarm.human_loop import (
33 HITLManager, HITLConfig, Breakpoint, BreakpointType, HumanDecision,
34)
35from agentos.swarm.agent_monitor import (
36 AgentMonitor, QualityGate, MonitorReport, GateResult, GateStatus, GateAction,
37 output_not_empty, output_length_range, no_error_output, contains_keywords,
38 latency_max, confidence_min,
39)
40from agentos.swarm.execution_trace import (
41 ExecutionTrace, TraceSpan, TraceEvent, TraceCollector,
42)
43from agentos.swarm.agent_memory import (
44 AgentMemory, WorkingMemory, ShortTermMemory, LongTermMemory,
45 ContextWindowManager, ContextBudget, MemoryEntry,
46)
47from agentos.swarm.tool_registry import (
48 ToolRegistry, ToolRouter, ToolExecutor, ToolSchema, ToolParam,
49 ToolCategory, RoutingDecision, RoutingContext, ToolExecutionError,
50 create_tool,
51)
52from agentos.security.guard import (
53 GuardPipeline, InputGuard, OutputGuard,
54 PIIDetector, ContentSafetyFilter,
55 create_strict_guard, create_permissive_guard,
56 GuardChainResult,
57)
60class SwarmTopology(str, Enum):
61 """Swarm topology types."""
62 STAR = "star" # Central coordinator
63 RING = "ring" # Circular message passing
64 MESH = "mesh" # All-to-all communication
65 TREE = "tree" # Hierarchical structure
68class ExecutionMode(str, Enum):
69 """Execution strategy for the coordinator."""
70 RAW = "raw" # Original topology-only execution
71 SMART = "smart" # Decompose → Execute DAG → Fuse
72 FEEDBACK = "feedback" # Smart + eval feedback loop
75@dataclass
76class AgentRole:
77 """Agent 角色定义。"""
78 name: str
79 goal: str
80 backstory: str = ""
81 tools: list[str] = field(default_factory=list)
82 model: str = "auto"
83 temperature: float = 0.7
84 allow_delegation: bool = True
85 verbose: bool = False
88class MessageBus:
89 """Agent 间消息总线 — 黑板模式。"""
91 def __init__(self):
92 self._messages: list[dict] = []
93 self._subscribers: dict[str, list[Callable]] = {}
94 self._shared_memory: dict[str, Any] = {}
96 def publish(self, sender: str, topic: str, data: dict):
97 msg = {"sender": sender, "topic": topic, "data": data}
98 self._messages.append(msg)
99 if topic in self._subscribers:
100 for cb in self._subscribers[topic]:
101 cb(msg)
103 def subscribe(self, topic: str, callback: Callable):
104 self._subscribers.setdefault(topic, []).append(callback)
106 @property
107 def messages(self) -> list[dict]:
108 return self._messages
110 @property
111 def shared_memory(self) -> dict[str, Any]:
112 return self._shared_memory
115@dataclass
116class SwarmMessage:
117 """
118 Message in swarm communication.
120 Attributes:
121 id: Unique identifier
122 sender: Sender agent name
123 receiver: Receiver agent name (None = broadcast)
124 content: Message content
125 metadata: Additional metadata
126 timestamp: Message timestamp
127 """
128 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
129 sender: str = ""
130 receiver: Optional[str] = None
131 content: Any = None
132 metadata: dict[str, Any] = field(default_factory=dict)
133 timestamp: float = field(default_factory=time.time)
135 def to_dict(self) -> dict[str, Any]:
136 """Convert to dict."""
137 return {
138 "id": self.id,
139 "sender": self.sender,
140 "receiver": self.receiver,
141 "content": self.content,
142 "metadata": self.metadata,
143 "timestamp": self.timestamp,
144 }
147@dataclass
148class SwarmResult:
149 """
150 Result of swarm execution.
152 Attributes:
153 id: Unique identifier
154 topology: Swarm topology
155 mode: Execution mode used
156 outputs: Agent outputs
157 messages: Communication messages
158 duration: Execution duration
159 success: Whether execution succeeded
160 fused: ResultFusion output (smart mode only)
161 decomposition: Task decomposition used (smart mode only)
162 feedback_loop: Feedback loop result (feedback mode only)
163 """
164 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
165 topology: SwarmTopology = SwarmTopology.STAR
166 mode: ExecutionMode = ExecutionMode.RAW
167 outputs: dict[str, Any] = field(default_factory=dict)
168 messages: list[SwarmMessage] = field(default_factory=list)
169 duration: float = 0.0
170 success: bool = True
171 fused: Optional[FusedResult] = None
172 decomposition: Optional[Decomposition] = None
173 feedback_loop: Optional[LoopResult] = None
175 def to_dict(self) -> dict[str, Any]:
176 """Convert to dict."""
177 d: dict[str, Any] = {
178 "id": self.id,
179 "topology": self.topology.value,
180 "mode": self.mode.value,
181 "outputs": self.outputs,
182 "messages": [m.to_dict() for m in self.messages],
183 "duration": f"{self.duration:.2f}s",
184 "success": self.success,
185 }
186 if self.fused:
187 d["fused"] = {
188 "action": self.fused.action,
189 "confidence": self.fused.confidence,
190 "reason": self.fused.reason,
191 }
192 if self.decomposition:
193 d["decomposition"] = {
194 "sub_tasks": [st.to_dict() for st in self.decomposition.sub_tasks],
195 "total_steps": self.decomposition.total_steps,
196 }
197 if self.feedback_loop:
198 d["feedback_loop"] = {
199 "attempts": self.feedback_loop.attempts,
200 "best_score": self.feedback_loop.best_score,
201 "converged": self.feedback_loop.converged,
202 "duration": f"{self.feedback_loop.duration:.2f}s",
203 }
204 return d
207class SmartSwarmCoordinator:
208 """
209 v1.9.4: Multi-agent coordination with intelligent orchestration.
211 Upgrades the coordinator with:
212 - TaskDecomposer: LLM-driven sub-task DAG decomposition
213 - ResultFusion: LLM-as-Judge aggregation with confidence scoring
214 - EvalFeedbackLoop: execute → evaluate → retry → converge
216 Usage:
217 coordinator = SmartSwarmCoordinator(topology=SwarmTopology.MESH)
218 coordinator.register(agent1)
219 coordinator.register(agent2)
221 # Smart mode with decomposition + fusion
222 result = await coordinator.smart_execute("complex research task")
224 # Feedback mode with evaluation retry loop
225 result = await coordinator.execute_with_feedback(
226 task, expected_output, scorer
227 )
228 """
230 def __init__(
231 self,
232 topology: SwarmTopology = SwarmTopology.STAR,
233 max_rounds: int = 10,
234 execution_mode: ExecutionMode = ExecutionMode.SMART,
235 decomposer: TaskDecomposer | None = None,
236 fusion: ResultFusion | None = None,
237 feedback_loop: EvalFeedbackLoop | None = None,
238 sandbox: CodeSandbox | None = None,
239 hitl_manager: HITLManager | None = None,
240 monitor: AgentMonitor | None = None,
241 trace_collector: TraceCollector | None = None,
242 memory: AgentMemory | None = None,
243 tool_registry: ToolRegistry | None = None,
244 tool_router: ToolRouter | None = None,
245 tool_executor: ToolExecutor | None = None,
246 guard: GuardPipeline | None = None,
247 ):
248 """
249 Initialize smart swarm coordinator.
251 Args:
252 topology: Swarm topology
253 max_rounds: Maximum communication rounds
254 execution_mode: Default execution mode
255 decomposer: TaskDecomposer instance (created if None)
256 fusion: ResultFusion instance (created if None)
257 feedback_loop: EvalFeedbackLoop instance (created if None)
258 sandbox: CodeSandbox instance for code execution (created if None)
259 hitl_manager: HITLManager for human-in-the-loop (created if None)
260 monitor: AgentMonitor for quality gating (created if None)
261 trace_collector: TraceCollector for execution traces (created if None)
262 memory: AgentMemory for layered memory (created if None)
263 tool_registry: ToolRegistry for tool catalog (created if None)
264 tool_router: ToolRouter for intelligent tool selection (created if None)
265 tool_executor: ToolExecutor for safe tool execution (created if None)
266 guard: GuardPipeline for input/output safety filtering (created if None)
267 """
268 self.topology = topology
269 self.max_rounds = max_rounds
270 self.execution_mode = execution_mode
271 self._agents: dict[str, Agent[Any, Any]] = {}
272 self._message_queue: list[SwarmMessage] = []
274 self.decomposer = decomposer or TaskDecomposer()
275 self.fusion = fusion or ResultFusion()
276 self.feedback = feedback_loop or EvalFeedbackLoop()
277 self.sandbox = sandbox or CodeSandbox()
278 self.hitl = hitl_manager or HITLManager()
279 self.monitor = monitor or AgentMonitor()
280 self.tracer = trace_collector or TraceCollector()
281 self.memory = memory or AgentMemory()
282 self.tool_registry = tool_registry or ToolRegistry()
283 self.tool_router = tool_router or ToolRouter(self.tool_registry)
284 self.tool_executor = tool_executor or ToolExecutor(self.tool_registry)
285 self.guard = guard or create_strict_guard()
287 # Original topology methods bound for backward compatibility
288 self._topo_handlers = {
289 SwarmTopology.STAR: self._execute_star,
290 SwarmTopology.RING: self._execute_ring,
291 SwarmTopology.MESH: self._execute_mesh,
292 SwarmTopology.TREE: self._execute_tree,
293 }
295 # ── Agent management ──────────────────────────────────────────
297 def register(self, agent: Agent[Any, Any]) -> None:
298 self._agents[agent.name] = agent
300 def unregister(self, agent_name: str) -> bool:
301 if agent_name in self._agents:
302 del self._agents[agent_name]
303 return True
304 return False
306 def get_agent(self, agent_name: str) -> Optional[Agent[Any, Any]]:
307 return self._agents.get(agent_name)
309 def list_agents(self) -> list[str]:
310 return list(self._agents.keys())
312 # ── Execution API ─────────────────────────────────────────────
314 async def execute(
315 self,
316 task: Any,
317 mode: ExecutionMode | None = None,
318 **metadata,
319 ) -> SwarmResult:
320 """Execute a task. Delegates to smart_execute or raw topology."""
321 mode = mode or self.execution_mode
322 if mode == ExecutionMode.SMART:
323 return await self.smart_execute(task, **metadata)
324 return await self._execute_raw(task, **metadata)
326 async def smart_execute(
327 self,
328 task: Any,
329 _trace: ExecutionTrace | None = None,
330 **metadata,
331 ) -> SwarmResult:
332 """Smart execution: decompose → execute DAG → fuse.
334 Uses ExecutionTrace for observability when tracer is available.
336 Args:
337 task: Task description (string or structured)
338 _trace: Optional trace to attach (auto-created if self.tracer exists)
339 **metadata: Additional metadata
341 Returns:
342 SwarmResult with fused output and decomposition trace
343 """
344 start_time = time.time()
345 task_str = str(task)
347 # Step 0: Security guard — input filtering
348 guard_result = self.guard.process_input(task_str)
349 if guard_result.blocked:
350 result = SwarmResult(
351 topology=self.topology,
352 mode=ExecutionMode.SMART,
353 output=f"[BLOCKED] Input rejected by guard: {guard_result.blocked_by}. Reason: {', '.join(guard_result.warnings)}",
354 completed=False,
355 )
356 return result
357 if guard_result.final_content != task_str:
358 task_str = guard_result.final_content # PII-redacted version
360 # Trace setup
361 trace = _trace
362 if trace is None and self.tracer is not None:
363 trace = ExecutionTrace(task_name=task_str[:80])
364 self.tracer.add(trace)
366 if trace:
367 root = trace.start_span(TraceEvent.TASK_START, name="smart_execute", data={"task": task_str})
369 result = SwarmResult(
370 topology=self.topology,
371 mode=ExecutionMode.SMART,
372 )
374 agent_names = self.list_agents()
376 # Step 0: Load memory context
377 self.memory.set_task(task_str)
378 memory_context = self.memory.get_context(query=task_str) if self.memory else ""
380 # Step 1: Decompose
381 if trace:
382 dspan = trace.start_span(TraceEvent.DECOMPOSE, name="decompose")
383 decomp = self.decomposer.decompose(task_str, agents=agent_names)
384 result.decomposition = decomp
385 if trace and dspan:
386 trace.end_span(dspan.id, status="done", data={"sub_tasks": len(decomp.sub_tasks)})
388 # Step 2: Execute sub-tasks in dependency order
389 sub_outputs: dict[str, dict[str, Any]] = {}
390 completed: set[str] = set()
392 for _round in range(self.max_rounds):
393 ready = [
394 st for st in decomp.sub_tasks
395 if st.status == "pending"
396 and all(dep in completed for dep in st.depends_on)
397 ]
398 if not ready:
399 break
401 for st in ready:
402 st.status = "running"
404 if trace:
405 stspan = trace.start_span(TraceEvent.SUBTASK_START, name=st.description[:60], data={"id": st.id})
407 # Build context from dependencies
408 context = task_str
409 if memory_context:
410 context = f"{memory_context}\n\n[Current Task]\n{task_str}"
411 if st.depends_on:
412 dep_contexts = []
413 for dep_id in st.depends_on:
414 dep_outputs = sub_outputs.get(dep_id, {})
415 for name, out in dep_outputs.items():
416 dep_contexts.append(f"[{name}]: {str(out)[:300]}")
417 if dep_contexts:
418 context = f"{context}\n\nPrevious results:\n" + "\n".join(dep_contexts)
420 # Execute with all agents on this sub-task
421 topo_result = await self._execute_raw(context, **metadata)
422 sub_outputs[st.id] = topo_result.outputs
423 st.output = topo_result.outputs
424 st.status = "done" if topo_result.success else "failed"
425 completed.add(st.id)
427 # Store sub-task result in memory
428 self.memory.remember(
429 content=f"SubTask [{st.description}]: {json.dumps(topo_result.outputs, default=str)[:500]}",
430 role="assistant",
431 importance=0.6,
432 metadata={"subtask_id": st.id, "status": st.status},
433 )
435 if trace and stspan:
436 st_status = "done" if st.status == "done" else "failed"
437 trace.end_span(stspan.id, status=st_status, data={"output_keys": list(topo_result.outputs.keys())})
439 # Step 3: Fuse results from final sub-tasks
440 if trace:
441 fspan = trace.start_span(TraceEvent.FUSE, name="fuse_results")
443 final_subtasks = [
444 st for st in decomp.sub_tasks
445 if st.status == "done" and st.id not in {
446 s.id for s in decomp.sub_tasks
447 if any(d == st.id for d in s.depends_on)
448 }
449 ]
450 if final_subtasks:
451 all_final: dict[str, Any] = {}
452 for st in final_subtasks:
453 if st.output:
454 all_final.update(st.output)
455 if all_final:
456 fused = self.fusion.fuse(task_str, all_final)
457 result.fused = fused
458 result.outputs = all_final
459 result.success = fused.confidence >= 0.3
461 if not result.outputs and sub_outputs:
462 all_outputs: dict[str, Any] = {}
463 for st_outputs in sub_outputs.values():
464 all_outputs.update(st_outputs)
465 if all_outputs:
466 fused = self.fusion.fuse(task_str, all_outputs)
467 result.fused = fused
468 result.outputs = all_outputs
469 result.success = fused.confidence >= 0.3
471 if trace and fspan:
472 trace.end_span(fspan.id, status="done", data={"confidence": result.fused.confidence if result.fused else 0})
474 result.duration = time.time() - start_time
476 if trace and root:
477 trace.end_span(root.id, status="done" if result.success else "failed")
479 # Output guard — filter agent output before returning to user
480 if result.outputs:
481 guarded_outputs: dict[str, Any] = {}
482 for key, value in result.outputs.items():
483 output_str = str(value)
484 output_guard = self.guard.process_output(output_str)
485 if output_guard.blocked:
486 guarded_outputs[key] = f"[BLOCKED by guard: {output_guard.blocked_by}]"
487 elif output_guard.final_content != output_str:
488 guarded_outputs[key] = output_guard.final_content
489 else:
490 guarded_outputs[key] = value
491 result.outputs = guarded_outputs
493 return result
495 async def execute_with_feedback(
496 self,
497 task: Any,
498 expected_output: str = "",
499 scoring_strategy: str = "general",
500 retry_config: RetryConfig | None = None,
501 **metadata,
502 ) -> SwarmResult:
503 """Execution with eval-driven feedback loop.
505 Args:
506 task: Task description
507 expected_output: Reference for scoring
508 scoring_strategy: Scoring strategy (qa/code/summary/translation)
509 retry_config: Retry configuration
510 **metadata: Additional metadata
512 Returns:
513 SwarmResult with feedback_loop trace
514 """
515 start_time = time.time()
516 result = SwarmResult(
517 topology=self.topology,
518 mode=ExecutionMode.FEEDBACK,
519 )
521 task_str = str(task)
523 # Build executor that uses smart_execute
524 async def executor(t: str) -> Any:
525 r = await self.smart_execute(t, **metadata)
526 fused = r.fused
527 if fused and fused.merged:
528 content = fused.merged
529 # If it's a dict with agent outputs, stringify
530 if isinstance(content, dict):
531 parts = []
532 for k, v in content.items():
533 if v and not isinstance(v, dict):
534 parts.append(str(v))
535 return "\n".join(parts) if parts else str(content)
536 return str(content)
537 return str(r.outputs)
539 # Wire scorer if available
540 scorer = None
541 try:
542 from agentos.evaluation.scorers import CompositeScorerV2
543 scorer = CompositeScorerV2()
544 except Exception:
545 pass
547 feedback = EvalFeedbackLoop(
548 scorer=scorer,
549 config=retry_config or RetryConfig(max_retries=3),
550 )
552 loop_result = await feedback.run(
553 task=task_str,
554 executor=executor,
555 expected=expected_output,
556 strategy=scoring_strategy,
557 )
559 result.feedback_loop = loop_result
560 result.outputs = {"final": str(loop_result.final_output) if loop_result.final_output else ""}
561 result.success = loop_result.converged
562 result.duration = time.time() - start_time
563 return result
565 # ── Raw topology execution (backward compatible) ──────────────
567 async def _execute_raw(
568 self,
569 task: Any,
570 **metadata,
571 ) -> SwarmResult:
572 """Original topology-only execution."""
573 handler = self._topo_handlers.get(self.topology)
574 if handler is None:
575 raise ValueError(f"Unknown topology: {self.topology}")
576 return await handler(task, metadata)
578 # ── Star Topology ─────────────────────────────────────────────
580 async def _execute_star(
581 self,
582 task: Any,
583 metadata: dict[str, Any],
584 ) -> SwarmResult:
585 result = SwarmResult(topology=SwarmTopology.STAR, mode=ExecutionMode.RAW)
586 for agent_name, agent in self._agents.items():
587 try:
588 message = SwarmMessage(
589 sender="coordinator",
590 receiver=agent_name,
591 content=task,
592 metadata=metadata,
593 )
594 result.messages.append(message)
595 output = await agent.invoke(task, **metadata)
596 result.outputs[agent_name] = output
597 response = SwarmMessage(
598 sender=agent_name,
599 receiver="coordinator",
600 content=output,
601 )
602 result.messages.append(response)
603 except Exception as e:
604 result.outputs[agent_name] = {"error": str(e)}
605 result.success = False
606 return result
608 # ── Ring Topology ─────────────────────────────────────────────
610 async def _execute_ring(
611 self,
612 task: Any,
613 metadata: dict[str, Any],
614 ) -> SwarmResult:
615 result = SwarmResult(topology=SwarmTopology.RING, mode=ExecutionMode.RAW)
616 agent_names = list(self._agents.keys())
617 if not agent_names:
618 return result
620 current_input = task
621 for i, agent_name in enumerate(agent_names):
622 agent = self._agents[agent_name]
623 next_agent = agent_names[(i + 1) % len(agent_names)]
624 try:
625 output = await agent.invoke(current_input, **metadata)
626 result.outputs[agent_name] = output
627 message = SwarmMessage(
628 sender=agent_name,
629 receiver=next_agent,
630 content=output,
631 )
632 result.messages.append(message)
633 current_input = output
634 except Exception as e:
635 result.outputs[agent_name] = {"error": str(e)}
636 result.success = False
637 return result
639 # ── Mesh Topology ─────────────────────────────────────────────
641 async def _execute_mesh(
642 self,
643 task: Any,
644 metadata: dict[str, Any],
645 ) -> SwarmResult:
646 result = SwarmResult(topology=SwarmTopology.MESH, mode=ExecutionMode.RAW)
647 tasks_ = []
648 for agent_name, agent in self._agents.items():
649 tasks_.append(self._execute_agent_mesh(agent, task, metadata, result))
650 await asyncio.gather(*tasks_, return_exceptions=True)
651 for sender_name, output in result.outputs.items():
652 for receiver_name in self._agents.keys():
653 if sender_name != receiver_name:
654 message = SwarmMessage(
655 sender=sender_name,
656 receiver=receiver_name,
657 content=output,
658 )
659 result.messages.append(message)
660 return result
662 async def _execute_agent_mesh(
663 self,
664 agent: Agent[Any, Any],
665 task: Any,
666 metadata: dict[str, Any],
667 result: SwarmResult,
668 ) -> None:
669 try:
670 output = await agent.invoke(task, **metadata)
671 result.outputs[agent.name] = output
672 except Exception as e:
673 result.outputs[agent.name] = {"error": str(e)}
674 result.success = False
676 # ── Tree Topology ─────────────────────────────────────────────
678 async def _execute_tree(
679 self,
680 task: Any,
681 metadata: dict[str, Any],
682 ) -> SwarmResult:
683 result = SwarmResult(topology=SwarmTopology.TREE, mode=ExecutionMode.RAW)
684 agent_names = list(self._agents.keys())
685 if not agent_names:
686 return result
688 root_name = agent_names[0]
689 root_agent = self._agents[root_name]
690 try:
691 root_output = await root_agent.invoke(task, **metadata)
692 result.outputs[root_name] = root_output
693 except Exception as e:
694 result.outputs[root_name] = {"error": str(e)}
695 result.success = False
696 return result
698 children = agent_names[1:]
699 for child_name in children:
700 child_agent = self._agents[child_name]
701 message = SwarmMessage(
702 sender=root_name,
703 receiver=child_name,
704 content=root_output,
705 )
706 result.messages.append(message)
707 try:
708 child_output = await child_agent.invoke(root_output, **metadata)
709 result.outputs[child_name] = child_output
710 response = SwarmMessage(
711 sender=child_name,
712 receiver=root_name,
713 content=child_output,
714 )
715 result.messages.append(response)
716 except Exception as e:
717 result.outputs[child_name] = {"error": str(e)}
718 result.success = False
719 return result
721 # ── Messaging ─────────────────────────────────────────────────
723 def send_message(
724 self,
725 sender: str,
726 receiver: Optional[str],
727 content: Any,
728 **metadata,
729 ) -> SwarmMessage:
730 message = SwarmMessage(
731 sender=sender,
732 receiver=receiver,
733 content=content,
734 metadata=metadata,
735 )
736 self._message_queue.append(message)
737 return message
739 def get_messages(
740 self,
741 receiver: Optional[str] = None,
742 ) -> list[SwarmMessage]:
743 if receiver:
744 return [
745 m for m in self._message_queue
746 if m.receiver == receiver or m.receiver is None
747 ]
748 return self._message_queue.copy()
750 def clear_messages(self) -> None:
751 self._message_queue.clear()
753 # ── Code Sandbox Execution (v1.9.5) ───────────────────────────
755 async def execute_code(
756 self,
757 code: str,
758 func_name: str = "",
759 test_cases: list[TestCase] | None = None,
760 setup_code: str = "",
761 sandbox: CodeSandbox | None = None,
762 max_retries: int = 3,
763 code_generator: Callable[[str, list[str]], str] | None = None,
764 ) -> SandboxResult:
765 """Execute code in sandbox with test cases and feedback-driven retry.
767 Supports code generation: if code_generator is provided and initial run
768 fails, it will use the feedback extractor to guide re-generation.
770 Args:
771 code: Code to execute (or initial code if using generator)
772 func_name: Function name to test
773 test_cases: Test cases for validation
774 setup_code: Setup code (imports, fixtures)
775 sandbox: Custom sandbox instance
776 max_retries: Max retry attempts with code generation
777 code_generator: Callable(spec, feedback_suggestions) → new_code
779 Returns:
780 SandboxResult with execution details and test outcomes
781 """
782 sb = sandbox or self.sandbox
784 result = sb.run(code, func_name, test_cases, setup_code)
786 # If initial run succeeded, we're done
787 if result.all_passed:
788 return result
790 # Feedback-driven retry loop
791 for attempt in range(1, max_retries + 1):
792 if not code_generator:
793 break
795 suggestions = CodeFeedbackExtractor.extract(result)
796 if not suggestions:
797 break
799 # Generate improved code
800 spec = f"Function: {func_name}, Test cases: {len(test_cases or [])}"
801 try:
802 new_code = code_generator(spec, suggestions)
803 except Exception:
804 break
806 if not new_code or new_code == code:
807 break
809 code = new_code
810 result = sb.run(code, func_name, test_cases, setup_code)
812 if result.all_passed:
813 break
815 if attempt == max_retries:
816 break # Don't overwrite last result
818 return result
820 # ── HITL-Enhanced Execution (v1.9.5) ──────────────────────────
822 async def smart_execute_with_hitl(
823 self,
824 task: Any,
825 hitl: HITLManager | None = None,
826 **metadata,
827 ) -> SwarmResult:
828 """Smart execution with human-in-the-loop breakpoints.
830 Same as smart_execute but pauses at configurable checkpoints:
831 - Before each sub-task (if hitl.break_on_every_task)
832 - On sub-task failure (if hitl.break_on_failure)
833 - On low-confidence fusion (if config threshold met)
835 Args:
836 task: Task description
837 hitl: HITLManager instance (uses self.hitl if None)
838 **metadata: Additional metadata
840 Returns:
841 SwarmResult with fused output
842 """
843 hitl_mgr = hitl or self.hitl
844 start_time = time.time()
845 result = SwarmResult(
846 topology=self.topology,
847 mode=ExecutionMode.SMART,
848 )
850 task_str = str(task)
851 agent_names = self.list_agents()
853 # Step 1: Decompose
854 decomp = self.decomposer.decompose(task_str, agents=agent_names)
855 result.decomposition = decomp
857 # Step 2: Execute sub-tasks with HITL gates
858 sub_outputs: dict[str, dict[str, Any]] = {}
859 completed: set[str] = set()
860 aborted = False
862 for _round in range(self.max_rounds):
863 if aborted:
864 break
866 ready = [
867 st for st in decomp.sub_tasks
868 if st.status == "pending"
869 and all(dep in completed for dep in st.depends_on)
870 ]
871 if not ready:
872 break
874 for st in ready:
875 # HITL: check before executing sub-task
876 if hitl_mgr.config.break_on_every_task:
877 decision, feedback = await hitl_mgr.request_decision(
878 bp_type=BreakpointType.BEFORE_TASK,
879 task_id=st.id,
880 message=f"Execute sub-task: {st.description}?",
881 context={"task": task_str, "sub_task": st.description},
882 options=["approve", "abort", "modify"],
883 )
884 if decision == HumanDecision.ABORT:
885 aborted = True
886 break
887 if decision == HumanDecision.MODIFY and feedback:
888 st.description = f"{st.description} [modified: {feedback}]"
890 st.status = "running"
892 # Build context from dependencies
893 context = task_str
894 if st.depends_on:
895 dep_contexts = []
896 for dep_id in st.depends_on:
897 dep_outputs = sub_outputs.get(dep_id, {})
898 for name, out in dep_outputs.items():
899 dep_contexts.append(f"[{name}]: {str(out)[:300]}")
900 if dep_contexts:
901 context = f"{task_str}\n\nPrevious results:\n" + "\n".join(dep_contexts)
903 # Execute
904 topo_result = await self._execute_raw(context, **metadata)
905 sub_outputs[st.id] = topo_result.outputs
906 st.output = topo_result.outputs
907 st.status = "done" if topo_result.success else "failed"
908 completed.add(st.id)
910 # HITL: check on failure
911 if not topo_result.success:
912 decision, feedback = await hitl_mgr.should_break_on_failure(
913 task_id=st.id,
914 error=topo_result.error or "Unknown error",
915 attempt=1,
916 )
917 if decision == HumanDecision.ABORT:
918 aborted = True
919 break
920 if decision == HumanDecision.MODIFY and feedback:
921 st.description = f"{st.description} [retry with: {feedback}]"
922 st.status = "pending" # Re-queue for retry
923 completed.discard(st.id)
924 del sub_outputs[st.id]
926 if aborted:
927 result.success = False
928 result.error = "Aborted by human"
929 return result
931 # Step 3: Fuse results
932 final_subtasks = [
933 st for st in decomp.sub_tasks
934 if st.status == "done" and st.id not in {
935 s.id for s in decomp.sub_tasks
936 if any(d == st.id for d in s.depends_on)
937 }
938 ]
939 if final_subtasks:
940 all_final: dict[str, Any] = {}
941 for st in final_subtasks:
942 if st.output:
943 all_final.update(st.output)
944 if all_final:
945 fused = self.fusion.fuse(task_str, all_final)
946 result.fused = fused
947 result.outputs = all_final
949 # HITL: check low confidence
950 if fused.confidence < hitl_mgr.config.break_on_low_confidence:
951 decision, feedback = await hitl_mgr.should_break_on_result(
952 task_id="final",
953 output=all_final,
954 confidence=fused.confidence,
955 )
956 if decision == HumanDecision.ABORT:
957 result.success = False
958 result.error = "Aborted by human at final result"
959 return result
960 if decision == HumanDecision.REJECT:
961 result.success = False
962 result.error = f"Rejected: {feedback}"
963 return result
965 result.success = fused.confidence >= 0.3
967 if not result.outputs and sub_outputs:
968 all_outputs: dict[str, Any] = {}
969 for st_outputs in sub_outputs.values():
970 all_outputs.update(st_outputs)
971 if all_outputs:
972 fused = self.fusion.fuse(task_str, all_outputs)
973 result.fused = fused
974 result.outputs = all_outputs
975 result.success = fused.confidence >= 0.3
977 result.duration = time.time() - start_time
978 return result
980 # ── Monitored Execution (v1.9.6) ─────────────────────────────
982 async def monitor_execute(
983 self,
984 task: Any,
985 quality_gates: list[QualityGate] | None = None,
986 fallback_fn: Callable[[], Any] | None = None,
987 **metadata,
988 ) -> tuple[Any, MonitorReport]:
989 """Execute with automatic quality gating.
991 Runs smart_execute through the AgentMonitor pipeline. If gates fail,
992 automatically retries or falls back based on gate configuration.
994 Args:
995 task: Task description
996 quality_gates: Custom quality gates (uses monitor defaults if None)
997 fallback_fn: Fallback function if all gates fail
998 **metadata: Additional metadata
1000 Returns:
1001 Tuple of (final_output, MonitorReport)
1002 """
1003 # Configure monitor with custom gates if provided
1004 monitor = self.monitor
1005 if quality_gates:
1006 monitor = AgentMonitor(
1007 max_retries=self.monitor.max_retries,
1008 default_fallback=self.monitor.default_fallback,
1009 )
1010 monitor.add_gates(quality_gates)
1011 elif not self.monitor._gates:
1012 # Default gates if none configured
1013 monitor = AgentMonitor()
1014 monitor.add_gates([
1015 output_not_empty(),
1016 no_error_output(),
1017 ])
1019 # Track latency for latency gates
1020 start = time.time()
1022 async def execute_fn() -> Any:
1023 result = await self.smart_execute(task, **metadata)
1024 fused = result.fused
1025 if fused and fused.merged:
1026 return fused.merged
1027 return result.outputs
1029 output, report = await monitor.monitor_execution(
1030 task_fn=execute_fn,
1031 task_name=str(task)[:80],
1032 context={"_latency_ms": 0},
1033 fallback_fn=fallback_fn,
1034 )
1036 # Inject actual latency
1037 elapsed = (time.time() - start) * 1000
1038 for gate in report.gates:
1039 gate.data["_latency_ms"] = elapsed
1041 return output, report
1043 # ── Tool Registry Convenience Methods ─────────────────────────
1045 def register_tool(
1046 self,
1047 name: str,
1048 description: str,
1049 handler: Callable,
1050 category: ToolCategory = ToolCategory.CUSTOM,
1051 params: list[ToolParam] | None = None,
1052 capabilities: list[str] | None = None,
1053 tags: list[str] | None = None,
1054 is_destructive: bool = False,
1055 rate_limit: int = 0,
1056 **kwargs,
1057 ) -> ToolSchema:
1058 """Register a tool in the coordinator's tool registry."""
1059 tool = create_tool(
1060 name=name, description=description, handler=handler,
1061 category=category, params=params or [],
1062 capabilities=capabilities or [], tags=tags or [],
1063 is_destructive=is_destructive, rate_limit=rate_limit, **kwargs,
1064 )
1065 return self.tool_registry.register(tool)
1067 def find_tool(self, query: str, top_k: int = 5) -> list[tuple[ToolSchema, float]]:
1068 """Search for tools matching a natural language query."""
1069 return self.tool_registry.search(query, top_k=top_k)
1071 def route_tool(self, task: str, **ctx_kwargs) -> RoutingDecision:
1072 """Route a task to the best matching tool."""
1073 context = RoutingContext(task=task, **ctx_kwargs)
1074 return self.tool_router.route(context)
1076 def execute_tool(self, tool_name: str, params: dict[str, Any] | None = None, force: bool = False) -> Any:
1077 """Execute a registered tool safely."""
1078 return self.tool_executor.execute(tool_name, params, force=force)
1081# ── Backward-compatible alias ─────────────────────────────────────
1082SwarmCoordinator = SmartSwarmCoordinator