Coverage for agentos/observability/__init__.py: 42%
353 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v1.14.3 — Observability Platform (OpenTelemetry Integration).
4Production-grade observability for AgentOS agent pipelines:
5- Distributed tracing (OpenTelemetry spans)
6- Metrics collection (Prometheus-compatible counters, histograms, gauges)
7- Structured logging with correlation IDs
8- Agent lifecycle events instrumentation
9- Cost tracking dashboard
10- Latency breakdown by pipeline stage
12Architecture:
13 AgentOS Pipeline
14 ├── OTel Tracer (auto-instrumented)
15 │ ├── Agent invocation span
16 │ │ ├── LLM call span
17 │ │ ├── Tool call span
18 │ │ └── Memory retrieval span
19 ├── MetricsExporter (Prometheus)
20 └── StructuredLogger (JSON)
22Inspired by: LangSmith, Weave, Arize Phoenix
23"""
25from __future__ import annotations
27import json
28import time
29import uuid
30from collections import defaultdict
31from contextlib import contextmanager
32from dataclasses import dataclass, field
33from enum import Enum
34from typing import (
35 Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union,
36)
39# ── Span & Trace Types ──────────────────────
42class SpanKind(str, Enum):
43 AGENT = "agent"
44 LLM = "llm"
45 TOOL = "tool"
46 MEMORY = "memory"
47 RETRIEVAL = "retrieval"
48 CHAIN = "chain"
49 EMBEDDING = "embedding"
52class SpanStatus(str, Enum):
53 OK = "ok"
54 ERROR = "error"
55 TIMEOUT = "timeout"
58@dataclass
59class SpanEvent:
60 """Span 中的事件。"""
61 name: str
62 timestamp: float = field(default_factory=time.time)
63 attributes: Dict[str, Any] = field(default_factory=dict)
66@dataclass
67class Span:
68 """OpenTelemetry 风格的 Span。"""
70 span_id: str = field(default_factory=lambda: f"span-{uuid.uuid4().hex[:12]}")
71 parent_id: Optional[str] = None
72 trace_id: str = field(default_factory=lambda: f"trace-{uuid.uuid4().hex[:16]}")
73 name: str = ""
74 kind: SpanKind = SpanKind.AGENT
76 start_time: float = field(default_factory=time.time)
77 end_time: Optional[float] = None
78 status: SpanStatus = SpanStatus.OK
80 attributes: Dict[str, Any] = field(default_factory=dict)
81 events: List[SpanEvent] = field(default_factory=list)
82 children: List["Span"] = field(default_factory=list)
84 # Agent-specific
85 model_name: str = ""
86 input_tokens: int = 0
87 output_tokens: int = 0
88 cost_usd: float = 0.0
90 @property
91 def duration_ms(self) -> float:
92 end = self.end_time or time.time()
93 return (end - self.start_time) * 1000
95 def add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> SpanEvent:
96 event = SpanEvent(name=name, attributes=attributes or {})
97 self.events.append(event)
98 return event
100 def set_error(self, error: str) -> None:
101 self.status = SpanStatus.ERROR
102 self.attributes["error"] = error
104 def finish(self) -> None:
105 self.end_time = time.time()
107 def to_dict(self) -> dict:
108 return {
109 "span_id": self.span_id,
110 "parent_id": self.parent_id,
111 "trace_id": self.trace_id,
112 "name": self.name,
113 "kind": self.kind.value,
114 "duration_ms": self.duration_ms,
115 "status": self.status.value,
116 "attributes": self.attributes,
117 "model_name": self.model_name,
118 "input_tokens": self.input_tokens,
119 "output_tokens": self.output_tokens,
120 "cost_usd": self.cost_usd,
121 }
124@dataclass
125class Trace:
126 """一次完整的 Agent 调用链路。"""
128 trace_id: str = field(default_factory=lambda: f"trace-{uuid.uuid4().hex[:16]}")
129 root_span: Optional[Span] = None
130 spans: List[Span] = field(default_factory=list)
132 @property
133 def total_duration_ms(self) -> float:
134 if not self.root_span:
135 return 0.0
136 return self.root_span.duration_ms
138 @property
139 def total_cost_usd(self) -> float:
140 return sum(s.cost_usd for s in self.spans)
142 @property
143 def total_tokens(self) -> int:
144 return sum(s.input_tokens + s.output_tokens for s in self.spans)
146 def to_dict(self) -> dict:
147 return {
148 "trace_id": self.trace_id,
149 "total_duration_ms": self.total_duration_ms,
150 "total_cost_usd": self.total_cost_usd,
151 "total_tokens": self.total_tokens,
152 "span_count": len(self.spans),
153 "spans": [s.to_dict() for s in self.spans],
154 }
157# ── Tracer ──────────────────────────────────
160class Tracer:
161 """AgentOS 追踪器。
163 自动检测 Agent/LMM/Tool/Memory 调用并创建 Span。
165 Usage:
166 tracer = Tracer()
167 with tracer.start_span("my_agent", SpanKind.AGENT) as span:
168 span.set_attribute("user_id", "123")
169 # ... agent logic ...
170 """
172 def __init__(self, service_name: str = "agentos"):
173 self._service_name = service_name
174 self._active_trace: Optional[Trace] = None
175 self._span_stack: List[Span] = []
176 self._exporters: List[Callable] = []
177 self._trace_count: int = 0
179 def add_exporter(self, exporter: Callable[[Trace], None]) -> None:
180 """添加导出器。"""
181 self._exporters.append(exporter)
183 @contextmanager
184 def start_trace(self, name: str = "") -> Iterator[Trace]:
185 """创建新 Trace。"""
186 trace = Trace()
187 trace.trace_id = f"trace-{uuid.uuid4().hex[:16]}"
189 old_trace = self._active_trace
190 self._active_trace = trace
192 try:
193 yield trace
194 finally:
195 trace.root_span = trace.spans[0] if trace.spans else None
196 self._active_trace = old_trace
197 self._trace_count += 1
199 # Export
200 for exporter in self._exporters:
201 try:
202 exporter(trace)
203 except Exception:
204 pass
206 @contextmanager
207 def start_span(
208 self,
209 name: str,
210 kind: SpanKind = SpanKind.AGENT,
211 attributes: Optional[Dict[str, Any]] = None,
212 ) -> Iterator[Span]:
213 """在当前 Trace 中创建 Span。"""
214 span = Span(
215 name=name,
216 kind=kind,
217 trace_id=self._active_trace.trace_id if self._active_trace else "",
218 )
220 if self._span_stack:
221 span.parent_id = self._span_stack[-1].span_id
222 self._span_stack[-1].children.append(span)
224 if attributes:
225 span.attributes.update(attributes)
227 self._span_stack.append(span)
229 if self._active_trace:
230 self._active_trace.spans.append(span)
232 try:
233 yield span
234 except Exception as e:
235 span.set_error(str(e))
236 raise
237 finally:
238 span.finish()
239 self._span_stack.pop()
241 def record_llm_call(
242 self,
243 model: str,
244 input_tokens: int,
245 output_tokens: int,
246 cost_usd: float,
247 duration_ms: float,
248 ) -> None:
249 """记录 LLM 调用指标。"""
250 if self._span_stack:
251 span = self._span_stack[-1]
252 span.model_name = model
253 span.input_tokens = input_tokens
254 span.output_tokens = output_tokens
255 span.cost_usd = cost_usd
257 def record_tool_call(self, tool_name: str, success: bool, duration_ms: float) -> None:
258 """记录工具调用。"""
259 if self._span_stack:
260 span = self._span_stack[-1]
261 span.add_event("tool_call", {
262 "tool_name": tool_name,
263 "success": success,
264 "duration_ms": duration_ms,
265 })
268# ── Metrics ─────────────────────────────────
271class MetricType(str, Enum):
272 COUNTER = "counter"
273 GAUGE = "gauge"
274 HISTOGRAM = "histogram"
277@dataclass
278class Metric:
279 """单个指标。"""
280 name: str
281 type: MetricType
282 description: str = ""
283 labels: Dict[str, str] = field(default_factory=dict)
284 value: float = 0.0
285 timestamp: float = field(default_factory=time.time)
288class MetricsRegistry:
289 """Prometheus 风格的指标注册表。
291 Usage:
292 registry = MetricsRegistry()
293 counter = registry.counter("agent_invocations_total", "Total agent calls")
294 counter.inc()
296 histogram = registry.histogram("llm_latency_ms", "LLM call latency")
297 histogram.observe(1234.5)
298 """
300 def __init__(self):
301 self._metrics: Dict[str, Metric] = {}
302 self._counters: Dict[str, "Counter"] = {}
303 self._histograms: Dict[str, "Histogram"] = {}
304 self._gauges: Dict[str, "Gauge"] = {}
306 def counter(self, name: str, description: str = "") -> "Counter":
307 if name not in self._counters:
308 c = Counter(name, description)
309 self._counters[name] = c
310 self._metrics[name] = Metric(
311 name=name, type=MetricType.COUNTER, description=description
312 )
313 return self._counters[name]
315 def histogram(self, name: str, description: str = "",
316 buckets: Optional[List[float]] = None) -> "Histogram":
317 if name not in self._histograms:
318 h = Histogram(name, description, buckets)
319 self._histograms[name] = h
320 self._metrics[name] = Metric(
321 name=name, type=MetricType.HISTOGRAM, description=description
322 )
323 return self._histograms[name]
325 def gauge(self, name: str, description: str = "") -> "Gauge":
326 if name not in self._gauges:
327 g = Gauge(name, description)
328 self._gauges[name] = g
329 self._metrics[name] = Metric(
330 name=name, type=MetricType.GAUGE, description=description
331 )
332 return self._gauges[name]
334 def collect(self) -> List[dict]:
335 """收集所有指标的当前值(Prometheus scrape 格式)。"""
336 results = []
337 now = time.time()
339 for name, counter in self._counters.items():
340 results.append({
341 "name": name,
342 "type": "counter",
343 "value": counter.value,
344 "timestamp": now,
345 })
347 for name, histogram in self._histograms.items():
348 results.append({
349 "name": name,
350 "type": "histogram",
351 "count": histogram.count,
352 "sum": histogram.sum,
353 "buckets": dict(histogram.buckets),
354 "timestamp": now,
355 })
357 for name, gauge in self._gauges.items():
358 results.append({
359 "name": name,
360 "type": "gauge",
361 "value": gauge.value,
362 "timestamp": now,
363 })
365 return results
367 def to_prometheus_text(self) -> str:
368 """导出为 Prometheus 文本格式。"""
369 lines = []
370 for metric in self.collect():
371 lines.append(f"# HELP {metric['name']} AgentOS metric")
372 lines.append(f"# TYPE {metric['name']} {metric['type']}")
374 if metric["type"] == "histogram":
375 lines.append(f"{metric['name']}_count {metric['count']}")
376 lines.append(f"{metric['name']}_sum {metric['sum']}")
377 for bucket, count in metric.get("buckets", {}).items():
378 lines.append(f"{metric['name']}_bucket{{le=\"{bucket}\"}} {count}")
379 else:
380 lines.append(f"{metric['name']} {metric['value']}")
382 return "\n".join(lines)
385class Counter:
386 def __init__(self, name: str, description: str = ""):
387 self.name = name
388 self.description = description
389 self._value: float = 0.0
391 def inc(self, amount: float = 1.0) -> None:
392 self._value += amount
394 @property
395 def value(self) -> float:
396 return self._value
399class Gauge:
400 def __init__(self, name: str, description: str = ""):
401 self.name = name
402 self.description = description
403 self._value: float = 0.0
405 def set(self, value: float) -> None:
406 self._value = value
408 @property
409 def value(self) -> float:
410 return self._value
413class Histogram:
414 DEFAULT_BUCKETS = [1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000]
416 def __init__(self, name: str, description: str = "",
417 buckets: Optional[List[float]] = None):
418 self.name = name
419 self.description = description
420 self.buckets: Dict[float, int] = {}
421 for b in (buckets or self.DEFAULT_BUCKETS):
422 self.buckets[b] = 0
423 self._count: int = 0
424 self._sum: float = 0.0
426 def observe(self, value: float) -> None:
427 self._count += 1
428 self._sum += value
429 for boundary in sorted(self.buckets.keys()):
430 if value <= boundary:
431 self.buckets[boundary] += 1
432 break
434 @property
435 def count(self) -> int:
436 return self._count
438 @property
439 def sum(self) -> float:
440 return self._sum
443# ── Structured Logger ───────────────────────
446class LogLevel(str, Enum):
447 DEBUG = "debug"
448 INFO = "info"
449 WARNING = "warning"
450 ERROR = "error"
451 CRITICAL = "critical"
454class StructuredLogger:
455 """结构化 JSON 日志记录器,自动注入 trace_id 和 span_id。
457 Usage:
458 logger = StructuredLogger(tracer)
459 logger.info("Agent started", agent_name="ToolAgent", user_id="123")
460 """
462 def __init__(self, tracer: Optional[Tracer] = None):
463 self._tracer = tracer
464 self._handlers: List[Callable] = []
465 self._level = LogLevel.INFO
467 def add_handler(self, handler: Callable[[dict], None]) -> None:
468 self._handlers.append(handler)
470 def set_level(self, level: LogLevel) -> None:
471 self._level = level
473 def log(self, level: LogLevel, message: str, **kwargs) -> None:
474 entry = {
475 "timestamp": time.time(),
476 "level": level.value,
477 "message": message,
478 **kwargs,
479 }
481 # Inject trace context
482 if self._tracer:
483 trace = self._tracer._active_trace
484 if trace:
485 entry["trace_id"] = trace.trace_id
486 if self._tracer._span_stack:
487 entry["span_id"] = self._tracer._span_stack[-1].span_id
489 for handler in self._handlers:
490 try:
491 handler(entry)
492 except Exception:
493 pass
495 def debug(self, message: str, **kwargs) -> None:
496 self.log(LogLevel.DEBUG, message, **kwargs)
498 def info(self, message: str, **kwargs) -> None:
499 self.log(LogLevel.INFO, message, **kwargs)
501 def warning(self, message: str, **kwargs) -> None:
502 self.log(LogLevel.WARNING, message, **kwargs)
504 def error(self, message: str, **kwargs) -> None:
505 self.log(LogLevel.ERROR, message, **kwargs)
507 def critical(self, message: str, **kwargs) -> None:
508 self.log(LogLevel.CRITICAL, message, **kwargs)
511# ── Dashboard Data ──────────────────────────
514class ObservabilityDashboard:
515 """可观测性数据聚合器 — 为 Grafana/自定义仪表盘提供数据。
517 Usage:
518 dashboard = ObservabilityDashboard(tracer, metrics)
519 summary = dashboard.get_summary()
520 """
522 def __init__(self, tracer: Tracer, metrics: MetricsRegistry):
523 self._tracer = tracer
524 self._metrics = metrics
525 self._trace_buffer: List[Trace] = []
526 self._max_buffer = 1000
528 def record_trace(self, trace: Trace) -> None:
529 self._trace_buffer.append(trace)
530 if len(self._trace_buffer) > self._max_buffer:
531 self._trace_buffer = self._trace_buffer[-self._max_buffer:]
533 def get_summary(self) -> dict:
534 """获取综合摘要。"""
535 traces = self._trace_buffer[-100:] # Last 100 traces
537 if not traces:
538 return {"message": "No traces recorded"}
540 durations = [t.total_duration_ms for t in traces]
541 costs = [t.total_cost_usd for t in traces]
543 durations.sort()
545 # Span kind distribution
546 kind_counts: Dict[str, int] = defaultdict(int)
547 error_count = 0
548 for t in traces:
549 for s in t.spans:
550 kind_counts[s.kind.value] += 1
551 if s.status == SpanStatus.ERROR:
552 error_count += 1
554 return {
555 "trace_count": len(traces),
556 "total_traces": self._tracer._trace_count,
557 "error_rate": error_count / max(sum(kind_counts.values()), 1),
558 "duration": {
559 "p50_ms": durations[len(durations) // 2] if durations else 0,
560 "p95_ms": durations[int(len(durations) * 0.95)] if len(durations) > 1 else 0,
561 "p99_ms": durations[int(len(durations) * 0.99)] if len(durations) > 1 else 0,
562 "avg_ms": sum(durations) / len(durations) if durations else 0,
563 },
564 "cost": {
565 "total_usd": sum(costs),
566 "avg_per_trace_usd": sum(costs) / len(costs) if costs else 0,
567 },
568 "span_distribution": dict(kind_counts),
569 "metrics": self._metrics.collect(),
570 }
572 def get_latency_breakdown(self) -> List[dict]:
573 """按 pipeline 阶段拆分延迟。"""
574 breakdown: Dict[str, List[float]] = defaultdict(list)
576 for trace in self._trace_buffer[-100:]:
577 for span in trace.spans:
578 breakdown[span.kind.value].append(span.duration_ms)
580 result = []
581 for kind, values in breakdown.items():
582 values.sort()
583 n = len(values)
584 result.append({
585 "stage": kind,
586 "count": n,
587 "avg_ms": sum(values) / n if n else 0,
588 "p50_ms": values[n // 2] if n else 0,
589 "p95_ms": values[int(n * 0.95)] if n > 1 else (values[0] if values else 0),
590 })
592 return result
595# ── Quick Start ─────────────────────────────
598def create_observability_stack(service_name: str = "agentos"):
599 """一键创建可观测性栈。"""
600 tracer = Tracer(service_name=service_name)
601 metrics = MetricsRegistry()
602 logger = StructuredLogger(tracer)
603 dashboard = ObservabilityDashboard(tracer, metrics)
605 # Register auto-export
606 tracer.add_exporter(dashboard.record_trace)
608 # Register default metrics
609 metrics.counter("agent_invocations_total", "Total agent invocations")
610 metrics.histogram("agent_latency_ms", "Agent end-to-end latency")
611 metrics.histogram("llm_latency_ms", "LLM call latency")
612 metrics.counter("tool_calls_total", "Total tool calls")
613 metrics.counter("tool_errors_total", "Total tool errors")
614 metrics.gauge("active_agents", "Currently active agents")
616 return tracer, metrics, logger, dashboard
619# ── Missing compat classes ───────────────────
621class MetricsCollector(MetricsRegistry):
622 """Alias for MetricsRegistry — required by agentos/__init__.py."""
623 pass
626class NoopTracer(Tracer):
627 """No-op tracer — compatible replacement."""
629 def __init__(self, *args, **kwargs):
630 super().__init__(*args, **kwargs)
632 def start_span(self, *args, **kwargs):
633 return Span(name="noop")
635 def mark_complete(self, span_id: str = ""):
636 pass
639@dataclass
640class CostAnalytics:
641 """LLM 成本分析。"""
643 total_cost: float = 0.0
644 total_tokens: int = 0
645 prompt_tokens: int = 0
646 completion_tokens: int = 0
647 cost_per_call: list[float] = field(default_factory=list)
649 def record_call(self, prompt_tokens: int, completion_tokens: int, cost: float):
650 self.prompt_tokens += prompt_tokens
651 self.completion_tokens += completion_tokens
652 self.total_cost += cost
653 self.cost_per_call.append(cost)
655 def summary(self) -> dict:
656 return {
657 "total_cost": self.total_cost,
658 "total_tokens": self.prompt_tokens + self.completion_tokens,
659 "avg_cost_per_call": sum(self.cost_per_call) / len(self.cost_per_call) if self.cost_per_call else 0,
660 }
663@dataclass
664class BudgetAlert:
665 """成本预算告警。"""
667 budget_limit: float = 0.0
668 current_spend: float = 0.0
669 threshold_pct: float = 80.0
671 def check(self) -> Optional[str]:
672 if self.budget_limit <= 0:
673 return None
674 pct = (self.current_spend / self.budget_limit) * 100
675 if pct >= self.threshold_pct:
676 return f"Budget alert: {pct:.1f}% of ${self.budget_limit:.2f} spent"
677 return None