Coverage for src / tracekit / loaders / validation.py: 55%
228 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Packet validation and integrity checking.
3This module provides comprehensive packet validation including sync markers,
4sequence numbers, checksums, and structural integrity verification.
7Example:
8 >>> from tracekit.loaders.validation import PacketValidator
9 >>> validator = PacketValidator(sync_marker=0xFA, checksum_type="crc16")
10 >>> result = validator.validate_packet(packet_data)
11 >>> if result.is_valid:
12 ... print("Packet valid")
13 >>> stats = validator.get_statistics()
14 >>> print(f"Pass rate: {stats.pass_rate:.1%}")
15"""
17from __future__ import annotations
19import logging
20from dataclasses import dataclass, field
21from typing import Any
23# Logger for debug output
24logger = logging.getLogger(__name__)
27@dataclass
28class SequenceGap:
29 """Sequence gap information.
33 Attributes:
34 position: Packet position where gap was detected.
35 expected: Expected sequence number.
36 got: Actual sequence number received.
37 gap_size: Size of the gap (number of missing packets).
38 """
40 position: int
41 expected: int
42 got: int
43 gap_size: int
46@dataclass
47class SequenceValidation:
48 """Sequence validation results.
52 Attributes:
53 total_packets: Total number of packets validated.
54 sequence_gaps: List of detected sequence gaps.
55 duplicates: Number of duplicate sequence numbers.
56 valid: Whether sequence validation passed overall.
57 """
59 total_packets: int = 0
60 sequence_gaps: list[SequenceGap] = field(default_factory=list)
61 duplicates: int = 0
62 valid: bool = True
64 @property
65 def gap_count(self) -> int:
66 """Number of sequence gaps detected.
68 Returns:
69 Number of gaps.
70 """
71 return len(self.sequence_gaps)
73 @property
74 def total_missing_packets(self) -> int:
75 """Total number of missing packets across all gaps.
77 Returns:
78 Sum of all gap sizes.
79 """
80 return sum(gap.gap_size for gap in self.sequence_gaps)
83@dataclass
84class ValidationResult:
85 """Result of packet validation.
89 Attributes:
90 is_valid: Whether packet passed all validation checks.
91 sync_valid: Sync marker validation result.
92 sequence_valid: Sequence number validation result.
93 checksum_valid: Checksum validation result.
94 errors: List of validation error messages.
95 warnings: List of validation warnings.
96 packet_index: Index of validated packet.
97 """
99 is_valid: bool = True
100 sync_valid: bool = True
101 sequence_valid: bool = True
102 checksum_valid: bool = True
103 errors: list[str] = field(default_factory=list)
104 warnings: list[str] = field(default_factory=list)
105 packet_index: int = 0
107 def add_error(self, message: str) -> None:
108 """Add validation error.
110 Args:
111 message: Error message.
112 """
113 self.errors.append(message)
114 self.is_valid = False
116 def add_warning(self, message: str) -> None:
117 """Add validation warning.
119 Args:
120 message: Warning message.
121 """
122 self.warnings.append(message)
125@dataclass
126class ValidationStats:
127 """Aggregate validation statistics.
131 Attributes:
132 total_packets: Total number of packets validated.
133 valid_packets: Number of packets that passed all checks.
134 sync_failures: Number of sync marker failures.
135 sequence_gaps: Number of sequence gaps detected.
136 sequence_duplicates: Number of duplicate sequences detected.
137 checksum_failures: Number of checksum failures.
138 error_types: Dictionary of error type counts.
139 """
141 total_packets: int = 0
142 valid_packets: int = 0
143 sync_failures: int = 0
144 sequence_gaps: int = 0
145 sequence_duplicates: int = 0
146 checksum_failures: int = 0
147 error_types: dict[str, int] = field(default_factory=dict)
149 @property
150 def pass_rate(self) -> float:
151 """Calculate validation pass rate.
153 Returns:
154 Fraction of packets that passed validation (0.0 to 1.0).
155 """
156 if self.total_packets == 0:
157 return 0.0
158 return self.valid_packets / self.total_packets
160 @property
161 def fail_rate(self) -> float:
162 """Calculate validation fail rate.
164 Returns:
165 Fraction of packets that failed validation (0.0 to 1.0).
166 """
167 return 1.0 - self.pass_rate
169 def add_error_type(self, error_type: str) -> None:
170 """Increment error type counter.
172 Args:
173 error_type: Type of error (e.g., "sync_mismatch", "checksum_fail").
174 """
175 self.error_types[error_type] = self.error_types.get(error_type, 0) + 1
178class PacketValidator:
179 """Validate packet integrity and structure.
183 Performs comprehensive validation including:
184 - Sync/magic byte verification
185 - Sequence number gap/duplicate detection
186 - Checksum verification (CRC-8/16/32, sum, XOR)
187 - Field value range validation
189 Attributes:
190 sync_marker: Expected sync marker value (optional).
191 sync_field: Name of sync field in packet header (optional).
192 sequence_field: Name of sequence field in packet header (optional).
193 checksum_type: Checksum algorithm ("crc8", "crc16", "crc32", "sum", "xor", optional).
194 checksum_field: Name of checksum field in packet header (optional).
195 strictness: Validation strictness level ("strict", "normal", "lenient").
196 stats: Validation statistics.
197 """
199 def __init__(
200 self,
201 *,
202 sync_marker: int | bytes | None = None,
203 sync_field: str = "sync_marker",
204 sequence_field: str = "sequence",
205 checksum_type: str | None = None,
206 checksum_field: str = "checksum",
207 strictness: str = "normal",
208 ) -> None:
209 """Initialize packet validator.
211 Args:
212 sync_marker: Expected sync marker value (optional).
213 sync_field: Name of sync field in packet header (default: "sync_marker").
214 sequence_field: Name of sequence field in packet header (default: "sequence").
215 checksum_type: Checksum algorithm (optional).
216 checksum_field: Name of checksum field in packet header (default: "checksum").
217 strictness: Validation strictness ("strict", "normal", "lenient").
218 """
219 self.sync_marker = sync_marker
220 self.sync_field = sync_field
221 self.sequence_field = sequence_field
222 self.checksum_type = checksum_type
223 self.checksum_field = checksum_field
224 self.strictness = strictness
226 self.stats = ValidationStats()
227 self._last_sequence: int | None = None
229 def validate_packet(
230 self, packet: dict[str, Any], packet_data: bytes | None = None
231 ) -> ValidationResult:
232 """Validate a single packet.
236 Args:
237 packet: Parsed packet dictionary.
238 packet_data: Raw packet bytes (required for checksum validation).
240 Returns:
241 ValidationResult with validation outcome.
243 Example:
244 >>> validator = PacketValidator(sync_marker=0xFA)
245 >>> result = validator.validate_packet(packet)
246 >>> if not result.is_valid:
247 ... print(f"Errors: {result.errors}")
248 """
249 result = ValidationResult(packet_index=packet.get("index", 0))
251 header = packet.get("header", {})
253 # Validate sync marker
254 if self.sync_marker is not None:
255 result.sync_valid = self._validate_sync(header, result)
257 # Validate sequence number
258 if self.sequence_field in header:
259 result.sequence_valid = self._validate_sequence(header, result)
261 # Validate checksum
262 if self.checksum_type is not None and packet_data is not None:
263 result.checksum_valid = self._validate_checksum(header, packet_data, result)
265 # Update statistics
266 self.stats.total_packets += 1
267 if result.is_valid:
268 self.stats.valid_packets += 1
270 return result
272 def _validate_sync(self, header: dict[str, Any], result: ValidationResult) -> bool:
273 """Validate sync marker.
275 Args:
276 header: Packet header dictionary.
277 result: Validation result to update.
279 Returns:
280 True if sync is valid.
281 """
282 if self.sync_field not in header: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 if self.strictness == "strict":
284 result.add_error(f"Missing sync field: {self.sync_field}")
285 self.stats.sync_failures += 1
286 self.stats.add_error_type("sync_missing")
287 return False
288 else:
289 result.add_warning(f"Missing sync field: {self.sync_field}")
290 return True
292 sync_value = header[self.sync_field]
294 if sync_value != self.sync_marker:
295 # Convert bytes to int if needed for formatting
296 if isinstance(sync_value, int): 296 ↛ 298line 296 didn't jump to line 298 because the condition on line 296 was always true
297 sync_val_hex = sync_value
298 elif isinstance(sync_value, bytes):
299 sync_val_hex = int.from_bytes(sync_value, "big")
300 else:
301 sync_val_hex = int.from_bytes(bytes([sync_value]), "big")
303 # Convert sync_marker to int if needed for formatting
304 if isinstance(self.sync_marker, int): 304 ↛ 306line 304 didn't jump to line 306 because the condition on line 304 was always true
305 expected_hex = self.sync_marker
306 elif isinstance(self.sync_marker, bytes):
307 expected_hex = int.from_bytes(self.sync_marker, "big")
308 else:
309 expected_hex = 0
311 msg = f"Sync marker mismatch: expected {expected_hex:#x}, got {sync_val_hex:#x}"
312 if self.strictness == "strict":
313 result.add_error(msg)
314 else:
315 result.add_warning(msg)
317 self.stats.sync_failures += 1
318 self.stats.add_error_type("sync_mismatch")
319 return False
321 return True
323 def _validate_sequence(self, header: dict[str, Any], result: ValidationResult) -> bool:
324 """Validate sequence number.
326 Args:
327 header: Packet header dictionary.
328 result: Validation result to update.
330 Returns:
331 True if sequence is valid.
332 """
333 sequence = header.get(self.sequence_field)
334 if sequence is None: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 return True # No sequence to validate
337 if self._last_sequence is not None:
338 expected = (self._last_sequence + 1) & 0xFFFFFFFF # Handle rollover
340 if sequence == self._last_sequence:
341 # Duplicate sequence
342 msg = f"Duplicate sequence number: {sequence}"
343 result.add_warning(msg)
344 self.stats.sequence_duplicates += 1
345 self.stats.add_error_type("sequence_duplicate")
346 return False
348 elif sequence != expected:
349 # Sequence gap
350 gap = (sequence - expected) & 0xFFFFFFFF
351 msg = f"Sequence gap detected: expected {expected}, got {sequence} (gap: {gap})"
353 if self.strictness == "strict": 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 result.add_error(msg)
355 else:
356 result.add_warning(msg)
358 self.stats.sequence_gaps += 1
359 self.stats.add_error_type("sequence_gap")
361 if self.strictness == "strict": 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 self._last_sequence = sequence
363 return False
365 self._last_sequence = sequence
366 return True
368 def _validate_checksum(
369 self, header: dict[str, Any], packet_data: bytes, result: ValidationResult
370 ) -> bool:
371 """Validate packet checksum.
373 Args:
374 header: Packet header dictionary.
375 packet_data: Raw packet bytes.
376 result: Validation result to update.
378 Returns:
379 True if checksum is valid.
380 """
381 if self.checksum_field not in header: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 if self.strictness == "strict":
383 result.add_error(f"Missing checksum field: {self.checksum_field}")
384 self.stats.checksum_failures += 1
385 self.stats.add_error_type("checksum_missing")
386 return False
387 return True
389 expected_checksum = header[self.checksum_field]
390 computed_checksum = self._compute_checksum(packet_data)
392 if computed_checksum != expected_checksum: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true
393 msg = f"Checksum mismatch: expected {expected_checksum:#x}, got {computed_checksum:#x}"
395 if self.strictness == "strict":
396 result.add_error(msg)
397 else:
398 result.add_warning(msg)
400 self.stats.checksum_failures += 1
401 self.stats.add_error_type("checksum_fail")
402 return False
404 return True
406 def _compute_checksum(self, data: bytes) -> int:
407 """Compute checksum using configured algorithm.
409 Args:
410 data: Data to checksum.
412 Returns:
413 Computed checksum value.
414 """
415 if self.checksum_type == "crc8": 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true
416 return self._crc8(data)
417 elif self.checksum_type == "crc16": 417 ↛ 419line 417 didn't jump to line 419 because the condition on line 417 was always true
418 return self._crc16(data)
419 elif self.checksum_type == "crc32":
420 return self._crc32(data)
421 elif self.checksum_type == "sum":
422 return sum(data) & 0xFF
423 elif self.checksum_type == "xor":
424 result = 0
425 for byte in data:
426 result ^= byte
427 return result
428 else:
429 logger.warning("Unknown checksum type: %s", self.checksum_type)
430 return 0
432 @staticmethod
433 def _crc8(data: bytes, poly: int = 0x07) -> int:
434 """Compute CRC-8 checksum.
436 Args:
437 data: Data to checksum.
438 poly: CRC polynomial (default: 0x07).
440 Returns:
441 CRC-8 value.
442 """
443 crc = 0
444 for byte in data:
445 crc ^= byte
446 for _ in range(8):
447 if crc & 0x80:
448 crc = (crc << 1) ^ poly
449 else:
450 crc <<= 1
451 crc &= 0xFF
452 return crc
454 @staticmethod
455 def _crc16(data: bytes, poly: int = 0x1021) -> int:
456 """Compute CRC-16 checksum.
458 Args:
459 data: Data to checksum.
460 poly: CRC polynomial (default: 0x1021 for CRC-16-CCITT).
462 Returns:
463 CRC-16 value.
464 """
465 crc = 0xFFFF
466 for byte in data:
467 crc ^= byte << 8
468 for _ in range(8):
469 if crc & 0x8000:
470 crc = (crc << 1) ^ poly
471 else:
472 crc <<= 1
473 crc &= 0xFFFF
474 return crc
476 @staticmethod
477 def _crc32(data: bytes, poly: int = 0xEDB88320) -> int:
478 """Compute CRC-32 checksum.
480 Args:
481 data: Data to checksum.
482 poly: CRC polynomial (default: 0xEDB88320 for CRC-32).
484 Returns:
485 CRC-32 value.
486 """
487 crc = 0xFFFFFFFF
488 for byte in data:
489 crc ^= byte
490 for _ in range(8):
491 if crc & 1:
492 crc = (crc >> 1) ^ poly
493 else:
494 crc >>= 1
495 return crc ^ 0xFFFFFFFF
497 def get_statistics(self) -> ValidationStats:
498 """Get aggregate validation statistics.
502 Returns:
503 ValidationStats with cumulative validation results.
505 Example:
506 >>> validator = PacketValidator()
507 >>> # ... validate packets ...
508 >>> stats = validator.get_statistics()
509 >>> print(f"Pass rate: {stats.pass_rate:.1%}")
510 >>> print(f"Sync failures: {stats.sync_failures}")
511 """
512 return self.stats
514 def validate_sequence(self, packets: list[dict[str, Any]]) -> SequenceValidation:
515 """Validate sequence numbers across multiple packets.
519 Args:
520 packets: List of parsed packets with headers.
522 Returns:
523 SequenceValidation with gap and duplicate detection results.
525 Example:
526 >>> validator = PacketValidator(sequence_field="sequence")
527 >>> seq_validation = validator.validate_sequence(packets)
528 >>> if seq_validation.gap_count > 0:
529 ... print(f"Found {seq_validation.gap_count} sequence gaps")
530 """
531 result = SequenceValidation(total_packets=len(packets))
533 if not packets:
534 return result
536 last_seq: int | None = None
538 for i, packet in enumerate(packets):
539 header = packet.get("header", {})
540 seq = header.get(self.sequence_field)
542 if seq is None:
543 continue
545 if last_seq is not None:
546 expected = (last_seq + 1) & 0xFFFFFFFF
548 if seq == last_seq:
549 # Duplicate
550 result.duplicates += 1
551 result.valid = False
553 elif seq != expected:
554 # Gap detected
555 gap_size = (seq - expected) & 0xFFFFFFFF
556 gap = SequenceGap(
557 position=i,
558 expected=expected,
559 got=seq,
560 gap_size=gap_size,
561 )
562 result.sequence_gaps.append(gap)
563 result.valid = False
565 last_seq = seq
567 return result
569 def reset_statistics(self) -> None:
570 """Reset validation statistics.
572 Useful for validating multiple files or resetting state.
573 """
574 self.stats = ValidationStats()
575 self._last_sequence = None
578__all__ = [
579 "PacketValidator",
580 "SequenceGap",
581 "SequenceValidation",
582 "ValidationResult",
583 "ValidationStats",
584]