Coverage for src / tracekit / core / logging_advanced.py: 93%
438 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Advanced logging features for TraceKit.
3This module provides advanced logging capabilities including log aggregation,
4analysis, alerting, sampling, and external system integration.
5"""
7from __future__ import annotations
9import gzip
10import hashlib
11import json
12import logging
13import os
14import queue
15import re
16import threading
17import time
18from collections import Counter, deque
19from dataclasses import dataclass, field
20from datetime import datetime, timedelta
21from enum import Enum, auto
22from typing import TYPE_CHECKING, Any
24if TYPE_CHECKING:
25 from collections.abc import Callable
27logger = logging.getLogger(__name__)
30# =============================================================================
31# =============================================================================
34@dataclass
35class AggregatedLogEntry:
36 """Aggregated log entry.
38 Attributes:
39 key: Aggregation key
40 count: Number of occurrences
41 first_seen: First occurrence timestamp
42 last_seen: Last occurrence timestamp
43 sample_message: Sample message
44 levels: Counter of log levels
45 sources: Set of source loggers
47 References:
48 LOG-009: Log Aggregation
49 """
51 key: str
52 count: int = 0
53 first_seen: datetime = field(default_factory=datetime.now)
54 last_seen: datetime = field(default_factory=datetime.now)
55 sample_message: str = ""
56 levels: Counter = field(default_factory=Counter) # type: ignore[type-arg]
57 sources: set[str] = field(default_factory=set)
60class LogAggregator:
61 """Aggregates log messages by pattern.
63 Groups similar log messages together to reduce noise and
64 identify patterns.
66 Example:
67 >>> aggregator = LogAggregator()
68 >>> aggregator.add(record)
69 >>> summary = aggregator.get_summary()
71 References:
72 LOG-009: Log Aggregation
73 """
75 def __init__(self, window_seconds: int = 60, min_count: int = 2):
76 """Initialize aggregator.
78 Args:
79 window_seconds: Aggregation window size
80 min_count: Minimum occurrences to report
81 """
82 self.window_seconds = window_seconds
83 self.min_count = min_count
84 self._entries: dict[str, AggregatedLogEntry] = {}
85 self._lock = threading.Lock()
87 def _normalize_message(self, message: str) -> str:
88 """Normalize message for grouping.
90 Replaces variable parts with placeholders.
92 Args:
93 message: Log message to normalize.
95 Returns:
96 Normalized message with placeholders.
97 """
98 # Replace numbers
99 normalized = re.sub(r"\d+", "<NUM>", message)
100 # Replace UUIDs
101 normalized = re.sub(
102 r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
103 "<UUID>",
104 normalized,
105 flags=re.IGNORECASE,
106 )
107 # Replace file paths
108 normalized = re.sub(r"[/\\][\w./\\-]+", "<PATH>", normalized)
109 return normalized
111 def add(self, record: logging.LogRecord) -> None:
112 """Add log record to aggregator.
114 Args:
115 record: Log record
116 """
117 key = self._normalize_message(record.getMessage())
118 now = datetime.now()
120 with self._lock:
121 if key not in self._entries:
122 self._entries[key] = AggregatedLogEntry(
123 key=key, first_seen=now, sample_message=record.getMessage()
124 )
126 entry = self._entries[key]
127 entry.count += 1
128 entry.last_seen = now
129 entry.levels[record.levelname] += 1
130 entry.sources.add(record.name)
132 def get_summary(self) -> list[AggregatedLogEntry]:
133 """Get aggregation summary.
135 Returns:
136 List of aggregated entries meeting threshold
137 """
138 with self._lock:
139 return [entry for entry in self._entries.values() if entry.count >= self.min_count]
141 def cleanup_old(self) -> None:
142 """Remove entries outside window."""
143 cutoff = datetime.now() - timedelta(seconds=self.window_seconds)
144 with self._lock:
145 self._entries = {k: v for k, v in self._entries.items() if v.last_seen >= cutoff}
148# =============================================================================
149# =============================================================================
152@dataclass
153class LogPattern:
154 """Detected log pattern.
156 References:
157 LOG-010: Log Analysis and Patterns
158 """
160 pattern: str
161 count: int
162 severity_distribution: dict[str, int]
163 time_distribution: dict[int, int] # Hour -> count
164 example: str
167class LogAnalyzer:
168 """Analyzes log patterns and trends.
170 References:
171 LOG-010: Log Analysis and Patterns
172 """
174 def __init__(self, max_history: int = 10000):
175 self._history: deque = deque(maxlen=max_history) # type: ignore[type-arg]
176 self._patterns: dict[str, LogPattern] = {}
178 def add(self, record: logging.LogRecord) -> None:
179 """Add record to analysis history."""
180 self._history.append(
181 {
182 "message": record.getMessage(),
183 "level": record.levelname,
184 "time": datetime.now(),
185 "logger": record.name,
186 }
187 )
189 def analyze_patterns(self) -> list[LogPattern]:
190 """Analyze log patterns.
192 Returns:
193 List of detected patterns
194 """
195 pattern_counts: Counter = Counter() # type: ignore[type-arg]
196 pattern_levels: dict[str, Counter] = {} # type: ignore[type-arg]
197 pattern_hours: dict[str, Counter] = {} # type: ignore[type-arg]
198 pattern_examples: dict[str, str] = {}
200 for entry in self._history:
201 # Normalize message
202 normalized = re.sub(r"\d+", "<N>", entry["message"])
203 pattern_counts[normalized] += 1
205 if normalized not in pattern_levels:
206 pattern_levels[normalized] = Counter()
207 pattern_hours[normalized] = Counter()
208 pattern_examples[normalized] = entry["message"]
210 pattern_levels[normalized][entry["level"]] += 1
211 pattern_hours[normalized][entry["time"].hour] += 1
213 return [
214 LogPattern(
215 pattern=pattern,
216 count=count,
217 severity_distribution=dict(pattern_levels.get(pattern, {})),
218 time_distribution=dict(pattern_hours.get(pattern, {})),
219 example=pattern_examples.get(pattern, ""),
220 )
221 for pattern, count in pattern_counts.most_common(20)
222 ]
224 def get_error_rate(self, window_minutes: int = 60) -> float:
225 """Get error rate over window.
227 Args:
228 window_minutes: Window size in minutes
230 Returns:
231 Error rate (0.0 to 1.0)
232 """
233 cutoff = datetime.now() - timedelta(minutes=window_minutes)
234 recent = [e for e in self._history if e["time"] >= cutoff]
236 if not recent:
237 return 0.0
239 errors = sum(1 for e in recent if e["level"] in ("ERROR", "CRITICAL"))
240 return errors / len(recent)
242 def get_trend(self) -> str:
243 """Get error trend (increasing, stable, decreasing)."""
244 if len(self._history) < 100:
245 return "insufficient_data"
247 # Compare first half to second half
248 mid = len(self._history) // 2
249 first_half = list(self._history)[:mid]
250 second_half = list(self._history)[mid:]
252 first_errors = sum(1 for e in first_half if e["level"] in ("ERROR", "CRITICAL"))
253 second_errors = sum(1 for e in second_half if e["level"] in ("ERROR", "CRITICAL"))
255 first_rate = first_errors / len(first_half)
256 second_rate = second_errors / len(second_half)
258 if second_rate > first_rate * 1.2:
259 return "increasing"
260 elif second_rate < first_rate * 0.8:
261 return "decreasing"
262 return "stable"
265# =============================================================================
266# =============================================================================
269class AlertSeverity(Enum):
270 """Alert severity levels."""
272 INFO = auto()
273 WARNING = auto()
274 ERROR = auto()
275 CRITICAL = auto()
278@dataclass
279class LogAlert:
280 """Log alert definition.
282 References:
283 LOG-012: Log Alerting
284 """
286 id: str
287 name: str
288 condition: Callable[[logging.LogRecord], bool]
289 severity: AlertSeverity = AlertSeverity.WARNING
290 cooldown_seconds: int = 300
291 last_triggered: datetime | None = None
292 enabled: bool = True
295@dataclass
296class TriggeredAlert:
297 """Triggered alert instance."""
299 alert: LogAlert
300 record: logging.LogRecord
301 timestamp: datetime
304class LogAlerter:
305 """Log alerting system.
307 Monitors logs and triggers alerts based on conditions.
309 Example:
310 >>> alerter = LogAlerter()
311 >>> alerter.add_alert("error_burst", lambda r: r.levelno >= logging.ERROR)
312 >>> alerter.on_alert(lambda a: send_notification(a))
314 References:
315 LOG-012: Log Alerting
316 """
318 def __init__(self) -> None:
319 self._alerts: dict[str, LogAlert] = {}
320 self._handlers: list[Callable[[TriggeredAlert], None]] = []
321 self._lock = threading.Lock()
323 def add_alert(
324 self,
325 name: str,
326 condition: Callable[[logging.LogRecord], bool],
327 severity: AlertSeverity = AlertSeverity.WARNING,
328 cooldown_seconds: int = 300,
329 ) -> str:
330 """Add alert definition.
332 Args:
333 name: Alert name
334 condition: Condition function
335 severity: Alert severity
336 cooldown_seconds: Minimum time between triggers
338 Returns:
339 Alert ID
340 """
341 import uuid
343 alert_id = str(uuid.uuid4())
345 alert = LogAlert(
346 id=alert_id,
347 name=name,
348 condition=condition,
349 severity=severity,
350 cooldown_seconds=cooldown_seconds,
351 )
352 self._alerts[alert_id] = alert
353 return alert_id
355 def check(self, record: logging.LogRecord) -> list[TriggeredAlert]:
356 """Check record against all alerts.
358 Args:
359 record: Log record to check
361 Returns:
362 List of triggered alerts
363 """
364 triggered = []
365 now = datetime.now()
367 with self._lock:
368 for alert in self._alerts.values():
369 if not alert.enabled:
370 continue
372 # Check cooldown
373 if alert.last_triggered:
374 elapsed = (now - alert.last_triggered).total_seconds()
375 if elapsed < alert.cooldown_seconds:
376 continue
378 # Check condition
379 try:
380 if alert.condition(record):
381 alert.last_triggered = now
382 triggered_alert = TriggeredAlert(alert=alert, record=record, timestamp=now)
383 triggered.append(triggered_alert)
384 self._notify(triggered_alert)
385 except Exception as e:
386 logger.warning(f"Alert condition check failed: {e}")
388 return triggered
390 def on_alert(self, handler: Callable[[TriggeredAlert], None]) -> None:
391 """Register alert handler."""
392 self._handlers.append(handler)
394 def _notify(self, alert: TriggeredAlert) -> None:
395 """Notify handlers of triggered alert."""
396 for handler in self._handlers:
397 try:
398 handler(alert)
399 except Exception as e:
400 logger.warning(f"Alert handler failed: {e}")
403# =============================================================================
404# =============================================================================
407class SamplingStrategy(Enum):
408 """Sampling strategy."""
410 RANDOM = auto()
411 RATE_LIMIT = auto()
412 ADAPTIVE = auto()
415class LogSampler:
416 """Samples log messages for high-volume scenarios.
418 References:
419 LOG-015: Log Sampling for High-Volume
420 """
422 def __init__(
423 self,
424 strategy: SamplingStrategy = SamplingStrategy.RATE_LIMIT,
425 rate: float = 0.1, # 10% for random
426 max_per_second: int = 100, # for rate limit
427 ):
428 self.strategy = strategy
429 self.rate = rate
430 self.max_per_second = max_per_second
431 self._count_this_second = 0
432 self._last_second = 0
433 self._lock = threading.Lock()
435 def should_log(self, record: logging.LogRecord) -> bool:
436 """Determine if record should be logged.
438 Args:
439 record: Log record
441 Returns:
442 True if should log
443 """
444 # Always log errors and above
445 if record.levelno >= logging.ERROR:
446 return True
448 if self.strategy == SamplingStrategy.RANDOM:
449 import random
451 return random.random() < self.rate
453 elif self.strategy == SamplingStrategy.RATE_LIMIT:
454 with self._lock:
455 current_second = int(time.time())
456 if current_second != self._last_second:
457 self._last_second = current_second
458 self._count_this_second = 0
460 if self._count_this_second < self.max_per_second:
461 self._count_this_second += 1
462 return True
463 return False
465 elif self.strategy == SamplingStrategy.ADAPTIVE: 465 ↛ 487line 465 didn't jump to line 487 because the condition on line 465 was always true
466 # Reduce sampling as volume increases
467 with self._lock:
468 current_second = int(time.time())
469 if current_second != self._last_second: 469 ↛ 482line 469 didn't jump to line 482 because the condition on line 469 was always true
470 volume = self._count_this_second
471 self._last_second = current_second
472 self._count_this_second = 0
474 # Adjust rate based on volume
475 if volume > 1000: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 self.rate = 0.01
477 elif volume > 100: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 self.rate = 0.1
479 else:
480 self.rate = 1.0
482 self._count_this_second += 1
483 import random
485 return random.random() < self.rate
487 return True # type: ignore[unreachable]
490# =============================================================================
491# =============================================================================
494class LogBuffer:
495 """Buffers log messages for batch writing.
497 References:
498 LOG-016: Log Buffer for Batch Writing
499 """
501 def __init__(self, max_size: int = 1000, flush_interval_seconds: float = 5.0):
502 self.max_size = max_size
503 self.flush_interval = flush_interval_seconds
504 self._buffer: queue.Queue = queue.Queue(maxsize=max_size) # type: ignore[type-arg]
505 self._handlers: list[Callable[[list[logging.LogRecord]], None]] = []
506 self._flush_thread: threading.Thread | None = None
507 self._running = False
509 def add(self, record: logging.LogRecord) -> None:
510 """Add record to buffer."""
511 try:
512 self._buffer.put_nowait(record)
513 except queue.Full:
514 # Buffer full, force flush
515 self.flush()
516 self._buffer.put_nowait(record)
518 def flush(self) -> None:
519 """Flush buffer to handlers."""
520 records = []
521 while not self._buffer.empty():
522 try:
523 records.append(self._buffer.get_nowait())
524 except queue.Empty:
525 break
527 if records:
528 for handler in self._handlers:
529 try:
530 handler(records)
531 except Exception as e:
532 logger.warning(f"Buffer flush handler failed: {e}")
534 def on_flush(self, handler: Callable[[list[logging.LogRecord]], None]) -> None:
535 """Register flush handler."""
536 self._handlers.append(handler)
538 def start_auto_flush(self) -> None:
539 """Start automatic flush thread."""
540 self._running = True
541 self._flush_thread = threading.Thread(target=self._flush_loop, daemon=True)
542 self._flush_thread.start()
544 def stop_auto_flush(self) -> None:
545 """Stop automatic flush thread."""
546 self._running = False
547 if self._flush_thread: 547 ↛ 549line 547 didn't jump to line 549 because the condition on line 547 was always true
548 self._flush_thread.join(timeout=2)
549 self.flush()
551 def _flush_loop(self) -> None:
552 """Periodic flush loop."""
553 while self._running:
554 time.sleep(self.flush_interval)
555 self.flush()
558# =============================================================================
559# =============================================================================
562class CompressedLogHandler(logging.Handler):
563 """Handler that writes compressed logs.
565 References:
566 LOG-017: Log Compression
567 """
569 def __init__(
570 self,
571 filename: str,
572 max_bytes: int = 10_000_000,
573 backup_count: int = 5,
574 compression_level: int = 9,
575 ):
576 super().__init__()
577 self.filename = filename
578 self.max_bytes = max_bytes
579 self.backup_count = backup_count
580 self.compression_level = compression_level
581 self._current_file: Any = None
582 self._current_size = 0
583 self._lock = threading.Lock()
585 def emit(self, record: logging.LogRecord) -> None:
586 """Emit log record."""
587 try:
588 msg = self.format(record) + "\n"
589 msg_bytes = msg.encode("utf-8")
591 with self._lock:
592 if self._current_file is None:
593 self._open_file()
595 if self._current_size + len(msg_bytes) > self.max_bytes:
596 self._rotate()
598 self._current_file.write(msg_bytes)
599 self._current_size += len(msg_bytes)
601 except Exception:
602 self.handleError(record)
604 def _open_file(self) -> None:
605 """Open current log file."""
606 self._current_file = gzip.open( # noqa: SIM115
607 f"{self.filename}.gz", "ab", compresslevel=self.compression_level
608 )
609 try:
610 self._current_size = os.path.getsize(f"{self.filename}.gz") # noqa: PTH202
611 except OSError:
612 self._current_size = 0
614 def _rotate(self) -> None:
615 """Rotate log files."""
616 if self._current_file: 616 ↛ 620line 616 didn't jump to line 620 because the condition on line 616 was always true
617 self._current_file.close()
619 # Shift existing backups
620 for i in range(self.backup_count - 1, 0, -1):
621 src = f"{self.filename}.{i}.gz"
622 dst = f"{self.filename}.{i + 1}.gz"
623 if os.path.exists(src):
624 os.rename(src, dst) # noqa: PTH104
626 # Move current to .1
627 if os.path.exists(f"{self.filename}.gz"): 627 ↛ 630line 627 didn't jump to line 630 because the condition on line 627 was always true
628 os.rename(f"{self.filename}.gz", f"{self.filename}.1.gz") # noqa: PTH104
630 self._open_file()
632 def close(self) -> None:
633 """Close handler."""
634 with self._lock:
635 if self._current_file: 635 ↛ 638line 635 didn't jump to line 638
636 self._current_file.close()
637 self._current_file = None
638 super().close()
641# =============================================================================
642# =============================================================================
645class EncryptedLogHandler(logging.Handler):
646 """Handler that writes encrypted logs.
648 Uses simple XOR encryption for demonstration.
649 In production, use proper encryption (AES, etc.).
651 References:
652 LOG-018: Log Encryption
653 """
655 def __init__(self, filename: str, key: str):
656 super().__init__()
657 self.filename = filename
658 self._key = hashlib.sha256(key.encode()).digest()
659 self._file: Any = None
660 self._lock = threading.Lock()
662 def emit(self, record: logging.LogRecord) -> None:
663 """Emit encrypted log record."""
664 try:
665 msg = self.format(record) + "\n"
666 encrypted = self._encrypt(msg.encode("utf-8"))
668 with self._lock:
669 if self._file is None: 669 ↛ 673line 669 didn't jump to line 673 because the condition on line 669 was always true
670 self._file = open(self.filename, "ab") # noqa: SIM115
672 # Write length-prefixed encrypted message
673 length = len(encrypted).to_bytes(4, "big")
674 self._file.write(length + encrypted)
675 self._file.flush()
677 except Exception:
678 self.handleError(record)
680 def _encrypt(self, data: bytes) -> bytes:
681 """Encrypt data with XOR."""
682 encrypted = bytearray()
683 for i, byte in enumerate(data):
684 encrypted.append(byte ^ self._key[i % len(self._key)])
685 return bytes(encrypted)
687 def close(self) -> None:
688 """Close handler."""
689 with self._lock:
690 if self._file: 690 ↛ 693line 690 didn't jump to line 693
691 self._file.close()
692 self._file = None
693 super().close()
696# =============================================================================
697# =============================================================================
700class LogForwarderProtocol(Enum):
701 """Log forwarding protocols."""
703 SYSLOG = auto()
704 HTTP = auto()
705 TCP = auto()
706 UDP = auto()
709@dataclass
710class ForwardingConfig:
711 """Log forwarding configuration.
713 References:
714 LOG-019: Log Forwarding
715 """
717 protocol: LogForwarderProtocol
718 host: str
719 port: int
720 timeout: float = 5.0
721 batch_size: int = 100
722 tls: bool = False
725class LogForwarder:
726 """Forwards logs to external systems.
728 References:
729 LOG-019: Log Forwarding
730 """
732 def __init__(self, config: ForwardingConfig):
733 self.config = config
734 self._buffer: list[dict[str, Any]] = []
735 self._lock = threading.Lock()
737 def forward(self, record: logging.LogRecord) -> None:
738 """Forward log record.
740 Args:
741 record: Log record
742 """
743 entry = {
744 "timestamp": datetime.now().isoformat(),
745 "level": record.levelname,
746 "logger": record.name,
747 "message": record.getMessage(),
748 "hostname": os.uname().nodename if hasattr(os, "uname") else "unknown",
749 }
751 with self._lock:
752 self._buffer.append(entry)
753 if len(self._buffer) >= self.config.batch_size:
754 self._flush()
756 def _flush(self) -> None:
757 """Flush buffer to destination."""
758 if not self._buffer: 758 ↛ 759line 758 didn't jump to line 759 because the condition on line 758 was never true
759 return
761 entries = self._buffer.copy()
762 self._buffer.clear()
764 try:
765 if self.config.protocol == LogForwarderProtocol.HTTP: 765 ↛ 767line 765 didn't jump to line 767 because the condition on line 765 was always true
766 self._send_http(entries)
767 elif self.config.protocol == LogForwarderProtocol.SYSLOG:
768 self._send_syslog(entries)
769 elif self.config.protocol == LogForwarderProtocol.TCP:
770 self._send_tcp(entries)
771 elif self.config.protocol == LogForwarderProtocol.UDP:
772 self._send_udp(entries)
773 except Exception as e:
774 logger.warning(f"Log forwarding failed: {e}")
775 # Put entries back in buffer
776 self._buffer.extend(entries)
778 def _send_http(self, entries: list[dict[str, Any]]) -> None:
779 """Send via HTTP."""
780 import urllib.request
782 data = json.dumps(entries).encode("utf-8")
783 req = urllib.request.Request(
784 f"{'https' if self.config.tls else 'http'}://{self.config.host}:{self.config.port}/logs",
785 data=data,
786 headers={"Content-Type": "application/json"},
787 )
788 urllib.request.urlopen(req, timeout=self.config.timeout)
790 def _send_syslog(self, entries: list[dict[str, Any]]) -> None:
791 """Send via syslog."""
792 import socket
794 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
795 for entry in entries:
796 msg = f"<14>{entry['timestamp']} {entry['logger']}: {entry['message']}"
797 sock.sendto(msg.encode(), (self.config.host, self.config.port))
798 sock.close()
800 def _send_tcp(self, entries: list[dict[str, Any]]) -> None:
801 """Send via TCP."""
802 import socket
804 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
805 sock.settimeout(self.config.timeout)
806 sock.connect((self.config.host, self.config.port))
807 for entry in entries:
808 msg = json.dumps(entry) + "\n"
809 sock.send(msg.encode())
810 sock.close()
812 def _send_udp(self, entries: list[dict[str, Any]]) -> None:
813 """Send via UDP."""
814 import socket
816 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
817 for entry in entries:
818 msg = json.dumps(entry)
819 sock.sendto(msg.encode(), (self.config.host, self.config.port))
820 sock.close()
823# =============================================================================
824# =============================================================================
827@dataclass
828class DashboardMetrics:
829 """Metrics for log visualization dashboard.
831 References:
832 LOG-020: Log Visualization Dashboard Data
833 """
835 total_logs: int = 0
836 logs_by_level: dict[str, int] = field(default_factory=dict)
837 logs_by_logger: dict[str, int] = field(default_factory=dict)
838 logs_per_minute: list[int] = field(default_factory=list)
839 error_rate: float = 0.0
840 top_patterns: list[tuple[str, int]] = field(default_factory=list)
841 recent_errors: list[dict[str, Any]] = field(default_factory=list)
844class LogDashboardCollector:
845 """Collects metrics for log visualization.
847 References:
848 LOG-020: Log Visualization Dashboard Data
849 """
851 def __init__(self, window_minutes: int = 60):
852 self.window_minutes = window_minutes
853 self._logs: deque = deque() # type: ignore[type-arg]
854 self._lock = threading.Lock()
856 def add(self, record: logging.LogRecord) -> None:
857 """Add log record to metrics."""
858 entry = {
859 "timestamp": datetime.now(),
860 "level": record.levelname,
861 "logger": record.name,
862 "message": record.getMessage(),
863 }
865 with self._lock:
866 self._logs.append(entry)
867 # Trim old entries
868 cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
869 while self._logs and self._logs[0]["timestamp"] < cutoff:
870 self._logs.popleft()
872 def get_metrics(self) -> DashboardMetrics:
873 """Get current dashboard metrics.
875 Returns:
876 Dashboard metrics
877 """
878 with self._lock:
879 logs = list(self._logs)
881 if not logs:
882 return DashboardMetrics()
884 # Count by level
885 level_counts = Counter(log["level"] for log in logs)
887 # Count by logger
888 logger_counts = Counter(log["logger"] for log in logs)
890 # Logs per minute
891 now = datetime.now()
892 per_minute = []
893 for i in range(60):
894 minute_start = now - timedelta(minutes=i + 1)
895 minute_end = now - timedelta(minutes=i)
896 count = sum(1 for log in logs if minute_start <= log["timestamp"] < minute_end)
897 per_minute.append(count)
898 per_minute.reverse()
900 # Error rate
901 error_count = sum(1 for log in logs if log["level"] in ("ERROR", "CRITICAL"))
902 error_rate = error_count / len(logs) if logs else 0.0
904 # Top patterns
905 patterns: Counter[str] = Counter()
906 for log in logs:
907 normalized = re.sub(r"\d+", "<N>", log["message"])
908 patterns[normalized] += 1
910 # Recent errors
911 recent_errors = [log for log in logs if log["level"] in ("ERROR", "CRITICAL")][-10:]
913 return DashboardMetrics(
914 total_logs=len(logs),
915 logs_by_level=dict(level_counts),
916 logs_by_logger=dict(logger_counts.most_common(10)),
917 logs_per_minute=per_minute,
918 error_rate=error_rate,
919 top_patterns=patterns.most_common(10),
920 recent_errors=recent_errors,
921 )
924__all__ = [
925 # Aggregation (LOG-009)
926 "AggregatedLogEntry",
927 # Alerting (LOG-012)
928 "AlertSeverity",
929 # Compression (LOG-017)
930 "CompressedLogHandler",
931 # Dashboard (LOG-020)
932 "DashboardMetrics",
933 # Encryption (LOG-018)
934 "EncryptedLogHandler",
935 # Forwarding (LOG-019)
936 "ForwardingConfig",
937 "LogAggregator",
938 "LogAlert",
939 "LogAlerter",
940 # Analysis (LOG-010)
941 "LogAnalyzer",
942 # Buffer (LOG-016)
943 "LogBuffer",
944 "LogDashboardCollector",
945 "LogForwarder",
946 "LogForwarderProtocol",
947 "LogPattern",
948 # Sampling (LOG-015)
949 "LogSampler",
950 "SamplingStrategy",
951 "TriggeredAlert",
952]