Coverage for session_buddy / utils / runtime_snapshots.py: 79.44%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1from __future__ import annotations 

2 

3import json 

4import typing as t 

5from dataclasses import dataclass, field 

6from datetime import UTC, datetime 

7 

8from mcp_common import MCPServerSettings 

9from mcp_common.cli.health import ( 

10 RuntimeHealthSnapshot, 

11 load_runtime_health, 

12 write_runtime_health, 

13) 

14 

15if t.TYPE_CHECKING: 

16 from pathlib import Path 

17 

18 

19@dataclass 

20class RuntimeTelemetrySnapshot: 

21 orchestrator_pid: int | None = None 

22 started_at: str | None = None 

23 updated_at: str | None = None 

24 uptime_seconds: float | None = None 

25 counters: dict[str, int] = field(default_factory=dict) 

26 

27 def as_dict(self) -> dict[str, t.Any]: 

28 return { 

29 "orchestrator_pid": self.orchestrator_pid, 

30 "started_at": self.started_at, 

31 "updated_at": self.updated_at, 

32 "uptime_seconds": self.uptime_seconds, 

33 "counters": self.counters.copy(), 

34 } 

35 

36 

37@dataclass 

38class RuntimeSnapshotManager: 

39 settings: MCPServerSettings 

40 started_at: datetime = field(default_factory=lambda: datetime.now(UTC)) 

41 counters: dict[str, int] = field(default_factory=dict) 

42 

43 @classmethod 

44 def for_server(cls, server_name: str) -> RuntimeSnapshotManager: 

45 return cls(settings=MCPServerSettings.load(server_name)) 

46 

47 def record(self, name: str, amount: int = 1) -> None: 

48 self.counters[name] = self.counters.get(name, 0) + amount 

49 

50 def write_health_snapshot( 

51 self, 

52 pid: int | None, 

53 health_state: dict[str, t.Any] | None = None, 

54 watchers_running: bool = True, 

55 ) -> RuntimeHealthSnapshot: 

56 snapshot = load_runtime_health(self.settings.health_snapshot_path()) 

57 snapshot.orchestrator_pid = pid 

58 snapshot.watchers_running = watchers_running 

59 if health_state is not None: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true

60 snapshot.activity_state = {"health": health_state} 

61 write_runtime_health(self.settings.health_snapshot_path(), snapshot) 

62 return snapshot 

63 

64 def write_telemetry_snapshot(self, pid: int | None) -> RuntimeTelemetrySnapshot: 

65 uptime_seconds = (datetime.now(UTC) - self.started_at).total_seconds() 

66 snapshot = RuntimeTelemetrySnapshot( 

67 orchestrator_pid=pid, 

68 started_at=self.started_at.isoformat(), 

69 uptime_seconds=uptime_seconds, 

70 counters=self.counters.copy(), 

71 ) 

72 write_runtime_telemetry(self.settings.telemetry_snapshot_path(), snapshot) 

73 return snapshot 

74 

75 

76def load_runtime_telemetry(path: Path) -> RuntimeTelemetrySnapshot: 

77 if not path.exists(): 

78 return RuntimeTelemetrySnapshot() 

79 

80 try: 

81 data = json.loads(path.read_text()) 

82 except (json.JSONDecodeError, OSError): 

83 return RuntimeTelemetrySnapshot() 

84 

85 if not isinstance(data, dict): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 return RuntimeTelemetrySnapshot() 

87 

88 snapshot = RuntimeTelemetrySnapshot() 

89 snapshot.orchestrator_pid = data.get("orchestrator_pid") 

90 snapshot.started_at = data.get("started_at") 

91 snapshot.updated_at = data.get("updated_at") 

92 snapshot.uptime_seconds = data.get("uptime_seconds") 

93 counters = data.get("counters") 

94 snapshot.counters = counters if isinstance(counters, dict) else {} 

95 return snapshot 

96 

97 

98def write_runtime_telemetry(path: Path, snapshot: RuntimeTelemetrySnapshot) -> None: 

99 path.parent.mkdir(parents=True, exist_ok=True, mode=0o700) 

100 snapshot.updated_at = datetime.now(UTC).isoformat() 

101 

102 tmp = path.with_suffix(".tmp") 

103 try: 

104 tmp.write_text(json.dumps(snapshot.as_dict(), indent=2)) 

105 tmp.chmod(0o600) 

106 tmp.replace(path) 

107 except OSError: 

108 tmp.unlink(missing_ok=True) 

109 raise 

110 

111 

112def update_telemetry_counter( 

113 settings: MCPServerSettings, 

114 name: str, 

115 amount: int = 1, 

116 pid: int | None = None, 

117) -> RuntimeTelemetrySnapshot: 

118 path = settings.telemetry_snapshot_path() 

119 snapshot = load_runtime_telemetry(path) 

120 now = datetime.now(UTC) 

121 

122 started_at = _parse_iso_datetime(snapshot.started_at) 

123 if started_at is None: 123 ↛ 127line 123 didn't jump to line 127 because the condition on line 123 was always true

124 started_at = now 

125 snapshot.started_at = started_at.isoformat() 

126 

127 snapshot.orchestrator_pid = pid 

128 snapshot.uptime_seconds = (now - started_at).total_seconds() 

129 snapshot.counters[name] = snapshot.counters.get(name, 0) + amount 

130 write_runtime_telemetry(path, snapshot) 

131 return snapshot 

132 

133 

134async def run_snapshot_loop( 

135 manager: RuntimeSnapshotManager, 

136 pid: int | None, 

137 interval_seconds: float, 

138) -> None: 

139 while True: 

140 manager.record("snapshot_updates") 

141 manager.write_health_snapshot(pid=pid) 

142 manager.write_telemetry_snapshot(pid=pid) 

143 await _sleep(interval_seconds) 

144 

145 

146async def _sleep(interval_seconds: float) -> None: 

147 import asyncio 

148 

149 await asyncio.sleep(interval_seconds) 

150 

151 

152def _parse_iso_datetime(value: str | None) -> datetime | None: 

153 if value is None: 153 ↛ 155line 153 didn't jump to line 155 because the condition on line 153 was always true

154 return None 

155 try: 

156 return datetime.fromisoformat(value) 

157 except (TypeError, ValueError): 

158 return None 

159 

160 

161__all__ = [ 

162 "RuntimeSnapshotManager", 

163 "RuntimeTelemetrySnapshot", 

164 "load_runtime_telemetry", 

165 "run_snapshot_loop", 

166 "update_telemetry_counter", 

167 "write_runtime_telemetry", 

168]