Coverage for src / tracekit / core / logging_advanced.py: 93%

438 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced logging features for TraceKit. 

2 

3This module provides advanced logging capabilities including log aggregation, 

4analysis, alerting, sampling, and external system integration. 

5""" 

6 

7from __future__ import annotations 

8 

9import gzip 

10import hashlib 

11import json 

12import logging 

13import os 

14import queue 

15import re 

16import threading 

17import time 

18from collections import Counter, deque 

19from dataclasses import dataclass, field 

20from datetime import datetime, timedelta 

21from enum import Enum, auto 

22from typing import TYPE_CHECKING, Any 

23 

24if TYPE_CHECKING: 

25 from collections.abc import Callable 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30# ============================================================================= 

31# ============================================================================= 

32 

33 

34@dataclass 

35class AggregatedLogEntry: 

36 """Aggregated log entry. 

37 

38 Attributes: 

39 key: Aggregation key 

40 count: Number of occurrences 

41 first_seen: First occurrence timestamp 

42 last_seen: Last occurrence timestamp 

43 sample_message: Sample message 

44 levels: Counter of log levels 

45 sources: Set of source loggers 

46 

47 References: 

48 LOG-009: Log Aggregation 

49 """ 

50 

51 key: str 

52 count: int = 0 

53 first_seen: datetime = field(default_factory=datetime.now) 

54 last_seen: datetime = field(default_factory=datetime.now) 

55 sample_message: str = "" 

56 levels: Counter = field(default_factory=Counter) # type: ignore[type-arg] 

57 sources: set[str] = field(default_factory=set) 

58 

59 

60class LogAggregator: 

61 """Aggregates log messages by pattern. 

62 

63 Groups similar log messages together to reduce noise and 

64 identify patterns. 

65 

66 Example: 

67 >>> aggregator = LogAggregator() 

68 >>> aggregator.add(record) 

69 >>> summary = aggregator.get_summary() 

70 

71 References: 

72 LOG-009: Log Aggregation 

73 """ 

74 

75 def __init__(self, window_seconds: int = 60, min_count: int = 2): 

76 """Initialize aggregator. 

77 

78 Args: 

79 window_seconds: Aggregation window size 

80 min_count: Minimum occurrences to report 

81 """ 

82 self.window_seconds = window_seconds 

83 self.min_count = min_count 

84 self._entries: dict[str, AggregatedLogEntry] = {} 

85 self._lock = threading.Lock() 

86 

87 def _normalize_message(self, message: str) -> str: 

88 """Normalize message for grouping. 

89 

90 Replaces variable parts with placeholders. 

91 

92 Args: 

93 message: Log message to normalize. 

94 

95 Returns: 

96 Normalized message with placeholders. 

97 """ 

98 # Replace numbers 

99 normalized = re.sub(r"\d+", "<NUM>", message) 

100 # Replace UUIDs 

101 normalized = re.sub( 

102 r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", 

103 "<UUID>", 

104 normalized, 

105 flags=re.IGNORECASE, 

106 ) 

107 # Replace file paths 

108 normalized = re.sub(r"[/\\][\w./\\-]+", "<PATH>", normalized) 

109 return normalized 

110 

111 def add(self, record: logging.LogRecord) -> None: 

112 """Add log record to aggregator. 

113 

114 Args: 

115 record: Log record 

116 """ 

117 key = self._normalize_message(record.getMessage()) 

118 now = datetime.now() 

119 

120 with self._lock: 

121 if key not in self._entries: 

122 self._entries[key] = AggregatedLogEntry( 

123 key=key, first_seen=now, sample_message=record.getMessage() 

124 ) 

125 

126 entry = self._entries[key] 

127 entry.count += 1 

128 entry.last_seen = now 

129 entry.levels[record.levelname] += 1 

130 entry.sources.add(record.name) 

131 

132 def get_summary(self) -> list[AggregatedLogEntry]: 

133 """Get aggregation summary. 

134 

135 Returns: 

136 List of aggregated entries meeting threshold 

137 """ 

138 with self._lock: 

139 return [entry for entry in self._entries.values() if entry.count >= self.min_count] 

140 

141 def cleanup_old(self) -> None: 

142 """Remove entries outside window.""" 

143 cutoff = datetime.now() - timedelta(seconds=self.window_seconds) 

144 with self._lock: 

145 self._entries = {k: v for k, v in self._entries.items() if v.last_seen >= cutoff} 

146 

147 

148# ============================================================================= 

149# ============================================================================= 

150 

151 

152@dataclass 

153class LogPattern: 

154 """Detected log pattern. 

155 

156 References: 

157 LOG-010: Log Analysis and Patterns 

158 """ 

159 

160 pattern: str 

161 count: int 

162 severity_distribution: dict[str, int] 

163 time_distribution: dict[int, int] # Hour -> count 

164 example: str 

165 

166 

167class LogAnalyzer: 

168 """Analyzes log patterns and trends. 

169 

170 References: 

171 LOG-010: Log Analysis and Patterns 

172 """ 

173 

174 def __init__(self, max_history: int = 10000): 

175 self._history: deque = deque(maxlen=max_history) # type: ignore[type-arg] 

176 self._patterns: dict[str, LogPattern] = {} 

177 

178 def add(self, record: logging.LogRecord) -> None: 

179 """Add record to analysis history.""" 

180 self._history.append( 

181 { 

182 "message": record.getMessage(), 

183 "level": record.levelname, 

184 "time": datetime.now(), 

185 "logger": record.name, 

186 } 

187 ) 

188 

189 def analyze_patterns(self) -> list[LogPattern]: 

190 """Analyze log patterns. 

191 

192 Returns: 

193 List of detected patterns 

194 """ 

195 pattern_counts: Counter = Counter() # type: ignore[type-arg] 

196 pattern_levels: dict[str, Counter] = {} # type: ignore[type-arg] 

197 pattern_hours: dict[str, Counter] = {} # type: ignore[type-arg] 

198 pattern_examples: dict[str, str] = {} 

199 

200 for entry in self._history: 

201 # Normalize message 

202 normalized = re.sub(r"\d+", "<N>", entry["message"]) 

203 pattern_counts[normalized] += 1 

204 

205 if normalized not in pattern_levels: 

206 pattern_levels[normalized] = Counter() 

207 pattern_hours[normalized] = Counter() 

208 pattern_examples[normalized] = entry["message"] 

209 

210 pattern_levels[normalized][entry["level"]] += 1 

211 pattern_hours[normalized][entry["time"].hour] += 1 

212 

213 return [ 

214 LogPattern( 

215 pattern=pattern, 

216 count=count, 

217 severity_distribution=dict(pattern_levels.get(pattern, {})), 

218 time_distribution=dict(pattern_hours.get(pattern, {})), 

219 example=pattern_examples.get(pattern, ""), 

220 ) 

221 for pattern, count in pattern_counts.most_common(20) 

222 ] 

223 

224 def get_error_rate(self, window_minutes: int = 60) -> float: 

225 """Get error rate over window. 

226 

227 Args: 

228 window_minutes: Window size in minutes 

229 

230 Returns: 

231 Error rate (0.0 to 1.0) 

232 """ 

233 cutoff = datetime.now() - timedelta(minutes=window_minutes) 

234 recent = [e for e in self._history if e["time"] >= cutoff] 

235 

236 if not recent: 

237 return 0.0 

238 

239 errors = sum(1 for e in recent if e["level"] in ("ERROR", "CRITICAL")) 

240 return errors / len(recent) 

241 

242 def get_trend(self) -> str: 

243 """Get error trend (increasing, stable, decreasing).""" 

244 if len(self._history) < 100: 

245 return "insufficient_data" 

246 

247 # Compare first half to second half 

248 mid = len(self._history) // 2 

249 first_half = list(self._history)[:mid] 

250 second_half = list(self._history)[mid:] 

251 

252 first_errors = sum(1 for e in first_half if e["level"] in ("ERROR", "CRITICAL")) 

253 second_errors = sum(1 for e in second_half if e["level"] in ("ERROR", "CRITICAL")) 

254 

255 first_rate = first_errors / len(first_half) 

256 second_rate = second_errors / len(second_half) 

257 

258 if second_rate > first_rate * 1.2: 

259 return "increasing" 

260 elif second_rate < first_rate * 0.8: 

261 return "decreasing" 

262 return "stable" 

263 

264 

265# ============================================================================= 

266# ============================================================================= 

267 

268 

269class AlertSeverity(Enum): 

270 """Alert severity levels.""" 

271 

272 INFO = auto() 

273 WARNING = auto() 

274 ERROR = auto() 

275 CRITICAL = auto() 

276 

277 

278@dataclass 

279class LogAlert: 

280 """Log alert definition. 

281 

282 References: 

283 LOG-012: Log Alerting 

284 """ 

285 

286 id: str 

287 name: str 

288 condition: Callable[[logging.LogRecord], bool] 

289 severity: AlertSeverity = AlertSeverity.WARNING 

290 cooldown_seconds: int = 300 

291 last_triggered: datetime | None = None 

292 enabled: bool = True 

293 

294 

295@dataclass 

296class TriggeredAlert: 

297 """Triggered alert instance.""" 

298 

299 alert: LogAlert 

300 record: logging.LogRecord 

301 timestamp: datetime 

302 

303 

304class LogAlerter: 

305 """Log alerting system. 

306 

307 Monitors logs and triggers alerts based on conditions. 

308 

309 Example: 

310 >>> alerter = LogAlerter() 

311 >>> alerter.add_alert("error_burst", lambda r: r.levelno >= logging.ERROR) 

312 >>> alerter.on_alert(lambda a: send_notification(a)) 

313 

314 References: 

315 LOG-012: Log Alerting 

316 """ 

317 

318 def __init__(self) -> None: 

319 self._alerts: dict[str, LogAlert] = {} 

320 self._handlers: list[Callable[[TriggeredAlert], None]] = [] 

321 self._lock = threading.Lock() 

322 

323 def add_alert( 

324 self, 

325 name: str, 

326 condition: Callable[[logging.LogRecord], bool], 

327 severity: AlertSeverity = AlertSeverity.WARNING, 

328 cooldown_seconds: int = 300, 

329 ) -> str: 

330 """Add alert definition. 

331 

332 Args: 

333 name: Alert name 

334 condition: Condition function 

335 severity: Alert severity 

336 cooldown_seconds: Minimum time between triggers 

337 

338 Returns: 

339 Alert ID 

340 """ 

341 import uuid 

342 

343 alert_id = str(uuid.uuid4()) 

344 

345 alert = LogAlert( 

346 id=alert_id, 

347 name=name, 

348 condition=condition, 

349 severity=severity, 

350 cooldown_seconds=cooldown_seconds, 

351 ) 

352 self._alerts[alert_id] = alert 

353 return alert_id 

354 

355 def check(self, record: logging.LogRecord) -> list[TriggeredAlert]: 

356 """Check record against all alerts. 

357 

358 Args: 

359 record: Log record to check 

360 

361 Returns: 

362 List of triggered alerts 

363 """ 

364 triggered = [] 

365 now = datetime.now() 

366 

367 with self._lock: 

368 for alert in self._alerts.values(): 

369 if not alert.enabled: 

370 continue 

371 

372 # Check cooldown 

373 if alert.last_triggered: 

374 elapsed = (now - alert.last_triggered).total_seconds() 

375 if elapsed < alert.cooldown_seconds: 

376 continue 

377 

378 # Check condition 

379 try: 

380 if alert.condition(record): 

381 alert.last_triggered = now 

382 triggered_alert = TriggeredAlert(alert=alert, record=record, timestamp=now) 

383 triggered.append(triggered_alert) 

384 self._notify(triggered_alert) 

385 except Exception as e: 

386 logger.warning(f"Alert condition check failed: {e}") 

387 

388 return triggered 

389 

390 def on_alert(self, handler: Callable[[TriggeredAlert], None]) -> None: 

391 """Register alert handler.""" 

392 self._handlers.append(handler) 

393 

394 def _notify(self, alert: TriggeredAlert) -> None: 

395 """Notify handlers of triggered alert.""" 

396 for handler in self._handlers: 

397 try: 

398 handler(alert) 

399 except Exception as e: 

400 logger.warning(f"Alert handler failed: {e}") 

401 

402 

403# ============================================================================= 

404# ============================================================================= 

405 

406 

407class SamplingStrategy(Enum): 

408 """Sampling strategy.""" 

409 

410 RANDOM = auto() 

411 RATE_LIMIT = auto() 

412 ADAPTIVE = auto() 

413 

414 

415class LogSampler: 

416 """Samples log messages for high-volume scenarios. 

417 

418 References: 

419 LOG-015: Log Sampling for High-Volume 

420 """ 

421 

422 def __init__( 

423 self, 

424 strategy: SamplingStrategy = SamplingStrategy.RATE_LIMIT, 

425 rate: float = 0.1, # 10% for random 

426 max_per_second: int = 100, # for rate limit 

427 ): 

428 self.strategy = strategy 

429 self.rate = rate 

430 self.max_per_second = max_per_second 

431 self._count_this_second = 0 

432 self._last_second = 0 

433 self._lock = threading.Lock() 

434 

435 def should_log(self, record: logging.LogRecord) -> bool: 

436 """Determine if record should be logged. 

437 

438 Args: 

439 record: Log record 

440 

441 Returns: 

442 True if should log 

443 """ 

444 # Always log errors and above 

445 if record.levelno >= logging.ERROR: 

446 return True 

447 

448 if self.strategy == SamplingStrategy.RANDOM: 

449 import random 

450 

451 return random.random() < self.rate 

452 

453 elif self.strategy == SamplingStrategy.RATE_LIMIT: 

454 with self._lock: 

455 current_second = int(time.time()) 

456 if current_second != self._last_second: 

457 self._last_second = current_second 

458 self._count_this_second = 0 

459 

460 if self._count_this_second < self.max_per_second: 

461 self._count_this_second += 1 

462 return True 

463 return False 

464 

465 elif self.strategy == SamplingStrategy.ADAPTIVE: 465 ↛ 487line 465 didn't jump to line 487 because the condition on line 465 was always true

466 # Reduce sampling as volume increases 

467 with self._lock: 

468 current_second = int(time.time()) 

469 if current_second != self._last_second: 469 ↛ 482line 469 didn't jump to line 482 because the condition on line 469 was always true

470 volume = self._count_this_second 

471 self._last_second = current_second 

472 self._count_this_second = 0 

473 

474 # Adjust rate based on volume 

475 if volume > 1000: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 self.rate = 0.01 

477 elif volume > 100: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 self.rate = 0.1 

479 else: 

480 self.rate = 1.0 

481 

482 self._count_this_second += 1 

483 import random 

484 

485 return random.random() < self.rate 

486 

487 return True # type: ignore[unreachable] 

488 

489 

490# ============================================================================= 

491# ============================================================================= 

492 

493 

494class LogBuffer: 

495 """Buffers log messages for batch writing. 

496 

497 References: 

498 LOG-016: Log Buffer for Batch Writing 

499 """ 

500 

501 def __init__(self, max_size: int = 1000, flush_interval_seconds: float = 5.0): 

502 self.max_size = max_size 

503 self.flush_interval = flush_interval_seconds 

504 self._buffer: queue.Queue = queue.Queue(maxsize=max_size) # type: ignore[type-arg] 

505 self._handlers: list[Callable[[list[logging.LogRecord]], None]] = [] 

506 self._flush_thread: threading.Thread | None = None 

507 self._running = False 

508 

509 def add(self, record: logging.LogRecord) -> None: 

510 """Add record to buffer.""" 

511 try: 

512 self._buffer.put_nowait(record) 

513 except queue.Full: 

514 # Buffer full, force flush 

515 self.flush() 

516 self._buffer.put_nowait(record) 

517 

518 def flush(self) -> None: 

519 """Flush buffer to handlers.""" 

520 records = [] 

521 while not self._buffer.empty(): 

522 try: 

523 records.append(self._buffer.get_nowait()) 

524 except queue.Empty: 

525 break 

526 

527 if records: 

528 for handler in self._handlers: 

529 try: 

530 handler(records) 

531 except Exception as e: 

532 logger.warning(f"Buffer flush handler failed: {e}") 

533 

534 def on_flush(self, handler: Callable[[list[logging.LogRecord]], None]) -> None: 

535 """Register flush handler.""" 

536 self._handlers.append(handler) 

537 

538 def start_auto_flush(self) -> None: 

539 """Start automatic flush thread.""" 

540 self._running = True 

541 self._flush_thread = threading.Thread(target=self._flush_loop, daemon=True) 

542 self._flush_thread.start() 

543 

544 def stop_auto_flush(self) -> None: 

545 """Stop automatic flush thread.""" 

546 self._running = False 

547 if self._flush_thread: 547 ↛ 549line 547 didn't jump to line 549 because the condition on line 547 was always true

548 self._flush_thread.join(timeout=2) 

549 self.flush() 

550 

551 def _flush_loop(self) -> None: 

552 """Periodic flush loop.""" 

553 while self._running: 

554 time.sleep(self.flush_interval) 

555 self.flush() 

556 

557 

558# ============================================================================= 

559# ============================================================================= 

560 

561 

562class CompressedLogHandler(logging.Handler): 

563 """Handler that writes compressed logs. 

564 

565 References: 

566 LOG-017: Log Compression 

567 """ 

568 

569 def __init__( 

570 self, 

571 filename: str, 

572 max_bytes: int = 10_000_000, 

573 backup_count: int = 5, 

574 compression_level: int = 9, 

575 ): 

576 super().__init__() 

577 self.filename = filename 

578 self.max_bytes = max_bytes 

579 self.backup_count = backup_count 

580 self.compression_level = compression_level 

581 self._current_file: Any = None 

582 self._current_size = 0 

583 self._lock = threading.Lock() 

584 

585 def emit(self, record: logging.LogRecord) -> None: 

586 """Emit log record.""" 

587 try: 

588 msg = self.format(record) + "\n" 

589 msg_bytes = msg.encode("utf-8") 

590 

591 with self._lock: 

592 if self._current_file is None: 

593 self._open_file() 

594 

595 if self._current_size + len(msg_bytes) > self.max_bytes: 

596 self._rotate() 

597 

598 self._current_file.write(msg_bytes) 

599 self._current_size += len(msg_bytes) 

600 

601 except Exception: 

602 self.handleError(record) 

603 

604 def _open_file(self) -> None: 

605 """Open current log file.""" 

606 self._current_file = gzip.open( # noqa: SIM115 

607 f"{self.filename}.gz", "ab", compresslevel=self.compression_level 

608 ) 

609 try: 

610 self._current_size = os.path.getsize(f"{self.filename}.gz") # noqa: PTH202 

611 except OSError: 

612 self._current_size = 0 

613 

614 def _rotate(self) -> None: 

615 """Rotate log files.""" 

616 if self._current_file: 616 ↛ 620line 616 didn't jump to line 620 because the condition on line 616 was always true

617 self._current_file.close() 

618 

619 # Shift existing backups 

620 for i in range(self.backup_count - 1, 0, -1): 

621 src = f"{self.filename}.{i}.gz" 

622 dst = f"{self.filename}.{i + 1}.gz" 

623 if os.path.exists(src): 

624 os.rename(src, dst) # noqa: PTH104 

625 

626 # Move current to .1 

627 if os.path.exists(f"{self.filename}.gz"): 627 ↛ 630line 627 didn't jump to line 630 because the condition on line 627 was always true

628 os.rename(f"{self.filename}.gz", f"{self.filename}.1.gz") # noqa: PTH104 

629 

630 self._open_file() 

631 

632 def close(self) -> None: 

633 """Close handler.""" 

634 with self._lock: 

635 if self._current_file: 635 ↛ 638line 635 didn't jump to line 638

636 self._current_file.close() 

637 self._current_file = None 

638 super().close() 

639 

640 

641# ============================================================================= 

642# ============================================================================= 

643 

644 

645class EncryptedLogHandler(logging.Handler): 

646 """Handler that writes encrypted logs. 

647 

648 Uses simple XOR encryption for demonstration. 

649 In production, use proper encryption (AES, etc.). 

650 

651 References: 

652 LOG-018: Log Encryption 

653 """ 

654 

655 def __init__(self, filename: str, key: str): 

656 super().__init__() 

657 self.filename = filename 

658 self._key = hashlib.sha256(key.encode()).digest() 

659 self._file: Any = None 

660 self._lock = threading.Lock() 

661 

662 def emit(self, record: logging.LogRecord) -> None: 

663 """Emit encrypted log record.""" 

664 try: 

665 msg = self.format(record) + "\n" 

666 encrypted = self._encrypt(msg.encode("utf-8")) 

667 

668 with self._lock: 

669 if self._file is None: 669 ↛ 673line 669 didn't jump to line 673 because the condition on line 669 was always true

670 self._file = open(self.filename, "ab") # noqa: SIM115 

671 

672 # Write length-prefixed encrypted message 

673 length = len(encrypted).to_bytes(4, "big") 

674 self._file.write(length + encrypted) 

675 self._file.flush() 

676 

677 except Exception: 

678 self.handleError(record) 

679 

680 def _encrypt(self, data: bytes) -> bytes: 

681 """Encrypt data with XOR.""" 

682 encrypted = bytearray() 

683 for i, byte in enumerate(data): 

684 encrypted.append(byte ^ self._key[i % len(self._key)]) 

685 return bytes(encrypted) 

686 

687 def close(self) -> None: 

688 """Close handler.""" 

689 with self._lock: 

690 if self._file: 690 ↛ 693line 690 didn't jump to line 693

691 self._file.close() 

692 self._file = None 

693 super().close() 

694 

695 

696# ============================================================================= 

697# ============================================================================= 

698 

699 

700class LogForwarderProtocol(Enum): 

701 """Log forwarding protocols.""" 

702 

703 SYSLOG = auto() 

704 HTTP = auto() 

705 TCP = auto() 

706 UDP = auto() 

707 

708 

709@dataclass 

710class ForwardingConfig: 

711 """Log forwarding configuration. 

712 

713 References: 

714 LOG-019: Log Forwarding 

715 """ 

716 

717 protocol: LogForwarderProtocol 

718 host: str 

719 port: int 

720 timeout: float = 5.0 

721 batch_size: int = 100 

722 tls: bool = False 

723 

724 

725class LogForwarder: 

726 """Forwards logs to external systems. 

727 

728 References: 

729 LOG-019: Log Forwarding 

730 """ 

731 

732 def __init__(self, config: ForwardingConfig): 

733 self.config = config 

734 self._buffer: list[dict[str, Any]] = [] 

735 self._lock = threading.Lock() 

736 

737 def forward(self, record: logging.LogRecord) -> None: 

738 """Forward log record. 

739 

740 Args: 

741 record: Log record 

742 """ 

743 entry = { 

744 "timestamp": datetime.now().isoformat(), 

745 "level": record.levelname, 

746 "logger": record.name, 

747 "message": record.getMessage(), 

748 "hostname": os.uname().nodename if hasattr(os, "uname") else "unknown", 

749 } 

750 

751 with self._lock: 

752 self._buffer.append(entry) 

753 if len(self._buffer) >= self.config.batch_size: 

754 self._flush() 

755 

756 def _flush(self) -> None: 

757 """Flush buffer to destination.""" 

758 if not self._buffer: 758 ↛ 759line 758 didn't jump to line 759 because the condition on line 758 was never true

759 return 

760 

761 entries = self._buffer.copy() 

762 self._buffer.clear() 

763 

764 try: 

765 if self.config.protocol == LogForwarderProtocol.HTTP: 765 ↛ 767line 765 didn't jump to line 767 because the condition on line 765 was always true

766 self._send_http(entries) 

767 elif self.config.protocol == LogForwarderProtocol.SYSLOG: 

768 self._send_syslog(entries) 

769 elif self.config.protocol == LogForwarderProtocol.TCP: 

770 self._send_tcp(entries) 

771 elif self.config.protocol == LogForwarderProtocol.UDP: 

772 self._send_udp(entries) 

773 except Exception as e: 

774 logger.warning(f"Log forwarding failed: {e}") 

775 # Put entries back in buffer 

776 self._buffer.extend(entries) 

777 

778 def _send_http(self, entries: list[dict[str, Any]]) -> None: 

779 """Send via HTTP.""" 

780 import urllib.request 

781 

782 data = json.dumps(entries).encode("utf-8") 

783 req = urllib.request.Request( 

784 f"{'https' if self.config.tls else 'http'}://{self.config.host}:{self.config.port}/logs", 

785 data=data, 

786 headers={"Content-Type": "application/json"}, 

787 ) 

788 urllib.request.urlopen(req, timeout=self.config.timeout) 

789 

790 def _send_syslog(self, entries: list[dict[str, Any]]) -> None: 

791 """Send via syslog.""" 

792 import socket 

793 

794 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

795 for entry in entries: 

796 msg = f"<14>{entry['timestamp']} {entry['logger']}: {entry['message']}" 

797 sock.sendto(msg.encode(), (self.config.host, self.config.port)) 

798 sock.close() 

799 

800 def _send_tcp(self, entries: list[dict[str, Any]]) -> None: 

801 """Send via TCP.""" 

802 import socket 

803 

804 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

805 sock.settimeout(self.config.timeout) 

806 sock.connect((self.config.host, self.config.port)) 

807 for entry in entries: 

808 msg = json.dumps(entry) + "\n" 

809 sock.send(msg.encode()) 

810 sock.close() 

811 

812 def _send_udp(self, entries: list[dict[str, Any]]) -> None: 

813 """Send via UDP.""" 

814 import socket 

815 

816 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

817 for entry in entries: 

818 msg = json.dumps(entry) 

819 sock.sendto(msg.encode(), (self.config.host, self.config.port)) 

820 sock.close() 

821 

822 

823# ============================================================================= 

824# ============================================================================= 

825 

826 

827@dataclass 

828class DashboardMetrics: 

829 """Metrics for log visualization dashboard. 

830 

831 References: 

832 LOG-020: Log Visualization Dashboard Data 

833 """ 

834 

835 total_logs: int = 0 

836 logs_by_level: dict[str, int] = field(default_factory=dict) 

837 logs_by_logger: dict[str, int] = field(default_factory=dict) 

838 logs_per_minute: list[int] = field(default_factory=list) 

839 error_rate: float = 0.0 

840 top_patterns: list[tuple[str, int]] = field(default_factory=list) 

841 recent_errors: list[dict[str, Any]] = field(default_factory=list) 

842 

843 

844class LogDashboardCollector: 

845 """Collects metrics for log visualization. 

846 

847 References: 

848 LOG-020: Log Visualization Dashboard Data 

849 """ 

850 

851 def __init__(self, window_minutes: int = 60): 

852 self.window_minutes = window_minutes 

853 self._logs: deque = deque() # type: ignore[type-arg] 

854 self._lock = threading.Lock() 

855 

856 def add(self, record: logging.LogRecord) -> None: 

857 """Add log record to metrics.""" 

858 entry = { 

859 "timestamp": datetime.now(), 

860 "level": record.levelname, 

861 "logger": record.name, 

862 "message": record.getMessage(), 

863 } 

864 

865 with self._lock: 

866 self._logs.append(entry) 

867 # Trim old entries 

868 cutoff = datetime.now() - timedelta(minutes=self.window_minutes) 

869 while self._logs and self._logs[0]["timestamp"] < cutoff: 

870 self._logs.popleft() 

871 

872 def get_metrics(self) -> DashboardMetrics: 

873 """Get current dashboard metrics. 

874 

875 Returns: 

876 Dashboard metrics 

877 """ 

878 with self._lock: 

879 logs = list(self._logs) 

880 

881 if not logs: 

882 return DashboardMetrics() 

883 

884 # Count by level 

885 level_counts = Counter(log["level"] for log in logs) 

886 

887 # Count by logger 

888 logger_counts = Counter(log["logger"] for log in logs) 

889 

890 # Logs per minute 

891 now = datetime.now() 

892 per_minute = [] 

893 for i in range(60): 

894 minute_start = now - timedelta(minutes=i + 1) 

895 minute_end = now - timedelta(minutes=i) 

896 count = sum(1 for log in logs if minute_start <= log["timestamp"] < minute_end) 

897 per_minute.append(count) 

898 per_minute.reverse() 

899 

900 # Error rate 

901 error_count = sum(1 for log in logs if log["level"] in ("ERROR", "CRITICAL")) 

902 error_rate = error_count / len(logs) if logs else 0.0 

903 

904 # Top patterns 

905 patterns: Counter[str] = Counter() 

906 for log in logs: 

907 normalized = re.sub(r"\d+", "<N>", log["message"]) 

908 patterns[normalized] += 1 

909 

910 # Recent errors 

911 recent_errors = [log for log in logs if log["level"] in ("ERROR", "CRITICAL")][-10:] 

912 

913 return DashboardMetrics( 

914 total_logs=len(logs), 

915 logs_by_level=dict(level_counts), 

916 logs_by_logger=dict(logger_counts.most_common(10)), 

917 logs_per_minute=per_minute, 

918 error_rate=error_rate, 

919 top_patterns=patterns.most_common(10), 

920 recent_errors=recent_errors, 

921 ) 

922 

923 

924__all__ = [ 

925 # Aggregation (LOG-009) 

926 "AggregatedLogEntry", 

927 # Alerting (LOG-012) 

928 "AlertSeverity", 

929 # Compression (LOG-017) 

930 "CompressedLogHandler", 

931 # Dashboard (LOG-020) 

932 "DashboardMetrics", 

933 # Encryption (LOG-018) 

934 "EncryptedLogHandler", 

935 # Forwarding (LOG-019) 

936 "ForwardingConfig", 

937 "LogAggregator", 

938 "LogAlert", 

939 "LogAlerter", 

940 # Analysis (LOG-010) 

941 "LogAnalyzer", 

942 # Buffer (LOG-016) 

943 "LogBuffer", 

944 "LogDashboardCollector", 

945 "LogForwarder", 

946 "LogForwarderProtocol", 

947 "LogPattern", 

948 # Sampling (LOG-015) 

949 "LogSampler", 

950 "SamplingStrategy", 

951 "TriggeredAlert", 

952]