Source code for jeevesagent.observability.tracing

"""Telemetry adapters.

* :class:`NoTelemetry` — no-op default. Both methods do as little work as
  possible so wrapping every loop step in ``async with telemetry.trace(...)``
  has effectively zero cost when telemetry isn't configured.
* :class:`OTelTelemetry` — OpenTelemetry-backed. Lazy SDK imports inside
  ``__init__``. Spans go to whatever ``TracerProvider`` is configured;
  metrics go to whatever ``MeterProvider`` is configured. Tests pass
  in-memory providers; production users wire up their exporters once at
  startup and the adapter inherits.

Metric type dispatch is by suffix:

* names ending in ``_ms``, ``_seconds``, or ``_bytes`` -> histogram
* everything else -> counter

That keeps the public interface a single :meth:`emit_metric` while still
producing the right OTel instrument under the hood.
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

from ..core.types import Span

_HISTOGRAM_SUFFIXES = ("_ms", "_seconds", "_bytes")


def _clean(attrs: dict[str, Any]) -> dict[str, Any]:
    """Drop None values; OTel attribute APIs reject them."""
    return {k: v for k, v in attrs.items() if v is not None}


[docs] class NoTelemetry: """No-op telemetry. Very cheap; safe to call on every loop step."""
[docs] @asynccontextmanager async def trace(self, name: str, **attrs: Any) -> AsyncIterator[Span]: yield Span(name=name, trace_id="", span_id="", attributes=dict(attrs))
[docs] async def emit_metric(self, name: str, value: float, **attrs: Any) -> None: return None
[docs] class OTelTelemetry: """OpenTelemetry-backed :class:`~jeevesagent.core.protocols.Telemetry`.""" def __init__( self, *, tracer_provider: Any | None = None, meter_provider: Any | None = None, instrumentation_name: str = "jeevesagent", ) -> None: try: from opentelemetry import metrics, trace except ImportError as exc: # pragma: no cover — depends on user env raise ImportError( "opentelemetry is not installed. " "Install with: pip install 'jeevesagent[otel]'" ) from exc self._tracer = trace.get_tracer( instrumentation_name, tracer_provider=tracer_provider, ) if meter_provider is not None: self._meter = meter_provider.get_meter(instrumentation_name) else: self._meter = metrics.get_meter(instrumentation_name) self._counters: dict[str, Any] = {} self._histograms: dict[str, Any] = {}
[docs] @asynccontextmanager async def trace(self, name: str, **attrs: Any) -> AsyncIterator[Span]: clean_attrs = _clean(attrs) with self._tracer.start_as_current_span( name, attributes=clean_attrs ) as otel_span: ctx = otel_span.get_span_context() our_span = Span( name=name, trace_id=format(ctx.trace_id, "032x"), span_id=format(ctx.span_id, "016x"), attributes=dict(clean_attrs), ) try: yield our_span except Exception as exc: otel_span.record_exception(exc) from opentelemetry.trace import Status, StatusCode otel_span.set_status(Status(StatusCode.ERROR, str(exc))) raise
[docs] async def emit_metric( self, name: str, value: float, **attrs: Any ) -> None: clean_attrs = _clean(attrs) if name.endswith(_HISTOGRAM_SUFFIXES): histo = self._histograms.get(name) if histo is None: histo = self._meter.create_histogram(name) self._histograms[name] = histo histo.record(value, attributes=clean_attrs) else: counter = self._counters.get(name) if counter is None: counter = self._meter.create_counter(name) self._counters[name] = counter counter.add(value, attributes=clean_attrs)