Coverage for agentos/dashboard/tracker.py: 0%

101 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2追踪状态管理器 — 记录 Agent 运行历史、会话、步骤。 

3 

4数据存储在 ~/.agentos/tracker/ 下,以 JSONL 追加写入。 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import time 

11import os 

12from dataclasses import dataclass, field, asdict 

13from pathlib import Path 

14from typing import Any 

15 

16 

17TRACKER_DIR = Path.home() / ".agentos" / "tracker" 

18 

19 

20@dataclass 

21class StepRecord: 

22 """单步执行记录。""" 

23 step_index: int 

24 step_type: str # "thinking" | "tool_call" | "tool_result" | "final_answer" 

25 detail: str # 步骤内容摘要 

26 duration_ms: float 

27 tokens: int = 0 

28 

29 

30@dataclass 

31class SessionRecord: 

32 """单次会话完整记录。""" 

33 session_id: str 

34 task: str 

35 model: str 

36 provider: str 

37 started_at: float = field(default_factory=time.time) 

38 finished_at: float = 0.0 

39 status: str = "running" # "running" | "completed" | "error" | "cancelled" 

40 steps: list[StepRecord] = field(default_factory=list) 

41 total_tokens: int = 0 

42 total_cost_usd: float = 0.0 

43 error: str = "" 

44 

45 

46class Tracker: 

47 """线程安全的追踪记录器(文件级锁)+ 事件发布。 

48 

49 支持订阅者模式:外部可 subscribe 回调,在 add_step/finish_session 时收到实时事件推送。 

50 """ 

51 

52 _instance: Tracker | None = None 

53 

54 def __init__(self): 

55 TRACKER_DIR.mkdir(parents=True, exist_ok=True) 

56 self._sessions_file = TRACKER_DIR / "sessions.jsonl" 

57 self._active: dict[str, SessionRecord] = {} 

58 self._subscribers: list = [] # 回调列表 

59 

60 @classmethod 

61 def get(cls) -> Tracker: 

62 if cls._instance is None: 

63 cls._instance = cls() 

64 return cls._instance 

65 

66 def start_session(self, session_id: str, task: str, model: str = "", provider: str = "") -> SessionRecord: 

67 rec = SessionRecord(session_id=session_id, task=task, model=model, provider=provider) 

68 self._active[session_id] = rec 

69 return rec 

70 

71 def add_step(self, session_id: str, step_type: str, detail: str, duration_ms: float = 0.0, tokens: int = 0): 

72 rec = self._active.get(session_id) 

73 if rec is None: 

74 return 

75 step = StepRecord( 

76 step_index=len(rec.steps), 

77 step_type=step_type, 

78 detail=detail, 

79 duration_ms=duration_ms, 

80 tokens=tokens, 

81 ) 

82 rec.steps.append(step) 

83 rec.total_tokens += tokens 

84 self._notify("step", {"session_id": session_id, "step_type": step_type, "detail": detail, "duration_ms": duration_ms, "tokens": tokens}) 

85 

86 def finish_session(self, session_id: str, status: str = "completed", error: str = "", total_cost: float = 0.0): 

87 rec = self._active.pop(session_id, None) 

88 if rec is None: 

89 return 

90 rec.finished_at = time.time() 

91 rec.status = status 

92 rec.error = error 

93 rec.total_cost_usd = total_cost 

94 with open(self._sessions_file, "a") as f: 

95 f.write(json.dumps(asdict(rec), ensure_ascii=False) + "\n") 

96 self._notify("session_done", asdict(rec)) 

97 

98 def subscribe(self, callback): 

99 """订阅实时事件。callback 接收 (event_type: str, data: dict)。""" 

100 self._subscribers.append(callback) 

101 

102 def unsubscribe(self, callback): 

103 """取消订阅。""" 

104 try: 

105 self._subscribers.remove(callback) 

106 except ValueError: 

107 pass 

108 

109 def _notify(self, event_type: str, data: dict): 

110 for cb in self._subscribers: 

111 try: 

112 cb(event_type, data) 

113 except Exception: 

114 pass 

115 

116 def list_sessions(self, limit: int = 50) -> list[dict]: 

117 sessions = [] 

118 if self._sessions_file.exists(): 

119 with open(self._sessions_file, "r") as f: 

120 for line in f: 

121 if line.strip(): 

122 sessions.append(json.loads(line)) 

123 # 倒序,最新的在前 

124 sessions.reverse() 

125 return sessions[:limit] 

126 

127 def get_session(self, session_id: str) -> dict | None: 

128 # 先在 active 中找 

129 rec = self._active.get(session_id) 

130 if rec: 

131 return asdict(rec) 

132 # 再从文件中找 

133 if self._sessions_file.exists(): 

134 with open(self._sessions_file, "r") as f: 

135 for line in f: 

136 if line.strip(): 

137 d = json.loads(line) 

138 if d.get("session_id") == session_id: 

139 return d 

140 return None 

141 

142 def clear(self): 

143 self._active.clear() 

144 if self._sessions_file.exists(): 

145 self._sessions_file.unlink()