Coverage for agentos/observability/__init__.py: 42%

353 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v1.14.3 — Observability Platform (OpenTelemetry Integration). 

3 

4Production-grade observability for AgentOS agent pipelines: 

5- Distributed tracing (OpenTelemetry spans) 

6- Metrics collection (Prometheus-compatible counters, histograms, gauges) 

7- Structured logging with correlation IDs 

8- Agent lifecycle events instrumentation 

9- Cost tracking dashboard 

10- Latency breakdown by pipeline stage 

11 

12Architecture: 

13 AgentOS Pipeline 

14 ├── OTel Tracer (auto-instrumented) 

15 │ ├── Agent invocation span 

16 │ │ ├── LLM call span 

17 │ │ ├── Tool call span 

18 │ │ └── Memory retrieval span 

19 ├── MetricsExporter (Prometheus) 

20 └── StructuredLogger (JSON) 

21 

22Inspired by: LangSmith, Weave, Arize Phoenix 

23""" 

24 

25from __future__ import annotations 

26 

27import json 

28import time 

29import uuid 

30from collections import defaultdict 

31from contextlib import contextmanager 

32from dataclasses import dataclass, field 

33from enum import Enum 

34from typing import ( 

35 Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union, 

36) 

37 

38 

39# ── Span & Trace Types ────────────────────── 

40 

41 

42class SpanKind(str, Enum): 

43 AGENT = "agent" 

44 LLM = "llm" 

45 TOOL = "tool" 

46 MEMORY = "memory" 

47 RETRIEVAL = "retrieval" 

48 CHAIN = "chain" 

49 EMBEDDING = "embedding" 

50 

51 

52class SpanStatus(str, Enum): 

53 OK = "ok" 

54 ERROR = "error" 

55 TIMEOUT = "timeout" 

56 

57 

58@dataclass 

59class SpanEvent: 

60 """Span 中的事件。""" 

61 name: str 

62 timestamp: float = field(default_factory=time.time) 

63 attributes: Dict[str, Any] = field(default_factory=dict) 

64 

65 

66@dataclass 

67class Span: 

68 """OpenTelemetry 风格的 Span。""" 

69 

70 span_id: str = field(default_factory=lambda: f"span-{uuid.uuid4().hex[:12]}") 

71 parent_id: Optional[str] = None 

72 trace_id: str = field(default_factory=lambda: f"trace-{uuid.uuid4().hex[:16]}") 

73 name: str = "" 

74 kind: SpanKind = SpanKind.AGENT 

75 

76 start_time: float = field(default_factory=time.time) 

77 end_time: Optional[float] = None 

78 status: SpanStatus = SpanStatus.OK 

79 

80 attributes: Dict[str, Any] = field(default_factory=dict) 

81 events: List[SpanEvent] = field(default_factory=list) 

82 children: List["Span"] = field(default_factory=list) 

83 

84 # Agent-specific 

85 model_name: str = "" 

86 input_tokens: int = 0 

87 output_tokens: int = 0 

88 cost_usd: float = 0.0 

89 

90 @property 

91 def duration_ms(self) -> float: 

92 end = self.end_time or time.time() 

93 return (end - self.start_time) * 1000 

94 

95 def add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> SpanEvent: 

96 event = SpanEvent(name=name, attributes=attributes or {}) 

97 self.events.append(event) 

98 return event 

99 

100 def set_error(self, error: str) -> None: 

101 self.status = SpanStatus.ERROR 

102 self.attributes["error"] = error 

103 

104 def finish(self) -> None: 

105 self.end_time = time.time() 

106 

107 def to_dict(self) -> dict: 

108 return { 

109 "span_id": self.span_id, 

110 "parent_id": self.parent_id, 

111 "trace_id": self.trace_id, 

112 "name": self.name, 

113 "kind": self.kind.value, 

114 "duration_ms": self.duration_ms, 

115 "status": self.status.value, 

116 "attributes": self.attributes, 

117 "model_name": self.model_name, 

118 "input_tokens": self.input_tokens, 

119 "output_tokens": self.output_tokens, 

120 "cost_usd": self.cost_usd, 

121 } 

122 

123 

124@dataclass 

125class Trace: 

126 """一次完整的 Agent 调用链路。""" 

127 

128 trace_id: str = field(default_factory=lambda: f"trace-{uuid.uuid4().hex[:16]}") 

129 root_span: Optional[Span] = None 

130 spans: List[Span] = field(default_factory=list) 

131 

132 @property 

133 def total_duration_ms(self) -> float: 

134 if not self.root_span: 

135 return 0.0 

136 return self.root_span.duration_ms 

137 

138 @property 

139 def total_cost_usd(self) -> float: 

140 return sum(s.cost_usd for s in self.spans) 

141 

142 @property 

143 def total_tokens(self) -> int: 

144 return sum(s.input_tokens + s.output_tokens for s in self.spans) 

145 

146 def to_dict(self) -> dict: 

147 return { 

148 "trace_id": self.trace_id, 

149 "total_duration_ms": self.total_duration_ms, 

150 "total_cost_usd": self.total_cost_usd, 

151 "total_tokens": self.total_tokens, 

152 "span_count": len(self.spans), 

153 "spans": [s.to_dict() for s in self.spans], 

154 } 

155 

156 

157# ── Tracer ────────────────────────────────── 

158 

159 

160class Tracer: 

161 """AgentOS 追踪器。 

162 

163 自动检测 Agent/LMM/Tool/Memory 调用并创建 Span。 

164 

165 Usage: 

166 tracer = Tracer() 

167 with tracer.start_span("my_agent", SpanKind.AGENT) as span: 

168 span.set_attribute("user_id", "123") 

169 # ... agent logic ... 

170 """ 

171 

172 def __init__(self, service_name: str = "agentos"): 

173 self._service_name = service_name 

174 self._active_trace: Optional[Trace] = None 

175 self._span_stack: List[Span] = [] 

176 self._exporters: List[Callable] = [] 

177 self._trace_count: int = 0 

178 

179 def add_exporter(self, exporter: Callable[[Trace], None]) -> None: 

180 """添加导出器。""" 

181 self._exporters.append(exporter) 

182 

183 @contextmanager 

184 def start_trace(self, name: str = "") -> Iterator[Trace]: 

185 """创建新 Trace。""" 

186 trace = Trace() 

187 trace.trace_id = f"trace-{uuid.uuid4().hex[:16]}" 

188 

189 old_trace = self._active_trace 

190 self._active_trace = trace 

191 

192 try: 

193 yield trace 

194 finally: 

195 trace.root_span = trace.spans[0] if trace.spans else None 

196 self._active_trace = old_trace 

197 self._trace_count += 1 

198 

199 # Export 

200 for exporter in self._exporters: 

201 try: 

202 exporter(trace) 

203 except Exception: 

204 pass 

205 

206 @contextmanager 

207 def start_span( 

208 self, 

209 name: str, 

210 kind: SpanKind = SpanKind.AGENT, 

211 attributes: Optional[Dict[str, Any]] = None, 

212 ) -> Iterator[Span]: 

213 """在当前 Trace 中创建 Span。""" 

214 span = Span( 

215 name=name, 

216 kind=kind, 

217 trace_id=self._active_trace.trace_id if self._active_trace else "", 

218 ) 

219 

220 if self._span_stack: 

221 span.parent_id = self._span_stack[-1].span_id 

222 self._span_stack[-1].children.append(span) 

223 

224 if attributes: 

225 span.attributes.update(attributes) 

226 

227 self._span_stack.append(span) 

228 

229 if self._active_trace: 

230 self._active_trace.spans.append(span) 

231 

232 try: 

233 yield span 

234 except Exception as e: 

235 span.set_error(str(e)) 

236 raise 

237 finally: 

238 span.finish() 

239 self._span_stack.pop() 

240 

241 def record_llm_call( 

242 self, 

243 model: str, 

244 input_tokens: int, 

245 output_tokens: int, 

246 cost_usd: float, 

247 duration_ms: float, 

248 ) -> None: 

249 """记录 LLM 调用指标。""" 

250 if self._span_stack: 

251 span = self._span_stack[-1] 

252 span.model_name = model 

253 span.input_tokens = input_tokens 

254 span.output_tokens = output_tokens 

255 span.cost_usd = cost_usd 

256 

257 def record_tool_call(self, tool_name: str, success: bool, duration_ms: float) -> None: 

258 """记录工具调用。""" 

259 if self._span_stack: 

260 span = self._span_stack[-1] 

261 span.add_event("tool_call", { 

262 "tool_name": tool_name, 

263 "success": success, 

264 "duration_ms": duration_ms, 

265 }) 

266 

267 

268# ── Metrics ───────────────────────────────── 

269 

270 

271class MetricType(str, Enum): 

272 COUNTER = "counter" 

273 GAUGE = "gauge" 

274 HISTOGRAM = "histogram" 

275 

276 

277@dataclass 

278class Metric: 

279 """单个指标。""" 

280 name: str 

281 type: MetricType 

282 description: str = "" 

283 labels: Dict[str, str] = field(default_factory=dict) 

284 value: float = 0.0 

285 timestamp: float = field(default_factory=time.time) 

286 

287 

288class MetricsRegistry: 

289 """Prometheus 风格的指标注册表。 

290 

291 Usage: 

292 registry = MetricsRegistry() 

293 counter = registry.counter("agent_invocations_total", "Total agent calls") 

294 counter.inc() 

295 

296 histogram = registry.histogram("llm_latency_ms", "LLM call latency") 

297 histogram.observe(1234.5) 

298 """ 

299 

300 def __init__(self): 

301 self._metrics: Dict[str, Metric] = {} 

302 self._counters: Dict[str, "Counter"] = {} 

303 self._histograms: Dict[str, "Histogram"] = {} 

304 self._gauges: Dict[str, "Gauge"] = {} 

305 

306 def counter(self, name: str, description: str = "") -> "Counter": 

307 if name not in self._counters: 

308 c = Counter(name, description) 

309 self._counters[name] = c 

310 self._metrics[name] = Metric( 

311 name=name, type=MetricType.COUNTER, description=description 

312 ) 

313 return self._counters[name] 

314 

315 def histogram(self, name: str, description: str = "", 

316 buckets: Optional[List[float]] = None) -> "Histogram": 

317 if name not in self._histograms: 

318 h = Histogram(name, description, buckets) 

319 self._histograms[name] = h 

320 self._metrics[name] = Metric( 

321 name=name, type=MetricType.HISTOGRAM, description=description 

322 ) 

323 return self._histograms[name] 

324 

325 def gauge(self, name: str, description: str = "") -> "Gauge": 

326 if name not in self._gauges: 

327 g = Gauge(name, description) 

328 self._gauges[name] = g 

329 self._metrics[name] = Metric( 

330 name=name, type=MetricType.GAUGE, description=description 

331 ) 

332 return self._gauges[name] 

333 

334 def collect(self) -> List[dict]: 

335 """收集所有指标的当前值(Prometheus scrape 格式)。""" 

336 results = [] 

337 now = time.time() 

338 

339 for name, counter in self._counters.items(): 

340 results.append({ 

341 "name": name, 

342 "type": "counter", 

343 "value": counter.value, 

344 "timestamp": now, 

345 }) 

346 

347 for name, histogram in self._histograms.items(): 

348 results.append({ 

349 "name": name, 

350 "type": "histogram", 

351 "count": histogram.count, 

352 "sum": histogram.sum, 

353 "buckets": dict(histogram.buckets), 

354 "timestamp": now, 

355 }) 

356 

357 for name, gauge in self._gauges.items(): 

358 results.append({ 

359 "name": name, 

360 "type": "gauge", 

361 "value": gauge.value, 

362 "timestamp": now, 

363 }) 

364 

365 return results 

366 

367 def to_prometheus_text(self) -> str: 

368 """导出为 Prometheus 文本格式。""" 

369 lines = [] 

370 for metric in self.collect(): 

371 lines.append(f"# HELP {metric['name']} AgentOS metric") 

372 lines.append(f"# TYPE {metric['name']} {metric['type']}") 

373 

374 if metric["type"] == "histogram": 

375 lines.append(f"{metric['name']}_count {metric['count']}") 

376 lines.append(f"{metric['name']}_sum {metric['sum']}") 

377 for bucket, count in metric.get("buckets", {}).items(): 

378 lines.append(f"{metric['name']}_bucket{{le=\"{bucket}\"}} {count}") 

379 else: 

380 lines.append(f"{metric['name']} {metric['value']}") 

381 

382 return "\n".join(lines) 

383 

384 

385class Counter: 

386 def __init__(self, name: str, description: str = ""): 

387 self.name = name 

388 self.description = description 

389 self._value: float = 0.0 

390 

391 def inc(self, amount: float = 1.0) -> None: 

392 self._value += amount 

393 

394 @property 

395 def value(self) -> float: 

396 return self._value 

397 

398 

399class Gauge: 

400 def __init__(self, name: str, description: str = ""): 

401 self.name = name 

402 self.description = description 

403 self._value: float = 0.0 

404 

405 def set(self, value: float) -> None: 

406 self._value = value 

407 

408 @property 

409 def value(self) -> float: 

410 return self._value 

411 

412 

413class Histogram: 

414 DEFAULT_BUCKETS = [1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000] 

415 

416 def __init__(self, name: str, description: str = "", 

417 buckets: Optional[List[float]] = None): 

418 self.name = name 

419 self.description = description 

420 self.buckets: Dict[float, int] = {} 

421 for b in (buckets or self.DEFAULT_BUCKETS): 

422 self.buckets[b] = 0 

423 self._count: int = 0 

424 self._sum: float = 0.0 

425 

426 def observe(self, value: float) -> None: 

427 self._count += 1 

428 self._sum += value 

429 for boundary in sorted(self.buckets.keys()): 

430 if value <= boundary: 

431 self.buckets[boundary] += 1 

432 break 

433 

434 @property 

435 def count(self) -> int: 

436 return self._count 

437 

438 @property 

439 def sum(self) -> float: 

440 return self._sum 

441 

442 

443# ── Structured Logger ─────────────────────── 

444 

445 

446class LogLevel(str, Enum): 

447 DEBUG = "debug" 

448 INFO = "info" 

449 WARNING = "warning" 

450 ERROR = "error" 

451 CRITICAL = "critical" 

452 

453 

454class StructuredLogger: 

455 """结构化 JSON 日志记录器,自动注入 trace_id 和 span_id。 

456 

457 Usage: 

458 logger = StructuredLogger(tracer) 

459 logger.info("Agent started", agent_name="ToolAgent", user_id="123") 

460 """ 

461 

462 def __init__(self, tracer: Optional[Tracer] = None): 

463 self._tracer = tracer 

464 self._handlers: List[Callable] = [] 

465 self._level = LogLevel.INFO 

466 

467 def add_handler(self, handler: Callable[[dict], None]) -> None: 

468 self._handlers.append(handler) 

469 

470 def set_level(self, level: LogLevel) -> None: 

471 self._level = level 

472 

473 def log(self, level: LogLevel, message: str, **kwargs) -> None: 

474 entry = { 

475 "timestamp": time.time(), 

476 "level": level.value, 

477 "message": message, 

478 **kwargs, 

479 } 

480 

481 # Inject trace context 

482 if self._tracer: 

483 trace = self._tracer._active_trace 

484 if trace: 

485 entry["trace_id"] = trace.trace_id 

486 if self._tracer._span_stack: 

487 entry["span_id"] = self._tracer._span_stack[-1].span_id 

488 

489 for handler in self._handlers: 

490 try: 

491 handler(entry) 

492 except Exception: 

493 pass 

494 

495 def debug(self, message: str, **kwargs) -> None: 

496 self.log(LogLevel.DEBUG, message, **kwargs) 

497 

498 def info(self, message: str, **kwargs) -> None: 

499 self.log(LogLevel.INFO, message, **kwargs) 

500 

501 def warning(self, message: str, **kwargs) -> None: 

502 self.log(LogLevel.WARNING, message, **kwargs) 

503 

504 def error(self, message: str, **kwargs) -> None: 

505 self.log(LogLevel.ERROR, message, **kwargs) 

506 

507 def critical(self, message: str, **kwargs) -> None: 

508 self.log(LogLevel.CRITICAL, message, **kwargs) 

509 

510 

511# ── Dashboard Data ────────────────────────── 

512 

513 

514class ObservabilityDashboard: 

515 """可观测性数据聚合器 — 为 Grafana/自定义仪表盘提供数据。 

516 

517 Usage: 

518 dashboard = ObservabilityDashboard(tracer, metrics) 

519 summary = dashboard.get_summary() 

520 """ 

521 

522 def __init__(self, tracer: Tracer, metrics: MetricsRegistry): 

523 self._tracer = tracer 

524 self._metrics = metrics 

525 self._trace_buffer: List[Trace] = [] 

526 self._max_buffer = 1000 

527 

528 def record_trace(self, trace: Trace) -> None: 

529 self._trace_buffer.append(trace) 

530 if len(self._trace_buffer) > self._max_buffer: 

531 self._trace_buffer = self._trace_buffer[-self._max_buffer:] 

532 

533 def get_summary(self) -> dict: 

534 """获取综合摘要。""" 

535 traces = self._trace_buffer[-100:] # Last 100 traces 

536 

537 if not traces: 

538 return {"message": "No traces recorded"} 

539 

540 durations = [t.total_duration_ms for t in traces] 

541 costs = [t.total_cost_usd for t in traces] 

542 

543 durations.sort() 

544 

545 # Span kind distribution 

546 kind_counts: Dict[str, int] = defaultdict(int) 

547 error_count = 0 

548 for t in traces: 

549 for s in t.spans: 

550 kind_counts[s.kind.value] += 1 

551 if s.status == SpanStatus.ERROR: 

552 error_count += 1 

553 

554 return { 

555 "trace_count": len(traces), 

556 "total_traces": self._tracer._trace_count, 

557 "error_rate": error_count / max(sum(kind_counts.values()), 1), 

558 "duration": { 

559 "p50_ms": durations[len(durations) // 2] if durations else 0, 

560 "p95_ms": durations[int(len(durations) * 0.95)] if len(durations) > 1 else 0, 

561 "p99_ms": durations[int(len(durations) * 0.99)] if len(durations) > 1 else 0, 

562 "avg_ms": sum(durations) / len(durations) if durations else 0, 

563 }, 

564 "cost": { 

565 "total_usd": sum(costs), 

566 "avg_per_trace_usd": sum(costs) / len(costs) if costs else 0, 

567 }, 

568 "span_distribution": dict(kind_counts), 

569 "metrics": self._metrics.collect(), 

570 } 

571 

572 def get_latency_breakdown(self) -> List[dict]: 

573 """按 pipeline 阶段拆分延迟。""" 

574 breakdown: Dict[str, List[float]] = defaultdict(list) 

575 

576 for trace in self._trace_buffer[-100:]: 

577 for span in trace.spans: 

578 breakdown[span.kind.value].append(span.duration_ms) 

579 

580 result = [] 

581 for kind, values in breakdown.items(): 

582 values.sort() 

583 n = len(values) 

584 result.append({ 

585 "stage": kind, 

586 "count": n, 

587 "avg_ms": sum(values) / n if n else 0, 

588 "p50_ms": values[n // 2] if n else 0, 

589 "p95_ms": values[int(n * 0.95)] if n > 1 else (values[0] if values else 0), 

590 }) 

591 

592 return result 

593 

594 

595# ── Quick Start ───────────────────────────── 

596 

597 

598def create_observability_stack(service_name: str = "agentos"): 

599 """一键创建可观测性栈。""" 

600 tracer = Tracer(service_name=service_name) 

601 metrics = MetricsRegistry() 

602 logger = StructuredLogger(tracer) 

603 dashboard = ObservabilityDashboard(tracer, metrics) 

604 

605 # Register auto-export 

606 tracer.add_exporter(dashboard.record_trace) 

607 

608 # Register default metrics 

609 metrics.counter("agent_invocations_total", "Total agent invocations") 

610 metrics.histogram("agent_latency_ms", "Agent end-to-end latency") 

611 metrics.histogram("llm_latency_ms", "LLM call latency") 

612 metrics.counter("tool_calls_total", "Total tool calls") 

613 metrics.counter("tool_errors_total", "Total tool errors") 

614 metrics.gauge("active_agents", "Currently active agents") 

615 

616 return tracer, metrics, logger, dashboard 

617 

618 

619# ── Missing compat classes ─────────────────── 

620 

621class MetricsCollector(MetricsRegistry): 

622 """Alias for MetricsRegistry — required by agentos/__init__.py.""" 

623 pass 

624 

625 

626class NoopTracer(Tracer): 

627 """No-op tracer — compatible replacement.""" 

628 

629 def __init__(self, *args, **kwargs): 

630 super().__init__(*args, **kwargs) 

631 

632 def start_span(self, *args, **kwargs): 

633 return Span(name="noop") 

634 

635 def mark_complete(self, span_id: str = ""): 

636 pass 

637 

638 

639@dataclass 

640class CostAnalytics: 

641 """LLM 成本分析。""" 

642 

643 total_cost: float = 0.0 

644 total_tokens: int = 0 

645 prompt_tokens: int = 0 

646 completion_tokens: int = 0 

647 cost_per_call: list[float] = field(default_factory=list) 

648 

649 def record_call(self, prompt_tokens: int, completion_tokens: int, cost: float): 

650 self.prompt_tokens += prompt_tokens 

651 self.completion_tokens += completion_tokens 

652 self.total_cost += cost 

653 self.cost_per_call.append(cost) 

654 

655 def summary(self) -> dict: 

656 return { 

657 "total_cost": self.total_cost, 

658 "total_tokens": self.prompt_tokens + self.completion_tokens, 

659 "avg_cost_per_call": sum(self.cost_per_call) / len(self.cost_per_call) if self.cost_per_call else 0, 

660 } 

661 

662 

663@dataclass 

664class BudgetAlert: 

665 """成本预算告警。""" 

666 

667 budget_limit: float = 0.0 

668 current_spend: float = 0.0 

669 threshold_pct: float = 80.0 

670 

671 def check(self) -> Optional[str]: 

672 if self.budget_limit <= 0: 

673 return None 

674 pct = (self.current_spend / self.budget_limit) * 100 

675 if pct >= self.threshold_pct: 

676 return f"Budget alert: {pct:.1f}% of ${self.budget_limit:.2f} spent" 

677 return None