Coverage for src / mysingle / core / metrics / collector.py: 0%

146 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1"""Enhanced metrics collector with performance optimizations.""" 

2 

3import asyncio 

4import statistics 

5import time 

6from collections import defaultdict, deque 

7from dataclasses import dataclass, field 

8from typing import Any 

9 

10from ..logging import get_structured_logger 

11 

12logger = get_structured_logger(__name__) 

13 

14 

15@dataclass 

16class MetricsConfig: 

17 """Configuration for metrics collection.""" 

18 

19 max_duration_samples: int = 1000 

20 max_histogram_buckets: int = 20 

21 enable_percentiles: bool = True 

22 enable_histogram: bool = True 

23 retention_period_seconds: int = 3600 # 1 hour 

24 cleanup_interval_seconds: int = 300 # 5 minutes 

25 

26 

27@dataclass 

28class RouteMetrics: 

29 """Metrics for a specific route.""" 

30 

31 request_count: int = 0 

32 error_count: int = 0 

33 durations: deque[float] = field(default_factory=lambda: deque(maxlen=1000)) 

34 last_accessed: float = field(default_factory=time.time) 

35 status_codes: defaultdict[int, int] = field( 

36 default_factory=lambda: defaultdict(int) 

37 ) 

38 

39 

40class MetricsCollector: 

41 """Enhanced in-memory metrics collector with performance optimizations.""" 

42 

43 def __init__(self, service_name: str, config: MetricsConfig | None = None): 

44 self.service_name = service_name 

45 self.config = config or MetricsConfig() 

46 self.start_time = time.time() 

47 

48 # 효율적인 데이터 구조 사용 

49 self.routes: dict[str, RouteMetrics] = {} 

50 self._lock = asyncio.Lock() if hasattr(asyncio, "current_task") else None 

51 

52 # 전역 카운터 

53 self.total_requests = 0 

54 self.total_errors = 0 

55 

56 # 백그라운드 정리 작업 

57 self._cleanup_task: asyncio.Task | None = None 

58 self._start_cleanup_task() 

59 

60 def _start_cleanup_task(self) -> None: 

61 """Start background cleanup task.""" 

62 try: 

63 if asyncio.get_running_loop(): 

64 self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) 

65 except RuntimeError: 

66 # No event loop running 

67 pass 

68 

69 async def _periodic_cleanup(self) -> None: 

70 """Periodically clean up old metrics data.""" 

71 while True: 

72 try: 

73 await asyncio.sleep(self.config.cleanup_interval_seconds) 

74 await self._cleanup_old_metrics() 

75 except asyncio.CancelledError: 

76 break 

77 except Exception as e: 

78 logger.warning(f"Error in metrics cleanup: {e}") 

79 

80 async def _cleanup_old_metrics(self) -> None: 

81 """Remove old metrics data to prevent memory bloat.""" 

82 current_time = time.time() 

83 cutoff_time = current_time - self.config.retention_period_seconds 

84 

85 routes_to_remove = [] 

86 for route_key, metrics in self.routes.items(): 

87 if metrics.last_accessed < cutoff_time: 

88 routes_to_remove.append(route_key) 

89 

90 for route_key in routes_to_remove: 

91 del self.routes[route_key] 

92 

93 if routes_to_remove: 

94 logger.debug(f"Cleaned up {len(routes_to_remove)} old route metrics") 

95 

96 async def record_request( 

97 self, method: str, path: str, status_code: int, duration: float 

98 ) -> None: 

99 """Record a request metric asynchronously.""" 

100 route_key = f"{method}:{path}" 

101 current_time = time.time() 

102 

103 # 전역 카운터 업데이트 

104 self.total_requests += 1 

105 if status_code >= 400: 

106 self.total_errors += 1 

107 

108 # 루트별 메트릭 업데이트 

109 if route_key not in self.routes: 

110 self.routes[route_key] = RouteMetrics() 

111 

112 route_metrics = self.routes[route_key] 

113 route_metrics.request_count += 1 

114 route_metrics.last_accessed = current_time 

115 route_metrics.status_codes[status_code] += 1 

116 

117 if status_code >= 400: 

118 route_metrics.error_count += 1 

119 

120 # 지속 시간 추가 (메모리 효율적) 

121 route_metrics.durations.append(duration) 

122 

123 def record_request_sync( 

124 self, method: str, path: str, status_code: int, duration: float 

125 ) -> None: 

126 """Synchronous version of record_request for compatibility.""" 

127 try: 

128 # 항상 동기적으로 처리 (테스트 환경 호환성) 

129 route_key = f"{method}:{path}" 

130 current_time = time.time() 

131 

132 self.total_requests += 1 

133 if status_code >= 400: 

134 self.total_errors += 1 

135 

136 if route_key not in self.routes: 

137 self.routes[route_key] = RouteMetrics() 

138 

139 route_metrics = self.routes[route_key] 

140 route_metrics.request_count += 1 

141 route_metrics.last_accessed = current_time 

142 route_metrics.status_codes[status_code] += 1 

143 

144 if status_code >= 400: 

145 route_metrics.error_count += 1 

146 

147 route_metrics.durations.append(duration) 

148 except Exception as e: 

149 logger.warning(f"Error recording metrics: {e}") 

150 

151 def _calculate_percentiles(self, durations: deque[float]) -> dict[str, float]: 

152 """Calculate percentiles for response times.""" 

153 if not durations or not self.config.enable_percentiles: 

154 return {} 

155 

156 sorted_durations = sorted(durations) 

157 return { 

158 "p50": statistics.median(sorted_durations), 

159 "p90": ( 

160 statistics.quantiles(sorted_durations, n=10)[8] 

161 if len(sorted_durations) >= 10 

162 else sorted_durations[-1] 

163 ), 

164 "p95": ( 

165 statistics.quantiles(sorted_durations, n=20)[18] 

166 if len(sorted_durations) >= 20 

167 else sorted_durations[-1] 

168 ), 

169 "p99": ( 

170 statistics.quantiles(sorted_durations, n=100)[98] 

171 if len(sorted_durations) >= 100 

172 else sorted_durations[-1] 

173 ), 

174 } 

175 

176 def _calculate_histogram(self, durations: deque[float]) -> dict[str, Any]: 

177 """Calculate histogram for response times.""" 

178 if not durations or not self.config.enable_histogram: 

179 return {} 

180 

181 # 히스토그램 버킷 생성 

182 max_duration = max(durations) 

183 bucket_size = max_duration / self.config.max_histogram_buckets 

184 

185 buckets: dict[str, int] = {} 

186 for duration in durations: 

187 bucket = int(duration / bucket_size) if bucket_size > 0 else 0 

188 bucket_key = f"le_{bucket * bucket_size:.3f}" 

189 buckets[bucket_key] = buckets.get(bucket_key, 0) + 1 

190 

191 return { 

192 "buckets": buckets, 

193 "bucket_size": bucket_size, 

194 "total_samples": len(durations), 

195 } 

196 

197 def get_metrics(self) -> dict[str, Any]: 

198 """Get comprehensive metrics summary.""" 

199 uptime = time.time() - self.start_time 

200 

201 # 루트별 상세 메트릭 

202 routes_metrics = {} 

203 for route_key, route_metrics in self.routes.items(): 

204 durations = route_metrics.durations 

205 

206 route_data = { 

207 "request_count": route_metrics.request_count, 

208 "error_count": route_metrics.error_count, 

209 "error_rate": ( 

210 route_metrics.error_count / route_metrics.request_count 

211 if route_metrics.request_count > 0 

212 else 0 

213 ), 

214 "status_codes": dict(route_metrics.status_codes), 

215 "last_accessed": route_metrics.last_accessed, 

216 } 

217 

218 # 응답 시간 통계 

219 if durations: 

220 route_data.update( 

221 { 

222 "avg_response_time": sum(durations) / len(durations), 

223 "min_response_time": min(durations), 

224 "max_response_time": max(durations), 

225 "total_samples": len(durations), 

226 } 

227 ) 

228 

229 # 백분위수 추가 

230 route_data.update(self._calculate_percentiles(durations)) 

231 

232 # 히스토그램 추가 

233 histogram = self._calculate_histogram(durations) 

234 if histogram: 

235 route_data["histogram"] = histogram # type: ignore[assignment] 

236 

237 routes_metrics[route_key] = route_data 

238 

239 return { 

240 "service": self.service_name, 

241 "timestamp": time.time(), 

242 "uptime_seconds": uptime, 

243 "total_requests": self.total_requests, 

244 "total_errors": self.total_errors, 

245 "error_rate": ( 

246 self.total_errors / self.total_requests 

247 if self.total_requests > 0 

248 else 0 

249 ), 

250 "requests_per_second": self.total_requests / uptime if uptime > 0 else 0, 

251 "active_routes": len(self.routes), 

252 "config": { 

253 "max_duration_samples": self.config.max_duration_samples, 

254 "enable_percentiles": self.config.enable_percentiles, 

255 "enable_histogram": self.config.enable_histogram, 

256 "retention_period_seconds": self.config.retention_period_seconds, 

257 }, 

258 "routes": routes_metrics, 

259 } 

260 

261 def get_prometheus_metrics(self) -> str: 

262 """Generate Prometheus-formatted metrics.""" 

263 metrics = self.get_metrics() 

264 lines = [] 

265 

266 service_name = self.service_name.replace("-", "_").replace(".", "_") 

267 

268 # 기본 메트릭 

269 lines.extend( 

270 [ 

271 f"# HELP {service_name}_uptime_seconds Service uptime in seconds", 

272 f"# TYPE {service_name}_uptime_seconds gauge", 

273 f"{service_name}_uptime_seconds {metrics['uptime_seconds']:.2f}", 

274 "", 

275 f"# HELP {service_name}_requests_total Total number of requests", 

276 f"# TYPE {service_name}_requests_total counter", 

277 f"{service_name}_requests_total {metrics['total_requests']}", 

278 "", 

279 f"# HELP {service_name}_errors_total Total number of errors", 

280 f"# TYPE {service_name}_errors_total counter", 

281 f"{service_name}_errors_total {metrics['total_errors']}", 

282 "", 

283 f"# HELP {service_name}_requests_per_second Current requests per second", 

284 f"# TYPE {service_name}_requests_per_second gauge", 

285 f"{service_name}_requests_per_second {metrics['requests_per_second']:.2f}", 

286 "", 

287 ] 

288 ) 

289 

290 # 루트별 메트릭 

291 for route, route_data in metrics["routes"].items(): 

292 method, path = route.split(":", 1) 

293 labels = f'method="{method}",path="{path}"' 

294 

295 lines.extend( 

296 [ 

297 f"# HELP {service_name}_route_requests_total Total requests per route", 

298 f"# TYPE {service_name}_route_requests_total counter", 

299 f"{service_name}_route_requests_total{{{labels}}} {route_data['request_count']}", 

300 "", 

301 f"# HELP {service_name}_route_errors_total Total errors per route", 

302 f"# TYPE {service_name}_route_errors_total counter", 

303 f"{service_name}_route_errors_total{{{labels}}} {route_data['error_count']}", 

304 "", 

305 ] 

306 ) 

307 

308 if "avg_response_time" in route_data: 

309 lines.extend( 

310 [ 

311 f"# HELP {service_name}_route_duration_seconds Average response time per route", 

312 f"# TYPE {service_name}_route_duration_seconds gauge", 

313 f"{service_name}_route_duration_seconds{{{labels}}} {route_data['avg_response_time']:.4f}", 

314 "", 

315 ] 

316 ) 

317 

318 # 백분위수 메트릭 

319 for percentile in ["p50", "p90", "p95", "p99"]: 

320 if percentile in route_data: 

321 lines.extend( 

322 [ 

323 f"# HELP {service_name}_route_duration_{percentile}_seconds {percentile.upper()} response time per route", 

324 f"# TYPE {service_name}_route_duration_{percentile}_seconds gauge", 

325 f"{service_name}_route_duration_{percentile}_seconds{{{labels}}} {route_data[percentile]:.4f}", 

326 "", 

327 ] 

328 ) 

329 

330 return "\n".join(lines) 

331 

332 def reset_metrics(self) -> None: 

333 """Reset all metrics (useful for testing).""" 

334 self.routes.clear() 

335 self.total_requests = 0 

336 self.total_errors = 0 

337 self.start_time = time.time() 

338 

339 def __del__(self) -> None: 

340 """Cleanup when collector is destroyed.""" 

341 if self._cleanup_task and not self._cleanup_task.done(): 

342 self._cleanup_task.cancel()