Coverage for src / tracekit / batch / logging.py: 97%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Aggregate logging for batch processing operations. 

2 

3This module provides consolidated logging for parallel batch workers 

4with job-level summaries and per-file tracking. 

5 

6 

7Example: 

8 >>> from tracekit.batch.logging import BatchLogger 

9 >>> logger = BatchLogger(batch_id="job-001") 

10 >>> with logger.file_context("capture1.wfm") as file_log: 

11 ... file_log.info("Processing file") 

12 ... result = analyze(file) 

13 >>> logger.summary() 

14 

15References: 

16 LOG-011, 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22import threading 

23import time 

24import uuid 

25from collections import defaultdict 

26from contextlib import contextmanager 

27from dataclasses import dataclass, field 

28from datetime import UTC, datetime 

29from typing import TYPE_CHECKING, Any 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Iterator 

33 

34from tracekit.core.logging import format_timestamp, get_logger 

35 

36 

37@dataclass 

38class FileLogEntry: 

39 """Log entry for a single file in a batch job. 

40 

41 Attributes: 

42 file_id: Unique identifier for this file. 

43 filename: Path to the file. 

44 start_time: Processing start time. 

45 end_time: Processing end time. 

46 status: Processing status (pending, processing, success, error). 

47 error_message: Error message if status is 'error'. 

48 log_messages: List of log messages for this file. 

49 

50 References: 

51 LOG-011: Aggregate Logging for Batch Processing 

52 """ 

53 

54 file_id: str 

55 filename: str 

56 start_time: float | None = None 

57 end_time: float | None = None 

58 status: str = "pending" 

59 error_message: str | None = None 

60 log_messages: list[dict[str, Any]] = field(default_factory=list) 

61 

62 @property 

63 def duration(self) -> float | None: 

64 """Get processing duration in seconds.""" 

65 if self.start_time is not None and self.end_time is not None: 

66 return self.end_time - self.start_time 

67 return None 

68 

69 def to_dict(self) -> dict[str, Any]: 

70 """Convert to dictionary representation.""" 

71 return { 

72 "file_id": self.file_id, 

73 "filename": self.filename, 

74 "start_time": format_timestamp(datetime.fromtimestamp(self.start_time, tz=UTC)) 

75 if self.start_time 

76 else None, 

77 "end_time": format_timestamp(datetime.fromtimestamp(self.end_time, tz=UTC)) 

78 if self.end_time 

79 else None, 

80 "duration_seconds": self.duration, 

81 "status": self.status, 

82 "error_message": self.error_message, 

83 "log_count": len(self.log_messages), 

84 } 

85 

86 

87@dataclass 

88class BatchSummary: 

89 """Summary of batch processing results. 

90 

91 Attributes: 

92 batch_id: Unique identifier for the batch job. 

93 total_files: Total number of files in the batch. 

94 success_count: Number of successfully processed files. 

95 error_count: Number of files that failed. 

96 total_duration: Total processing time in seconds. 

97 errors_by_type: Count of errors grouped by type. 

98 

99 References: 

100 LOG-011: Aggregate Logging for Batch Processing 

101 """ 

102 

103 batch_id: str 

104 total_files: int 

105 success_count: int 

106 error_count: int 

107 total_duration: float 

108 start_time: str 

109 end_time: str 

110 errors_by_type: dict[str, int] = field(default_factory=dict) 

111 files_per_second: float = 0.0 

112 average_duration_per_file: float = 0.0 

113 

114 def to_dict(self) -> dict[str, Any]: 

115 """Convert to dictionary representation.""" 

116 return { 

117 "batch_id": self.batch_id, 

118 "total_files": self.total_files, 

119 "success_count": self.success_count, 

120 "error_count": self.error_count, 

121 "success_rate": self.success_count / self.total_files if self.total_files > 0 else 0.0, 

122 "total_duration_seconds": self.total_duration, 

123 "start_time": self.start_time, 

124 "end_time": self.end_time, 

125 "files_per_second": self.files_per_second, 

126 "average_duration_per_file": self.average_duration_per_file, 

127 "errors_by_type": self.errors_by_type, 

128 } 

129 

130 

131class FileLogger: 

132 """Logger for individual file processing within a batch. 

133 

134 Provides logging methods that automatically tag logs with 

135 batch_id and file_id for aggregation. 

136 

137 References: 

138 LOG-011: Aggregate Logging for Batch Processing 

139 """ 

140 

141 def __init__( 

142 self, 

143 entry: FileLogEntry, 

144 batch_id: str, 

145 parent_logger: logging.Logger, 

146 ): 

147 """Initialize file logger. 

148 

149 Args: 

150 entry: FileLogEntry for this file. 

151 batch_id: Batch job identifier. 

152 parent_logger: Parent logger for output. 

153 """ 

154 self._entry = entry 

155 self._batch_id = batch_id 

156 self._logger = parent_logger 

157 

158 def _log(self, level: int, message: str, **kwargs: Any) -> None: 

159 """Log a message with batch/file context.""" 

160 log_entry = { 

161 "timestamp": format_timestamp(), 

162 "level": logging.getLevelName(level), 

163 "message": message, 

164 "batch_id": self._batch_id, 

165 "file_id": self._entry.file_id, 

166 "filename": self._entry.filename, 

167 **kwargs, 

168 } 

169 self._entry.log_messages.append(log_entry) 

170 self._logger.log( 

171 level, 

172 message, 

173 extra={ 

174 "batch_id": self._batch_id, 

175 "file_id": self._entry.file_id, 

176 **kwargs, 

177 }, 

178 ) 

179 

180 def debug(self, message: str, **kwargs: Any) -> None: 

181 """Log debug message.""" 

182 self._log(logging.DEBUG, message, **kwargs) 

183 

184 def info(self, message: str, **kwargs: Any) -> None: 

185 """Log info message.""" 

186 self._log(logging.INFO, message, **kwargs) 

187 

188 def warning(self, message: str, **kwargs: Any) -> None: 

189 """Log warning message.""" 

190 self._log(logging.WARNING, message, **kwargs) 

191 

192 def error(self, message: str, **kwargs: Any) -> None: 

193 """Log error message.""" 

194 self._log(logging.ERROR, message, **kwargs) 

195 

196 

197class BatchLogger: 

198 """Aggregate logger for batch processing operations. 

199 

200 Consolidates logs from parallel batch workers with job-level 

201 summaries and per-file tracking. 

202 

203 Example: 

204 >>> logger = BatchLogger(batch_id="job-001") 

205 >>> with logger.file_context("capture1.wfm") as file_log: 

206 ... file_log.info("Loading file") 

207 ... result = analyze(file) 

208 ... file_log.info("Analysis complete", result=result) 

209 >>> summary = logger.summary() 

210 

211 References: 

212 LOG-011: Aggregate Logging for Batch Processing 

213 LOG-013: Batch Job Correlation ID and Lineage 

214 """ 

215 

216 def __init__( 

217 self, 

218 batch_id: str | None = None, 

219 logger_name: str = "tracekit.batch", 

220 ): 

221 """Initialize batch logger. 

222 

223 Args: 

224 batch_id: Unique batch job identifier. Auto-generated if None. 

225 logger_name: Name for the underlying logger. 

226 """ 

227 self.batch_id = batch_id or str(uuid.uuid4()) 

228 self._logger = get_logger(logger_name) 

229 self._files: dict[str, FileLogEntry] = {} 

230 self._lock = threading.Lock() 

231 self._start_time: float | None = None 

232 self._end_time: float | None = None 

233 self._error_types: dict[str, int] = defaultdict(int) 

234 

235 def start(self) -> None: 

236 """Mark batch job as started. 

237 

238 Records the start time for duration calculation. 

239 """ 

240 self._start_time = time.time() 

241 self._logger.info( 

242 "Batch job started", 

243 extra={"batch_id": self.batch_id}, 

244 ) 

245 

246 def finish(self) -> None: 

247 """Mark batch job as finished. 

248 

249 Records the end time and logs the completion summary. 

250 """ 

251 self._end_time = time.time() 

252 summary = self.summary() 

253 self._logger.info( 

254 "Batch job completed", 

255 extra={ 

256 "batch_id": self.batch_id, 

257 "total_files": summary.total_files, 

258 "success_count": summary.success_count, 

259 "error_count": summary.error_count, 

260 "duration": summary.total_duration, 

261 }, 

262 ) 

263 

264 def register_file(self, filename: str) -> str: 

265 """Register a file for processing. 

266 

267 Args: 

268 filename: Path to the file. 

269 

270 Returns: 

271 Unique file_id for this file. 

272 """ 

273 file_id = str(uuid.uuid4()) 

274 with self._lock: 

275 self._files[file_id] = FileLogEntry( 

276 file_id=file_id, 

277 filename=filename, 

278 ) 

279 return file_id 

280 

281 @contextmanager 

282 def file_context(self, filename: str) -> Iterator[FileLogger]: 

283 """Context manager for file processing. 

284 

285 Automatically tracks start/end time and status. 

286 

287 Args: 

288 filename: Path to the file being processed. 

289 

290 Yields: 

291 FileLogger for logging within this file's context. 

292 

293 Raises: 

294 Exception: Re-raises any exception from the processing context. 

295 

296 Example: 

297 >>> with batch_logger.file_context("data.wfm") as log: 

298 ... log.info("Processing started") 

299 ... result = process_file("data.wfm") 

300 """ 

301 file_id = self.register_file(filename) 

302 entry = self._files[file_id] 

303 

304 entry.start_time = time.time() 

305 entry.status = "processing" 

306 

307 file_logger = FileLogger(entry, self.batch_id, self._logger) 

308 

309 try: 

310 yield file_logger 

311 entry.status = "success" 

312 except Exception as e: 

313 entry.status = "error" 

314 entry.error_message = str(e) 

315 error_type = type(e).__name__ 

316 with self._lock: 

317 self._error_types[error_type] += 1 

318 file_logger.error("Processing failed: %s", e, exception_type=error_type) # type: ignore[call-arg] 

319 raise 

320 finally: 

321 entry.end_time = time.time() 

322 

323 def mark_success(self, file_id: str) -> None: 

324 """Mark a file as successfully processed. 

325 

326 Args: 

327 file_id: File identifier from register_file. 

328 """ 

329 with self._lock: 

330 if file_id in self._files: 330 ↛ exitline 330 didn't jump to the function exit

331 self._files[file_id].status = "success" 

332 self._files[file_id].end_time = time.time() 

333 

334 def mark_error(self, file_id: str, error: str, error_type: str = "Unknown") -> None: 

335 """Mark a file as failed. 

336 

337 Args: 

338 file_id: File identifier from register_file. 

339 error: Error message. 

340 error_type: Type of error for aggregation. 

341 """ 

342 with self._lock: 

343 if file_id in self._files: 343 ↛ exitline 343 didn't jump to the function exit

344 self._files[file_id].status = "error" 

345 self._files[file_id].error_message = error 

346 self._files[file_id].end_time = time.time() 

347 self._error_types[error_type] += 1 

348 

349 def summary(self) -> BatchSummary: 

350 """Generate batch processing summary. 

351 

352 Returns: 

353 BatchSummary with aggregated statistics. 

354 

355 References: 

356 LOG-011: Aggregate Logging for Batch Processing 

357 """ 

358 with self._lock: 

359 files = list(self._files.values()) 

360 errors_by_type = dict(self._error_types) 

361 

362 total_files = len(files) 

363 success_count = sum(1 for f in files if f.status == "success") 

364 error_count = sum(1 for f in files if f.status == "error") 

365 

366 # Calculate timing 

367 start_time = self._start_time or ( 

368 min((f.start_time for f in files if f.start_time), default=0) 

369 ) 

370 end_time = self._end_time or (max((f.end_time for f in files if f.end_time), default=0)) 

371 total_duration = end_time - start_time if start_time and end_time else 0.0 

372 

373 # Calculate per-file metrics 

374 durations = [f.duration for f in files if f.duration is not None] 

375 avg_duration = sum(durations) / len(durations) if durations else 0.0 

376 files_per_second = total_files / total_duration if total_duration > 0 else 0.0 

377 

378 return BatchSummary( 

379 batch_id=self.batch_id, 

380 total_files=total_files, 

381 success_count=success_count, 

382 error_count=error_count, 

383 total_duration=total_duration, 

384 start_time=format_timestamp(datetime.fromtimestamp(start_time, tz=UTC)) 

385 if start_time 

386 else "", 

387 end_time=format_timestamp(datetime.fromtimestamp(end_time, tz=UTC)) if end_time else "", 

388 errors_by_type=errors_by_type, 

389 files_per_second=files_per_second, 

390 average_duration_per_file=avg_duration, 

391 ) 

392 

393 def get_file_logs(self, file_id: str) -> list[dict[str, Any]]: 

394 """Get all log messages for a specific file. 

395 

396 Args: 

397 file_id: File identifier. 

398 

399 Returns: 

400 List of log message dictionaries. 

401 """ 

402 with self._lock: 

403 if file_id in self._files: 403 ↛ 405line 403 didn't jump to line 405 because the condition on line 403 was always true

404 return list(self._files[file_id].log_messages) 

405 return [] 

406 

407 def get_all_files(self) -> list[dict[str, Any]]: 

408 """Get summary information for all files. 

409 

410 Returns: 

411 List of file summary dictionaries. 

412 """ 

413 with self._lock: 

414 return [f.to_dict() for f in self._files.values()] 

415 

416 def get_errors(self) -> list[dict[str, Any]]: 

417 """Get all files that encountered errors. 

418 

419 Returns: 

420 List of error file dictionaries with error details. 

421 """ 

422 with self._lock: 

423 return [ 

424 { 

425 **f.to_dict(), 

426 "logs": f.log_messages, 

427 } 

428 for f in self._files.values() 

429 if f.status == "error" 

430 ] 

431 

432 

433def aggregate_batch_logs( 

434 batch_loggers: list[BatchLogger], 

435) -> dict[str, Any]: 

436 """Aggregate logs from multiple batch loggers. 

437 

438 Combines summaries from multiple batch jobs into a single 

439 aggregate report. 

440 

441 Args: 

442 batch_loggers: List of BatchLogger instances. 

443 

444 Returns: 

445 Aggregated summary dictionary. 

446 

447 References: 

448 LOG-011: Aggregate Logging for Batch Processing 

449 """ 

450 total_files = 0 

451 total_success = 0 

452 total_errors = 0 

453 total_duration = 0.0 

454 all_errors_by_type: dict[str, int] = defaultdict(int) 

455 batch_summaries = [] 

456 

457 for logger in batch_loggers: 

458 summary = logger.summary() 

459 batch_summaries.append(summary.to_dict()) 

460 total_files += summary.total_files 

461 total_success += summary.success_count 

462 total_errors += summary.error_count 

463 total_duration += summary.total_duration 

464 for error_type, count in summary.errors_by_type.items(): 

465 all_errors_by_type[error_type] += count 

466 

467 return { 

468 "aggregate": { 

469 "total_batches": len(batch_loggers), 

470 "total_files": total_files, 

471 "total_success": total_success, 

472 "total_errors": total_errors, 

473 "total_duration_seconds": total_duration, 

474 "overall_success_rate": total_success / total_files if total_files > 0 else 0.0, 

475 "errors_by_type": dict(all_errors_by_type), 

476 }, 

477 "batches": batch_summaries, 

478 } 

479 

480 

481__all__ = [ 

482 "BatchLogger", 

483 "BatchSummary", 

484 "FileLogEntry", 

485 "FileLogger", 

486 "aggregate_batch_logs", 

487]