Coverage for agentos/orchestration/distributed.py: 47%
179 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v1.14.2 — Distributed Orchestration (Ray-based Agent Swarm).
4受 Ray Serve / Ray Core 启发,为 AgentOS 增加分布式编排层。
5Agent 不再局限于单进程,可以在多台机器上组成 Swarm,
6自动负载均衡、容错恢复、跨节点通信。
8Core features:
9- RayAgentActor: Ray Actor 封装的 Agent 实例
10- DistSwarmCoordinator: 分布式 Swarm 协调器
11- AgentPlacementStrategy: 智能 Agent 放置(CPU/GPU/内存感知)
12- DistTaskQueue: 分布式任务队列(Ray 原生)
13- CrossNodeBus: 跨节点消息总线
14- FaultTolerance: Actor 重启/状态恢复
16Architecture:
17 DistSwarmCoordinator (head node)
18 ├── RayAgentActor[0] (worker node 1)
19 │ ├── ToolAgent instance
20 │ └── Local memory store
21 ├── RayAgentActor[1] (worker node 2)
22 ├── ...
23 └── DistTaskQueue → automatic load balancing
25Usage:
26 coordinator = DistSwarmCoordinator(num_workers=4)
27 coordinator.start()
28 result = await coordinator.submit(task="Summarize all PDFs in /data/")
29 coordinator.shutdown()
30"""
32from __future__ import annotations
34import asyncio
35import json
36import os
37import time
38import uuid
39from dataclasses import dataclass, field
40from enum import Enum
41from typing import (
42 Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union,
43)
45# ── Ray Optional Import ────────────────────
47_HAS_RAY = False
48try:
49 import ray
50 _HAS_RAY = True
51except ImportError:
52 ray = None # type: ignore
55def _require_ray():
56 """Raise helpful error if ray is not installed."""
57 if not _HAS_RAY:
58 raise ImportError(
59 "The distributed orchestration module requires 'ray'. "
60 "Install it with: pip install ray"
61 )
64# ── Data Models ─────────────────────────────
66@dataclass
67class AgentPlacementSpec:
68 """Agent placement specification for distributed deployment."""
69 cpu: float = 1.0
70 gpu: float = 0.0
71 memory_mb: int = 512
72 node_affinity: Optional[str] = None
73 strategy: str = "spread" # spread | pack | custom
76class DistTaskStatus(str, Enum):
77 PENDING = "pending"
78 RUNNING = "running"
79 COMPLETED = "completed"
80 FAILED = "failed"
81 CANCELLED = "cancelled"
84class DistAgentStatus(str, Enum):
85 IDLE = "idle"
86 BUSY = "busy"
87 OFFLINE = "offline"
90class PlacementStrategy(str, Enum):
91 SPREAD = "spread"
92 PACK = "pack"
93 RANDOM = "random"
94 CUSTOM = "custom"
97@dataclass
98class DistSwarmConfig:
99 """Configuration for distributed swarm orchestrator."""
100 num_workers: int = 4
101 cpus_per_worker: float = 1.0
102 gpus_per_worker: float = 0.0
103 memory_per_worker_mb: int = 1024
104 placement: PlacementStrategy = PlacementStrategy.SPREAD
105 heartbeat_interval: float = 5.0
106 task_timeout: float = 300.0
109@dataclass
110class DistTaskRecord:
111 """Record of a distributed task."""
112 task_id: str
113 status: DistTaskStatus = DistTaskStatus.PENDING
114 assigned_actor: Optional[str] = None
115 created_at: float = 0.0
116 started_at: Optional[float] = None
117 completed_at: Optional[float] = None
118 result: Any = None
119 error: Optional[str] = None
122class CrossNodeMailbox:
123 """Mailbox for cross-node message passing."""
124 def __init__(self, mailbox_id: str = ""):
125 self.mailbox_id = mailbox_id or uuid.uuid4().hex[:8]
126 self._messages: List[Dict[str, Any]] = []
128 async def send(self, message: Dict[str, Any]) -> None:
129 self._messages.append(message)
131 async def receive(self, timeout: float = 5.0) -> Optional[Dict[str, Any]]:
132 if self._messages:
133 return self._messages.pop(0)
134 return None
136 async def receive_all(self) -> List[Dict[str, Any]]:
137 msgs = list(self._messages)
138 self._messages.clear()
139 return msgs
142class CrossNodeBus:
143 """Cross-node message bus for distributed communication."""
144 def __init__(self, bus_id: str = ""):
145 self.bus_id = bus_id or uuid.uuid4().hex[:8]
146 self._mailboxes: Dict[str, CrossNodeMailbox] = {}
147 self._subscribers: Dict[str, List[Callable]] = {}
149 def create_mailbox(self, name: str = "") -> CrossNodeMailbox:
150 mbox = CrossNodeMailbox(name)
151 self._mailboxes[mbox.mailbox_id] = mbox
152 return mbox
154 def get_mailbox(self, mailbox_id: str) -> Optional[CrossNodeMailbox]:
155 return self._mailboxes.get(mailbox_id)
157 async def broadcast(self, topic: str, payload: Dict[str, Any]) -> None:
158 for cb in self._subscribers.get(topic, []):
159 try:
160 await cb(payload)
161 except Exception:
162 pass
164 def subscribe(self, topic: str, callback: Callable) -> None:
165 self._subscribers.setdefault(topic, []).append(callback)
168# ── Ray Agent Actor (only if ray is available) ──
170if _HAS_RAY:
172 @ray.remote
173 class RayAgentActor:
174 """Ray Actor wrapping an Agent instance for distributed execution."""
175 def __init__(self, actor_name: str = "", node_id: str = ""):
176 self.name = actor_name or uuid.uuid4().hex[:8]
177 self.node_id = node_id or ray.get_runtime_context().get_node_id()
178 self.status = DistAgentStatus.IDLE
179 self.tasks_completed: int = 0
180 self.tasks_failed: int = 0
181 self._shutdown: bool = False
183 def get_status(self) -> Dict[str, Any]:
184 return {
185 "name": self.name,
186 "node_id": self.node_id,
187 "status": self.status.value,
188 "tasks_completed": self.tasks_completed,
189 "tasks_failed": self.tasks_failed,
190 }
192 async def execute(
193 self, task_id: str, payload: Dict[str, Any]
194 ) -> Dict[str, Any]:
195 self.status = DistAgentStatus.BUSY
196 try:
197 result = {"task_id": task_id, "status": "ok", "data": payload}
198 self.tasks_completed += 1
199 return result
200 except Exception as e:
201 self.tasks_failed += 1
202 return {"task_id": task_id, "status": "error", "error": str(e)}
203 finally:
204 self.status = DistAgentStatus.IDLE
206 def shutdown(self) -> None:
207 self._shutdown = True
208 self.status = DistAgentStatus.OFFLINE
210else:
211 # Placeholder when ray is not installed
212 class RayAgentActor:
213 def __init__(self, *args, **kwargs):
214 _require_ray()
217# ── Distributed Task Queue ──────────────────
219class DistTaskQueue:
220 """Distributed task queue with load balancing."""
221 def __init__(self, max_size: int = 1000):
222 self.max_size = max_size
223 self._queue: List[DistTaskRecord] = []
224 self._results: Dict[str, Any] = {}
226 def submit(
227 self, payload: Dict[str, Any], timeout: float = 300.0
228 ) -> DistTaskRecord:
229 task_id = uuid.uuid4().hex[:16]
230 record = DistTaskRecord(
231 task_id=task_id,
232 created_at=time.time(),
233 )
234 self._queue.append(record)
235 return record
237 def get_result(self, task_id: str, timeout: float = 30.0) -> Any:
238 return self._results.get(task_id)
240 def mark_complete(self, task_id: str, result: Any) -> None:
241 self._results[task_id] = result
242 for rec in self._queue:
243 if rec.task_id == task_id:
244 rec.status = DistTaskStatus.COMPLETED
245 rec.result = result
246 rec.completed_at = time.time()
248 def list_pending(self) -> List[DistTaskRecord]:
249 return [r for r in self._queue if r.status == DistTaskStatus.PENDING]
252# ── Distributed Swarm Coordinator ───────────
254class DistSwarmCoordinator:
255 """Coordinates a distributed swarm of agent actors."""
256 def __init__(
257 self,
258 config: Optional[DistSwarmConfig] = None,
259 num_workers: int = 4,
260 ):
261 self.config = config or DistSwarmConfig(num_workers=num_workers)
262 self._actors: List[Any] = []
263 self.bus = CrossNodeBus()
264 self._started: bool = False
266 async def start(self) -> None:
267 _require_ray()
268 if not ray.is_initialized():
269 ray.init(ignore_reinit_error=True)
270 for i in range(self.config.num_workers):
271 actor = RayAgentActor.remote( # type: ignore[union-attr]
272 actor_name=f"worker-{i}",
273 )
274 self._actors.append(actor)
275 self._started = True
277 async def stop(self) -> None:
278 for actor in self._actors:
279 try:
280 actor.shutdown.remote() # type: ignore[union-attr]
281 except Exception:
282 pass
283 self._actors.clear()
284 self._started = False
286 async def submit(
287 self, payload: Dict[str, Any], timeout: float = 300.0
288 ) -> Any:
289 _require_ray()
290 if not self._actors:
291 await self.start()
292 actor = self._actors[0] # Simple round-robin
293 result_ref = actor.execute.remote( # type: ignore[union-attr]
294 uuid.uuid4().hex[:16], payload
295 )
296 try:
297 return ray.get(result_ref, timeout=timeout)
298 except Exception as e:
299 return {"error": str(e)}
301 def is_running(self) -> bool:
302 return self._started
304 def actor_count(self) -> int:
305 return len(self._actors)
308def quick_start(num_workers: int = 4) -> DistSwarmCoordinator:
309 """Quickly create and start a distributed swarm coordinator."""
310 return DistSwarmCoordinator(num_workers=num_workers)