Coverage for session_buddy / utils / runtime_snapshots.py: 79.44%
97 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1from __future__ import annotations
3import json
4import typing as t
5from dataclasses import dataclass, field
6from datetime import UTC, datetime
8from mcp_common import MCPServerSettings
9from mcp_common.cli.health import (
10 RuntimeHealthSnapshot,
11 load_runtime_health,
12 write_runtime_health,
13)
15if t.TYPE_CHECKING:
16 from pathlib import Path
19@dataclass
20class RuntimeTelemetrySnapshot:
21 orchestrator_pid: int | None = None
22 started_at: str | None = None
23 updated_at: str | None = None
24 uptime_seconds: float | None = None
25 counters: dict[str, int] = field(default_factory=dict)
27 def as_dict(self) -> dict[str, t.Any]:
28 return {
29 "orchestrator_pid": self.orchestrator_pid,
30 "started_at": self.started_at,
31 "updated_at": self.updated_at,
32 "uptime_seconds": self.uptime_seconds,
33 "counters": self.counters.copy(),
34 }
37@dataclass
38class RuntimeSnapshotManager:
39 settings: MCPServerSettings
40 started_at: datetime = field(default_factory=lambda: datetime.now(UTC))
41 counters: dict[str, int] = field(default_factory=dict)
43 @classmethod
44 def for_server(cls, server_name: str) -> RuntimeSnapshotManager:
45 return cls(settings=MCPServerSettings.load(server_name))
47 def record(self, name: str, amount: int = 1) -> None:
48 self.counters[name] = self.counters.get(name, 0) + amount
50 def write_health_snapshot(
51 self,
52 pid: int | None,
53 health_state: dict[str, t.Any] | None = None,
54 watchers_running: bool = True,
55 ) -> RuntimeHealthSnapshot:
56 snapshot = load_runtime_health(self.settings.health_snapshot_path())
57 snapshot.orchestrator_pid = pid
58 snapshot.watchers_running = watchers_running
59 if health_state is not None: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true
60 snapshot.activity_state = {"health": health_state}
61 write_runtime_health(self.settings.health_snapshot_path(), snapshot)
62 return snapshot
64 def write_telemetry_snapshot(self, pid: int | None) -> RuntimeTelemetrySnapshot:
65 uptime_seconds = (datetime.now(UTC) - self.started_at).total_seconds()
66 snapshot = RuntimeTelemetrySnapshot(
67 orchestrator_pid=pid,
68 started_at=self.started_at.isoformat(),
69 uptime_seconds=uptime_seconds,
70 counters=self.counters.copy(),
71 )
72 write_runtime_telemetry(self.settings.telemetry_snapshot_path(), snapshot)
73 return snapshot
76def load_runtime_telemetry(path: Path) -> RuntimeTelemetrySnapshot:
77 if not path.exists():
78 return RuntimeTelemetrySnapshot()
80 try:
81 data = json.loads(path.read_text())
82 except (json.JSONDecodeError, OSError):
83 return RuntimeTelemetrySnapshot()
85 if not isinstance(data, dict): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 return RuntimeTelemetrySnapshot()
88 snapshot = RuntimeTelemetrySnapshot()
89 snapshot.orchestrator_pid = data.get("orchestrator_pid")
90 snapshot.started_at = data.get("started_at")
91 snapshot.updated_at = data.get("updated_at")
92 snapshot.uptime_seconds = data.get("uptime_seconds")
93 counters = data.get("counters")
94 snapshot.counters = counters if isinstance(counters, dict) else {}
95 return snapshot
98def write_runtime_telemetry(path: Path, snapshot: RuntimeTelemetrySnapshot) -> None:
99 path.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
100 snapshot.updated_at = datetime.now(UTC).isoformat()
102 tmp = path.with_suffix(".tmp")
103 try:
104 tmp.write_text(json.dumps(snapshot.as_dict(), indent=2))
105 tmp.chmod(0o600)
106 tmp.replace(path)
107 except OSError:
108 tmp.unlink(missing_ok=True)
109 raise
112def update_telemetry_counter(
113 settings: MCPServerSettings,
114 name: str,
115 amount: int = 1,
116 pid: int | None = None,
117) -> RuntimeTelemetrySnapshot:
118 path = settings.telemetry_snapshot_path()
119 snapshot = load_runtime_telemetry(path)
120 now = datetime.now(UTC)
122 started_at = _parse_iso_datetime(snapshot.started_at)
123 if started_at is None: 123 ↛ 127line 123 didn't jump to line 127 because the condition on line 123 was always true
124 started_at = now
125 snapshot.started_at = started_at.isoformat()
127 snapshot.orchestrator_pid = pid
128 snapshot.uptime_seconds = (now - started_at).total_seconds()
129 snapshot.counters[name] = snapshot.counters.get(name, 0) + amount
130 write_runtime_telemetry(path, snapshot)
131 return snapshot
134async def run_snapshot_loop(
135 manager: RuntimeSnapshotManager,
136 pid: int | None,
137 interval_seconds: float,
138) -> None:
139 while True:
140 manager.record("snapshot_updates")
141 manager.write_health_snapshot(pid=pid)
142 manager.write_telemetry_snapshot(pid=pid)
143 await _sleep(interval_seconds)
146async def _sleep(interval_seconds: float) -> None:
147 import asyncio
149 await asyncio.sleep(interval_seconds)
152def _parse_iso_datetime(value: str | None) -> datetime | None:
153 if value is None: 153 ↛ 155line 153 didn't jump to line 155 because the condition on line 153 was always true
154 return None
155 try:
156 return datetime.fromisoformat(value)
157 except (TypeError, ValueError):
158 return None
161__all__ = [
162 "RuntimeSnapshotManager",
163 "RuntimeTelemetrySnapshot",
164 "load_runtime_telemetry",
165 "run_snapshot_loop",
166 "update_telemetry_counter",
167 "write_runtime_telemetry",
168]