Coverage for agentos/core/async_loop.py: 42%

97 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Async agent execution loop with concurrency support. 

3 

4Provides async/await versions of the core agent loop for high-throughput 

5scenarios where multiple agents run concurrently. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import time 

12from dataclasses import dataclass, field 

13from typing import Any, AsyncIterator, Awaitable, Callable, Optional 

14 

15from agentos.core.context import AgentContext 

16from agentos.core.streaming import StreamChunk 

17 

18 

19@dataclass 

20class AsyncLoopConfig: 

21 """Configuration for async agent execution loop.""" 

22 

23 max_concurrency: int = 10 

24 """Max concurrent agent invocations.""" 

25 

26 timeout_seconds: float = 300.0 

27 """Per-invocation timeout.""" 

28 

29 retry_on_timeout: bool = True 

30 """Whether to retry timed-out invocations.""" 

31 

32 max_retries: int = 3 

33 """Max retries on transient failures.""" 

34 

35 collect_metrics: bool = True 

36 """Whether to collect timing metrics.""" 

37 

38 

39@dataclass 

40class AsyncInvocationResult: 

41 """Result of a single async agent invocation.""" 

42 

43 agent_id: str 

44 success: bool 

45 output: Any = None 

46 error: Optional[str] = None 

47 latency_ms: float = 0.0 

48 retries: int = 0 

49 

50 

51class AsyncAgentLoop: 

52 """ 

53 Async execution loop for agent invocations. 

54 

55 Supports: 

56 - Concurrent multi-agent execution with semaphore-based throttling 

57 - Per-invocation timeouts via asyncio.wait_for 

58 - Automatic retry with exponential backoff 

59 - Streaming output via async generators 

60 - Metrics collection (p50/p95/p99 latency) 

61 

62 Example:: 

63 

64 loop = AsyncAgentLoop(config=AsyncLoopConfig(max_concurrency=5)) 

65 results = await loop.run_all([task1, task2, task3]) 

66 """ 

67 

68 def __init__(self, config: Optional[AsyncLoopConfig] = None): 

69 self.config = config or AsyncLoopConfig() 

70 self._semaphore = asyncio.Semaphore(self.config.max_concurrency) 

71 self._metrics: list[float] = [] 

72 

73 async def run_single( 

74 self, 

75 agent_id: str, 

76 fn: Callable[..., Awaitable[Any]], 

77 *args: Any, 

78 **kwargs: Any, 

79 ) -> AsyncInvocationResult: 

80 """ 

81 Run a single agent invocation with timeout and retry. 

82 

83 Args: 

84 agent_id: Identifier for the agent invocation. 

85 fn: Async callable to execute. 

86 *args: Positional args for fn. 

87 **kwargs: Keyword args for fn. 

88 

89 Returns: 

90 AsyncInvocationResult with success/failure details. 

91 """ 

92 async with self._semaphore: 

93 return await self._execute_with_retry(agent_id, fn, args, kwargs) 

94 

95 async def _execute_with_retry( 

96 self, 

97 agent_id: str, 

98 fn: Callable[..., Awaitable[Any]], 

99 args: tuple, 

100 kwargs: dict, 

101 ) -> AsyncInvocationResult: 

102 last_error: Optional[str] = None 

103 t0 = time.perf_counter() 

104 

105 for attempt in range(self.config.max_retries + 1): 

106 try: 

107 result = await asyncio.wait_for( 

108 fn(*args, **kwargs), 

109 timeout=self.config.timeout_seconds, 

110 ) 

111 latency = (time.perf_counter() - t0) * 1000 

112 if self.config.collect_metrics: 

113 self._metrics.append(latency) 

114 return AsyncInvocationResult( 

115 agent_id=agent_id, 

116 success=True, 

117 output=result, 

118 latency_ms=latency, 

119 retries=attempt, 

120 ) 

121 except asyncio.TimeoutError: 

122 last_error = f"Timeout after {self.config.timeout_seconds}s" 

123 if not self.config.retry_on_timeout: 

124 break 

125 except Exception as exc: 

126 last_error = f"{type(exc).__name__}: {exc}" 

127 if attempt >= self.config.max_retries: 

128 break 

129 

130 latency = (time.perf_counter() - t0) * 1000 

131 if self.config.collect_metrics: 

132 self._metrics.append(latency) 

133 return AsyncInvocationResult( 

134 agent_id=agent_id, 

135 success=False, 

136 error=last_error, 

137 latency_ms=latency, 

138 retries=self.config.max_retries, 

139 ) 

140 

141 async def run_all( 

142 self, 

143 tasks: list[tuple[str, Callable[..., Awaitable[Any]], tuple, dict]], 

144 ) -> list[AsyncInvocationResult]: 

145 """ 

146 Run multiple agent invocations concurrently. 

147 

148 Args: 

149 tasks: List of (agent_id, async_fn, args, kwargs) tuples. 

150 

151 Returns: 

152 List of results in the same order as input tasks. 

153 """ 

154 coros = [ 

155 self.run_single(agent_id, fn, *args, **kwargs) 

156 for agent_id, fn, args, kwargs in tasks 

157 ] 

158 return list(await asyncio.gather(*coros)) 

159 

160 async def run_streaming( 

161 self, 

162 agent_id: str, 

163 stream_fn: Callable[[], AsyncIterator[StreamChunk]], 

164 ) -> AsyncIterator[StreamChunk]: 

165 """ 

166 Run an agent and yield streaming output chunks. 

167 

168 Args: 

169 agent_id: Identifier for the agent. 

170 stream_fn: Async generator yielding StreamChunk objects. 

171 

172 Yields: 

173 StreamChunk as they become available. 

174 """ 

175 async with self._semaphore: 

176 t0 = time.perf_counter() 

177 chunk_count = 0 

178 async for chunk in stream_fn(): 

179 chunk_count += 1 

180 yield chunk 

181 latency = (time.perf_counter() - t0) * 1000 

182 if self.config.collect_metrics: 

183 self._metrics.append(latency) 

184 

185 def get_latency_stats(self) -> dict[str, float]: 

186 """ 

187 Compute p50/p95/p99 latency from collected metrics. 

188 

189 Returns: 

190 Dict with keys p50_ms, p95_ms, p99_ms, mean_ms, count. 

191 """ 

192 if not self._metrics: 

193 return {"p50_ms": 0, "p95_ms": 0, "p99_ms": 0, "mean_ms": 0, "count": 0} 

194 sorted_ms = sorted(self._metrics) 

195 n = len(sorted_ms) 

196 

197 def percentile(p: float) -> float: 

198 idx = int(n * p / 100) 

199 return sorted_ms[min(idx, n - 1)] 

200 

201 return { 

202 "p50_ms": percentile(50), 

203 "p95_ms": percentile(95), 

204 "p99_ms": percentile(99), 

205 "mean_ms": sum(sorted_ms) / n, 

206 "count": n, 

207 } 

208 

209 def reset_metrics(self) -> None: 

210 """Clear accumulated latency metrics.""" 

211 self._metrics.clear() 

212 

213 

214class AsyncContextManager: 

215 """ 

216 Async-safe context manager for agent sessions. 

217 

218 Manages async context propagation across concurrent agent invocations. 

219 """ 

220 

221 def __init__(self, context: AgentContext): 

222 self._context = context 

223 self._lock = asyncio.Lock() 

224 

225 async def get(self, key: str, default: Any = None) -> Any: 

226 async with self._lock: 

227 return self._context.get(key, default) 

228 

229 async def set(self, key: str, value: Any) -> None: 

230 async with self._lock: 

231 self._context[key] = value 

232 

233 async def update(self, mapping: dict[str, Any]) -> None: 

234 async with self._lock: 

235 self._context.update(mapping) 

236 

237 async def snapshot(self) -> dict[str, Any]: 

238 async with self._lock: 

239 return dict(self._context)