Coverage for agentos/observability/otel_bridge.py: 0%
298 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""AgentOS OpenTelemetry - OTLP/Jaeger/Zipkin trace/metric export (v1.14.6)."""
3import os
4import logging
5from contextlib import asynccontextmanager, contextmanager
6from dataclasses import dataclass, field
7from enum import Enum
8from functools import wraps
9from typing import Any, Callable, Dict, Optional, TYPE_CHECKING
11if TYPE_CHECKING:
12 from agentos.observability.metrics import MetricsCollector
14logger = logging.getLogger("agentos.otel")
17# ---- Enums ----
19class OTelExporter(str, Enum):
20 """OpenTelemetry exporter backend."""
21 OTLP_HTTP = "otlp_http"
22 OTLP_GRPC = "otlp_grpc"
23 CONSOLE = "console"
24 ZIPKIN = "zipkin"
25 NONE = "none"
28class OtelStatus(str, Enum):
29 """Span status codes."""
30 OK = "OK"
31 ERROR = "ERROR"
34class SpanKind(str, Enum):
35 """Span kind for semantic conventions."""
36 INTERNAL = "internal"
37 CLIENT = "client"
38 SERVER = "server"
39 PRODUCER = "producer"
40 CONSUMER = "consumer"
43# ---- OtelConfig ----
45@dataclass
46class OtelConfig:
47 """OpenTelemetry configuration."""
49 service_name: str = "agentos"
50 service_version: str = ""
51 exporter: OTelExporter = OTelExporter.CONSOLE
52 endpoint: str = "http://localhost:4318/v1/traces"
53 metrics_endpoint: str = "http://localhost:4318/v1/metrics"
54 resource_attrs: Dict[str, str] = field(default_factory=dict)
55 sample_rate: float = 1.0
56 batch_timeout_ms: int = 5000
57 max_span_attributes: int = 128
58 disabled: bool = False
59 api_key: str = ""
60 zipkin_endpoint: str = "http://localhost:9411/api/v2/spans"
62 def with_env_overrides(self) -> "OtelConfig":
63 if v := os.getenv("OTEL_SERVICE_NAME"):
64 self.service_name = v
65 if v := os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"):
66 self.endpoint = v
67 if v := os.getenv("OTEL_EXPORTER_ZIPKIN_ENDPOINT"):
68 self.zipkin_endpoint = v
69 if os.getenv("OTEL_SDK_DISABLED"):
70 self.disabled = True
71 return self
74# ---- SpanHandle ----
76def _normalize_value(v: Any) -> Any:
77 if isinstance(v, (str, int, float, bool)):
78 return v
79 if isinstance(v, (list, tuple)):
80 return [_normalize_value(x) for x in v]
81 return str(v)
84class SpanHandle:
85 """Wrapper around OTel span for attribute/event/exception API."""
87 def __init__(self, span: Any):
88 self._span = span
90 def set_attribute(self, key: str, value: Any) -> None:
91 self._span.set_attribute(key, _normalize_value(value))
93 def set_attributes(self, attrs: Dict[str, Any]) -> None:
94 for k, v in attrs.items():
95 self.set_attribute(k, v)
97 def add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> None:
98 self._span.add_event(name, attributes={
99 k: _normalize_value(v) for k, v in (attributes or {}).items()
100 })
102 def record_exception(self, exception: Exception) -> None:
103 self._span.record_exception(exception)
105 def set_status(self, status: OtelStatus, description: str = "") -> None:
106 from opentelemetry import trace as otel_trace
107 code = otel_trace.StatusCode.OK if status == OtelStatus.OK else otel_trace.StatusCode.ERROR
108 self._span.set_status(otel_trace.Status(code, description))
111# ---- OtelTracer ----
113class OtelTracer:
114 """OpenTelemetry tracer with span management and W3C context propagation.
116 Usage:
117 OtelTracer.init(OtelConfig(service_name="my-agent"))
119 with OtelTracer.span("llm_call", kind=SpanKind.CLIENT) as span:
120 span.set_attribute("model", "gpt-4")
121 result = llm.generate(prompt)
123 @OtelTracer.trace("process")
124 async def process(input): ...
125 """
127 _config: Optional[OtelConfig] = None
128 _tracer_provider: Any = None
129 _initialized: bool = False
131 @classmethod
132 def init(cls, config: Optional[OtelConfig] = None) -> None:
133 if config is None:
134 config = OtelConfig().with_env_overrides()
135 else:
136 config = config.with_env_overrides()
138 cls._config = config
139 if config.disabled:
140 cls._initialized = True
141 return
143 try:
144 cls._init_sdk(config)
145 cls._initialized = True
146 logger.info(
147 "OtelTracer initialized: service=%s exporter=%s",
148 config.service_name, config.exporter.value,
149 )
150 except ImportError:
151 logger.warning("opentelemetry packages not installed - noop tracer")
152 cls._initialized = True
153 except Exception as e:
154 logger.error("OTel init failed: %s - noop", e)
155 cls._initialized = True
157 @classmethod
158 def _init_sdk(cls, config: OtelConfig):
159 from opentelemetry import trace as otel_trace
160 from opentelemetry.sdk.trace import TracerProvider
161 from opentelemetry.sdk.trace.export import BatchSpanProcessor
162 from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
163 from opentelemetry.trace import set_tracer_provider
165 resource = Resource.create({
166 SERVICE_NAME: config.service_name,
167 SERVICE_VERSION: config.service_version,
168 **config.resource_attrs,
169 })
171 provider = TracerProvider(resource=resource)
172 exporter = cls._build_exporter(config)
173 if exporter:
174 provider.add_span_processor(
175 BatchSpanProcessor(
176 exporter,
177 schedule_delay_millis=config.batch_timeout_ms,
178 max_export_batch_size=512,
179 )
180 )
181 set_tracer_provider(provider)
182 cls._tracer_provider = provider
184 @classmethod
185 def _build_exporter(cls, config: OtelConfig):
186 if config.exporter == OTelExporter.OTLP_HTTP:
187 from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
188 OTLPSpanExporter,
189 )
190 return OTLPSpanExporter(endpoint=config.endpoint)
191 elif config.exporter == OTelExporter.OTLP_GRPC:
192 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
193 OTLPSpanExporter,
194 )
195 return OTLPSpanExporter(endpoint=config.endpoint, insecure=True)
196 elif config.exporter == OTelExporter.CONSOLE:
197 from opentelemetry.sdk.trace.export import ConsoleSpanExporter
198 return ConsoleSpanExporter()
199 elif config.exporter == OTelExporter.ZIPKIN:
200 from opentelemetry.exporter.zipkin.proto.http import ZipkinExporter
201 return ZipkinExporter(endpoint=config.zipkin_endpoint)
202 return None
204 @classmethod
205 def get_tracer(cls, name: str = "agentos") -> Any:
206 if not cls._initialized:
207 cls.init()
208 from opentelemetry import trace as otel_trace
209 return otel_trace.get_tracer(name)
211 @classmethod
212 @contextmanager
213 def span(
214 cls,
215 name: str,
216 kind: SpanKind = SpanKind.INTERNAL,
217 attributes: Optional[Dict[str, Any]] = None,
218 parent: Any = None,
219 ):
220 if not cls._initialized:
221 cls.init()
223 tracer = cls.get_tracer()
224 kind_map = {
225 SpanKind.INTERNAL: 0,
226 SpanKind.CLIENT: 3,
227 SpanKind.SERVER: 2,
228 SpanKind.PRODUCER: 4,
229 SpanKind.CONSUMER: 5,
230 }
231 sk = kind_map.get(kind, 0)
233 from opentelemetry import trace as otel_trace
234 span = tracer.start_span(
235 name,
236 kind=otel_trace.SpanKind(sk),
237 attributes={
238 k: _normalize_value(v) for k, v in (attributes or {}).items()
239 },
240 )
242 try:
243 yield SpanHandle(span)
244 except Exception:
245 span.set_status(otel_trace.Status(otel_trace.StatusCode.ERROR))
246 raise
247 finally:
248 span.end()
250 @classmethod
251 def trace(
252 cls,
253 name: str = "",
254 kind: SpanKind = SpanKind.INTERNAL,
255 extract_attrs: Optional[Callable] = None,
256 ):
257 span_name = name
259 def decorator(func):
260 nonlocal span_name
261 import asyncio
262 if not span_name:
263 span_name = func.__name__
264 is_async = asyncio.iscoroutinefunction(func)
266 @wraps(func)
267 async def async_wrapper(*args, **kwargs):
268 attrs = extract_attrs(*args, **kwargs) if extract_attrs else {}
269 with cls.span(span_name, kind=kind, attributes=attrs) as span:
270 try:
271 result = await func(*args, **kwargs)
272 span.set_attribute("status", "ok")
273 return result
274 except Exception as e:
275 span.record_exception(e)
276 span.set_status(OtelStatus.ERROR)
277 raise
279 @wraps(func)
280 def sync_wrapper(*args, **kwargs):
281 attrs = extract_attrs(*args, **kwargs) if extract_attrs else {}
282 with cls.span(span_name, kind=kind, attributes=attrs) as span:
283 try:
284 result = func(*args, **kwargs)
285 span.set_attribute("status", "ok")
286 return result
287 except Exception as e:
288 span.record_exception(e)
289 span.set_status(OtelStatus.ERROR)
290 raise
292 return async_wrapper if is_async else sync_wrapper
294 return decorator
296 @classmethod
297 @asynccontextmanager
298 async def async_span(cls, name: str, kind: SpanKind = SpanKind.INTERNAL, **attrs):
299 with cls.span(name, kind=kind, attributes=attrs) as span:
300 yield span
302 @classmethod
303 def shutdown(cls):
304 if cls._tracer_provider:
305 try:
306 cls._tracer_provider.shutdown()
307 except Exception:
308 pass
309 cls._tracer_provider = None
310 cls._initialized = False
313# ---- OtelMeter ----
315class OtelMeter:
316 """Bridge MetricsCollector to OpenTelemetry metrics."""
318 _meter: Any = None
319 _instruments: Dict[str, Any] = {}
321 @classmethod
322 def init(cls, config: Optional[OtelConfig] = None):
323 if config is None:
324 config = OtelConfig().with_env_overrides()
325 if config.disabled:
326 return
327 try:
328 from opentelemetry import metrics
329 from opentelemetry.sdk.metrics import MeterProvider
330 from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
331 from opentelemetry.sdk.resources import Resource, SERVICE_NAME
332 from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
333 OTLPMetricExporter,
334 )
336 resource = Resource.create({SERVICE_NAME: config.service_name})
337 exporter = OTLPMetricExporter(endpoint=config.metrics_endpoint)
338 reader = PeriodicExportingMetricReader(exporter)
339 provider = MeterProvider(resource=resource, metric_readers=[reader])
340 metrics.set_meter_provider(provider)
341 cls._meter = metrics.get_meter(config.service_name, config.service_version)
342 logger.info("OtelMeter initialized: endpoint=%s", config.metrics_endpoint)
343 except ImportError:
344 logger.warning("opentelemetry metrics packages not installed")
345 except Exception as e:
346 logger.error("OtelMeter init failed: %s", e)
348 @classmethod
349 def record_counter(
350 cls, name: str, value: float, attrs: Optional[Dict[str, str]] = None
351 ):
352 if cls._meter is None:
353 return
354 if name not in cls._instruments:
355 cls._instruments[name] = cls._meter.create_counter(name, description="")
356 cls._instruments[name].add(value, attributes=attrs or {})
358 @classmethod
359 def record_histogram(
360 cls, name: str, value: float, attrs: Optional[Dict[str, str]] = None
361 ):
362 if cls._meter is None:
363 return
364 key = f"hist_{name}"
365 if key not in cls._instruments:
366 cls._instruments[key] = cls._meter.create_histogram(name, description="")
367 cls._instruments[key].record(value, attributes=attrs or {})
369 @classmethod
370 def record_gauge(
371 cls, name: str, value: float, attrs: Optional[Dict[str, str]] = None
372 ):
373 if cls._meter is None:
374 return
375 key = f"gauge_{name}"
376 if key not in cls._instruments:
377 cls._instruments[key] = cls._meter.create_up_down_counter(
378 name, description=""
379 )
380 cls._instruments[key].add(value, attributes=attrs or {})
382 @classmethod
383 def bridge(cls, collector: "MetricsCollector"):
384 """Wire MetricsCollector snapshots to OtelMeter on flush."""
385 original_flush = collector.flush
387 def _hooked_flush():
388 snapshots = original_flush()
389 for snap in snapshots:
390 attrs = dict(snap.tags) if hasattr(snap, "tags") else {}
391 if snap.type == "counter":
392 cls.record_counter(snap.name, snap.value, attrs)
393 elif snap.type == "histogram":
394 cls.record_histogram(snap.name, snap.value, attrs)
395 elif snap.type == "gauge":
396 cls.record_gauge(snap.name, snap.value, attrs)
397 return snapshots
399 collector.flush = _hooked_flush # type: ignore[method-assign]
402# ---- OtelMiddleware ----
404class OtelMiddleware:
405 """W3C TraceContext propagation for multi-agent pipelines."""
407 @staticmethod
408 def inject_context(headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
409 if headers is None:
410 headers = {}
411 try:
412 from opentelemetry import propagate
413 propagate.inject(headers)
414 except Exception:
415 pass
416 return headers
418 @staticmethod
419 def extract_context(headers: Optional[Dict[str, str]] = None) -> None:
420 if headers is None:
421 return
422 try:
423 from opentelemetry import propagate, context
424 ctx = propagate.extract(headers)
425 context.attach(ctx)
426 except Exception:
427 pass
429 @staticmethod
430 def get_trace_id() -> str:
431 try:
432 from opentelemetry import trace as otel_trace
433 span = otel_trace.get_current_span()
434 ctx = span.get_span_context()
435 if ctx.is_valid:
436 return format(ctx.trace_id, "032x")
437 except Exception:
438 pass
439 return ""
441 @staticmethod
442 def get_span_id() -> str:
443 try:
444 from opentelemetry import trace as otel_trace
445 span = otel_trace.get_current_span()
446 ctx = span.get_span_context()
447 if ctx.is_valid:
448 return format(ctx.span_id, "016x")
449 except Exception:
450 pass
451 return ""