Coverage for src / tracekit / batch / logging.py: 97%
153 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Aggregate logging for batch processing operations.
3This module provides consolidated logging for parallel batch workers
4with job-level summaries and per-file tracking.
7Example:
8 >>> from tracekit.batch.logging import BatchLogger
9 >>> logger = BatchLogger(batch_id="job-001")
10 >>> with logger.file_context("capture1.wfm") as file_log:
11 ... file_log.info("Processing file")
12 ... result = analyze(file)
13 >>> logger.summary()
15References:
16 LOG-011,
17"""
19from __future__ import annotations
21import logging
22import threading
23import time
24import uuid
25from collections import defaultdict
26from contextlib import contextmanager
27from dataclasses import dataclass, field
28from datetime import UTC, datetime
29from typing import TYPE_CHECKING, Any
31if TYPE_CHECKING:
32 from collections.abc import Iterator
34from tracekit.core.logging import format_timestamp, get_logger
37@dataclass
38class FileLogEntry:
39 """Log entry for a single file in a batch job.
41 Attributes:
42 file_id: Unique identifier for this file.
43 filename: Path to the file.
44 start_time: Processing start time.
45 end_time: Processing end time.
46 status: Processing status (pending, processing, success, error).
47 error_message: Error message if status is 'error'.
48 log_messages: List of log messages for this file.
50 References:
51 LOG-011: Aggregate Logging for Batch Processing
52 """
54 file_id: str
55 filename: str
56 start_time: float | None = None
57 end_time: float | None = None
58 status: str = "pending"
59 error_message: str | None = None
60 log_messages: list[dict[str, Any]] = field(default_factory=list)
62 @property
63 def duration(self) -> float | None:
64 """Get processing duration in seconds."""
65 if self.start_time is not None and self.end_time is not None:
66 return self.end_time - self.start_time
67 return None
69 def to_dict(self) -> dict[str, Any]:
70 """Convert to dictionary representation."""
71 return {
72 "file_id": self.file_id,
73 "filename": self.filename,
74 "start_time": format_timestamp(datetime.fromtimestamp(self.start_time, tz=UTC))
75 if self.start_time
76 else None,
77 "end_time": format_timestamp(datetime.fromtimestamp(self.end_time, tz=UTC))
78 if self.end_time
79 else None,
80 "duration_seconds": self.duration,
81 "status": self.status,
82 "error_message": self.error_message,
83 "log_count": len(self.log_messages),
84 }
87@dataclass
88class BatchSummary:
89 """Summary of batch processing results.
91 Attributes:
92 batch_id: Unique identifier for the batch job.
93 total_files: Total number of files in the batch.
94 success_count: Number of successfully processed files.
95 error_count: Number of files that failed.
96 total_duration: Total processing time in seconds.
97 errors_by_type: Count of errors grouped by type.
99 References:
100 LOG-011: Aggregate Logging for Batch Processing
101 """
103 batch_id: str
104 total_files: int
105 success_count: int
106 error_count: int
107 total_duration: float
108 start_time: str
109 end_time: str
110 errors_by_type: dict[str, int] = field(default_factory=dict)
111 files_per_second: float = 0.0
112 average_duration_per_file: float = 0.0
114 def to_dict(self) -> dict[str, Any]:
115 """Convert to dictionary representation."""
116 return {
117 "batch_id": self.batch_id,
118 "total_files": self.total_files,
119 "success_count": self.success_count,
120 "error_count": self.error_count,
121 "success_rate": self.success_count / self.total_files if self.total_files > 0 else 0.0,
122 "total_duration_seconds": self.total_duration,
123 "start_time": self.start_time,
124 "end_time": self.end_time,
125 "files_per_second": self.files_per_second,
126 "average_duration_per_file": self.average_duration_per_file,
127 "errors_by_type": self.errors_by_type,
128 }
131class FileLogger:
132 """Logger for individual file processing within a batch.
134 Provides logging methods that automatically tag logs with
135 batch_id and file_id for aggregation.
137 References:
138 LOG-011: Aggregate Logging for Batch Processing
139 """
141 def __init__(
142 self,
143 entry: FileLogEntry,
144 batch_id: str,
145 parent_logger: logging.Logger,
146 ):
147 """Initialize file logger.
149 Args:
150 entry: FileLogEntry for this file.
151 batch_id: Batch job identifier.
152 parent_logger: Parent logger for output.
153 """
154 self._entry = entry
155 self._batch_id = batch_id
156 self._logger = parent_logger
158 def _log(self, level: int, message: str, **kwargs: Any) -> None:
159 """Log a message with batch/file context."""
160 log_entry = {
161 "timestamp": format_timestamp(),
162 "level": logging.getLevelName(level),
163 "message": message,
164 "batch_id": self._batch_id,
165 "file_id": self._entry.file_id,
166 "filename": self._entry.filename,
167 **kwargs,
168 }
169 self._entry.log_messages.append(log_entry)
170 self._logger.log(
171 level,
172 message,
173 extra={
174 "batch_id": self._batch_id,
175 "file_id": self._entry.file_id,
176 **kwargs,
177 },
178 )
180 def debug(self, message: str, **kwargs: Any) -> None:
181 """Log debug message."""
182 self._log(logging.DEBUG, message, **kwargs)
184 def info(self, message: str, **kwargs: Any) -> None:
185 """Log info message."""
186 self._log(logging.INFO, message, **kwargs)
188 def warning(self, message: str, **kwargs: Any) -> None:
189 """Log warning message."""
190 self._log(logging.WARNING, message, **kwargs)
192 def error(self, message: str, **kwargs: Any) -> None:
193 """Log error message."""
194 self._log(logging.ERROR, message, **kwargs)
197class BatchLogger:
198 """Aggregate logger for batch processing operations.
200 Consolidates logs from parallel batch workers with job-level
201 summaries and per-file tracking.
203 Example:
204 >>> logger = BatchLogger(batch_id="job-001")
205 >>> with logger.file_context("capture1.wfm") as file_log:
206 ... file_log.info("Loading file")
207 ... result = analyze(file)
208 ... file_log.info("Analysis complete", result=result)
209 >>> summary = logger.summary()
211 References:
212 LOG-011: Aggregate Logging for Batch Processing
213 LOG-013: Batch Job Correlation ID and Lineage
214 """
216 def __init__(
217 self,
218 batch_id: str | None = None,
219 logger_name: str = "tracekit.batch",
220 ):
221 """Initialize batch logger.
223 Args:
224 batch_id: Unique batch job identifier. Auto-generated if None.
225 logger_name: Name for the underlying logger.
226 """
227 self.batch_id = batch_id or str(uuid.uuid4())
228 self._logger = get_logger(logger_name)
229 self._files: dict[str, FileLogEntry] = {}
230 self._lock = threading.Lock()
231 self._start_time: float | None = None
232 self._end_time: float | None = None
233 self._error_types: dict[str, int] = defaultdict(int)
235 def start(self) -> None:
236 """Mark batch job as started.
238 Records the start time for duration calculation.
239 """
240 self._start_time = time.time()
241 self._logger.info(
242 "Batch job started",
243 extra={"batch_id": self.batch_id},
244 )
246 def finish(self) -> None:
247 """Mark batch job as finished.
249 Records the end time and logs the completion summary.
250 """
251 self._end_time = time.time()
252 summary = self.summary()
253 self._logger.info(
254 "Batch job completed",
255 extra={
256 "batch_id": self.batch_id,
257 "total_files": summary.total_files,
258 "success_count": summary.success_count,
259 "error_count": summary.error_count,
260 "duration": summary.total_duration,
261 },
262 )
264 def register_file(self, filename: str) -> str:
265 """Register a file for processing.
267 Args:
268 filename: Path to the file.
270 Returns:
271 Unique file_id for this file.
272 """
273 file_id = str(uuid.uuid4())
274 with self._lock:
275 self._files[file_id] = FileLogEntry(
276 file_id=file_id,
277 filename=filename,
278 )
279 return file_id
281 @contextmanager
282 def file_context(self, filename: str) -> Iterator[FileLogger]:
283 """Context manager for file processing.
285 Automatically tracks start/end time and status.
287 Args:
288 filename: Path to the file being processed.
290 Yields:
291 FileLogger for logging within this file's context.
293 Raises:
294 Exception: Re-raises any exception from the processing context.
296 Example:
297 >>> with batch_logger.file_context("data.wfm") as log:
298 ... log.info("Processing started")
299 ... result = process_file("data.wfm")
300 """
301 file_id = self.register_file(filename)
302 entry = self._files[file_id]
304 entry.start_time = time.time()
305 entry.status = "processing"
307 file_logger = FileLogger(entry, self.batch_id, self._logger)
309 try:
310 yield file_logger
311 entry.status = "success"
312 except Exception as e:
313 entry.status = "error"
314 entry.error_message = str(e)
315 error_type = type(e).__name__
316 with self._lock:
317 self._error_types[error_type] += 1
318 file_logger.error("Processing failed: %s", e, exception_type=error_type) # type: ignore[call-arg]
319 raise
320 finally:
321 entry.end_time = time.time()
323 def mark_success(self, file_id: str) -> None:
324 """Mark a file as successfully processed.
326 Args:
327 file_id: File identifier from register_file.
328 """
329 with self._lock:
330 if file_id in self._files: 330 ↛ exitline 330 didn't jump to the function exit
331 self._files[file_id].status = "success"
332 self._files[file_id].end_time = time.time()
334 def mark_error(self, file_id: str, error: str, error_type: str = "Unknown") -> None:
335 """Mark a file as failed.
337 Args:
338 file_id: File identifier from register_file.
339 error: Error message.
340 error_type: Type of error for aggregation.
341 """
342 with self._lock:
343 if file_id in self._files: 343 ↛ exitline 343 didn't jump to the function exit
344 self._files[file_id].status = "error"
345 self._files[file_id].error_message = error
346 self._files[file_id].end_time = time.time()
347 self._error_types[error_type] += 1
349 def summary(self) -> BatchSummary:
350 """Generate batch processing summary.
352 Returns:
353 BatchSummary with aggregated statistics.
355 References:
356 LOG-011: Aggregate Logging for Batch Processing
357 """
358 with self._lock:
359 files = list(self._files.values())
360 errors_by_type = dict(self._error_types)
362 total_files = len(files)
363 success_count = sum(1 for f in files if f.status == "success")
364 error_count = sum(1 for f in files if f.status == "error")
366 # Calculate timing
367 start_time = self._start_time or (
368 min((f.start_time for f in files if f.start_time), default=0)
369 )
370 end_time = self._end_time or (max((f.end_time for f in files if f.end_time), default=0))
371 total_duration = end_time - start_time if start_time and end_time else 0.0
373 # Calculate per-file metrics
374 durations = [f.duration for f in files if f.duration is not None]
375 avg_duration = sum(durations) / len(durations) if durations else 0.0
376 files_per_second = total_files / total_duration if total_duration > 0 else 0.0
378 return BatchSummary(
379 batch_id=self.batch_id,
380 total_files=total_files,
381 success_count=success_count,
382 error_count=error_count,
383 total_duration=total_duration,
384 start_time=format_timestamp(datetime.fromtimestamp(start_time, tz=UTC))
385 if start_time
386 else "",
387 end_time=format_timestamp(datetime.fromtimestamp(end_time, tz=UTC)) if end_time else "",
388 errors_by_type=errors_by_type,
389 files_per_second=files_per_second,
390 average_duration_per_file=avg_duration,
391 )
393 def get_file_logs(self, file_id: str) -> list[dict[str, Any]]:
394 """Get all log messages for a specific file.
396 Args:
397 file_id: File identifier.
399 Returns:
400 List of log message dictionaries.
401 """
402 with self._lock:
403 if file_id in self._files: 403 ↛ 405line 403 didn't jump to line 405 because the condition on line 403 was always true
404 return list(self._files[file_id].log_messages)
405 return []
407 def get_all_files(self) -> list[dict[str, Any]]:
408 """Get summary information for all files.
410 Returns:
411 List of file summary dictionaries.
412 """
413 with self._lock:
414 return [f.to_dict() for f in self._files.values()]
416 def get_errors(self) -> list[dict[str, Any]]:
417 """Get all files that encountered errors.
419 Returns:
420 List of error file dictionaries with error details.
421 """
422 with self._lock:
423 return [
424 {
425 **f.to_dict(),
426 "logs": f.log_messages,
427 }
428 for f in self._files.values()
429 if f.status == "error"
430 ]
433def aggregate_batch_logs(
434 batch_loggers: list[BatchLogger],
435) -> dict[str, Any]:
436 """Aggregate logs from multiple batch loggers.
438 Combines summaries from multiple batch jobs into a single
439 aggregate report.
441 Args:
442 batch_loggers: List of BatchLogger instances.
444 Returns:
445 Aggregated summary dictionary.
447 References:
448 LOG-011: Aggregate Logging for Batch Processing
449 """
450 total_files = 0
451 total_success = 0
452 total_errors = 0
453 total_duration = 0.0
454 all_errors_by_type: dict[str, int] = defaultdict(int)
455 batch_summaries = []
457 for logger in batch_loggers:
458 summary = logger.summary()
459 batch_summaries.append(summary.to_dict())
460 total_files += summary.total_files
461 total_success += summary.success_count
462 total_errors += summary.error_count
463 total_duration += summary.total_duration
464 for error_type, count in summary.errors_by_type.items():
465 all_errors_by_type[error_type] += count
467 return {
468 "aggregate": {
469 "total_batches": len(batch_loggers),
470 "total_files": total_files,
471 "total_success": total_success,
472 "total_errors": total_errors,
473 "total_duration_seconds": total_duration,
474 "overall_success_rate": total_success / total_files if total_files > 0 else 0.0,
475 "errors_by_type": dict(all_errors_by_type),
476 },
477 "batches": batch_summaries,
478 }
481__all__ = [
482 "BatchLogger",
483 "BatchSummary",
484 "FileLogEntry",
485 "FileLogger",
486 "aggregate_batch_logs",
487]