Coverage for src / tracekit / core / log_query.py: 98%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Log query and export functionality. 

2 

3This module provides searchable log querying with filtering and export 

4capabilities for analysis and reporting. 

5 

6 

7Example: 

8 >>> from tracekit.core.log_query import LogQuery 

9 >>> query = LogQuery() 

10 >>> # Query last hour of ERROR logs 

11 >>> from datetime import datetime, UTC, timedelta 

12 >>> results = query.query_logs( 

13 ... start_time=datetime.now(UTC) - timedelta(hours=1), 

14 ... level="ERROR" 

15 ... ) 

16 >>> # Export to JSON 

17 >>> query.export_logs(results, "errors.json", format="json") 

18 

19References: 

20 LOG-010: Searchable Log Query and Export 

21""" 

22 

23from __future__ import annotations 

24 

25import csv 

26import json 

27import re 

28from dataclasses import asdict, dataclass 

29from pathlib import Path 

30from typing import TYPE_CHECKING, Any, Literal 

31 

32from tracekit.core.logging import format_timestamp 

33 

34if TYPE_CHECKING: 

35 from datetime import datetime 

36 

37 

38@dataclass 

39class LogRecord: 

40 """Structured log record for querying. 

41 

42 Attributes: 

43 timestamp: ISO 8601 timestamp of the log entry. 

44 level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). 

45 module: Logger name/module. 

46 message: Log message. 

47 correlation_id: Optional correlation ID for tracing. 

48 metadata: Additional metadata fields. 

49 

50 References: 

51 LOG-010: Searchable Log Query 

52 """ 

53 

54 timestamp: str 

55 level: str 

56 module: str 

57 message: str 

58 correlation_id: str | None = None 

59 metadata: dict[str, Any] | None = None 

60 

61 def to_dict(self) -> dict[str, Any]: 

62 """Convert log record to dictionary. 

63 

64 Returns: 

65 Dictionary representation of the log record. 

66 """ 

67 result = asdict(self) 

68 if result["metadata"] is None: 

69 result["metadata"] = {} 

70 return result 

71 

72 @classmethod 

73 def from_dict(cls, data: dict[str, Any]) -> LogRecord: 

74 """Create log record from dictionary. 

75 

76 Args: 

77 data: Dictionary containing log record data. 

78 

79 Returns: 

80 LogRecord instance. 

81 """ 

82 return cls( 

83 timestamp=data["timestamp"], 

84 level=data["level"], 

85 module=data["module"], 

86 message=data["message"], 

87 correlation_id=data.get("correlation_id"), 

88 metadata=data.get("metadata"), 

89 ) 

90 

91 

92class LogQuery: 

93 """Query and filter log records from various sources. 

94 

95 Provides structured querying of logs with filtering by timestamp, 

96 level, module, correlation ID, and message patterns. Supports 

97 pagination and multiple export formats. 

98 

99 Example: 

100 >>> query = LogQuery() 

101 >>> # Load logs from file 

102 >>> query.load_from_file("tracekit.log") 

103 >>> # Query with filters 

104 >>> results = query.query_logs( 

105 ... level="ERROR", 

106 ... module_pattern="tracekit.loaders.*" 

107 ... ) 

108 >>> # Export filtered results 

109 >>> query.export_logs(results, "filtered.csv", format="csv") 

110 

111 References: 

112 LOG-010: Searchable Log Query and Export 

113 """ 

114 

115 def __init__(self): # type: ignore[no-untyped-def] 

116 """Initialize log query engine.""" 

117 self._records: list[LogRecord] = [] 

118 

119 def load_from_file(self, path: str, format: Literal["json", "text"] = "text") -> int: 

120 """Load log records from file. 

121 

122 Args: 

123 path: Path to log file. 

124 format: File format (json for JSON lines, text for plain text). 

125 

126 Returns: 

127 Number of records loaded. 

128 

129 Raises: 

130 FileNotFoundError: If log file does not exist. 

131 ValueError: If format is not supported. 

132 

133 Example: 

134 >>> query = LogQuery() 

135 >>> count = query.load_from_file("logs.json", format="json") 

136 >>> print(f"Loaded {count} records") 

137 

138 References: 

139 LOG-010: Searchable Log Query 

140 """ 

141 path_obj = Path(path) 

142 if not path_obj.exists(): 

143 raise FileNotFoundError(f"Log file not found: {path}") 

144 

145 if format == "json": 

146 return self._load_json_lines(path_obj) 

147 elif format == "text": 

148 return self._load_text(path_obj) 

149 else: 

150 raise ValueError(f"Unsupported format: {format}") 

151 

152 def add_record(self, record: LogRecord) -> None: 

153 """Add a log record to the query index. 

154 

155 Args: 

156 record: LogRecord to add. 

157 

158 Example: 

159 >>> query = LogQuery() 

160 >>> record = LogRecord( 

161 ... timestamp="2025-12-21T10:00:00.000000Z", 

162 ... level="INFO", 

163 ... module="tracekit.test", 

164 ... message="Test message" 

165 ... ) 

166 >>> query.add_record(record) 

167 """ 

168 self._records.append(record) 

169 

170 def query_logs( 

171 self, 

172 *, 

173 start_time: datetime | None = None, 

174 end_time: datetime | None = None, 

175 level: str | None = None, 

176 module: str | None = None, 

177 module_pattern: str | None = None, 

178 correlation_id: str | None = None, 

179 message_pattern: str | None = None, 

180 limit: int | None = None, 

181 offset: int = 0, 

182 ) -> list[LogRecord]: 

183 """Query log records with filtering. 

184 

185 Args: 

186 start_time: Return only logs after this time (UTC). 

187 end_time: Return only logs before this time (UTC). 

188 level: Filter by exact log level. 

189 module: Filter by exact module name. 

190 module_pattern: Filter by module name pattern (glob-style, e.g., "tracekit.loaders.*"). 

191 correlation_id: Filter by correlation ID. 

192 message_pattern: Filter by message regex pattern. 

193 limit: Maximum number of results to return. 

194 offset: Number of results to skip (for pagination). 

195 

196 Returns: 

197 List of matching LogRecord objects. 

198 

199 Example: 

200 >>> from datetime import datetime, UTC, timedelta 

201 >>> query = LogQuery() 

202 >>> # Last hour of errors 

203 >>> results = query.query_logs( 

204 ... start_time=datetime.now(UTC) - timedelta(hours=1), 

205 ... level="ERROR" 

206 ... ) 

207 >>> # Specific module with pattern 

208 >>> results = query.query_logs( 

209 ... module_pattern="tracekit.analyzers.*", 

210 ... message_pattern="FFT.*failed" 

211 ... ) 

212 >>> # Paginated results 

213 >>> page_1 = query.query_logs(limit=100, offset=0) 

214 >>> page_2 = query.query_logs(limit=100, offset=100) 

215 

216 References: 

217 LOG-010: Searchable Log Query and Export 

218 """ 

219 results = self._records.copy() 

220 

221 # Filter by timestamp range 

222 if start_time is not None: 

223 start_str = format_timestamp(start_time, format="iso8601") 

224 results = [r for r in results if r.timestamp >= start_str] 

225 

226 if end_time is not None: 

227 end_str = format_timestamp(end_time, format="iso8601") 

228 results = [r for r in results if r.timestamp <= end_str] 

229 

230 # Filter by log level 

231 if level is not None: 

232 results = [r for r in results if r.level == level.upper()] 

233 

234 # Filter by module 

235 if module is not None: 

236 results = [r for r in results if r.module == module] 

237 

238 # Filter by module pattern 

239 if module_pattern is not None: 

240 # Convert glob pattern to regex 

241 pattern = module_pattern.replace(".", r"\.").replace("*", ".*") 

242 regex = re.compile(f"^{pattern}$") 

243 results = [r for r in results if regex.match(r.module)] 

244 

245 # Filter by correlation ID 

246 if correlation_id is not None: 

247 results = [r for r in results if r.correlation_id == correlation_id] 

248 

249 # Filter by message pattern 

250 if message_pattern is not None: 

251 regex = re.compile(message_pattern) 

252 results = [r for r in results if regex.search(r.message)] 

253 

254 # Apply pagination 

255 if offset > 0: 

256 results = results[offset:] 

257 if limit is not None: 

258 results = results[:limit] 

259 

260 return results 

261 

262 def export_logs( 

263 self, 

264 records: list[LogRecord], 

265 path: str, 

266 format: Literal["json", "csv", "text"] = "json", 

267 ) -> None: 

268 """Export log records to file. 

269 

270 Args: 

271 records: List of LogRecord objects to export. 

272 path: Output file path. 

273 format: Export format (json, csv, or text). 

274 

275 Raises: 

276 ValueError: If format is not supported. 

277 

278 Example: 

279 >>> query = LogQuery() 

280 >>> results = query.query_logs(level="ERROR") 

281 >>> query.export_logs(results, "errors.json", format="json") 

282 >>> query.export_logs(results, "errors.csv", format="csv") 

283 >>> query.export_logs(results, "errors.txt", format="text") 

284 

285 References: 

286 LOG-010: Searchable Log Query and Export 

287 """ 

288 path_obj = Path(path) 

289 path_obj.parent.mkdir(parents=True, exist_ok=True) 

290 

291 if format == "json": 

292 self._export_json(records, path_obj) 

293 elif format == "csv": 

294 self._export_csv(records, path_obj) 

295 elif format == "text": 

296 self._export_text(records, path_obj) 

297 else: 

298 raise ValueError(f"Unsupported export format: {format}") 

299 

300 def get_statistics(self) -> dict[str, Any]: 

301 """Get statistics about loaded log records. 

302 

303 Returns: 

304 Dictionary with statistics: 

305 - total: Total number of records 

306 - by_level: Count by log level 

307 - by_module: Count by module 

308 - time_range: Earliest and latest timestamps 

309 

310 Example: 

311 >>> query = LogQuery() 

312 >>> query.load_from_file("logs.json") 

313 >>> stats = query.get_statistics() 

314 >>> print(f"Total logs: {stats['total']}") 

315 >>> print(f"Errors: {stats['by_level'].get('ERROR', 0)}") 

316 

317 References: 

318 LOG-010: Log Query 

319 """ 

320 if not self._records: 

321 return { 

322 "total": 0, 

323 "by_level": {}, 

324 "by_module": {}, 

325 "time_range": None, 

326 } 

327 

328 from collections import Counter 

329 

330 level_counts = Counter(r.level for r in self._records) 

331 module_counts = Counter(r.module for r in self._records) 

332 

333 timestamps = sorted(r.timestamp for r in self._records) 

334 time_range = { 

335 "earliest": timestamps[0], 

336 "latest": timestamps[-1], 

337 } 

338 

339 return { 

340 "total": len(self._records), 

341 "by_level": dict(level_counts), 

342 "by_module": dict(module_counts.most_common(20)), 

343 "time_range": time_range, 

344 } 

345 

346 def clear(self) -> None: 

347 """Clear all loaded log records. 

348 

349 Example: 

350 >>> query = LogQuery() 

351 >>> query.clear() 

352 """ 

353 self._records.clear() 

354 

355 def _load_json_lines(self, path: Path) -> int: 

356 """Load JSON lines format logs. 

357 

358 Args: 

359 path: Path to JSON lines file. 

360 

361 Returns: 

362 Number of records loaded. 

363 """ 

364 count = 0 

365 with open(path, encoding="utf-8") as f: 

366 for line in f: 

367 line = line.strip() 

368 if not line: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 continue 

370 try: 

371 data = json.loads(line) 

372 record = LogRecord( 

373 timestamp=data.get("timestamp", ""), 

374 level=data.get("level", "INFO"), 

375 module=data.get("module", "unknown"), 

376 message=data.get("message", ""), 

377 correlation_id=data.get("correlation_id"), 

378 metadata={ 

379 k: v 

380 for k, v in data.items() 

381 if k 

382 not in ("timestamp", "level", "module", "message", "correlation_id") 

383 }, 

384 ) 

385 self._records.append(record) 

386 count += 1 

387 except (json.JSONDecodeError, KeyError): 

388 # Skip malformed lines 

389 continue 

390 return count 

391 

392 def _load_text(self, path: Path) -> int: 

393 """Load plain text format logs. 

394 

395 Attempts to parse common log formats. 

396 

397 Args: 

398 path: Path to text log file. 

399 

400 Returns: 

401 Number of records loaded. 

402 """ 

403 count = 0 

404 with open(path, encoding="utf-8") as f: 

405 for line in f: 

406 line = line.strip() 

407 if not line: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true

408 continue 

409 

410 # Try to parse common format: TIMESTAMP [LEVEL] MODULE: MESSAGE 

411 match = re.match( 

412 r"^(\S+)\s+\[(\w+)\]\s+(\S+):\s+(.*)$", 

413 line, 

414 ) 

415 if match: 

416 timestamp, level, module, message = match.groups() 

417 record = LogRecord( 

418 timestamp=timestamp, 

419 level=level, 

420 module=module, 

421 message=message, 

422 ) 

423 self._records.append(record) 

424 count += 1 

425 

426 return count 

427 

428 def _export_json(self, records: list[LogRecord], path: Path) -> None: 

429 """Export records as JSON. 

430 

431 Args: 

432 records: Records to export. 

433 path: Output path. 

434 """ 

435 with open(path, "w", encoding="utf-8") as f: 

436 json.dump( 

437 [r.to_dict() for r in records], 

438 f, 

439 indent=2, 

440 default=str, 

441 ) 

442 

443 def _export_csv(self, records: list[LogRecord], path: Path) -> None: 

444 """Export records as CSV. 

445 

446 Args: 

447 records: Records to export. 

448 path: Output path. 

449 """ 

450 if not records: 

451 return 

452 

453 with open(path, "w", newline="", encoding="utf-8") as f: 

454 fieldnames = ["timestamp", "level", "module", "message", "correlation_id"] 

455 writer = csv.DictWriter(f, fieldnames=fieldnames) 

456 writer.writeheader() 

457 

458 for record in records: 

459 writer.writerow( 

460 { 

461 "timestamp": record.timestamp, 

462 "level": record.level, 

463 "module": record.module, 

464 "message": record.message, 

465 "correlation_id": record.correlation_id or "", 

466 } 

467 ) 

468 

469 def _export_text(self, records: list[LogRecord], path: Path) -> None: 

470 """Export records as plain text. 

471 

472 Args: 

473 records: Records to export. 

474 path: Output path. 

475 """ 

476 with open(path, "w", encoding="utf-8") as f: 

477 for record in records: 

478 line = f"{record.timestamp} [{record.level}] {record.module}: {record.message}" 

479 if record.correlation_id: 

480 line += f" [corr_id={record.correlation_id}]" 

481 f.write(line + "\n") 

482 

483 

484def query_logs( 

485 log_file: str, 

486 *, 

487 start_time: datetime | None = None, 

488 end_time: datetime | None = None, 

489 level: str | None = None, 

490 module: str | None = None, 

491 correlation_id: str | None = None, 

492 message_pattern: str | None = None, 

493 limit: int | None = None, 

494) -> list[LogRecord]: 

495 """Convenience function to query logs from a file. 

496 

497 Args: 

498 log_file: Path to log file. 

499 start_time: Filter by start time (UTC). 

500 end_time: Filter by end time (UTC). 

501 level: Filter by log level. 

502 module: Filter by module name. 

503 correlation_id: Filter by correlation ID. 

504 message_pattern: Filter by message regex pattern. 

505 limit: Maximum number of results. 

506 

507 Returns: 

508 List of matching LogRecord objects. 

509 

510 Example: 

511 >>> from datetime import datetime, UTC, timedelta 

512 >>> from tracekit.core.log_query import query_logs 

513 >>> # Query last hour of errors 

514 >>> results = query_logs( 

515 ... "tracekit.log", 

516 ... start_time=datetime.now(UTC) - timedelta(hours=1), 

517 ... level="ERROR" 

518 ... ) 

519 

520 References: 

521 LOG-010: Searchable Log Query and Export 

522 """ 

523 query = LogQuery() # type: ignore[no-untyped-call] 

524 query.load_from_file(log_file, format="json" if log_file.endswith(".json") else "text") 

525 return query.query_logs( 

526 start_time=start_time, 

527 end_time=end_time, 

528 level=level, 

529 module=module, 

530 correlation_id=correlation_id, 

531 message_pattern=message_pattern, 

532 limit=limit, 

533 ) 

534 

535 

536__all__ = [ 

537 "LogQuery", 

538 "LogRecord", 

539 "query_logs", 

540]