Coverage for src / tracekit / core / log_query.py: 98%
138 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Log query and export functionality.
3This module provides searchable log querying with filtering and export
4capabilities for analysis and reporting.
7Example:
8 >>> from tracekit.core.log_query import LogQuery
9 >>> query = LogQuery()
10 >>> # Query last hour of ERROR logs
11 >>> from datetime import datetime, UTC, timedelta
12 >>> results = query.query_logs(
13 ... start_time=datetime.now(UTC) - timedelta(hours=1),
14 ... level="ERROR"
15 ... )
16 >>> # Export to JSON
17 >>> query.export_logs(results, "errors.json", format="json")
19References:
20 LOG-010: Searchable Log Query and Export
21"""
23from __future__ import annotations
25import csv
26import json
27import re
28from dataclasses import asdict, dataclass
29from pathlib import Path
30from typing import TYPE_CHECKING, Any, Literal
32from tracekit.core.logging import format_timestamp
34if TYPE_CHECKING:
35 from datetime import datetime
38@dataclass
39class LogRecord:
40 """Structured log record for querying.
42 Attributes:
43 timestamp: ISO 8601 timestamp of the log entry.
44 level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
45 module: Logger name/module.
46 message: Log message.
47 correlation_id: Optional correlation ID for tracing.
48 metadata: Additional metadata fields.
50 References:
51 LOG-010: Searchable Log Query
52 """
54 timestamp: str
55 level: str
56 module: str
57 message: str
58 correlation_id: str | None = None
59 metadata: dict[str, Any] | None = None
61 def to_dict(self) -> dict[str, Any]:
62 """Convert log record to dictionary.
64 Returns:
65 Dictionary representation of the log record.
66 """
67 result = asdict(self)
68 if result["metadata"] is None:
69 result["metadata"] = {}
70 return result
72 @classmethod
73 def from_dict(cls, data: dict[str, Any]) -> LogRecord:
74 """Create log record from dictionary.
76 Args:
77 data: Dictionary containing log record data.
79 Returns:
80 LogRecord instance.
81 """
82 return cls(
83 timestamp=data["timestamp"],
84 level=data["level"],
85 module=data["module"],
86 message=data["message"],
87 correlation_id=data.get("correlation_id"),
88 metadata=data.get("metadata"),
89 )
92class LogQuery:
93 """Query and filter log records from various sources.
95 Provides structured querying of logs with filtering by timestamp,
96 level, module, correlation ID, and message patterns. Supports
97 pagination and multiple export formats.
99 Example:
100 >>> query = LogQuery()
101 >>> # Load logs from file
102 >>> query.load_from_file("tracekit.log")
103 >>> # Query with filters
104 >>> results = query.query_logs(
105 ... level="ERROR",
106 ... module_pattern="tracekit.loaders.*"
107 ... )
108 >>> # Export filtered results
109 >>> query.export_logs(results, "filtered.csv", format="csv")
111 References:
112 LOG-010: Searchable Log Query and Export
113 """
115 def __init__(self): # type: ignore[no-untyped-def]
116 """Initialize log query engine."""
117 self._records: list[LogRecord] = []
119 def load_from_file(self, path: str, format: Literal["json", "text"] = "text") -> int:
120 """Load log records from file.
122 Args:
123 path: Path to log file.
124 format: File format (json for JSON lines, text for plain text).
126 Returns:
127 Number of records loaded.
129 Raises:
130 FileNotFoundError: If log file does not exist.
131 ValueError: If format is not supported.
133 Example:
134 >>> query = LogQuery()
135 >>> count = query.load_from_file("logs.json", format="json")
136 >>> print(f"Loaded {count} records")
138 References:
139 LOG-010: Searchable Log Query
140 """
141 path_obj = Path(path)
142 if not path_obj.exists():
143 raise FileNotFoundError(f"Log file not found: {path}")
145 if format == "json":
146 return self._load_json_lines(path_obj)
147 elif format == "text":
148 return self._load_text(path_obj)
149 else:
150 raise ValueError(f"Unsupported format: {format}")
152 def add_record(self, record: LogRecord) -> None:
153 """Add a log record to the query index.
155 Args:
156 record: LogRecord to add.
158 Example:
159 >>> query = LogQuery()
160 >>> record = LogRecord(
161 ... timestamp="2025-12-21T10:00:00.000000Z",
162 ... level="INFO",
163 ... module="tracekit.test",
164 ... message="Test message"
165 ... )
166 >>> query.add_record(record)
167 """
168 self._records.append(record)
170 def query_logs(
171 self,
172 *,
173 start_time: datetime | None = None,
174 end_time: datetime | None = None,
175 level: str | None = None,
176 module: str | None = None,
177 module_pattern: str | None = None,
178 correlation_id: str | None = None,
179 message_pattern: str | None = None,
180 limit: int | None = None,
181 offset: int = 0,
182 ) -> list[LogRecord]:
183 """Query log records with filtering.
185 Args:
186 start_time: Return only logs after this time (UTC).
187 end_time: Return only logs before this time (UTC).
188 level: Filter by exact log level.
189 module: Filter by exact module name.
190 module_pattern: Filter by module name pattern (glob-style, e.g., "tracekit.loaders.*").
191 correlation_id: Filter by correlation ID.
192 message_pattern: Filter by message regex pattern.
193 limit: Maximum number of results to return.
194 offset: Number of results to skip (for pagination).
196 Returns:
197 List of matching LogRecord objects.
199 Example:
200 >>> from datetime import datetime, UTC, timedelta
201 >>> query = LogQuery()
202 >>> # Last hour of errors
203 >>> results = query.query_logs(
204 ... start_time=datetime.now(UTC) - timedelta(hours=1),
205 ... level="ERROR"
206 ... )
207 >>> # Specific module with pattern
208 >>> results = query.query_logs(
209 ... module_pattern="tracekit.analyzers.*",
210 ... message_pattern="FFT.*failed"
211 ... )
212 >>> # Paginated results
213 >>> page_1 = query.query_logs(limit=100, offset=0)
214 >>> page_2 = query.query_logs(limit=100, offset=100)
216 References:
217 LOG-010: Searchable Log Query and Export
218 """
219 results = self._records.copy()
221 # Filter by timestamp range
222 if start_time is not None:
223 start_str = format_timestamp(start_time, format="iso8601")
224 results = [r for r in results if r.timestamp >= start_str]
226 if end_time is not None:
227 end_str = format_timestamp(end_time, format="iso8601")
228 results = [r for r in results if r.timestamp <= end_str]
230 # Filter by log level
231 if level is not None:
232 results = [r for r in results if r.level == level.upper()]
234 # Filter by module
235 if module is not None:
236 results = [r for r in results if r.module == module]
238 # Filter by module pattern
239 if module_pattern is not None:
240 # Convert glob pattern to regex
241 pattern = module_pattern.replace(".", r"\.").replace("*", ".*")
242 regex = re.compile(f"^{pattern}$")
243 results = [r for r in results if regex.match(r.module)]
245 # Filter by correlation ID
246 if correlation_id is not None:
247 results = [r for r in results if r.correlation_id == correlation_id]
249 # Filter by message pattern
250 if message_pattern is not None:
251 regex = re.compile(message_pattern)
252 results = [r for r in results if regex.search(r.message)]
254 # Apply pagination
255 if offset > 0:
256 results = results[offset:]
257 if limit is not None:
258 results = results[:limit]
260 return results
262 def export_logs(
263 self,
264 records: list[LogRecord],
265 path: str,
266 format: Literal["json", "csv", "text"] = "json",
267 ) -> None:
268 """Export log records to file.
270 Args:
271 records: List of LogRecord objects to export.
272 path: Output file path.
273 format: Export format (json, csv, or text).
275 Raises:
276 ValueError: If format is not supported.
278 Example:
279 >>> query = LogQuery()
280 >>> results = query.query_logs(level="ERROR")
281 >>> query.export_logs(results, "errors.json", format="json")
282 >>> query.export_logs(results, "errors.csv", format="csv")
283 >>> query.export_logs(results, "errors.txt", format="text")
285 References:
286 LOG-010: Searchable Log Query and Export
287 """
288 path_obj = Path(path)
289 path_obj.parent.mkdir(parents=True, exist_ok=True)
291 if format == "json":
292 self._export_json(records, path_obj)
293 elif format == "csv":
294 self._export_csv(records, path_obj)
295 elif format == "text":
296 self._export_text(records, path_obj)
297 else:
298 raise ValueError(f"Unsupported export format: {format}")
300 def get_statistics(self) -> dict[str, Any]:
301 """Get statistics about loaded log records.
303 Returns:
304 Dictionary with statistics:
305 - total: Total number of records
306 - by_level: Count by log level
307 - by_module: Count by module
308 - time_range: Earliest and latest timestamps
310 Example:
311 >>> query = LogQuery()
312 >>> query.load_from_file("logs.json")
313 >>> stats = query.get_statistics()
314 >>> print(f"Total logs: {stats['total']}")
315 >>> print(f"Errors: {stats['by_level'].get('ERROR', 0)}")
317 References:
318 LOG-010: Log Query
319 """
320 if not self._records:
321 return {
322 "total": 0,
323 "by_level": {},
324 "by_module": {},
325 "time_range": None,
326 }
328 from collections import Counter
330 level_counts = Counter(r.level for r in self._records)
331 module_counts = Counter(r.module for r in self._records)
333 timestamps = sorted(r.timestamp for r in self._records)
334 time_range = {
335 "earliest": timestamps[0],
336 "latest": timestamps[-1],
337 }
339 return {
340 "total": len(self._records),
341 "by_level": dict(level_counts),
342 "by_module": dict(module_counts.most_common(20)),
343 "time_range": time_range,
344 }
346 def clear(self) -> None:
347 """Clear all loaded log records.
349 Example:
350 >>> query = LogQuery()
351 >>> query.clear()
352 """
353 self._records.clear()
355 def _load_json_lines(self, path: Path) -> int:
356 """Load JSON lines format logs.
358 Args:
359 path: Path to JSON lines file.
361 Returns:
362 Number of records loaded.
363 """
364 count = 0
365 with open(path, encoding="utf-8") as f:
366 for line in f:
367 line = line.strip()
368 if not line: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 continue
370 try:
371 data = json.loads(line)
372 record = LogRecord(
373 timestamp=data.get("timestamp", ""),
374 level=data.get("level", "INFO"),
375 module=data.get("module", "unknown"),
376 message=data.get("message", ""),
377 correlation_id=data.get("correlation_id"),
378 metadata={
379 k: v
380 for k, v in data.items()
381 if k
382 not in ("timestamp", "level", "module", "message", "correlation_id")
383 },
384 )
385 self._records.append(record)
386 count += 1
387 except (json.JSONDecodeError, KeyError):
388 # Skip malformed lines
389 continue
390 return count
392 def _load_text(self, path: Path) -> int:
393 """Load plain text format logs.
395 Attempts to parse common log formats.
397 Args:
398 path: Path to text log file.
400 Returns:
401 Number of records loaded.
402 """
403 count = 0
404 with open(path, encoding="utf-8") as f:
405 for line in f:
406 line = line.strip()
407 if not line: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 continue
410 # Try to parse common format: TIMESTAMP [LEVEL] MODULE: MESSAGE
411 match = re.match(
412 r"^(\S+)\s+\[(\w+)\]\s+(\S+):\s+(.*)$",
413 line,
414 )
415 if match:
416 timestamp, level, module, message = match.groups()
417 record = LogRecord(
418 timestamp=timestamp,
419 level=level,
420 module=module,
421 message=message,
422 )
423 self._records.append(record)
424 count += 1
426 return count
428 def _export_json(self, records: list[LogRecord], path: Path) -> None:
429 """Export records as JSON.
431 Args:
432 records: Records to export.
433 path: Output path.
434 """
435 with open(path, "w", encoding="utf-8") as f:
436 json.dump(
437 [r.to_dict() for r in records],
438 f,
439 indent=2,
440 default=str,
441 )
443 def _export_csv(self, records: list[LogRecord], path: Path) -> None:
444 """Export records as CSV.
446 Args:
447 records: Records to export.
448 path: Output path.
449 """
450 if not records:
451 return
453 with open(path, "w", newline="", encoding="utf-8") as f:
454 fieldnames = ["timestamp", "level", "module", "message", "correlation_id"]
455 writer = csv.DictWriter(f, fieldnames=fieldnames)
456 writer.writeheader()
458 for record in records:
459 writer.writerow(
460 {
461 "timestamp": record.timestamp,
462 "level": record.level,
463 "module": record.module,
464 "message": record.message,
465 "correlation_id": record.correlation_id or "",
466 }
467 )
469 def _export_text(self, records: list[LogRecord], path: Path) -> None:
470 """Export records as plain text.
472 Args:
473 records: Records to export.
474 path: Output path.
475 """
476 with open(path, "w", encoding="utf-8") as f:
477 for record in records:
478 line = f"{record.timestamp} [{record.level}] {record.module}: {record.message}"
479 if record.correlation_id:
480 line += f" [corr_id={record.correlation_id}]"
481 f.write(line + "\n")
484def query_logs(
485 log_file: str,
486 *,
487 start_time: datetime | None = None,
488 end_time: datetime | None = None,
489 level: str | None = None,
490 module: str | None = None,
491 correlation_id: str | None = None,
492 message_pattern: str | None = None,
493 limit: int | None = None,
494) -> list[LogRecord]:
495 """Convenience function to query logs from a file.
497 Args:
498 log_file: Path to log file.
499 start_time: Filter by start time (UTC).
500 end_time: Filter by end time (UTC).
501 level: Filter by log level.
502 module: Filter by module name.
503 correlation_id: Filter by correlation ID.
504 message_pattern: Filter by message regex pattern.
505 limit: Maximum number of results.
507 Returns:
508 List of matching LogRecord objects.
510 Example:
511 >>> from datetime import datetime, UTC, timedelta
512 >>> from tracekit.core.log_query import query_logs
513 >>> # Query last hour of errors
514 >>> results = query_logs(
515 ... "tracekit.log",
516 ... start_time=datetime.now(UTC) - timedelta(hours=1),
517 ... level="ERROR"
518 ... )
520 References:
521 LOG-010: Searchable Log Query and Export
522 """
523 query = LogQuery() # type: ignore[no-untyped-call]
524 query.load_from_file(log_file, format="json" if log_file.endswith(".json") else "text")
525 return query.query_logs(
526 start_time=start_time,
527 end_time=end_time,
528 level=level,
529 module=module,
530 correlation_id=correlation_id,
531 message_pattern=message_pattern,
532 limit=limit,
533 )
536__all__ = [
537 "LogQuery",
538 "LogRecord",
539 "query_logs",
540]