Coverage for src / mysingle / core / metrics / collector.py: 0%
146 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""Enhanced metrics collector with performance optimizations."""
3import asyncio
4import statistics
5import time
6from collections import defaultdict, deque
7from dataclasses import dataclass, field
8from typing import Any
10from ..logging import get_structured_logger
12logger = get_structured_logger(__name__)
15@dataclass
16class MetricsConfig:
17 """Configuration for metrics collection."""
19 max_duration_samples: int = 1000
20 max_histogram_buckets: int = 20
21 enable_percentiles: bool = True
22 enable_histogram: bool = True
23 retention_period_seconds: int = 3600 # 1 hour
24 cleanup_interval_seconds: int = 300 # 5 minutes
27@dataclass
28class RouteMetrics:
29 """Metrics for a specific route."""
31 request_count: int = 0
32 error_count: int = 0
33 durations: deque[float] = field(default_factory=lambda: deque(maxlen=1000))
34 last_accessed: float = field(default_factory=time.time)
35 status_codes: defaultdict[int, int] = field(
36 default_factory=lambda: defaultdict(int)
37 )
40class MetricsCollector:
41 """Enhanced in-memory metrics collector with performance optimizations."""
43 def __init__(self, service_name: str, config: MetricsConfig | None = None):
44 self.service_name = service_name
45 self.config = config or MetricsConfig()
46 self.start_time = time.time()
48 # 효율적인 데이터 구조 사용
49 self.routes: dict[str, RouteMetrics] = {}
50 self._lock = asyncio.Lock() if hasattr(asyncio, "current_task") else None
52 # 전역 카운터
53 self.total_requests = 0
54 self.total_errors = 0
56 # 백그라운드 정리 작업
57 self._cleanup_task: asyncio.Task | None = None
58 self._start_cleanup_task()
60 def _start_cleanup_task(self) -> None:
61 """Start background cleanup task."""
62 try:
63 if asyncio.get_running_loop():
64 self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
65 except RuntimeError:
66 # No event loop running
67 pass
69 async def _periodic_cleanup(self) -> None:
70 """Periodically clean up old metrics data."""
71 while True:
72 try:
73 await asyncio.sleep(self.config.cleanup_interval_seconds)
74 await self._cleanup_old_metrics()
75 except asyncio.CancelledError:
76 break
77 except Exception as e:
78 logger.warning(f"Error in metrics cleanup: {e}")
80 async def _cleanup_old_metrics(self) -> None:
81 """Remove old metrics data to prevent memory bloat."""
82 current_time = time.time()
83 cutoff_time = current_time - self.config.retention_period_seconds
85 routes_to_remove = []
86 for route_key, metrics in self.routes.items():
87 if metrics.last_accessed < cutoff_time:
88 routes_to_remove.append(route_key)
90 for route_key in routes_to_remove:
91 del self.routes[route_key]
93 if routes_to_remove:
94 logger.debug(f"Cleaned up {len(routes_to_remove)} old route metrics")
96 async def record_request(
97 self, method: str, path: str, status_code: int, duration: float
98 ) -> None:
99 """Record a request metric asynchronously."""
100 route_key = f"{method}:{path}"
101 current_time = time.time()
103 # 전역 카운터 업데이트
104 self.total_requests += 1
105 if status_code >= 400:
106 self.total_errors += 1
108 # 루트별 메트릭 업데이트
109 if route_key not in self.routes:
110 self.routes[route_key] = RouteMetrics()
112 route_metrics = self.routes[route_key]
113 route_metrics.request_count += 1
114 route_metrics.last_accessed = current_time
115 route_metrics.status_codes[status_code] += 1
117 if status_code >= 400:
118 route_metrics.error_count += 1
120 # 지속 시간 추가 (메모리 효율적)
121 route_metrics.durations.append(duration)
123 def record_request_sync(
124 self, method: str, path: str, status_code: int, duration: float
125 ) -> None:
126 """Synchronous version of record_request for compatibility."""
127 try:
128 # 항상 동기적으로 처리 (테스트 환경 호환성)
129 route_key = f"{method}:{path}"
130 current_time = time.time()
132 self.total_requests += 1
133 if status_code >= 400:
134 self.total_errors += 1
136 if route_key not in self.routes:
137 self.routes[route_key] = RouteMetrics()
139 route_metrics = self.routes[route_key]
140 route_metrics.request_count += 1
141 route_metrics.last_accessed = current_time
142 route_metrics.status_codes[status_code] += 1
144 if status_code >= 400:
145 route_metrics.error_count += 1
147 route_metrics.durations.append(duration)
148 except Exception as e:
149 logger.warning(f"Error recording metrics: {e}")
151 def _calculate_percentiles(self, durations: deque[float]) -> dict[str, float]:
152 """Calculate percentiles for response times."""
153 if not durations or not self.config.enable_percentiles:
154 return {}
156 sorted_durations = sorted(durations)
157 return {
158 "p50": statistics.median(sorted_durations),
159 "p90": (
160 statistics.quantiles(sorted_durations, n=10)[8]
161 if len(sorted_durations) >= 10
162 else sorted_durations[-1]
163 ),
164 "p95": (
165 statistics.quantiles(sorted_durations, n=20)[18]
166 if len(sorted_durations) >= 20
167 else sorted_durations[-1]
168 ),
169 "p99": (
170 statistics.quantiles(sorted_durations, n=100)[98]
171 if len(sorted_durations) >= 100
172 else sorted_durations[-1]
173 ),
174 }
176 def _calculate_histogram(self, durations: deque[float]) -> dict[str, Any]:
177 """Calculate histogram for response times."""
178 if not durations or not self.config.enable_histogram:
179 return {}
181 # 히스토그램 버킷 생성
182 max_duration = max(durations)
183 bucket_size = max_duration / self.config.max_histogram_buckets
185 buckets: dict[str, int] = {}
186 for duration in durations:
187 bucket = int(duration / bucket_size) if bucket_size > 0 else 0
188 bucket_key = f"le_{bucket * bucket_size:.3f}"
189 buckets[bucket_key] = buckets.get(bucket_key, 0) + 1
191 return {
192 "buckets": buckets,
193 "bucket_size": bucket_size,
194 "total_samples": len(durations),
195 }
197 def get_metrics(self) -> dict[str, Any]:
198 """Get comprehensive metrics summary."""
199 uptime = time.time() - self.start_time
201 # 루트별 상세 메트릭
202 routes_metrics = {}
203 for route_key, route_metrics in self.routes.items():
204 durations = route_metrics.durations
206 route_data = {
207 "request_count": route_metrics.request_count,
208 "error_count": route_metrics.error_count,
209 "error_rate": (
210 route_metrics.error_count / route_metrics.request_count
211 if route_metrics.request_count > 0
212 else 0
213 ),
214 "status_codes": dict(route_metrics.status_codes),
215 "last_accessed": route_metrics.last_accessed,
216 }
218 # 응답 시간 통계
219 if durations:
220 route_data.update(
221 {
222 "avg_response_time": sum(durations) / len(durations),
223 "min_response_time": min(durations),
224 "max_response_time": max(durations),
225 "total_samples": len(durations),
226 }
227 )
229 # 백분위수 추가
230 route_data.update(self._calculate_percentiles(durations))
232 # 히스토그램 추가
233 histogram = self._calculate_histogram(durations)
234 if histogram:
235 route_data["histogram"] = histogram # type: ignore[assignment]
237 routes_metrics[route_key] = route_data
239 return {
240 "service": self.service_name,
241 "timestamp": time.time(),
242 "uptime_seconds": uptime,
243 "total_requests": self.total_requests,
244 "total_errors": self.total_errors,
245 "error_rate": (
246 self.total_errors / self.total_requests
247 if self.total_requests > 0
248 else 0
249 ),
250 "requests_per_second": self.total_requests / uptime if uptime > 0 else 0,
251 "active_routes": len(self.routes),
252 "config": {
253 "max_duration_samples": self.config.max_duration_samples,
254 "enable_percentiles": self.config.enable_percentiles,
255 "enable_histogram": self.config.enable_histogram,
256 "retention_period_seconds": self.config.retention_period_seconds,
257 },
258 "routes": routes_metrics,
259 }
261 def get_prometheus_metrics(self) -> str:
262 """Generate Prometheus-formatted metrics."""
263 metrics = self.get_metrics()
264 lines = []
266 service_name = self.service_name.replace("-", "_").replace(".", "_")
268 # 기본 메트릭
269 lines.extend(
270 [
271 f"# HELP {service_name}_uptime_seconds Service uptime in seconds",
272 f"# TYPE {service_name}_uptime_seconds gauge",
273 f"{service_name}_uptime_seconds {metrics['uptime_seconds']:.2f}",
274 "",
275 f"# HELP {service_name}_requests_total Total number of requests",
276 f"# TYPE {service_name}_requests_total counter",
277 f"{service_name}_requests_total {metrics['total_requests']}",
278 "",
279 f"# HELP {service_name}_errors_total Total number of errors",
280 f"# TYPE {service_name}_errors_total counter",
281 f"{service_name}_errors_total {metrics['total_errors']}",
282 "",
283 f"# HELP {service_name}_requests_per_second Current requests per second",
284 f"# TYPE {service_name}_requests_per_second gauge",
285 f"{service_name}_requests_per_second {metrics['requests_per_second']:.2f}",
286 "",
287 ]
288 )
290 # 루트별 메트릭
291 for route, route_data in metrics["routes"].items():
292 method, path = route.split(":", 1)
293 labels = f'method="{method}",path="{path}"'
295 lines.extend(
296 [
297 f"# HELP {service_name}_route_requests_total Total requests per route",
298 f"# TYPE {service_name}_route_requests_total counter",
299 f"{service_name}_route_requests_total{{{labels}}} {route_data['request_count']}",
300 "",
301 f"# HELP {service_name}_route_errors_total Total errors per route",
302 f"# TYPE {service_name}_route_errors_total counter",
303 f"{service_name}_route_errors_total{{{labels}}} {route_data['error_count']}",
304 "",
305 ]
306 )
308 if "avg_response_time" in route_data:
309 lines.extend(
310 [
311 f"# HELP {service_name}_route_duration_seconds Average response time per route",
312 f"# TYPE {service_name}_route_duration_seconds gauge",
313 f"{service_name}_route_duration_seconds{{{labels}}} {route_data['avg_response_time']:.4f}",
314 "",
315 ]
316 )
318 # 백분위수 메트릭
319 for percentile in ["p50", "p90", "p95", "p99"]:
320 if percentile in route_data:
321 lines.extend(
322 [
323 f"# HELP {service_name}_route_duration_{percentile}_seconds {percentile.upper()} response time per route",
324 f"# TYPE {service_name}_route_duration_{percentile}_seconds gauge",
325 f"{service_name}_route_duration_{percentile}_seconds{{{labels}}} {route_data[percentile]:.4f}",
326 "",
327 ]
328 )
330 return "\n".join(lines)
332 def reset_metrics(self) -> None:
333 """Reset all metrics (useful for testing)."""
334 self.routes.clear()
335 self.total_requests = 0
336 self.total_errors = 0
337 self.start_time = time.time()
339 def __del__(self) -> None:
340 """Cleanup when collector is destroyed."""
341 if self._cleanup_task and not self._cleanup_task.done():
342 self._cleanup_task.cancel()