Coverage for agentos/observability/tracer.py: 59%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2全链路追踪 — 每一步可追溯。 

3基因来源: LangSmith + OpenAI Tracing 

4""" 

5 

6from __future__ import annotations 

7 

8import time 

9from contextlib import contextmanager 

10from dataclasses import dataclass, field 

11from typing import Any 

12 

13 

14@dataclass 

15class StepTrace: 

16 """单步追踪记录。""" 

17 name: str 

18 start_time: float = 0.0 

19 end_time: float = 0.0 

20 duration_ms: float = 0.0 

21 model: str = "" 

22 tokens_in: int = 0 

23 tokens_out: int = 0 

24 tool_calls_count: int = 0 

25 error: str | None = None 

26 

27 

28@dataclass 

29class TokenStats: 

30 """Token 使用统计。""" 

31 total_input: int = 0 

32 total_output: int = 0 

33 by_model: dict[str, dict[str, int]] = field(default_factory=dict) 

34 

35 

36@dataclass 

37class ObservabilityReport: 

38 """可观测性报告。""" 

39 session_id: str 

40 total_duration_ms: float = 0.0 

41 steps: list[StepTrace] = field(default_factory=list) 

42 tokens: TokenStats = field(default_factory=TokenStats) 

43 model_calls: int = 0 

44 tool_calls_total: int = 0 

45 

46 def summary(self) -> str: 

47 lines = [ 

48 f"Session: {self.session_id}", 

49 f"Duration: {self.total_duration_ms:.0f}ms", 

50 f"Model calls: {self.model_calls}", 

51 f"Tool calls: {self.tool_calls_total}", 

52 f"Tokens (in/out): {self.tokens.total_input}/{self.tokens.total_output}", 

53 ] 

54 if self.steps: 

55 lines.append(f"Steps: {', '.join(s.name for s in self.steps)}") 

56 return "\n".join(lines) 

57 

58 

59class Tracer: 

60 """全链路追踪器。每步记录耗时、token消耗、工具调用。""" 

61 

62 def __init__(self, session_id: str = ""): 

63 self.session_id = session_id 

64 self.steps: list[StepTrace] = [] 

65 self.token_stats = TokenStats() 

66 self.start_time = time.time() 

67 

68 @classmethod 

69 def noop(cls) -> "Tracer": 

70 return NoopTracer() 

71 

72 @contextmanager 

73 def step(self, name: str, model: str = ""): 

74 trace = StepTrace(name=name, model=model, start_time=time.time()) 

75 try: 

76 yield trace 

77 except Exception as e: 

78 trace.error = str(e) 

79 raise 

80 finally: 

81 trace.end_time = time.time() 

82 trace.duration_ms = (trace.end_time - trace.start_time) * 1000 

83 self.steps.append(trace) 

84 

85 def track_tokens(self, model: str, input_tokens: int, output_tokens: int): 

86 self.token_stats.total_input += input_tokens 

87 self.token_stats.total_output += output_tokens 

88 if model not in self.token_stats.by_model: 

89 self.token_stats.by_model[model] = {"input": 0, "output": 0} 

90 self.token_stats.by_model[model]["input"] += input_tokens 

91 self.token_stats.by_model[model]["output"] += output_tokens 

92 

93 def track_tool_call(self): 

94 if self.steps: 

95 self.steps[-1].tool_calls_count += 1 

96 

97 def report(self) -> ObservabilityReport: 

98 return ObservabilityReport( 

99 session_id=self.session_id, 

100 total_duration_ms=(time.time() - self.start_time) * 1000, 

101 steps=self.steps, 

102 tokens=self.token_stats, 

103 model_calls=len(self.steps), 

104 tool_calls_total=sum(s.tool_calls_count for s in self.steps), 

105 ) 

106 

107 def token_summary(self) -> dict[str, int]: 

108 return { 

109 "input": self.token_stats.total_input, 

110 "output": self.token_stats.total_output, 

111 } 

112 

113 

114class NoopTracer(Tracer): 

115 """空追踪器 — 生产环境中关闭追踪时使用。""" 

116 

117 def __init__(self): 

118 pass 

119 

120 @contextmanager 

121 def step(self, name: str, model: str = ""): 

122 yield StepTrace(name=name) 

123 

124 def track_tokens(self, model: str, input_tokens: int, output_tokens: int): 

125 pass 

126 

127 def track_tool_call(self): 

128 pass 

129 

130 def report(self) -> ObservabilityReport: 

131 return ObservabilityReport(session_id="noop") 

132 

133 def token_summary(self) -> dict[str, int]: 

134 return {}