Coverage for agentos/observability/tracer.py: 59%
82 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2全链路追踪 — 每一步可追溯。
3基因来源: LangSmith + OpenAI Tracing
4"""
6from __future__ import annotations
8import time
9from contextlib import contextmanager
10from dataclasses import dataclass, field
11from typing import Any
14@dataclass
15class StepTrace:
16 """单步追踪记录。"""
17 name: str
18 start_time: float = 0.0
19 end_time: float = 0.0
20 duration_ms: float = 0.0
21 model: str = ""
22 tokens_in: int = 0
23 tokens_out: int = 0
24 tool_calls_count: int = 0
25 error: str | None = None
28@dataclass
29class TokenStats:
30 """Token 使用统计。"""
31 total_input: int = 0
32 total_output: int = 0
33 by_model: dict[str, dict[str, int]] = field(default_factory=dict)
36@dataclass
37class ObservabilityReport:
38 """可观测性报告。"""
39 session_id: str
40 total_duration_ms: float = 0.0
41 steps: list[StepTrace] = field(default_factory=list)
42 tokens: TokenStats = field(default_factory=TokenStats)
43 model_calls: int = 0
44 tool_calls_total: int = 0
46 def summary(self) -> str:
47 lines = [
48 f"Session: {self.session_id}",
49 f"Duration: {self.total_duration_ms:.0f}ms",
50 f"Model calls: {self.model_calls}",
51 f"Tool calls: {self.tool_calls_total}",
52 f"Tokens (in/out): {self.tokens.total_input}/{self.tokens.total_output}",
53 ]
54 if self.steps:
55 lines.append(f"Steps: {', '.join(s.name for s in self.steps)}")
56 return "\n".join(lines)
59class Tracer:
60 """全链路追踪器。每步记录耗时、token消耗、工具调用。"""
62 def __init__(self, session_id: str = ""):
63 self.session_id = session_id
64 self.steps: list[StepTrace] = []
65 self.token_stats = TokenStats()
66 self.start_time = time.time()
68 @classmethod
69 def noop(cls) -> "Tracer":
70 return NoopTracer()
72 @contextmanager
73 def step(self, name: str, model: str = ""):
74 trace = StepTrace(name=name, model=model, start_time=time.time())
75 try:
76 yield trace
77 except Exception as e:
78 trace.error = str(e)
79 raise
80 finally:
81 trace.end_time = time.time()
82 trace.duration_ms = (trace.end_time - trace.start_time) * 1000
83 self.steps.append(trace)
85 def track_tokens(self, model: str, input_tokens: int, output_tokens: int):
86 self.token_stats.total_input += input_tokens
87 self.token_stats.total_output += output_tokens
88 if model not in self.token_stats.by_model:
89 self.token_stats.by_model[model] = {"input": 0, "output": 0}
90 self.token_stats.by_model[model]["input"] += input_tokens
91 self.token_stats.by_model[model]["output"] += output_tokens
93 def track_tool_call(self):
94 if self.steps:
95 self.steps[-1].tool_calls_count += 1
97 def report(self) -> ObservabilityReport:
98 return ObservabilityReport(
99 session_id=self.session_id,
100 total_duration_ms=(time.time() - self.start_time) * 1000,
101 steps=self.steps,
102 tokens=self.token_stats,
103 model_calls=len(self.steps),
104 tool_calls_total=sum(s.tool_calls_count for s in self.steps),
105 )
107 def token_summary(self) -> dict[str, int]:
108 return {
109 "input": self.token_stats.total_input,
110 "output": self.token_stats.total_output,
111 }
114class NoopTracer(Tracer):
115 """空追踪器 — 生产环境中关闭追踪时使用。"""
117 def __init__(self):
118 pass
120 @contextmanager
121 def step(self, name: str, model: str = ""):
122 yield StepTrace(name=name)
124 def track_tokens(self, model: str, input_tokens: int, output_tokens: int):
125 pass
127 def track_tool_call(self):
128 pass
130 def report(self) -> ObservabilityReport:
131 return ObservabilityReport(session_id="noop")
133 def token_summary(self) -> dict[str, int]:
134 return {}