Coverage for agentos/swarm/coordinator.py: 25%
652 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 18:40 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 18:40 +0800
1"""
2v1.9.8: Smart Swarm Coordinator with tool registry + intelligent routing.
4Full intelligence stack:
5- TaskDecomposer: decompose complex tasks into sub-task DAGs
6- ResultFusion: LLM-as-Judge aggregation with confidence scoring
7- EvalFeedbackLoop: execute → evaluate → retry → converge
8- CodeSandbox: safe code generation & execution with test cases
9- HumanLoop: human-in-the-loop breakpoints for approval/intervention
10- AgentMonitor: quality gates + self-monitoring pipeline
11- ExecutionTrace: full span-tree observability + bottleneck detection
12- AgentMemory: three-tier memory (working/short-term/long-term) + context window
13- ToolRegistry: schema-based tool catalog with versioning, capabilities, search
14- ToolRouter: intelligent tool selection with LLM + semantic matching
15- ToolExecutor: safe tool execution with validation, rate limiting, destructive confirmation
16"""
18from __future__ import annotations
20import asyncio
21import time
22import uuid
23from collections import defaultdict
24from dataclasses import dataclass, field
25from enum import Enum
26from typing import Any, Callable, Optional, Awaitable
28from agentos.core.di import Agent, RunContext
29from agentos.swarm.task_decomposer import TaskDecomposer, Decomposition, SubTask
30from agentos.swarm.result_fusion import ResultFusion, FusedResult
31from agentos.swarm.eval_feedback_loop import EvalFeedbackLoop, LoopResult, RetryConfig
32from agentos.swarm.code_sandbox import CodeSandbox, SandboxResult, TestCase, CodeFeedbackExtractor
33from agentos.swarm.human_loop import (
34 HITLManager, HITLConfig, Breakpoint, BreakpointType, HumanDecision,
35)
36from agentos.swarm.agent_monitor import (
37 AgentMonitor, QualityGate, MonitorReport, GateResult, GateStatus, GateAction,
38 output_not_empty, output_length_range, no_error_output, contains_keywords,
39 latency_max, confidence_min,
40)
41from agentos.swarm.execution_trace import (
42 ExecutionTrace, TraceSpan, TraceEvent, TraceCollector,
43)
44from agentos.swarm.agent_memory import (
45 AgentMemory, WorkingMemory, ShortTermMemory, LongTermMemory,
46 ContextWindowManager, ContextBudget, MemoryEntry,
47)
48from agentos.swarm.tool_registry import (
49 ToolRegistry, ToolRouter, ToolExecutor, ToolSchema, ToolParam,
50 ToolCategory, RoutingDecision, RoutingContext, ToolExecutionError,
51 create_tool,
52)
53from agentos.security.guard import (
54 GuardPipeline, InputGuard, OutputGuard,
55 PIIDetector, ContentSafetyFilter,
56 create_strict_guard, create_permissive_guard,
57 GuardChainResult,
58)
61class SwarmTopology(str, Enum):
62 """Swarm topology types."""
63 STAR = "star" # Central coordinator
64 RING = "ring" # Circular message passing
65 MESH = "mesh" # All-to-all communication
66 TREE = "tree" # Hierarchical structure
67 DAG = "dag" # Workflow-based dependencies
68 HYBRID = "hybrid" # Dynamic topology switching
71class ExecutionMode(str, Enum):
72 """Execution strategy for the coordinator."""
73 RAW = "raw" # Original topology-only execution
74 SMART = "smart" # Decompose → Execute DAG → Fuse
75 FEEDBACK = "feedback" # Smart + eval feedback loop
78@dataclass
79class AgentRole:
80 """Agent 角色定义。"""
81 name: str
82 goal: str
83 backstory: str = ""
84 tools: list[str] = field(default_factory=list)
85 model: str = "auto"
86 temperature: float = 0.7
87 allow_delegation: bool = True
88 verbose: bool = False
91class MessageBus:
92 """Agent 间消息总线 — 黑板模式。"""
94 def __init__(self):
95 self._messages: list[dict] = []
96 self._subscribers: dict[str, list[Callable]] = {}
97 self._shared_memory: dict[str, Any] = {}
99 def publish(self, sender: str, topic: str, data: dict):
100 msg = {"sender": sender, "topic": topic, "data": data}
101 self._messages.append(msg)
102 if topic in self._subscribers:
103 for cb in self._subscribers[topic]:
104 cb(msg)
106 def subscribe(self, topic: str, callback: Callable):
107 self._subscribers.setdefault(topic, []).append(callback)
109 @property
110 def messages(self) -> list[dict]:
111 return self._messages
113 @property
114 def shared_memory(self) -> dict[str, Any]:
115 return self._shared_memory
118@dataclass
119class SwarmMessage:
120 """
121 Message in swarm communication.
123 Attributes:
124 id: Unique identifier
125 sender: Sender agent name
126 receiver: Receiver agent name (None = broadcast)
127 content: Message content
128 metadata: Additional metadata
129 timestamp: Message timestamp
130 """
131 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
132 sender: str = ""
133 receiver: Optional[str] = None
134 content: Any = None
135 metadata: dict[str, Any] = field(default_factory=dict)
136 timestamp: float = field(default_factory=time.time)
138 def to_dict(self) -> dict[str, Any]:
139 """Convert to dict."""
140 return {
141 "id": self.id,
142 "sender": self.sender,
143 "receiver": self.receiver,
144 "content": self.content,
145 "metadata": self.metadata,
146 "timestamp": self.timestamp,
147 }
150@dataclass
151class SwarmResult:
152 """
153 Result of swarm execution.
155 Attributes:
156 id: Unique identifier
157 topology: Swarm topology
158 mode: Execution mode used
159 outputs: Agent outputs
160 messages: Communication messages
161 duration: Execution duration
162 success: Whether execution succeeded
163 fused: ResultFusion output (smart mode only)
164 decomposition: Task decomposition used (smart mode only)
165 feedback_loop: Feedback loop result (feedback mode only)
166 """
167 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
168 topology: SwarmTopology = SwarmTopology.STAR
169 mode: ExecutionMode = ExecutionMode.RAW
170 outputs: dict[str, Any] = field(default_factory=dict)
171 messages: list[SwarmMessage] = field(default_factory=list)
172 duration: float = 0.0
173 success: bool = True
174 fused: Optional[FusedResult] = None
175 decomposition: Optional[Decomposition] = None
176 feedback_loop: Optional[LoopResult] = None
178 def to_dict(self) -> dict[str, Any]:
179 """Convert to dict."""
180 d: dict[str, Any] = {
181 "id": self.id,
182 "topology": self.topology.value,
183 "mode": self.mode.value,
184 "outputs": self.outputs,
185 "messages": [m.to_dict() for m in self.messages],
186 "duration": f"{self.duration:.2f}s",
187 "success": self.success,
188 }
189 if self.fused:
190 d["fused"] = {
191 "action": self.fused.action,
192 "confidence": self.fused.confidence,
193 "reason": self.fused.reason,
194 }
195 if self.decomposition:
196 d["decomposition"] = {
197 "sub_tasks": [st.to_dict() for st in self.decomposition.sub_tasks],
198 "total_steps": self.decomposition.total_steps,
199 }
200 if self.feedback_loop:
201 d["feedback_loop"] = {
202 "attempts": self.feedback_loop.attempts,
203 "best_score": self.feedback_loop.best_score,
204 "converged": self.feedback_loop.converged,
205 "duration": f"{self.feedback_loop.duration:.2f}s",
206 }
207 return d
210# ── Swarm Agent Role Enum (v1.16.2, migrated from orchestration/swarm_coordinator.py) ─
212class SwarmAgentRole(str, Enum):
213 """Role of an agent within a swarm (enum-based, distinct from AgentRole dataclass)."""
214 COORDINATOR = "coordinator"
215 WORKER = "worker"
216 REVIEWER = "reviewer"
217 OBSERVER = "observer"
218 SPECIALIST = "specialist"
221class TaskPriority(str, Enum):
222 """Priority level for swarm tasks."""
223 CRITICAL = "critical"
224 HIGH = "high"
225 MEDIUM = "medium"
226 LOW = "low"
229class TaskStatus(str, Enum):
230 """Execution status of a swarm task."""
231 PENDING = "pending"
232 ASSIGNED = "assigned"
233 RUNNING = "running"
234 COMPLETED = "completed"
235 FAILED = "failed"
236 CANCELED = "canceled"
239@dataclass
240class SwarmAgentInfo:
241 """Metadata about a swarm agent (v1.16.2, migrated from orchestration)."""
242 agent_id: str
243 role: SwarmAgentRole
244 capabilities: list[str] = field(default_factory=list)
245 model: str = ""
246 max_concurrency: int = 3
247 current_load: int = 0
248 is_alive: bool = True
249 last_heartbeat: float = 0.0
250 metadata: dict[str, Any] = field(default_factory=dict)
252 @property
253 def is_available(self) -> bool:
254 return self.is_alive and self.current_load < self.max_concurrency
257@dataclass
258class SwarmTask:
259 """A task to be executed by the swarm (v1.16.2, migrated from orchestration)."""
260 task_id: str
261 description: str
262 priority: TaskPriority = TaskPriority.MEDIUM
263 status: TaskStatus = TaskStatus.PENDING
264 assigned_to: str = ""
265 required_capabilities: list[str] = field(default_factory=list)
266 parent_task_id: str = ""
267 dependencies: list[str] = field(default_factory=list)
268 result: Any = None
269 error: str = ""
270 started_at: float = 0.0
271 completed_at: float = 0.0
272 retry_count: int = 0
273 max_retries: int = 3
274 metadata: dict[str, Any] = field(default_factory=dict)
276 @property
277 def is_ready(self) -> bool:
278 return self.status == TaskStatus.PENDING and not self.dependencies
280 @property
281 def duration_ms(self) -> float:
282 if self.completed_at and self.started_at:
283 return (self.completed_at - self.started_at) * 1000
284 return 0.0
287# ── Dynamic Task Allocator ──────────────────────────────────────
289class TaskAllocator:
290 """Workload-aware dynamic task allocation (v1.16.2, migrated from orchestration).
292 Considers: capabilities, load, priority, affinity.
293 """
295 def __init__(self):
296 self._assignments: dict[str, str] = {}
298 def allocate(
299 self,
300 task: SwarmTask,
301 agents: list[SwarmAgentInfo],
302 ) -> Optional[str]:
303 available = [a for a in agents if a.is_available]
304 if not available:
305 return None
307 scored: list[tuple[SwarmAgentInfo, float]] = []
308 for agent in available:
309 score = 0.0
311 if task.required_capabilities:
312 match = len(set(task.required_capabilities) & set(agent.capabilities))
313 total = len(task.required_capabilities)
314 score += (match / total) * 50 if total > 0 else 25
316 score -= agent.current_load * 10
318 if agent.role == SwarmAgentRole.SPECIALIST:
319 if any(cap in agent.capabilities for cap in task.required_capabilities):
320 score += 20
322 if task.parent_task_id and self._assignments.get(task.parent_task_id) == agent.agent_id:
323 score += 15
325 scored.append((agent, score))
327 scored.sort(key=lambda x: x[1], reverse=True)
329 if scored and scored[0][1] > 0:
330 best = scored[0][0]
331 self._assignments[task.task_id] = best.agent_id
332 return best.agent_id
334 return available[0].agent_id if available else None
337# ── Conflict Resolver ───────────────────────────────────────────
339class ConflictType(str, Enum):
340 """Type of conflict between agent outputs."""
341 FACTUAL = "factual"
342 METHODOLOGICAL = "methodological"
343 OUTPUT = "output"
344 RESOURCE = "resource"
347class ConflictResolver:
348 """Detect and resolve conflicts between agents (v1.16.2, migrated from orchestration).
350 Strategies: majority vote, weighted vote, ranked choice, escalation.
351 """
353 def __init__(self):
354 self._conflict_log: list[dict] = []
356 def detect_conflict(
357 self,
358 agent_outputs: dict[str, Any],
359 expected_type: str = "text",
360 ) -> list[dict]:
361 conflicts = []
362 agents = list(agent_outputs.keys())
363 if len(agents) < 2:
364 return conflicts
366 outputs = list(agent_outputs.values())
368 if all(isinstance(o, str) for o in outputs):
369 for i in range(len(outputs)):
370 for j in range(i + 1, len(outputs)):
371 similarity = self._text_similarity(outputs[i], outputs[j])
372 if similarity < 0.3:
373 conflicts.append({
374 "type": ConflictType.OUTPUT.value,
375 "agents": [agents[i], agents[j]],
376 "similarity": similarity,
377 "outputs": {agents[i]: outputs[i][:200], agents[j]: outputs[j][:200]},
378 })
380 elif all(isinstance(o, (int, float)) for o in outputs):
381 values = outputs
382 mean_val = sum(values) / len(values)
383 for i, val in enumerate(values):
384 if abs(val - mean_val) / max(abs(mean_val), 1) > 0.5:
385 conflicts.append({
386 "type": ConflictType.FACTUAL.value,
387 "agents": [agents[i]],
388 "value": val,
389 "mean": mean_val,
390 "deviation": abs(val - mean_val) / max(abs(mean_val), 1),
391 })
393 return conflicts
395 def resolve(
396 self,
397 agent_outputs: dict[str, Any],
398 weights: dict[str, float] | None = None,
399 strategy: str = "majority",
400 expected_type: str = "text",
401 ) -> dict[str, Any]:
402 if len(agent_outputs) == 1:
403 agent_id = list(agent_outputs.keys())[0]
404 return {"output": agent_outputs[agent_id], "method": "single_agent", "conflict": False}
406 outputs = list(agent_outputs.values())
408 if all(isinstance(o, str) for o in outputs):
409 return self._resolve_text(agent_outputs, weights, strategy)
410 elif all(isinstance(o, (int, float)) for o in outputs):
411 return self._resolve_numeric(agent_outputs, weights, strategy)
412 else:
413 return {"output": outputs[0], "method": "first", "conflict": True}
415 def _resolve_text(self, outputs: dict[str, str], weights: dict[str, float] | None, strategy: str) -> dict:
416 if strategy == "majority":
417 votes: dict[str, list[str]] = defaultdict(list)
418 agent_ids = list(outputs.keys())
419 for i, a1 in enumerate(agent_ids):
420 best_match = a1
421 best_sim = 0
422 for j, a2 in enumerate(agent_ids):
423 if i == j:
424 continue
425 sim = self._text_similarity(outputs[a1], outputs[a2])
426 if sim > best_sim:
427 best_sim = sim
428 best_match = a2
429 key = outputs[best_match][:50]
430 votes[key].append(a1)
432 winning_key = max(votes, key=lambda k: len(votes[k]))
433 winning_agent = votes[winning_key][0]
434 return {
435 "output": outputs[winning_agent],
436 "method": "majority",
437 "votes": {k: len(v) for k, v in votes.items()},
438 "conflict": len(votes) > 1,
439 }
441 elif strategy == "weighted":
442 if not weights:
443 return self._resolve_text(outputs, weights, "majority")
444 best_agent = max(weights, key=weights.get)
445 return {"output": outputs.get(best_agent, list(outputs.values())[0]), "method": "weighted", "conflict": False}
447 else:
448 return {"output": list(outputs.values())[0], "method": "first", "conflict": False}
450 def _resolve_numeric(self, outputs: dict[str, float], weights: dict[str, float] | None, strategy: str) -> dict:
451 values = list(outputs.values())
452 agents = list(outputs.keys())
454 if strategy == "weighted" and weights:
455 total_weight = sum(weights.get(a, 1.0) for a in agents)
456 weighted = sum(weights.get(a, 1.0) * outputs[a] for a in agents) / total_weight
457 return {"output": weighted, "method": "weighted_average", "conflict": False}
458 else:
459 avg = sum(values) / len(values)
460 return {"output": avg, "method": "average", "conflict": False}
462 def _text_similarity(self, a: str, b: str) -> float:
463 if a == b:
464 return 1.0
465 tokens_a = set(a.lower().split())
466 tokens_b = set(b.lower().split())
467 if not tokens_a or not tokens_b:
468 return 0.0
469 intersection = tokens_a & tokens_b
470 union = tokens_a | tokens_b
471 return len(intersection) / len(union) if union else 0.0
474class SmartSwarmCoordinator:
475 """
476 v1.9.4: Multi-agent coordination with intelligent orchestration.
478 Upgrades the coordinator with:
479 - TaskDecomposer: LLM-driven sub-task DAG decomposition
480 - ResultFusion: LLM-as-Judge aggregation with confidence scoring
481 - EvalFeedbackLoop: execute → evaluate → retry → converge
483 Usage:
484 coordinator = SmartSwarmCoordinator(topology=SwarmTopology.MESH)
485 coordinator.register(agent1)
486 coordinator.register(agent2)
488 # Smart mode with decomposition + fusion
489 result = await coordinator.smart_execute("complex research task")
491 # Feedback mode with evaluation retry loop
492 result = await coordinator.execute_with_feedback(
493 task, expected_output, scorer
494 )
495 """
497 def __init__(
498 self,
499 topology: SwarmTopology = SwarmTopology.STAR,
500 max_rounds: int = 10,
501 execution_mode: ExecutionMode = ExecutionMode.SMART,
502 decomposer: TaskDecomposer | None = None,
503 fusion: ResultFusion | None = None,
504 feedback_loop: EvalFeedbackLoop | None = None,
505 sandbox: CodeSandbox | None = None,
506 hitl_manager: HITLManager | None = None,
507 monitor: AgentMonitor | None = None,
508 trace_collector: TraceCollector | None = None,
509 memory: AgentMemory | None = None,
510 tool_registry: ToolRegistry | None = None,
511 tool_router: ToolRouter | None = None,
512 tool_executor: ToolExecutor | None = None,
513 guard: GuardPipeline | None = None,
514 ):
515 """
516 Initialize smart swarm coordinator.
518 Args:
519 topology: Swarm topology
520 max_rounds: Maximum communication rounds
521 execution_mode: Default execution mode
522 decomposer: TaskDecomposer instance (created if None)
523 fusion: ResultFusion instance (created if None)
524 feedback_loop: EvalFeedbackLoop instance (created if None)
525 sandbox: CodeSandbox instance for code execution (created if None)
526 hitl_manager: HITLManager for human-in-the-loop (created if None)
527 monitor: AgentMonitor for quality gating (created if None)
528 trace_collector: TraceCollector for execution traces (created if None)
529 memory: AgentMemory for layered memory (created if None)
530 tool_registry: ToolRegistry for tool catalog (created if None)
531 tool_router: ToolRouter for intelligent tool selection (created if None)
532 tool_executor: ToolExecutor for safe tool execution (created if None)
533 guard: GuardPipeline for input/output safety filtering (created if None)
534 """
535 self.topology = topology
536 self.max_rounds = max_rounds
537 self.execution_mode = execution_mode
538 self._agents: dict[str, Agent[Any, Any]] = {}
539 self._message_queue: list[SwarmMessage] = []
541 self.decomposer = decomposer or TaskDecomposer()
542 self.fusion = fusion or ResultFusion()
543 self.feedback = feedback_loop or EvalFeedbackLoop()
544 self.sandbox = sandbox or CodeSandbox()
545 self.hitl = hitl_manager or HITLManager()
546 self.monitor = monitor or AgentMonitor()
547 self.tracer = trace_collector or TraceCollector()
548 self.memory = memory or AgentMemory()
549 self.tool_registry = tool_registry or ToolRegistry()
550 self.tool_router = tool_router or ToolRouter(self.tool_registry)
551 self.tool_executor = tool_executor or ToolExecutor(self.tool_registry)
552 self.guard = guard or create_strict_guard()
554 # Original topology methods bound for backward compatibility
555 self._topo_handlers = {
556 SwarmTopology.STAR: self._execute_star,
557 SwarmTopology.RING: self._execute_ring,
558 SwarmTopology.MESH: self._execute_mesh,
559 SwarmTopology.TREE: self._execute_tree,
560 }
562 # ── Agent management ──────────────────────────────────────────
564 def register(self, agent: Agent[Any, Any]) -> None:
565 self._agents[agent.name] = agent
567 def unregister(self, agent_name: str) -> bool:
568 if agent_name in self._agents:
569 del self._agents[agent_name]
570 return True
571 return False
573 def get_agent(self, agent_name: str) -> Optional[Agent[Any, Any]]:
574 return self._agents.get(agent_name)
576 def list_agents(self) -> list[str]:
577 return list(self._agents.keys())
579 # ── Execution API ─────────────────────────────────────────────
581 async def execute(
582 self,
583 task: Any,
584 mode: ExecutionMode | None = None,
585 **metadata,
586 ) -> SwarmResult:
587 """Execute a task. Delegates to smart_execute or raw topology."""
588 mode = mode or self.execution_mode
589 if mode == ExecutionMode.SMART:
590 return await self.smart_execute(task, **metadata)
591 return await self._execute_raw(task, **metadata)
593 async def smart_execute(
594 self,
595 task: Any,
596 _trace: ExecutionTrace | None = None,
597 **metadata,
598 ) -> SwarmResult:
599 """Smart execution: decompose → execute DAG → fuse.
601 Uses ExecutionTrace for observability when tracer is available.
603 Args:
604 task: Task description (string or structured)
605 _trace: Optional trace to attach (auto-created if self.tracer exists)
606 **metadata: Additional metadata
608 Returns:
609 SwarmResult with fused output and decomposition trace
610 """
611 start_time = time.time()
612 task_str = str(task)
614 # Step 0: Security guard — input filtering
615 guard_result = self.guard.process_input(task_str)
616 if guard_result.blocked:
617 result = SwarmResult(
618 topology=self.topology,
619 mode=ExecutionMode.SMART,
620 output=f"[BLOCKED] Input rejected by guard: {guard_result.blocked_by}. Reason: {', '.join(guard_result.warnings)}",
621 completed=False,
622 )
623 return result
624 if guard_result.final_content != task_str:
625 task_str = guard_result.final_content # PII-redacted version
627 # Trace setup
628 trace = _trace
629 if trace is None and self.tracer is not None:
630 trace = ExecutionTrace(task_name=task_str[:80])
631 self.tracer.add(trace)
633 if trace:
634 root = trace.start_span(TraceEvent.TASK_START, name="smart_execute", data={"task": task_str})
636 result = SwarmResult(
637 topology=self.topology,
638 mode=ExecutionMode.SMART,
639 )
641 agent_names = self.list_agents()
643 # Step 0: Load memory context
644 self.memory.set_task(task_str)
645 memory_context = self.memory.get_context(query=task_str) if self.memory else ""
647 # Step 1: Decompose
648 if trace:
649 dspan = trace.start_span(TraceEvent.DECOMPOSE, name="decompose")
650 decomp = self.decomposer.decompose(task_str, agents=agent_names)
651 result.decomposition = decomp
652 if trace and dspan:
653 trace.end_span(dspan.id, status="done", data={"sub_tasks": len(decomp.sub_tasks)})
655 # Step 2: Execute sub-tasks in dependency order
656 sub_outputs: dict[str, dict[str, Any]] = {}
657 completed: set[str] = set()
659 for _round in range(self.max_rounds):
660 ready = [
661 st for st in decomp.sub_tasks
662 if st.status == "pending"
663 and all(dep in completed for dep in st.depends_on)
664 ]
665 if not ready:
666 break
668 for st in ready:
669 st.status = "running"
671 if trace:
672 stspan = trace.start_span(TraceEvent.SUBTASK_START, name=st.description[:60], data={"id": st.id})
674 # Build context from dependencies
675 context = task_str
676 if memory_context:
677 context = f"{memory_context}\n\n[Current Task]\n{task_str}"
678 if st.depends_on:
679 dep_contexts = []
680 for dep_id in st.depends_on:
681 dep_outputs = sub_outputs.get(dep_id, {})
682 for name, out in dep_outputs.items():
683 dep_contexts.append(f"[{name}]: {str(out)[:300]}")
684 if dep_contexts:
685 context = f"{context}\n\nPrevious results:\n" + "\n".join(dep_contexts)
687 # Execute with all agents on this sub-task
688 topo_result = await self._execute_raw(context, **metadata)
689 sub_outputs[st.id] = topo_result.outputs
690 st.output = topo_result.outputs
691 st.status = "done" if topo_result.success else "failed"
692 completed.add(st.id)
694 # Store sub-task result in memory
695 self.memory.remember(
696 content=f"SubTask [{st.description}]: {json.dumps(topo_result.outputs, default=str)[:500]}",
697 role="assistant",
698 importance=0.6,
699 metadata={"subtask_id": st.id, "status": st.status},
700 )
702 if trace and stspan:
703 st_status = "done" if st.status == "done" else "failed"
704 trace.end_span(stspan.id, status=st_status, data={"output_keys": list(topo_result.outputs.keys())})
706 # Step 3: Fuse results from final sub-tasks
707 if trace:
708 fspan = trace.start_span(TraceEvent.FUSE, name="fuse_results")
710 final_subtasks = [
711 st for st in decomp.sub_tasks
712 if st.status == "done" and st.id not in {
713 s.id for s in decomp.sub_tasks
714 if any(d == st.id for d in s.depends_on)
715 }
716 ]
717 if final_subtasks:
718 all_final: dict[str, Any] = {}
719 for st in final_subtasks:
720 if st.output:
721 all_final.update(st.output)
722 if all_final:
723 fused = self.fusion.fuse(task_str, all_final)
724 result.fused = fused
725 result.outputs = all_final
726 result.success = fused.confidence >= 0.3
728 if not result.outputs and sub_outputs:
729 all_outputs: dict[str, Any] = {}
730 for st_outputs in sub_outputs.values():
731 all_outputs.update(st_outputs)
732 if all_outputs:
733 fused = self.fusion.fuse(task_str, all_outputs)
734 result.fused = fused
735 result.outputs = all_outputs
736 result.success = fused.confidence >= 0.3
738 if trace and fspan:
739 trace.end_span(fspan.id, status="done", data={"confidence": result.fused.confidence if result.fused else 0})
741 result.duration = time.time() - start_time
743 if trace and root:
744 trace.end_span(root.id, status="done" if result.success else "failed")
746 # Output guard — filter agent output before returning to user
747 if result.outputs:
748 guarded_outputs: dict[str, Any] = {}
749 for key, value in result.outputs.items():
750 output_str = str(value)
751 output_guard = self.guard.process_output(output_str)
752 if output_guard.blocked:
753 guarded_outputs[key] = f"[BLOCKED by guard: {output_guard.blocked_by}]"
754 elif output_guard.final_content != output_str:
755 guarded_outputs[key] = output_guard.final_content
756 else:
757 guarded_outputs[key] = value
758 result.outputs = guarded_outputs
760 return result
762 async def execute_with_feedback(
763 self,
764 task: Any,
765 expected_output: str = "",
766 scoring_strategy: str = "general",
767 retry_config: RetryConfig | None = None,
768 **metadata,
769 ) -> SwarmResult:
770 """Execution with eval-driven feedback loop.
772 Args:
773 task: Task description
774 expected_output: Reference for scoring
775 scoring_strategy: Scoring strategy (qa/code/summary/translation)
776 retry_config: Retry configuration
777 **metadata: Additional metadata
779 Returns:
780 SwarmResult with feedback_loop trace
781 """
782 start_time = time.time()
783 result = SwarmResult(
784 topology=self.topology,
785 mode=ExecutionMode.FEEDBACK,
786 )
788 task_str = str(task)
790 # Build executor that uses smart_execute
791 async def executor(t: str) -> Any:
792 r = await self.smart_execute(t, **metadata)
793 fused = r.fused
794 if fused and fused.merged:
795 content = fused.merged
796 # If it's a dict with agent outputs, stringify
797 if isinstance(content, dict):
798 parts = []
799 for k, v in content.items():
800 if v and not isinstance(v, dict):
801 parts.append(str(v))
802 return "\n".join(parts) if parts else str(content)
803 return str(content)
804 return str(r.outputs)
806 # Wire scorer if available
807 scorer = None
808 try:
809 from agentos.evaluation.scorers import CompositeScorerV2
810 scorer = CompositeScorerV2()
811 except Exception:
812 pass
814 feedback = EvalFeedbackLoop(
815 scorer=scorer,
816 config=retry_config or RetryConfig(max_retries=3),
817 )
819 loop_result = await feedback.run(
820 task=task_str,
821 executor=executor,
822 expected=expected_output,
823 strategy=scoring_strategy,
824 )
826 result.feedback_loop = loop_result
827 result.outputs = {"final": str(loop_result.final_output) if loop_result.final_output else ""}
828 result.success = loop_result.converged
829 result.duration = time.time() - start_time
830 return result
832 # ── Raw topology execution (backward compatible) ──────────────
834 async def _execute_raw(
835 self,
836 task: Any,
837 **metadata,
838 ) -> SwarmResult:
839 """Original topology-only execution."""
840 handler = self._topo_handlers.get(self.topology)
841 if handler is None:
842 raise ValueError(f"Unknown topology: {self.topology}")
843 return await handler(task, metadata)
845 # ── Star Topology ─────────────────────────────────────────────
847 async def _execute_star(
848 self,
849 task: Any,
850 metadata: dict[str, Any],
851 ) -> SwarmResult:
852 result = SwarmResult(topology=SwarmTopology.STAR, mode=ExecutionMode.RAW)
853 for agent_name, agent in self._agents.items():
854 try:
855 message = SwarmMessage(
856 sender="coordinator",
857 receiver=agent_name,
858 content=task,
859 metadata=metadata,
860 )
861 result.messages.append(message)
862 output = await agent.invoke(task, **metadata)
863 result.outputs[agent_name] = output
864 response = SwarmMessage(
865 sender=agent_name,
866 receiver="coordinator",
867 content=output,
868 )
869 result.messages.append(response)
870 except Exception as e:
871 result.outputs[agent_name] = {"error": str(e)}
872 result.success = False
873 return result
875 # ── Ring Topology ─────────────────────────────────────────────
877 async def _execute_ring(
878 self,
879 task: Any,
880 metadata: dict[str, Any],
881 ) -> SwarmResult:
882 result = SwarmResult(topology=SwarmTopology.RING, mode=ExecutionMode.RAW)
883 agent_names = list(self._agents.keys())
884 if not agent_names:
885 return result
887 current_input = task
888 for i, agent_name in enumerate(agent_names):
889 agent = self._agents[agent_name]
890 next_agent = agent_names[(i + 1) % len(agent_names)]
891 try:
892 output = await agent.invoke(current_input, **metadata)
893 result.outputs[agent_name] = output
894 message = SwarmMessage(
895 sender=agent_name,
896 receiver=next_agent,
897 content=output,
898 )
899 result.messages.append(message)
900 current_input = output
901 except Exception as e:
902 result.outputs[agent_name] = {"error": str(e)}
903 result.success = False
904 return result
906 # ── Mesh Topology ─────────────────────────────────────────────
908 async def _execute_mesh(
909 self,
910 task: Any,
911 metadata: dict[str, Any],
912 ) -> SwarmResult:
913 result = SwarmResult(topology=SwarmTopology.MESH, mode=ExecutionMode.RAW)
914 tasks_ = []
915 for agent_name, agent in self._agents.items():
916 tasks_.append(self._execute_agent_mesh(agent, task, metadata, result))
917 await asyncio.gather(*tasks_, return_exceptions=True)
918 for sender_name, output in result.outputs.items():
919 for receiver_name in self._agents.keys():
920 if sender_name != receiver_name:
921 message = SwarmMessage(
922 sender=sender_name,
923 receiver=receiver_name,
924 content=output,
925 )
926 result.messages.append(message)
927 return result
929 async def _execute_agent_mesh(
930 self,
931 agent: Agent[Any, Any],
932 task: Any,
933 metadata: dict[str, Any],
934 result: SwarmResult,
935 ) -> None:
936 try:
937 output = await agent.invoke(task, **metadata)
938 result.outputs[agent.name] = output
939 except Exception as e:
940 result.outputs[agent.name] = {"error": str(e)}
941 result.success = False
943 # ── Tree Topology ─────────────────────────────────────────────
945 async def _execute_tree(
946 self,
947 task: Any,
948 metadata: dict[str, Any],
949 ) -> SwarmResult:
950 result = SwarmResult(topology=SwarmTopology.TREE, mode=ExecutionMode.RAW)
951 agent_names = list(self._agents.keys())
952 if not agent_names:
953 return result
955 root_name = agent_names[0]
956 root_agent = self._agents[root_name]
957 try:
958 root_output = await root_agent.invoke(task, **metadata)
959 result.outputs[root_name] = root_output
960 except Exception as e:
961 result.outputs[root_name] = {"error": str(e)}
962 result.success = False
963 return result
965 children = agent_names[1:]
966 for child_name in children:
967 child_agent = self._agents[child_name]
968 message = SwarmMessage(
969 sender=root_name,
970 receiver=child_name,
971 content=root_output,
972 )
973 result.messages.append(message)
974 try:
975 child_output = await child_agent.invoke(root_output, **metadata)
976 result.outputs[child_name] = child_output
977 response = SwarmMessage(
978 sender=child_name,
979 receiver=root_name,
980 content=child_output,
981 )
982 result.messages.append(response)
983 except Exception as e:
984 result.outputs[child_name] = {"error": str(e)}
985 result.success = False
986 return result
988 # ── Messaging ─────────────────────────────────────────────────
990 def send_message(
991 self,
992 sender: str,
993 receiver: Optional[str],
994 content: Any,
995 **metadata,
996 ) -> SwarmMessage:
997 message = SwarmMessage(
998 sender=sender,
999 receiver=receiver,
1000 content=content,
1001 metadata=metadata,
1002 )
1003 self._message_queue.append(message)
1004 return message
1006 def get_messages(
1007 self,
1008 receiver: Optional[str] = None,
1009 ) -> list[SwarmMessage]:
1010 if receiver:
1011 return [
1012 m for m in self._message_queue
1013 if m.receiver == receiver or m.receiver is None
1014 ]
1015 return self._message_queue.copy()
1017 def clear_messages(self) -> None:
1018 self._message_queue.clear()
1020 # ── Code Sandbox Execution (v1.9.5) ───────────────────────────
1022 async def execute_code(
1023 self,
1024 code: str,
1025 func_name: str = "",
1026 test_cases: list[TestCase] | None = None,
1027 setup_code: str = "",
1028 sandbox: CodeSandbox | None = None,
1029 max_retries: int = 3,
1030 code_generator: Callable[[str, list[str]], str] | None = None,
1031 ) -> SandboxResult:
1032 """Execute code in sandbox with test cases and feedback-driven retry.
1034 Supports code generation: if code_generator is provided and initial run
1035 fails, it will use the feedback extractor to guide re-generation.
1037 Args:
1038 code: Code to execute (or initial code if using generator)
1039 func_name: Function name to test
1040 test_cases: Test cases for validation
1041 setup_code: Setup code (imports, fixtures)
1042 sandbox: Custom sandbox instance
1043 max_retries: Max retry attempts with code generation
1044 code_generator: Callable(spec, feedback_suggestions) → new_code
1046 Returns:
1047 SandboxResult with execution details and test outcomes
1048 """
1049 sb = sandbox or self.sandbox
1051 result = sb.run(code, func_name, test_cases, setup_code)
1053 # If initial run succeeded, we're done
1054 if result.all_passed:
1055 return result
1057 # Feedback-driven retry loop
1058 for attempt in range(1, max_retries + 1):
1059 if not code_generator:
1060 break
1062 suggestions = CodeFeedbackExtractor.extract(result)
1063 if not suggestions:
1064 break
1066 # Generate improved code
1067 spec = f"Function: {func_name}, Test cases: {len(test_cases or [])}"
1068 try:
1069 new_code = code_generator(spec, suggestions)
1070 except Exception:
1071 break
1073 if not new_code or new_code == code:
1074 break
1076 code = new_code
1077 result = sb.run(code, func_name, test_cases, setup_code)
1079 if result.all_passed:
1080 break
1082 if attempt == max_retries:
1083 break # Don't overwrite last result
1085 return result
1087 # ── HITL-Enhanced Execution (v1.9.5) ──────────────────────────
1089 async def smart_execute_with_hitl(
1090 self,
1091 task: Any,
1092 hitl: HITLManager | None = None,
1093 **metadata,
1094 ) -> SwarmResult:
1095 """Smart execution with human-in-the-loop breakpoints.
1097 Same as smart_execute but pauses at configurable checkpoints:
1098 - Before each sub-task (if hitl.break_on_every_task)
1099 - On sub-task failure (if hitl.break_on_failure)
1100 - On low-confidence fusion (if config threshold met)
1102 Args:
1103 task: Task description
1104 hitl: HITLManager instance (uses self.hitl if None)
1105 **metadata: Additional metadata
1107 Returns:
1108 SwarmResult with fused output
1109 """
1110 hitl_mgr = hitl or self.hitl
1111 start_time = time.time()
1112 result = SwarmResult(
1113 topology=self.topology,
1114 mode=ExecutionMode.SMART,
1115 )
1117 task_str = str(task)
1118 agent_names = self.list_agents()
1120 # Step 1: Decompose
1121 decomp = self.decomposer.decompose(task_str, agents=agent_names)
1122 result.decomposition = decomp
1124 # Step 2: Execute sub-tasks with HITL gates
1125 sub_outputs: dict[str, dict[str, Any]] = {}
1126 completed: set[str] = set()
1127 aborted = False
1129 for _round in range(self.max_rounds):
1130 if aborted:
1131 break
1133 ready = [
1134 st for st in decomp.sub_tasks
1135 if st.status == "pending"
1136 and all(dep in completed for dep in st.depends_on)
1137 ]
1138 if not ready:
1139 break
1141 for st in ready:
1142 # HITL: check before executing sub-task
1143 if hitl_mgr.config.break_on_every_task:
1144 decision, feedback = await hitl_mgr.request_decision(
1145 bp_type=BreakpointType.BEFORE_TASK,
1146 task_id=st.id,
1147 message=f"Execute sub-task: {st.description}?",
1148 context={"task": task_str, "sub_task": st.description},
1149 options=["approve", "abort", "modify"],
1150 )
1151 if decision == HumanDecision.ABORT:
1152 aborted = True
1153 break
1154 if decision == HumanDecision.MODIFY and feedback:
1155 st.description = f"{st.description} [modified: {feedback}]"
1157 st.status = "running"
1159 # Build context from dependencies
1160 context = task_str
1161 if st.depends_on:
1162 dep_contexts = []
1163 for dep_id in st.depends_on:
1164 dep_outputs = sub_outputs.get(dep_id, {})
1165 for name, out in dep_outputs.items():
1166 dep_contexts.append(f"[{name}]: {str(out)[:300]}")
1167 if dep_contexts:
1168 context = f"{task_str}\n\nPrevious results:\n" + "\n".join(dep_contexts)
1170 # Execute
1171 topo_result = await self._execute_raw(context, **metadata)
1172 sub_outputs[st.id] = topo_result.outputs
1173 st.output = topo_result.outputs
1174 st.status = "done" if topo_result.success else "failed"
1175 completed.add(st.id)
1177 # HITL: check on failure
1178 if not topo_result.success:
1179 decision, feedback = await hitl_mgr.should_break_on_failure(
1180 task_id=st.id,
1181 error=topo_result.error or "Unknown error",
1182 attempt=1,
1183 )
1184 if decision == HumanDecision.ABORT:
1185 aborted = True
1186 break
1187 if decision == HumanDecision.MODIFY and feedback:
1188 st.description = f"{st.description} [retry with: {feedback}]"
1189 st.status = "pending" # Re-queue for retry
1190 completed.discard(st.id)
1191 del sub_outputs[st.id]
1193 if aborted:
1194 result.success = False
1195 result.error = "Aborted by human"
1196 return result
1198 # Step 3: Fuse results
1199 final_subtasks = [
1200 st for st in decomp.sub_tasks
1201 if st.status == "done" and st.id not in {
1202 s.id for s in decomp.sub_tasks
1203 if any(d == st.id for d in s.depends_on)
1204 }
1205 ]
1206 if final_subtasks:
1207 all_final: dict[str, Any] = {}
1208 for st in final_subtasks:
1209 if st.output:
1210 all_final.update(st.output)
1211 if all_final:
1212 fused = self.fusion.fuse(task_str, all_final)
1213 result.fused = fused
1214 result.outputs = all_final
1216 # HITL: check low confidence
1217 if fused.confidence < hitl_mgr.config.break_on_low_confidence:
1218 decision, feedback = await hitl_mgr.should_break_on_result(
1219 task_id="final",
1220 output=all_final,
1221 confidence=fused.confidence,
1222 )
1223 if decision == HumanDecision.ABORT:
1224 result.success = False
1225 result.error = "Aborted by human at final result"
1226 return result
1227 if decision == HumanDecision.REJECT:
1228 result.success = False
1229 result.error = f"Rejected: {feedback}"
1230 return result
1232 result.success = fused.confidence >= 0.3
1234 if not result.outputs and sub_outputs:
1235 all_outputs: dict[str, Any] = {}
1236 for st_outputs in sub_outputs.values():
1237 all_outputs.update(st_outputs)
1238 if all_outputs:
1239 fused = self.fusion.fuse(task_str, all_outputs)
1240 result.fused = fused
1241 result.outputs = all_outputs
1242 result.success = fused.confidence >= 0.3
1244 result.duration = time.time() - start_time
1245 return result
1247 # ── Monitored Execution (v1.9.6) ─────────────────────────────
1249 async def monitor_execute(
1250 self,
1251 task: Any,
1252 quality_gates: list[QualityGate] | None = None,
1253 fallback_fn: Callable[[], Any] | None = None,
1254 **metadata,
1255 ) -> tuple[Any, MonitorReport]:
1256 """Execute with automatic quality gating.
1258 Runs smart_execute through the AgentMonitor pipeline. If gates fail,
1259 automatically retries or falls back based on gate configuration.
1261 Args:
1262 task: Task description
1263 quality_gates: Custom quality gates (uses monitor defaults if None)
1264 fallback_fn: Fallback function if all gates fail
1265 **metadata: Additional metadata
1267 Returns:
1268 Tuple of (final_output, MonitorReport)
1269 """
1270 # Configure monitor with custom gates if provided
1271 monitor = self.monitor
1272 if quality_gates:
1273 monitor = AgentMonitor(
1274 max_retries=self.monitor.max_retries,
1275 default_fallback=self.monitor.default_fallback,
1276 )
1277 monitor.add_gates(quality_gates)
1278 elif not self.monitor._gates:
1279 # Default gates if none configured
1280 monitor = AgentMonitor()
1281 monitor.add_gates([
1282 output_not_empty(),
1283 no_error_output(),
1284 ])
1286 # Track latency for latency gates
1287 start = time.time()
1289 async def execute_fn() -> Any:
1290 result = await self.smart_execute(task, **metadata)
1291 fused = result.fused
1292 if fused and fused.merged:
1293 return fused.merged
1294 return result.outputs
1296 output, report = await monitor.monitor_execution(
1297 task_fn=execute_fn,
1298 task_name=str(task)[:80],
1299 context={"_latency_ms": 0},
1300 fallback_fn=fallback_fn,
1301 )
1303 # Inject actual latency
1304 elapsed = (time.time() - start) * 1000
1305 for gate in report.gates:
1306 gate.data["_latency_ms"] = elapsed
1308 return output, report
1310 # ── Tool Registry Convenience Methods ─────────────────────────
1312 def register_tool(
1313 self,
1314 name: str,
1315 description: str,
1316 handler: Callable,
1317 category: ToolCategory = ToolCategory.CUSTOM,
1318 params: list[ToolParam] | None = None,
1319 capabilities: list[str] | None = None,
1320 tags: list[str] | None = None,
1321 is_destructive: bool = False,
1322 rate_limit: int = 0,
1323 **kwargs,
1324 ) -> ToolSchema:
1325 """Register a tool in the coordinator's tool registry."""
1326 tool = create_tool(
1327 name=name, description=description, handler=handler,
1328 category=category, params=params or [],
1329 capabilities=capabilities or [], tags=tags or [],
1330 is_destructive=is_destructive, rate_limit=rate_limit, **kwargs,
1331 )
1332 return self.tool_registry.register(tool)
1334 def find_tool(self, query: str, top_k: int = 5) -> list[tuple[ToolSchema, float]]:
1335 """Search for tools matching a natural language query."""
1336 return self.tool_registry.search(query, top_k=top_k)
1338 def route_tool(self, task: str, **ctx_kwargs) -> RoutingDecision:
1339 """Route a task to the best matching tool."""
1340 context = RoutingContext(task=task, **ctx_kwargs)
1341 return self.tool_router.route(context)
1343 def execute_tool(self, tool_name: str, params: dict[str, Any] | None = None, force: bool = False) -> Any:
1344 """Execute a registered tool safely."""
1345 return self.tool_executor.execute(tool_name, params, force=force)
1348# ── Backward-compatible alias ─────────────────────────────────────
1349SwarmCoordinator = SmartSwarmCoordinator