Coverage for agentos/observability/otel_bridge.py: 0%

298 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""AgentOS OpenTelemetry - OTLP/Jaeger/Zipkin trace/metric export (v1.14.6).""" 

2 

3import os 

4import logging 

5from contextlib import asynccontextmanager, contextmanager 

6from dataclasses import dataclass, field 

7from enum import Enum 

8from functools import wraps 

9from typing import Any, Callable, Dict, Optional, TYPE_CHECKING 

10 

11if TYPE_CHECKING: 

12 from agentos.observability.metrics import MetricsCollector 

13 

14logger = logging.getLogger("agentos.otel") 

15 

16 

17# ---- Enums ---- 

18 

19class OTelExporter(str, Enum): 

20 """OpenTelemetry exporter backend.""" 

21 OTLP_HTTP = "otlp_http" 

22 OTLP_GRPC = "otlp_grpc" 

23 CONSOLE = "console" 

24 ZIPKIN = "zipkin" 

25 NONE = "none" 

26 

27 

28class OtelStatus(str, Enum): 

29 """Span status codes.""" 

30 OK = "OK" 

31 ERROR = "ERROR" 

32 

33 

34class SpanKind(str, Enum): 

35 """Span kind for semantic conventions.""" 

36 INTERNAL = "internal" 

37 CLIENT = "client" 

38 SERVER = "server" 

39 PRODUCER = "producer" 

40 CONSUMER = "consumer" 

41 

42 

43# ---- OtelConfig ---- 

44 

45@dataclass 

46class OtelConfig: 

47 """OpenTelemetry configuration.""" 

48 

49 service_name: str = "agentos" 

50 service_version: str = "" 

51 exporter: OTelExporter = OTelExporter.CONSOLE 

52 endpoint: str = "http://localhost:4318/v1/traces" 

53 metrics_endpoint: str = "http://localhost:4318/v1/metrics" 

54 resource_attrs: Dict[str, str] = field(default_factory=dict) 

55 sample_rate: float = 1.0 

56 batch_timeout_ms: int = 5000 

57 max_span_attributes: int = 128 

58 disabled: bool = False 

59 api_key: str = "" 

60 zipkin_endpoint: str = "http://localhost:9411/api/v2/spans" 

61 

62 def with_env_overrides(self) -> "OtelConfig": 

63 if v := os.getenv("OTEL_SERVICE_NAME"): 

64 self.service_name = v 

65 if v := os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"): 

66 self.endpoint = v 

67 if v := os.getenv("OTEL_EXPORTER_ZIPKIN_ENDPOINT"): 

68 self.zipkin_endpoint = v 

69 if os.getenv("OTEL_SDK_DISABLED"): 

70 self.disabled = True 

71 return self 

72 

73 

74# ---- SpanHandle ---- 

75 

76def _normalize_value(v: Any) -> Any: 

77 if isinstance(v, (str, int, float, bool)): 

78 return v 

79 if isinstance(v, (list, tuple)): 

80 return [_normalize_value(x) for x in v] 

81 return str(v) 

82 

83 

84class SpanHandle: 

85 """Wrapper around OTel span for attribute/event/exception API.""" 

86 

87 def __init__(self, span: Any): 

88 self._span = span 

89 

90 def set_attribute(self, key: str, value: Any) -> None: 

91 self._span.set_attribute(key, _normalize_value(value)) 

92 

93 def set_attributes(self, attrs: Dict[str, Any]) -> None: 

94 for k, v in attrs.items(): 

95 self.set_attribute(k, v) 

96 

97 def add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> None: 

98 self._span.add_event(name, attributes={ 

99 k: _normalize_value(v) for k, v in (attributes or {}).items() 

100 }) 

101 

102 def record_exception(self, exception: Exception) -> None: 

103 self._span.record_exception(exception) 

104 

105 def set_status(self, status: OtelStatus, description: str = "") -> None: 

106 from opentelemetry import trace as otel_trace 

107 code = otel_trace.StatusCode.OK if status == OtelStatus.OK else otel_trace.StatusCode.ERROR 

108 self._span.set_status(otel_trace.Status(code, description)) 

109 

110 

111# ---- OtelTracer ---- 

112 

113class OtelTracer: 

114 """OpenTelemetry tracer with span management and W3C context propagation. 

115 

116 Usage: 

117 OtelTracer.init(OtelConfig(service_name="my-agent")) 

118 

119 with OtelTracer.span("llm_call", kind=SpanKind.CLIENT) as span: 

120 span.set_attribute("model", "gpt-4") 

121 result = llm.generate(prompt) 

122 

123 @OtelTracer.trace("process") 

124 async def process(input): ... 

125 """ 

126 

127 _config: Optional[OtelConfig] = None 

128 _tracer_provider: Any = None 

129 _initialized: bool = False 

130 

131 @classmethod 

132 def init(cls, config: Optional[OtelConfig] = None) -> None: 

133 if config is None: 

134 config = OtelConfig().with_env_overrides() 

135 else: 

136 config = config.with_env_overrides() 

137 

138 cls._config = config 

139 if config.disabled: 

140 cls._initialized = True 

141 return 

142 

143 try: 

144 cls._init_sdk(config) 

145 cls._initialized = True 

146 logger.info( 

147 "OtelTracer initialized: service=%s exporter=%s", 

148 config.service_name, config.exporter.value, 

149 ) 

150 except ImportError: 

151 logger.warning("opentelemetry packages not installed - noop tracer") 

152 cls._initialized = True 

153 except Exception as e: 

154 logger.error("OTel init failed: %s - noop", e) 

155 cls._initialized = True 

156 

157 @classmethod 

158 def _init_sdk(cls, config: OtelConfig): 

159 from opentelemetry import trace as otel_trace 

160 from opentelemetry.sdk.trace import TracerProvider 

161 from opentelemetry.sdk.trace.export import BatchSpanProcessor 

162 from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION 

163 from opentelemetry.trace import set_tracer_provider 

164 

165 resource = Resource.create({ 

166 SERVICE_NAME: config.service_name, 

167 SERVICE_VERSION: config.service_version, 

168 **config.resource_attrs, 

169 }) 

170 

171 provider = TracerProvider(resource=resource) 

172 exporter = cls._build_exporter(config) 

173 if exporter: 

174 provider.add_span_processor( 

175 BatchSpanProcessor( 

176 exporter, 

177 schedule_delay_millis=config.batch_timeout_ms, 

178 max_export_batch_size=512, 

179 ) 

180 ) 

181 set_tracer_provider(provider) 

182 cls._tracer_provider = provider 

183 

184 @classmethod 

185 def _build_exporter(cls, config: OtelConfig): 

186 if config.exporter == OTelExporter.OTLP_HTTP: 

187 from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( 

188 OTLPSpanExporter, 

189 ) 

190 return OTLPSpanExporter(endpoint=config.endpoint) 

191 elif config.exporter == OTelExporter.OTLP_GRPC: 

192 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( 

193 OTLPSpanExporter, 

194 ) 

195 return OTLPSpanExporter(endpoint=config.endpoint, insecure=True) 

196 elif config.exporter == OTelExporter.CONSOLE: 

197 from opentelemetry.sdk.trace.export import ConsoleSpanExporter 

198 return ConsoleSpanExporter() 

199 elif config.exporter == OTelExporter.ZIPKIN: 

200 from opentelemetry.exporter.zipkin.proto.http import ZipkinExporter 

201 return ZipkinExporter(endpoint=config.zipkin_endpoint) 

202 return None 

203 

204 @classmethod 

205 def get_tracer(cls, name: str = "agentos") -> Any: 

206 if not cls._initialized: 

207 cls.init() 

208 from opentelemetry import trace as otel_trace 

209 return otel_trace.get_tracer(name) 

210 

211 @classmethod 

212 @contextmanager 

213 def span( 

214 cls, 

215 name: str, 

216 kind: SpanKind = SpanKind.INTERNAL, 

217 attributes: Optional[Dict[str, Any]] = None, 

218 parent: Any = None, 

219 ): 

220 if not cls._initialized: 

221 cls.init() 

222 

223 tracer = cls.get_tracer() 

224 kind_map = { 

225 SpanKind.INTERNAL: 0, 

226 SpanKind.CLIENT: 3, 

227 SpanKind.SERVER: 2, 

228 SpanKind.PRODUCER: 4, 

229 SpanKind.CONSUMER: 5, 

230 } 

231 sk = kind_map.get(kind, 0) 

232 

233 from opentelemetry import trace as otel_trace 

234 span = tracer.start_span( 

235 name, 

236 kind=otel_trace.SpanKind(sk), 

237 attributes={ 

238 k: _normalize_value(v) for k, v in (attributes or {}).items() 

239 }, 

240 ) 

241 

242 try: 

243 yield SpanHandle(span) 

244 except Exception: 

245 span.set_status(otel_trace.Status(otel_trace.StatusCode.ERROR)) 

246 raise 

247 finally: 

248 span.end() 

249 

250 @classmethod 

251 def trace( 

252 cls, 

253 name: str = "", 

254 kind: SpanKind = SpanKind.INTERNAL, 

255 extract_attrs: Optional[Callable] = None, 

256 ): 

257 span_name = name 

258 

259 def decorator(func): 

260 nonlocal span_name 

261 import asyncio 

262 if not span_name: 

263 span_name = func.__name__ 

264 is_async = asyncio.iscoroutinefunction(func) 

265 

266 @wraps(func) 

267 async def async_wrapper(*args, **kwargs): 

268 attrs = extract_attrs(*args, **kwargs) if extract_attrs else {} 

269 with cls.span(span_name, kind=kind, attributes=attrs) as span: 

270 try: 

271 result = await func(*args, **kwargs) 

272 span.set_attribute("status", "ok") 

273 return result 

274 except Exception as e: 

275 span.record_exception(e) 

276 span.set_status(OtelStatus.ERROR) 

277 raise 

278 

279 @wraps(func) 

280 def sync_wrapper(*args, **kwargs): 

281 attrs = extract_attrs(*args, **kwargs) if extract_attrs else {} 

282 with cls.span(span_name, kind=kind, attributes=attrs) as span: 

283 try: 

284 result = func(*args, **kwargs) 

285 span.set_attribute("status", "ok") 

286 return result 

287 except Exception as e: 

288 span.record_exception(e) 

289 span.set_status(OtelStatus.ERROR) 

290 raise 

291 

292 return async_wrapper if is_async else sync_wrapper 

293 

294 return decorator 

295 

296 @classmethod 

297 @asynccontextmanager 

298 async def async_span(cls, name: str, kind: SpanKind = SpanKind.INTERNAL, **attrs): 

299 with cls.span(name, kind=kind, attributes=attrs) as span: 

300 yield span 

301 

302 @classmethod 

303 def shutdown(cls): 

304 if cls._tracer_provider: 

305 try: 

306 cls._tracer_provider.shutdown() 

307 except Exception: 

308 pass 

309 cls._tracer_provider = None 

310 cls._initialized = False 

311 

312 

313# ---- OtelMeter ---- 

314 

315class OtelMeter: 

316 """Bridge MetricsCollector to OpenTelemetry metrics.""" 

317 

318 _meter: Any = None 

319 _instruments: Dict[str, Any] = {} 

320 

321 @classmethod 

322 def init(cls, config: Optional[OtelConfig] = None): 

323 if config is None: 

324 config = OtelConfig().with_env_overrides() 

325 if config.disabled: 

326 return 

327 try: 

328 from opentelemetry import metrics 

329 from opentelemetry.sdk.metrics import MeterProvider 

330 from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader 

331 from opentelemetry.sdk.resources import Resource, SERVICE_NAME 

332 from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( 

333 OTLPMetricExporter, 

334 ) 

335 

336 resource = Resource.create({SERVICE_NAME: config.service_name}) 

337 exporter = OTLPMetricExporter(endpoint=config.metrics_endpoint) 

338 reader = PeriodicExportingMetricReader(exporter) 

339 provider = MeterProvider(resource=resource, metric_readers=[reader]) 

340 metrics.set_meter_provider(provider) 

341 cls._meter = metrics.get_meter(config.service_name, config.service_version) 

342 logger.info("OtelMeter initialized: endpoint=%s", config.metrics_endpoint) 

343 except ImportError: 

344 logger.warning("opentelemetry metrics packages not installed") 

345 except Exception as e: 

346 logger.error("OtelMeter init failed: %s", e) 

347 

348 @classmethod 

349 def record_counter( 

350 cls, name: str, value: float, attrs: Optional[Dict[str, str]] = None 

351 ): 

352 if cls._meter is None: 

353 return 

354 if name not in cls._instruments: 

355 cls._instruments[name] = cls._meter.create_counter(name, description="") 

356 cls._instruments[name].add(value, attributes=attrs or {}) 

357 

358 @classmethod 

359 def record_histogram( 

360 cls, name: str, value: float, attrs: Optional[Dict[str, str]] = None 

361 ): 

362 if cls._meter is None: 

363 return 

364 key = f"hist_{name}" 

365 if key not in cls._instruments: 

366 cls._instruments[key] = cls._meter.create_histogram(name, description="") 

367 cls._instruments[key].record(value, attributes=attrs or {}) 

368 

369 @classmethod 

370 def record_gauge( 

371 cls, name: str, value: float, attrs: Optional[Dict[str, str]] = None 

372 ): 

373 if cls._meter is None: 

374 return 

375 key = f"gauge_{name}" 

376 if key not in cls._instruments: 

377 cls._instruments[key] = cls._meter.create_up_down_counter( 

378 name, description="" 

379 ) 

380 cls._instruments[key].add(value, attributes=attrs or {}) 

381 

382 @classmethod 

383 def bridge(cls, collector: "MetricsCollector"): 

384 """Wire MetricsCollector snapshots to OtelMeter on flush.""" 

385 original_flush = collector.flush 

386 

387 def _hooked_flush(): 

388 snapshots = original_flush() 

389 for snap in snapshots: 

390 attrs = dict(snap.tags) if hasattr(snap, "tags") else {} 

391 if snap.type == "counter": 

392 cls.record_counter(snap.name, snap.value, attrs) 

393 elif snap.type == "histogram": 

394 cls.record_histogram(snap.name, snap.value, attrs) 

395 elif snap.type == "gauge": 

396 cls.record_gauge(snap.name, snap.value, attrs) 

397 return snapshots 

398 

399 collector.flush = _hooked_flush # type: ignore[method-assign] 

400 

401 

402# ---- OtelMiddleware ---- 

403 

404class OtelMiddleware: 

405 """W3C TraceContext propagation for multi-agent pipelines.""" 

406 

407 @staticmethod 

408 def inject_context(headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: 

409 if headers is None: 

410 headers = {} 

411 try: 

412 from opentelemetry import propagate 

413 propagate.inject(headers) 

414 except Exception: 

415 pass 

416 return headers 

417 

418 @staticmethod 

419 def extract_context(headers: Optional[Dict[str, str]] = None) -> None: 

420 if headers is None: 

421 return 

422 try: 

423 from opentelemetry import propagate, context 

424 ctx = propagate.extract(headers) 

425 context.attach(ctx) 

426 except Exception: 

427 pass 

428 

429 @staticmethod 

430 def get_trace_id() -> str: 

431 try: 

432 from opentelemetry import trace as otel_trace 

433 span = otel_trace.get_current_span() 

434 ctx = span.get_span_context() 

435 if ctx.is_valid: 

436 return format(ctx.trace_id, "032x") 

437 except Exception: 

438 pass 

439 return "" 

440 

441 @staticmethod 

442 def get_span_id() -> str: 

443 try: 

444 from opentelemetry import trace as otel_trace 

445 span = otel_trace.get_current_span() 

446 ctx = span.get_span_context() 

447 if ctx.is_valid: 

448 return format(ctx.span_id, "016x") 

449 except Exception: 

450 pass 

451 return ""