Coverage for agentos/orchestration/distributed.py: 47%

179 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v1.14.2 — Distributed Orchestration (Ray-based Agent Swarm). 

3 

4受 Ray Serve / Ray Core 启发,为 AgentOS 增加分布式编排层。 

5Agent 不再局限于单进程,可以在多台机器上组成 Swarm, 

6自动负载均衡、容错恢复、跨节点通信。 

7 

8Core features: 

9- RayAgentActor: Ray Actor 封装的 Agent 实例 

10- DistSwarmCoordinator: 分布式 Swarm 协调器 

11- AgentPlacementStrategy: 智能 Agent 放置(CPU/GPU/内存感知) 

12- DistTaskQueue: 分布式任务队列(Ray 原生) 

13- CrossNodeBus: 跨节点消息总线 

14- FaultTolerance: Actor 重启/状态恢复 

15 

16Architecture: 

17 DistSwarmCoordinator (head node) 

18 ├── RayAgentActor[0] (worker node 1) 

19 │ ├── ToolAgent instance 

20 │ └── Local memory store 

21 ├── RayAgentActor[1] (worker node 2) 

22 ├── ... 

23 └── DistTaskQueue → automatic load balancing 

24 

25Usage: 

26 coordinator = DistSwarmCoordinator(num_workers=4) 

27 coordinator.start() 

28 result = await coordinator.submit(task="Summarize all PDFs in /data/") 

29 coordinator.shutdown() 

30""" 

31 

32from __future__ import annotations 

33 

34import asyncio 

35import json 

36import os 

37import time 

38import uuid 

39from dataclasses import dataclass, field 

40from enum import Enum 

41from typing import ( 

42 Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union, 

43) 

44 

45# ── Ray Optional Import ──────────────────── 

46 

47_HAS_RAY = False 

48try: 

49 import ray 

50 _HAS_RAY = True 

51except ImportError: 

52 ray = None # type: ignore 

53 

54 

55def _require_ray(): 

56 """Raise helpful error if ray is not installed.""" 

57 if not _HAS_RAY: 

58 raise ImportError( 

59 "The distributed orchestration module requires 'ray'. " 

60 "Install it with: pip install ray" 

61 ) 

62 

63 

64# ── Data Models ───────────────────────────── 

65 

66@dataclass 

67class AgentPlacementSpec: 

68 """Agent placement specification for distributed deployment.""" 

69 cpu: float = 1.0 

70 gpu: float = 0.0 

71 memory_mb: int = 512 

72 node_affinity: Optional[str] = None 

73 strategy: str = "spread" # spread | pack | custom 

74 

75 

76class DistTaskStatus(str, Enum): 

77 PENDING = "pending" 

78 RUNNING = "running" 

79 COMPLETED = "completed" 

80 FAILED = "failed" 

81 CANCELLED = "cancelled" 

82 

83 

84class DistAgentStatus(str, Enum): 

85 IDLE = "idle" 

86 BUSY = "busy" 

87 OFFLINE = "offline" 

88 

89 

90class PlacementStrategy(str, Enum): 

91 SPREAD = "spread" 

92 PACK = "pack" 

93 RANDOM = "random" 

94 CUSTOM = "custom" 

95 

96 

97@dataclass 

98class DistSwarmConfig: 

99 """Configuration for distributed swarm orchestrator.""" 

100 num_workers: int = 4 

101 cpus_per_worker: float = 1.0 

102 gpus_per_worker: float = 0.0 

103 memory_per_worker_mb: int = 1024 

104 placement: PlacementStrategy = PlacementStrategy.SPREAD 

105 heartbeat_interval: float = 5.0 

106 task_timeout: float = 300.0 

107 

108 

109@dataclass 

110class DistTaskRecord: 

111 """Record of a distributed task.""" 

112 task_id: str 

113 status: DistTaskStatus = DistTaskStatus.PENDING 

114 assigned_actor: Optional[str] = None 

115 created_at: float = 0.0 

116 started_at: Optional[float] = None 

117 completed_at: Optional[float] = None 

118 result: Any = None 

119 error: Optional[str] = None 

120 

121 

122class CrossNodeMailbox: 

123 """Mailbox for cross-node message passing.""" 

124 def __init__(self, mailbox_id: str = ""): 

125 self.mailbox_id = mailbox_id or uuid.uuid4().hex[:8] 

126 self._messages: List[Dict[str, Any]] = [] 

127 

128 async def send(self, message: Dict[str, Any]) -> None: 

129 self._messages.append(message) 

130 

131 async def receive(self, timeout: float = 5.0) -> Optional[Dict[str, Any]]: 

132 if self._messages: 

133 return self._messages.pop(0) 

134 return None 

135 

136 async def receive_all(self) -> List[Dict[str, Any]]: 

137 msgs = list(self._messages) 

138 self._messages.clear() 

139 return msgs 

140 

141 

142class CrossNodeBus: 

143 """Cross-node message bus for distributed communication.""" 

144 def __init__(self, bus_id: str = ""): 

145 self.bus_id = bus_id or uuid.uuid4().hex[:8] 

146 self._mailboxes: Dict[str, CrossNodeMailbox] = {} 

147 self._subscribers: Dict[str, List[Callable]] = {} 

148 

149 def create_mailbox(self, name: str = "") -> CrossNodeMailbox: 

150 mbox = CrossNodeMailbox(name) 

151 self._mailboxes[mbox.mailbox_id] = mbox 

152 return mbox 

153 

154 def get_mailbox(self, mailbox_id: str) -> Optional[CrossNodeMailbox]: 

155 return self._mailboxes.get(mailbox_id) 

156 

157 async def broadcast(self, topic: str, payload: Dict[str, Any]) -> None: 

158 for cb in self._subscribers.get(topic, []): 

159 try: 

160 await cb(payload) 

161 except Exception: 

162 pass 

163 

164 def subscribe(self, topic: str, callback: Callable) -> None: 

165 self._subscribers.setdefault(topic, []).append(callback) 

166 

167 

168# ── Ray Agent Actor (only if ray is available) ── 

169 

170if _HAS_RAY: 

171 

172 @ray.remote 

173 class RayAgentActor: 

174 """Ray Actor wrapping an Agent instance for distributed execution.""" 

175 def __init__(self, actor_name: str = "", node_id: str = ""): 

176 self.name = actor_name or uuid.uuid4().hex[:8] 

177 self.node_id = node_id or ray.get_runtime_context().get_node_id() 

178 self.status = DistAgentStatus.IDLE 

179 self.tasks_completed: int = 0 

180 self.tasks_failed: int = 0 

181 self._shutdown: bool = False 

182 

183 def get_status(self) -> Dict[str, Any]: 

184 return { 

185 "name": self.name, 

186 "node_id": self.node_id, 

187 "status": self.status.value, 

188 "tasks_completed": self.tasks_completed, 

189 "tasks_failed": self.tasks_failed, 

190 } 

191 

192 async def execute( 

193 self, task_id: str, payload: Dict[str, Any] 

194 ) -> Dict[str, Any]: 

195 self.status = DistAgentStatus.BUSY 

196 try: 

197 result = {"task_id": task_id, "status": "ok", "data": payload} 

198 self.tasks_completed += 1 

199 return result 

200 except Exception as e: 

201 self.tasks_failed += 1 

202 return {"task_id": task_id, "status": "error", "error": str(e)} 

203 finally: 

204 self.status = DistAgentStatus.IDLE 

205 

206 def shutdown(self) -> None: 

207 self._shutdown = True 

208 self.status = DistAgentStatus.OFFLINE 

209 

210else: 

211 # Placeholder when ray is not installed 

212 class RayAgentActor: 

213 def __init__(self, *args, **kwargs): 

214 _require_ray() 

215 

216 

217# ── Distributed Task Queue ────────────────── 

218 

219class DistTaskQueue: 

220 """Distributed task queue with load balancing.""" 

221 def __init__(self, max_size: int = 1000): 

222 self.max_size = max_size 

223 self._queue: List[DistTaskRecord] = [] 

224 self._results: Dict[str, Any] = {} 

225 

226 def submit( 

227 self, payload: Dict[str, Any], timeout: float = 300.0 

228 ) -> DistTaskRecord: 

229 task_id = uuid.uuid4().hex[:16] 

230 record = DistTaskRecord( 

231 task_id=task_id, 

232 created_at=time.time(), 

233 ) 

234 self._queue.append(record) 

235 return record 

236 

237 def get_result(self, task_id: str, timeout: float = 30.0) -> Any: 

238 return self._results.get(task_id) 

239 

240 def mark_complete(self, task_id: str, result: Any) -> None: 

241 self._results[task_id] = result 

242 for rec in self._queue: 

243 if rec.task_id == task_id: 

244 rec.status = DistTaskStatus.COMPLETED 

245 rec.result = result 

246 rec.completed_at = time.time() 

247 

248 def list_pending(self) -> List[DistTaskRecord]: 

249 return [r for r in self._queue if r.status == DistTaskStatus.PENDING] 

250 

251 

252# ── Distributed Swarm Coordinator ─────────── 

253 

254class DistSwarmCoordinator: 

255 """Coordinates a distributed swarm of agent actors.""" 

256 def __init__( 

257 self, 

258 config: Optional[DistSwarmConfig] = None, 

259 num_workers: int = 4, 

260 ): 

261 self.config = config or DistSwarmConfig(num_workers=num_workers) 

262 self._actors: List[Any] = [] 

263 self.bus = CrossNodeBus() 

264 self._started: bool = False 

265 

266 async def start(self) -> None: 

267 _require_ray() 

268 if not ray.is_initialized(): 

269 ray.init(ignore_reinit_error=True) 

270 for i in range(self.config.num_workers): 

271 actor = RayAgentActor.remote( # type: ignore[union-attr] 

272 actor_name=f"worker-{i}", 

273 ) 

274 self._actors.append(actor) 

275 self._started = True 

276 

277 async def stop(self) -> None: 

278 for actor in self._actors: 

279 try: 

280 actor.shutdown.remote() # type: ignore[union-attr] 

281 except Exception: 

282 pass 

283 self._actors.clear() 

284 self._started = False 

285 

286 async def submit( 

287 self, payload: Dict[str, Any], timeout: float = 300.0 

288 ) -> Any: 

289 _require_ray() 

290 if not self._actors: 

291 await self.start() 

292 actor = self._actors[0] # Simple round-robin 

293 result_ref = actor.execute.remote( # type: ignore[union-attr] 

294 uuid.uuid4().hex[:16], payload 

295 ) 

296 try: 

297 return ray.get(result_ref, timeout=timeout) 

298 except Exception as e: 

299 return {"error": str(e)} 

300 

301 def is_running(self) -> bool: 

302 return self._started 

303 

304 def actor_count(self) -> int: 

305 return len(self._actors) 

306 

307 

308def quick_start(num_workers: int = 4) -> DistSwarmCoordinator: 

309 """Quickly create and start a distributed swarm coordinator.""" 

310 return DistSwarmCoordinator(num_workers=num_workers)