Coverage for agentos/tools/fusion.py: 31%
156 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Fusion Toolkit for NexusAgent.
4Multi-tool coordination system. Allows agents to use
5multiple tools in sequence or parallel, with automatic
6result fusion and conflict resolution.
7"""
9from __future__ import annotations
11import asyncio
12import time
13import uuid
14from dataclasses import dataclass, field
15from enum import Enum
16from typing import Any, Callable, Optional
19class FusionMode(str, Enum):
20 """Tool fusion modes."""
21 SEQUENTIAL = "sequential" # Run tools one by one
22 PARALLEL = "parallel" # Run tools in parallel
23 CHAIN = "chain" # Output of one feeds into next
26@dataclass
27class ToolSpec:
28 """
29 Tool specification.
31 Attributes:
32 name: Tool name
33 description: Tool description
34 func: Tool function
35 parameters: Parameter schema
36 timeout: Execution timeout
37 retry_count: Number of retries
38 """
39 name: str
40 description: str = ""
41 func: Callable[..., Any] = None
42 parameters: dict[str, Any] = field(default_factory=dict)
43 timeout: float = 30.0
44 retry_count: int = 0
46 def to_dict(self) -> dict[str, Any]:
47 """Convert to dict."""
48 return {
49 "name": self.name,
50 "description": self.description,
51 "parameters": self.parameters,
52 "timeout": self.timeout,
53 "retry_count": self.retry_count,
54 }
57@dataclass
58class ToolResult:
59 """
60 Result of a single tool execution.
62 Attributes:
63 tool_name: Name of the tool
64 success: Whether execution succeeded
65 output: Tool output
66 error: Error message (if failed)
67 duration: Execution duration
68 """
69 tool_name: str
70 success: bool
71 output: Any = None
72 error: Optional[str] = None
73 duration: float = 0.0
76@dataclass
77class FusionResult:
78 """
79 Result of tool fusion.
81 Attributes:
82 id: Unique identifier
83 mode: Fusion mode used
84 results: List of individual tool results
85 fused_output: Fused final output
86 total_duration: Total execution duration
87 success: Whether fusion succeeded
88 """
89 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
90 mode: FusionMode = FusionMode.SEQUENTIAL
91 results: list[ToolResult] = field(default_factory=list)
92 fused_output: Any = None
93 total_duration: float = 0.0
94 success: bool = True
96 def to_dict(self) -> dict[str, Any]:
97 """Convert to dict."""
98 return {
99 "id": self.id,
100 "mode": self.mode.value,
101 "results": [
102 {
103 "tool_name": r.tool_name,
104 "success": r.success,
105 "output": r.output,
106 "error": r.error,
107 "duration": r.duration,
108 }
109 for r in self.results
110 ],
111 "fused_output": self.fused_output,
112 "total_duration": self.total_duration,
113 "success": self.success,
114 }
117class FusionToolkit:
118 """
119 Multi-tool coordination system.
121 Allows agents to use multiple tools in different modes:
122 - Sequential: Run tools one by one
123 - Parallel: Run tools in parallel
124 - Chain: Output of one feeds into next
126 Usage:
127 toolkit = FusionToolkit()
128 toolkit.register(ToolSpec(name="search", func=search_func))
129 toolkit.register(ToolSpec(name="summarize", func=summarize_func))
131 # Sequential execution
132 result = await toolkit.execute(["search", "summarize"], {"query": "AI"})
134 # Parallel execution
135 result = await toolkit.execute_parallel(["search", "summarize"], {"query": "AI"})
136 """
138 def __init__(self, default_timeout: float = 30.0):
139 """
140 Initialize fusion toolkit.
142 Args:
143 default_timeout: Default tool timeout
144 """
145 self._tools: dict[str, ToolSpec] = {}
146 self._default_timeout = default_timeout
148 def register(self, tool: ToolSpec) -> None:
149 """
150 Register a tool.
152 Args:
153 tool: Tool specification
154 """
155 self._tools[tool.name] = tool
157 def unregister(self, tool_name: str) -> bool:
158 """
159 Unregister a tool.
161 Args:
162 tool_name: Tool name
164 Returns:
165 True if unregistered, False if not found
166 """
167 if tool_name in self._tools:
168 del self._tools[tool_name]
169 return True
170 return False
172 def get_tool(self, tool_name: str) -> Optional[ToolSpec]:
173 """
174 Get a tool by name.
176 Args:
177 tool_name: Tool name
179 Returns:
180 ToolSpec if found, None otherwise
181 """
182 return self._tools.get(tool_name)
184 def list_tools(self) -> list[ToolSpec]:
185 """
186 List all registered tools.
188 Returns:
189 List of ToolSpec
190 """
191 return list(self._tools.values())
193 async def execute(
194 self,
195 tool_names: list[str],
196 inputs: dict[str, Any],
197 mode: FusionMode = FusionMode.SEQUENTIAL,
198 ) -> FusionResult:
199 """
200 Execute multiple tools.
202 Args:
203 tool_names: List of tool names
204 inputs: Input parameters
205 mode: Fusion mode
207 Returns:
208 FusionResult
209 """
210 start_time = time.time()
212 if mode == FusionMode.SEQUENTIAL:
213 result = await self._execute_sequential(tool_names, inputs)
214 elif mode == FusionMode.PARALLEL:
215 result = await self._execute_parallel(tool_names, inputs)
216 elif mode == FusionMode.CHAIN:
217 result = await self._execute_chain(tool_names, inputs)
218 else:
219 raise ValueError(f"Unknown fusion mode: {mode}")
221 result.total_duration = time.time() - start_time
223 return result
225 async def _execute_sequential(
226 self,
227 tool_names: list[str],
228 inputs: dict[str, Any],
229 ) -> FusionResult:
230 """Execute tools sequentially."""
231 result = FusionResult(mode=FusionMode.SEQUENTIAL)
233 for tool_name in tool_names:
234 tool = self._tools.get(tool_name)
235 if not tool:
236 result.results.append(ToolResult(
237 tool_name=tool_name,
238 success=False,
239 error=f"Tool not found: {tool_name}",
240 ))
241 result.success = False
242 continue
244 try:
245 tool_start = time.time()
246 output = await self._execute_tool(tool, inputs)
247 duration = time.time() - tool_start
249 result.results.append(ToolResult(
250 tool_name=tool_name,
251 success=True,
252 output=output,
253 duration=duration,
254 ))
255 except Exception as e:
256 result.results.append(ToolResult(
257 tool_name=tool_name,
258 success=False,
259 error=str(e),
260 ))
261 result.success = False
263 # Fuse outputs
264 result.fused_output = self._fuse_outputs(result.results)
266 return result
268 async def _execute_parallel(
269 self,
270 tool_names: list[str],
271 inputs: dict[str, Any],
272 ) -> FusionResult:
273 """Execute tools in parallel."""
274 result = FusionResult(mode=FusionMode.PARALLEL)
276 tasks = []
277 for tool_name in tool_names:
278 tool = self._tools.get(tool_name)
279 if tool:
280 tasks.append(self._execute_tool_with_result(tool, inputs))
281 else:
282 result.results.append(ToolResult(
283 tool_name=tool_name,
284 success=False,
285 error=f"Tool not found: {tool_name}",
286 ))
288 # Execute in parallel
289 if tasks:
290 tool_results = await asyncio.gather(*tasks, return_exceptions=True)
291 for tr in tool_results:
292 if isinstance(tr, Exception):
293 result.results.append(ToolResult(
294 tool_name="unknown",
295 success=False,
296 error=str(tr),
297 ))
298 result.success = False
299 else:
300 result.results.append(tr)
301 if not tr.success:
302 result.success = False
304 # Fuse outputs
305 result.fused_output = self._fuse_outputs(result.results)
307 return result
309 async def _execute_chain(
310 self,
311 tool_names: list[str],
312 inputs: dict[str, Any],
313 ) -> FusionResult:
314 """Execute tools in chain (output feeds into next)."""
315 result = FusionResult(mode=FusionMode.CHAIN)
316 current_input = inputs.copy()
318 for tool_name in tool_names:
319 tool = self._tools.get(tool_name)
320 if not tool:
321 result.results.append(ToolResult(
322 tool_name=tool_name,
323 success=False,
324 error=f"Tool not found: {tool_name}",
325 ))
326 result.success = False
327 break
329 try:
330 tool_start = time.time()
331 output = await self._execute_tool(tool, current_input)
332 duration = time.time() - tool_start
334 result.results.append(ToolResult(
335 tool_name=tool_name,
336 success=True,
337 output=output,
338 duration=duration,
339 ))
341 # Feed output into next tool
342 current_input = {"input": output, **inputs}
343 except Exception as e:
344 result.results.append(ToolResult(
345 tool_name=tool_name,
346 success=False,
347 error=str(e),
348 ))
349 result.success = False
350 break
352 # Final output is last tool's output
353 if result.results:
354 last_result = result.results[-1]
355 if last_result.success:
356 result.fused_output = last_result.output
358 return result
360 async def _execute_tool(
361 self,
362 tool: ToolSpec,
363 inputs: dict[str, Any],
364 ) -> Any:
365 """Execute a single tool."""
366 if not tool.func:
367 raise ValueError(f"Tool {tool.name} has no function")
369 # Apply timeout
370 try:
371 if asyncio.iscoroutinefunction(tool.func):
372 return await asyncio.wait_for(
373 tool.func(**inputs),
374 timeout=tool.timeout or self._default_timeout,
375 )
376 else:
377 return await asyncio.wait_for(
378 asyncio.get_event_loop().run_in_executor(
379 None, lambda: tool.func(**inputs)
380 ),
381 timeout=tool.timeout or self._default_timeout,
382 )
383 except asyncio.TimeoutError:
384 raise TimeoutError(f"Tool {tool.name} timed out")
386 async def _execute_tool_with_result(
387 self,
388 tool: ToolSpec,
389 inputs: dict[str, Any],
390 ) -> ToolResult:
391 """Execute tool and return ToolResult."""
392 try:
393 tool_start = time.time()
394 output = await self._execute_tool(tool, inputs)
395 duration = time.time() - tool_start
397 return ToolResult(
398 tool_name=tool.name,
399 success=True,
400 output=output,
401 duration=duration,
402 )
403 except Exception as e:
404 return ToolResult(
405 tool_name=tool.name,
406 success=False,
407 error=str(e),
408 )
410 def _fuse_outputs(self, results: list[ToolResult]) -> Any:
411 """Fuse multiple tool outputs."""
412 outputs = [r.output for r in results if r.success and r.output is not None]
414 if not outputs:
415 return None
417 if len(outputs) == 1:
418 return outputs[0]
420 # Default fusion: merge dicts, concatenate lists
421 if all(isinstance(o, dict) for o in outputs):
422 fused = {}
423 for o in outputs:
424 fused.update(o)
425 return fused
427 if all(isinstance(o, list) for o in outputs):
428 return [item for o in outputs for item in o]
430 # Default: return list of outputs
431 return outputs