Coverage for agentos/tools/fusion.py: 31%

156 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Fusion Toolkit for NexusAgent. 

3 

4Multi-tool coordination system. Allows agents to use 

5multiple tools in sequence or parallel, with automatic 

6result fusion and conflict resolution. 

7""" 

8 

9from __future__ import annotations 

10 

11import asyncio 

12import time 

13import uuid 

14from dataclasses import dataclass, field 

15from enum import Enum 

16from typing import Any, Callable, Optional 

17 

18 

19class FusionMode(str, Enum): 

20 """Tool fusion modes.""" 

21 SEQUENTIAL = "sequential" # Run tools one by one 

22 PARALLEL = "parallel" # Run tools in parallel 

23 CHAIN = "chain" # Output of one feeds into next 

24 

25 

26@dataclass 

27class ToolSpec: 

28 """ 

29 Tool specification. 

30 

31 Attributes: 

32 name: Tool name 

33 description: Tool description 

34 func: Tool function 

35 parameters: Parameter schema 

36 timeout: Execution timeout 

37 retry_count: Number of retries 

38 """ 

39 name: str 

40 description: str = "" 

41 func: Callable[..., Any] = None 

42 parameters: dict[str, Any] = field(default_factory=dict) 

43 timeout: float = 30.0 

44 retry_count: int = 0 

45 

46 def to_dict(self) -> dict[str, Any]: 

47 """Convert to dict.""" 

48 return { 

49 "name": self.name, 

50 "description": self.description, 

51 "parameters": self.parameters, 

52 "timeout": self.timeout, 

53 "retry_count": self.retry_count, 

54 } 

55 

56 

57@dataclass 

58class ToolResult: 

59 """ 

60 Result of a single tool execution. 

61 

62 Attributes: 

63 tool_name: Name of the tool 

64 success: Whether execution succeeded 

65 output: Tool output 

66 error: Error message (if failed) 

67 duration: Execution duration 

68 """ 

69 tool_name: str 

70 success: bool 

71 output: Any = None 

72 error: Optional[str] = None 

73 duration: float = 0.0 

74 

75 

76@dataclass 

77class FusionResult: 

78 """ 

79 Result of tool fusion. 

80 

81 Attributes: 

82 id: Unique identifier 

83 mode: Fusion mode used 

84 results: List of individual tool results 

85 fused_output: Fused final output 

86 total_duration: Total execution duration 

87 success: Whether fusion succeeded 

88 """ 

89 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

90 mode: FusionMode = FusionMode.SEQUENTIAL 

91 results: list[ToolResult] = field(default_factory=list) 

92 fused_output: Any = None 

93 total_duration: float = 0.0 

94 success: bool = True 

95 

96 def to_dict(self) -> dict[str, Any]: 

97 """Convert to dict.""" 

98 return { 

99 "id": self.id, 

100 "mode": self.mode.value, 

101 "results": [ 

102 { 

103 "tool_name": r.tool_name, 

104 "success": r.success, 

105 "output": r.output, 

106 "error": r.error, 

107 "duration": r.duration, 

108 } 

109 for r in self.results 

110 ], 

111 "fused_output": self.fused_output, 

112 "total_duration": self.total_duration, 

113 "success": self.success, 

114 } 

115 

116 

117class FusionToolkit: 

118 """ 

119 Multi-tool coordination system. 

120 

121 Allows agents to use multiple tools in different modes: 

122 - Sequential: Run tools one by one 

123 - Parallel: Run tools in parallel 

124 - Chain: Output of one feeds into next 

125 

126 Usage: 

127 toolkit = FusionToolkit() 

128 toolkit.register(ToolSpec(name="search", func=search_func)) 

129 toolkit.register(ToolSpec(name="summarize", func=summarize_func)) 

130 

131 # Sequential execution 

132 result = await toolkit.execute(["search", "summarize"], {"query": "AI"}) 

133 

134 # Parallel execution 

135 result = await toolkit.execute_parallel(["search", "summarize"], {"query": "AI"}) 

136 """ 

137 

138 def __init__(self, default_timeout: float = 30.0): 

139 """ 

140 Initialize fusion toolkit. 

141 

142 Args: 

143 default_timeout: Default tool timeout 

144 """ 

145 self._tools: dict[str, ToolSpec] = {} 

146 self._default_timeout = default_timeout 

147 

148 def register(self, tool: ToolSpec) -> None: 

149 """ 

150 Register a tool. 

151 

152 Args: 

153 tool: Tool specification 

154 """ 

155 self._tools[tool.name] = tool 

156 

157 def unregister(self, tool_name: str) -> bool: 

158 """ 

159 Unregister a tool. 

160 

161 Args: 

162 tool_name: Tool name 

163 

164 Returns: 

165 True if unregistered, False if not found 

166 """ 

167 if tool_name in self._tools: 

168 del self._tools[tool_name] 

169 return True 

170 return False 

171 

172 def get_tool(self, tool_name: str) -> Optional[ToolSpec]: 

173 """ 

174 Get a tool by name. 

175 

176 Args: 

177 tool_name: Tool name 

178 

179 Returns: 

180 ToolSpec if found, None otherwise 

181 """ 

182 return self._tools.get(tool_name) 

183 

184 def list_tools(self) -> list[ToolSpec]: 

185 """ 

186 List all registered tools. 

187 

188 Returns: 

189 List of ToolSpec 

190 """ 

191 return list(self._tools.values()) 

192 

193 async def execute( 

194 self, 

195 tool_names: list[str], 

196 inputs: dict[str, Any], 

197 mode: FusionMode = FusionMode.SEQUENTIAL, 

198 ) -> FusionResult: 

199 """ 

200 Execute multiple tools. 

201 

202 Args: 

203 tool_names: List of tool names 

204 inputs: Input parameters 

205 mode: Fusion mode 

206 

207 Returns: 

208 FusionResult 

209 """ 

210 start_time = time.time() 

211 

212 if mode == FusionMode.SEQUENTIAL: 

213 result = await self._execute_sequential(tool_names, inputs) 

214 elif mode == FusionMode.PARALLEL: 

215 result = await self._execute_parallel(tool_names, inputs) 

216 elif mode == FusionMode.CHAIN: 

217 result = await self._execute_chain(tool_names, inputs) 

218 else: 

219 raise ValueError(f"Unknown fusion mode: {mode}") 

220 

221 result.total_duration = time.time() - start_time 

222 

223 return result 

224 

225 async def _execute_sequential( 

226 self, 

227 tool_names: list[str], 

228 inputs: dict[str, Any], 

229 ) -> FusionResult: 

230 """Execute tools sequentially.""" 

231 result = FusionResult(mode=FusionMode.SEQUENTIAL) 

232 

233 for tool_name in tool_names: 

234 tool = self._tools.get(tool_name) 

235 if not tool: 

236 result.results.append(ToolResult( 

237 tool_name=tool_name, 

238 success=False, 

239 error=f"Tool not found: {tool_name}", 

240 )) 

241 result.success = False 

242 continue 

243 

244 try: 

245 tool_start = time.time() 

246 output = await self._execute_tool(tool, inputs) 

247 duration = time.time() - tool_start 

248 

249 result.results.append(ToolResult( 

250 tool_name=tool_name, 

251 success=True, 

252 output=output, 

253 duration=duration, 

254 )) 

255 except Exception as e: 

256 result.results.append(ToolResult( 

257 tool_name=tool_name, 

258 success=False, 

259 error=str(e), 

260 )) 

261 result.success = False 

262 

263 # Fuse outputs 

264 result.fused_output = self._fuse_outputs(result.results) 

265 

266 return result 

267 

268 async def _execute_parallel( 

269 self, 

270 tool_names: list[str], 

271 inputs: dict[str, Any], 

272 ) -> FusionResult: 

273 """Execute tools in parallel.""" 

274 result = FusionResult(mode=FusionMode.PARALLEL) 

275 

276 tasks = [] 

277 for tool_name in tool_names: 

278 tool = self._tools.get(tool_name) 

279 if tool: 

280 tasks.append(self._execute_tool_with_result(tool, inputs)) 

281 else: 

282 result.results.append(ToolResult( 

283 tool_name=tool_name, 

284 success=False, 

285 error=f"Tool not found: {tool_name}", 

286 )) 

287 

288 # Execute in parallel 

289 if tasks: 

290 tool_results = await asyncio.gather(*tasks, return_exceptions=True) 

291 for tr in tool_results: 

292 if isinstance(tr, Exception): 

293 result.results.append(ToolResult( 

294 tool_name="unknown", 

295 success=False, 

296 error=str(tr), 

297 )) 

298 result.success = False 

299 else: 

300 result.results.append(tr) 

301 if not tr.success: 

302 result.success = False 

303 

304 # Fuse outputs 

305 result.fused_output = self._fuse_outputs(result.results) 

306 

307 return result 

308 

309 async def _execute_chain( 

310 self, 

311 tool_names: list[str], 

312 inputs: dict[str, Any], 

313 ) -> FusionResult: 

314 """Execute tools in chain (output feeds into next).""" 

315 result = FusionResult(mode=FusionMode.CHAIN) 

316 current_input = inputs.copy() 

317 

318 for tool_name in tool_names: 

319 tool = self._tools.get(tool_name) 

320 if not tool: 

321 result.results.append(ToolResult( 

322 tool_name=tool_name, 

323 success=False, 

324 error=f"Tool not found: {tool_name}", 

325 )) 

326 result.success = False 

327 break 

328 

329 try: 

330 tool_start = time.time() 

331 output = await self._execute_tool(tool, current_input) 

332 duration = time.time() - tool_start 

333 

334 result.results.append(ToolResult( 

335 tool_name=tool_name, 

336 success=True, 

337 output=output, 

338 duration=duration, 

339 )) 

340 

341 # Feed output into next tool 

342 current_input = {"input": output, **inputs} 

343 except Exception as e: 

344 result.results.append(ToolResult( 

345 tool_name=tool_name, 

346 success=False, 

347 error=str(e), 

348 )) 

349 result.success = False 

350 break 

351 

352 # Final output is last tool's output 

353 if result.results: 

354 last_result = result.results[-1] 

355 if last_result.success: 

356 result.fused_output = last_result.output 

357 

358 return result 

359 

360 async def _execute_tool( 

361 self, 

362 tool: ToolSpec, 

363 inputs: dict[str, Any], 

364 ) -> Any: 

365 """Execute a single tool.""" 

366 if not tool.func: 

367 raise ValueError(f"Tool {tool.name} has no function") 

368 

369 # Apply timeout 

370 try: 

371 if asyncio.iscoroutinefunction(tool.func): 

372 return await asyncio.wait_for( 

373 tool.func(**inputs), 

374 timeout=tool.timeout or self._default_timeout, 

375 ) 

376 else: 

377 return await asyncio.wait_for( 

378 asyncio.get_event_loop().run_in_executor( 

379 None, lambda: tool.func(**inputs) 

380 ), 

381 timeout=tool.timeout or self._default_timeout, 

382 ) 

383 except asyncio.TimeoutError: 

384 raise TimeoutError(f"Tool {tool.name} timed out") 

385 

386 async def _execute_tool_with_result( 

387 self, 

388 tool: ToolSpec, 

389 inputs: dict[str, Any], 

390 ) -> ToolResult: 

391 """Execute tool and return ToolResult.""" 

392 try: 

393 tool_start = time.time() 

394 output = await self._execute_tool(tool, inputs) 

395 duration = time.time() - tool_start 

396 

397 return ToolResult( 

398 tool_name=tool.name, 

399 success=True, 

400 output=output, 

401 duration=duration, 

402 ) 

403 except Exception as e: 

404 return ToolResult( 

405 tool_name=tool.name, 

406 success=False, 

407 error=str(e), 

408 ) 

409 

410 def _fuse_outputs(self, results: list[ToolResult]) -> Any: 

411 """Fuse multiple tool outputs.""" 

412 outputs = [r.output for r in results if r.success and r.output is not None] 

413 

414 if not outputs: 

415 return None 

416 

417 if len(outputs) == 1: 

418 return outputs[0] 

419 

420 # Default fusion: merge dicts, concatenate lists 

421 if all(isinstance(o, dict) for o in outputs): 

422 fused = {} 

423 for o in outputs: 

424 fused.update(o) 

425 return fused 

426 

427 if all(isinstance(o, list) for o in outputs): 

428 return [item for o in outputs for item in o] 

429 

430 # Default: return list of outputs 

431 return outputs