Coverage for agentos/dashboard/tracker.py: 0%
101 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2追踪状态管理器 — 记录 Agent 运行历史、会话、步骤。
4数据存储在 ~/.agentos/tracker/ 下,以 JSONL 追加写入。
5"""
7from __future__ import annotations
9import json
10import time
11import os
12from dataclasses import dataclass, field, asdict
13from pathlib import Path
14from typing import Any
17TRACKER_DIR = Path.home() / ".agentos" / "tracker"
20@dataclass
21class StepRecord:
22 """单步执行记录。"""
23 step_index: int
24 step_type: str # "thinking" | "tool_call" | "tool_result" | "final_answer"
25 detail: str # 步骤内容摘要
26 duration_ms: float
27 tokens: int = 0
30@dataclass
31class SessionRecord:
32 """单次会话完整记录。"""
33 session_id: str
34 task: str
35 model: str
36 provider: str
37 started_at: float = field(default_factory=time.time)
38 finished_at: float = 0.0
39 status: str = "running" # "running" | "completed" | "error" | "cancelled"
40 steps: list[StepRecord] = field(default_factory=list)
41 total_tokens: int = 0
42 total_cost_usd: float = 0.0
43 error: str = ""
46class Tracker:
47 """线程安全的追踪记录器(文件级锁)+ 事件发布。
49 支持订阅者模式:外部可 subscribe 回调,在 add_step/finish_session 时收到实时事件推送。
50 """
52 _instance: Tracker | None = None
54 def __init__(self):
55 TRACKER_DIR.mkdir(parents=True, exist_ok=True)
56 self._sessions_file = TRACKER_DIR / "sessions.jsonl"
57 self._active: dict[str, SessionRecord] = {}
58 self._subscribers: list = [] # 回调列表
60 @classmethod
61 def get(cls) -> Tracker:
62 if cls._instance is None:
63 cls._instance = cls()
64 return cls._instance
66 def start_session(self, session_id: str, task: str, model: str = "", provider: str = "") -> SessionRecord:
67 rec = SessionRecord(session_id=session_id, task=task, model=model, provider=provider)
68 self._active[session_id] = rec
69 return rec
71 def add_step(self, session_id: str, step_type: str, detail: str, duration_ms: float = 0.0, tokens: int = 0):
72 rec = self._active.get(session_id)
73 if rec is None:
74 return
75 step = StepRecord(
76 step_index=len(rec.steps),
77 step_type=step_type,
78 detail=detail,
79 duration_ms=duration_ms,
80 tokens=tokens,
81 )
82 rec.steps.append(step)
83 rec.total_tokens += tokens
84 self._notify("step", {"session_id": session_id, "step_type": step_type, "detail": detail, "duration_ms": duration_ms, "tokens": tokens})
86 def finish_session(self, session_id: str, status: str = "completed", error: str = "", total_cost: float = 0.0):
87 rec = self._active.pop(session_id, None)
88 if rec is None:
89 return
90 rec.finished_at = time.time()
91 rec.status = status
92 rec.error = error
93 rec.total_cost_usd = total_cost
94 with open(self._sessions_file, "a") as f:
95 f.write(json.dumps(asdict(rec), ensure_ascii=False) + "\n")
96 self._notify("session_done", asdict(rec))
98 def subscribe(self, callback):
99 """订阅实时事件。callback 接收 (event_type: str, data: dict)。"""
100 self._subscribers.append(callback)
102 def unsubscribe(self, callback):
103 """取消订阅。"""
104 try:
105 self._subscribers.remove(callback)
106 except ValueError:
107 pass
109 def _notify(self, event_type: str, data: dict):
110 for cb in self._subscribers:
111 try:
112 cb(event_type, data)
113 except Exception:
114 pass
116 def list_sessions(self, limit: int = 50) -> list[dict]:
117 sessions = []
118 if self._sessions_file.exists():
119 with open(self._sessions_file, "r") as f:
120 for line in f:
121 if line.strip():
122 sessions.append(json.loads(line))
123 # 倒序,最新的在前
124 sessions.reverse()
125 return sessions[:limit]
127 def get_session(self, session_id: str) -> dict | None:
128 # 先在 active 中找
129 rec = self._active.get(session_id)
130 if rec:
131 return asdict(rec)
132 # 再从文件中找
133 if self._sessions_file.exists():
134 with open(self._sessions_file, "r") as f:
135 for line in f:
136 if line.strip():
137 d = json.loads(line)
138 if d.get("session_id") == session_id:
139 return d
140 return None
142 def clear(self):
143 self._active.clear()
144 if self._sessions_file.exists():
145 self._sessions_file.unlink()