Coverage for agentos/core/async_loop.py: 42%
97 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Async agent execution loop with concurrency support.
4Provides async/await versions of the core agent loop for high-throughput
5scenarios where multiple agents run concurrently.
6"""
8from __future__ import annotations
10import asyncio
11import time
12from dataclasses import dataclass, field
13from typing import Any, AsyncIterator, Awaitable, Callable, Optional
15from agentos.core.context import AgentContext
16from agentos.core.streaming import StreamChunk
19@dataclass
20class AsyncLoopConfig:
21 """Configuration for async agent execution loop."""
23 max_concurrency: int = 10
24 """Max concurrent agent invocations."""
26 timeout_seconds: float = 300.0
27 """Per-invocation timeout."""
29 retry_on_timeout: bool = True
30 """Whether to retry timed-out invocations."""
32 max_retries: int = 3
33 """Max retries on transient failures."""
35 collect_metrics: bool = True
36 """Whether to collect timing metrics."""
39@dataclass
40class AsyncInvocationResult:
41 """Result of a single async agent invocation."""
43 agent_id: str
44 success: bool
45 output: Any = None
46 error: Optional[str] = None
47 latency_ms: float = 0.0
48 retries: int = 0
51class AsyncAgentLoop:
52 """
53 Async execution loop for agent invocations.
55 Supports:
56 - Concurrent multi-agent execution with semaphore-based throttling
57 - Per-invocation timeouts via asyncio.wait_for
58 - Automatic retry with exponential backoff
59 - Streaming output via async generators
60 - Metrics collection (p50/p95/p99 latency)
62 Example::
64 loop = AsyncAgentLoop(config=AsyncLoopConfig(max_concurrency=5))
65 results = await loop.run_all([task1, task2, task3])
66 """
68 def __init__(self, config: Optional[AsyncLoopConfig] = None):
69 self.config = config or AsyncLoopConfig()
70 self._semaphore = asyncio.Semaphore(self.config.max_concurrency)
71 self._metrics: list[float] = []
73 async def run_single(
74 self,
75 agent_id: str,
76 fn: Callable[..., Awaitable[Any]],
77 *args: Any,
78 **kwargs: Any,
79 ) -> AsyncInvocationResult:
80 """
81 Run a single agent invocation with timeout and retry.
83 Args:
84 agent_id: Identifier for the agent invocation.
85 fn: Async callable to execute.
86 *args: Positional args for fn.
87 **kwargs: Keyword args for fn.
89 Returns:
90 AsyncInvocationResult with success/failure details.
91 """
92 async with self._semaphore:
93 return await self._execute_with_retry(agent_id, fn, args, kwargs)
95 async def _execute_with_retry(
96 self,
97 agent_id: str,
98 fn: Callable[..., Awaitable[Any]],
99 args: tuple,
100 kwargs: dict,
101 ) -> AsyncInvocationResult:
102 last_error: Optional[str] = None
103 t0 = time.perf_counter()
105 for attempt in range(self.config.max_retries + 1):
106 try:
107 result = await asyncio.wait_for(
108 fn(*args, **kwargs),
109 timeout=self.config.timeout_seconds,
110 )
111 latency = (time.perf_counter() - t0) * 1000
112 if self.config.collect_metrics:
113 self._metrics.append(latency)
114 return AsyncInvocationResult(
115 agent_id=agent_id,
116 success=True,
117 output=result,
118 latency_ms=latency,
119 retries=attempt,
120 )
121 except asyncio.TimeoutError:
122 last_error = f"Timeout after {self.config.timeout_seconds}s"
123 if not self.config.retry_on_timeout:
124 break
125 except Exception as exc:
126 last_error = f"{type(exc).__name__}: {exc}"
127 if attempt >= self.config.max_retries:
128 break
130 latency = (time.perf_counter() - t0) * 1000
131 if self.config.collect_metrics:
132 self._metrics.append(latency)
133 return AsyncInvocationResult(
134 agent_id=agent_id,
135 success=False,
136 error=last_error,
137 latency_ms=latency,
138 retries=self.config.max_retries,
139 )
141 async def run_all(
142 self,
143 tasks: list[tuple[str, Callable[..., Awaitable[Any]], tuple, dict]],
144 ) -> list[AsyncInvocationResult]:
145 """
146 Run multiple agent invocations concurrently.
148 Args:
149 tasks: List of (agent_id, async_fn, args, kwargs) tuples.
151 Returns:
152 List of results in the same order as input tasks.
153 """
154 coros = [
155 self.run_single(agent_id, fn, *args, **kwargs)
156 for agent_id, fn, args, kwargs in tasks
157 ]
158 return list(await asyncio.gather(*coros))
160 async def run_streaming(
161 self,
162 agent_id: str,
163 stream_fn: Callable[[], AsyncIterator[StreamChunk]],
164 ) -> AsyncIterator[StreamChunk]:
165 """
166 Run an agent and yield streaming output chunks.
168 Args:
169 agent_id: Identifier for the agent.
170 stream_fn: Async generator yielding StreamChunk objects.
172 Yields:
173 StreamChunk as they become available.
174 """
175 async with self._semaphore:
176 t0 = time.perf_counter()
177 chunk_count = 0
178 async for chunk in stream_fn():
179 chunk_count += 1
180 yield chunk
181 latency = (time.perf_counter() - t0) * 1000
182 if self.config.collect_metrics:
183 self._metrics.append(latency)
185 def get_latency_stats(self) -> dict[str, float]:
186 """
187 Compute p50/p95/p99 latency from collected metrics.
189 Returns:
190 Dict with keys p50_ms, p95_ms, p99_ms, mean_ms, count.
191 """
192 if not self._metrics:
193 return {"p50_ms": 0, "p95_ms": 0, "p99_ms": 0, "mean_ms": 0, "count": 0}
194 sorted_ms = sorted(self._metrics)
195 n = len(sorted_ms)
197 def percentile(p: float) -> float:
198 idx = int(n * p / 100)
199 return sorted_ms[min(idx, n - 1)]
201 return {
202 "p50_ms": percentile(50),
203 "p95_ms": percentile(95),
204 "p99_ms": percentile(99),
205 "mean_ms": sum(sorted_ms) / n,
206 "count": n,
207 }
209 def reset_metrics(self) -> None:
210 """Clear accumulated latency metrics."""
211 self._metrics.clear()
214class AsyncContextManager:
215 """
216 Async-safe context manager for agent sessions.
218 Manages async context propagation across concurrent agent invocations.
219 """
221 def __init__(self, context: AgentContext):
222 self._context = context
223 self._lock = asyncio.Lock()
225 async def get(self, key: str, default: Any = None) -> Any:
226 async with self._lock:
227 return self._context.get(key, default)
229 async def set(self, key: str, value: Any) -> None:
230 async with self._lock:
231 self._context[key] = value
233 async def update(self, mapping: dict[str, Any]) -> None:
234 async with self._lock:
235 self._context.update(mapping)
237 async def snapshot(self) -> dict[str, Any]:
238 async with self._lock:
239 return dict(self._context)