Coverage for src / tracekit / loaders / validation.py: 55%

228 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Packet validation and integrity checking. 

2 

3This module provides comprehensive packet validation including sync markers, 

4sequence numbers, checksums, and structural integrity verification. 

5 

6 

7Example: 

8 >>> from tracekit.loaders.validation import PacketValidator 

9 >>> validator = PacketValidator(sync_marker=0xFA, checksum_type="crc16") 

10 >>> result = validator.validate_packet(packet_data) 

11 >>> if result.is_valid: 

12 ... print("Packet valid") 

13 >>> stats = validator.get_statistics() 

14 >>> print(f"Pass rate: {stats.pass_rate:.1%}") 

15""" 

16 

17from __future__ import annotations 

18 

19import logging 

20from dataclasses import dataclass, field 

21from typing import Any 

22 

23# Logger for debug output 

24logger = logging.getLogger(__name__) 

25 

26 

27@dataclass 

28class SequenceGap: 

29 """Sequence gap information. 

30 

31 

32 

33 Attributes: 

34 position: Packet position where gap was detected. 

35 expected: Expected sequence number. 

36 got: Actual sequence number received. 

37 gap_size: Size of the gap (number of missing packets). 

38 """ 

39 

40 position: int 

41 expected: int 

42 got: int 

43 gap_size: int 

44 

45 

46@dataclass 

47class SequenceValidation: 

48 """Sequence validation results. 

49 

50 

51 

52 Attributes: 

53 total_packets: Total number of packets validated. 

54 sequence_gaps: List of detected sequence gaps. 

55 duplicates: Number of duplicate sequence numbers. 

56 valid: Whether sequence validation passed overall. 

57 """ 

58 

59 total_packets: int = 0 

60 sequence_gaps: list[SequenceGap] = field(default_factory=list) 

61 duplicates: int = 0 

62 valid: bool = True 

63 

64 @property 

65 def gap_count(self) -> int: 

66 """Number of sequence gaps detected. 

67 

68 Returns: 

69 Number of gaps. 

70 """ 

71 return len(self.sequence_gaps) 

72 

73 @property 

74 def total_missing_packets(self) -> int: 

75 """Total number of missing packets across all gaps. 

76 

77 Returns: 

78 Sum of all gap sizes. 

79 """ 

80 return sum(gap.gap_size for gap in self.sequence_gaps) 

81 

82 

83@dataclass 

84class ValidationResult: 

85 """Result of packet validation. 

86 

87 

88 

89 Attributes: 

90 is_valid: Whether packet passed all validation checks. 

91 sync_valid: Sync marker validation result. 

92 sequence_valid: Sequence number validation result. 

93 checksum_valid: Checksum validation result. 

94 errors: List of validation error messages. 

95 warnings: List of validation warnings. 

96 packet_index: Index of validated packet. 

97 """ 

98 

99 is_valid: bool = True 

100 sync_valid: bool = True 

101 sequence_valid: bool = True 

102 checksum_valid: bool = True 

103 errors: list[str] = field(default_factory=list) 

104 warnings: list[str] = field(default_factory=list) 

105 packet_index: int = 0 

106 

107 def add_error(self, message: str) -> None: 

108 """Add validation error. 

109 

110 Args: 

111 message: Error message. 

112 """ 

113 self.errors.append(message) 

114 self.is_valid = False 

115 

116 def add_warning(self, message: str) -> None: 

117 """Add validation warning. 

118 

119 Args: 

120 message: Warning message. 

121 """ 

122 self.warnings.append(message) 

123 

124 

125@dataclass 

126class ValidationStats: 

127 """Aggregate validation statistics. 

128 

129 

130 

131 Attributes: 

132 total_packets: Total number of packets validated. 

133 valid_packets: Number of packets that passed all checks. 

134 sync_failures: Number of sync marker failures. 

135 sequence_gaps: Number of sequence gaps detected. 

136 sequence_duplicates: Number of duplicate sequences detected. 

137 checksum_failures: Number of checksum failures. 

138 error_types: Dictionary of error type counts. 

139 """ 

140 

141 total_packets: int = 0 

142 valid_packets: int = 0 

143 sync_failures: int = 0 

144 sequence_gaps: int = 0 

145 sequence_duplicates: int = 0 

146 checksum_failures: int = 0 

147 error_types: dict[str, int] = field(default_factory=dict) 

148 

149 @property 

150 def pass_rate(self) -> float: 

151 """Calculate validation pass rate. 

152 

153 Returns: 

154 Fraction of packets that passed validation (0.0 to 1.0). 

155 """ 

156 if self.total_packets == 0: 

157 return 0.0 

158 return self.valid_packets / self.total_packets 

159 

160 @property 

161 def fail_rate(self) -> float: 

162 """Calculate validation fail rate. 

163 

164 Returns: 

165 Fraction of packets that failed validation (0.0 to 1.0). 

166 """ 

167 return 1.0 - self.pass_rate 

168 

169 def add_error_type(self, error_type: str) -> None: 

170 """Increment error type counter. 

171 

172 Args: 

173 error_type: Type of error (e.g., "sync_mismatch", "checksum_fail"). 

174 """ 

175 self.error_types[error_type] = self.error_types.get(error_type, 0) + 1 

176 

177 

178class PacketValidator: 

179 """Validate packet integrity and structure. 

180 

181 

182 

183 Performs comprehensive validation including: 

184 - Sync/magic byte verification 

185 - Sequence number gap/duplicate detection 

186 - Checksum verification (CRC-8/16/32, sum, XOR) 

187 - Field value range validation 

188 

189 Attributes: 

190 sync_marker: Expected sync marker value (optional). 

191 sync_field: Name of sync field in packet header (optional). 

192 sequence_field: Name of sequence field in packet header (optional). 

193 checksum_type: Checksum algorithm ("crc8", "crc16", "crc32", "sum", "xor", optional). 

194 checksum_field: Name of checksum field in packet header (optional). 

195 strictness: Validation strictness level ("strict", "normal", "lenient"). 

196 stats: Validation statistics. 

197 """ 

198 

199 def __init__( 

200 self, 

201 *, 

202 sync_marker: int | bytes | None = None, 

203 sync_field: str = "sync_marker", 

204 sequence_field: str = "sequence", 

205 checksum_type: str | None = None, 

206 checksum_field: str = "checksum", 

207 strictness: str = "normal", 

208 ) -> None: 

209 """Initialize packet validator. 

210 

211 Args: 

212 sync_marker: Expected sync marker value (optional). 

213 sync_field: Name of sync field in packet header (default: "sync_marker"). 

214 sequence_field: Name of sequence field in packet header (default: "sequence"). 

215 checksum_type: Checksum algorithm (optional). 

216 checksum_field: Name of checksum field in packet header (default: "checksum"). 

217 strictness: Validation strictness ("strict", "normal", "lenient"). 

218 """ 

219 self.sync_marker = sync_marker 

220 self.sync_field = sync_field 

221 self.sequence_field = sequence_field 

222 self.checksum_type = checksum_type 

223 self.checksum_field = checksum_field 

224 self.strictness = strictness 

225 

226 self.stats = ValidationStats() 

227 self._last_sequence: int | None = None 

228 

229 def validate_packet( 

230 self, packet: dict[str, Any], packet_data: bytes | None = None 

231 ) -> ValidationResult: 

232 """Validate a single packet. 

233 

234 

235 

236 Args: 

237 packet: Parsed packet dictionary. 

238 packet_data: Raw packet bytes (required for checksum validation). 

239 

240 Returns: 

241 ValidationResult with validation outcome. 

242 

243 Example: 

244 >>> validator = PacketValidator(sync_marker=0xFA) 

245 >>> result = validator.validate_packet(packet) 

246 >>> if not result.is_valid: 

247 ... print(f"Errors: {result.errors}") 

248 """ 

249 result = ValidationResult(packet_index=packet.get("index", 0)) 

250 

251 header = packet.get("header", {}) 

252 

253 # Validate sync marker 

254 if self.sync_marker is not None: 

255 result.sync_valid = self._validate_sync(header, result) 

256 

257 # Validate sequence number 

258 if self.sequence_field in header: 

259 result.sequence_valid = self._validate_sequence(header, result) 

260 

261 # Validate checksum 

262 if self.checksum_type is not None and packet_data is not None: 

263 result.checksum_valid = self._validate_checksum(header, packet_data, result) 

264 

265 # Update statistics 

266 self.stats.total_packets += 1 

267 if result.is_valid: 

268 self.stats.valid_packets += 1 

269 

270 return result 

271 

272 def _validate_sync(self, header: dict[str, Any], result: ValidationResult) -> bool: 

273 """Validate sync marker. 

274 

275 Args: 

276 header: Packet header dictionary. 

277 result: Validation result to update. 

278 

279 Returns: 

280 True if sync is valid. 

281 """ 

282 if self.sync_field not in header: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 if self.strictness == "strict": 

284 result.add_error(f"Missing sync field: {self.sync_field}") 

285 self.stats.sync_failures += 1 

286 self.stats.add_error_type("sync_missing") 

287 return False 

288 else: 

289 result.add_warning(f"Missing sync field: {self.sync_field}") 

290 return True 

291 

292 sync_value = header[self.sync_field] 

293 

294 if sync_value != self.sync_marker: 

295 # Convert bytes to int if needed for formatting 

296 if isinstance(sync_value, int): 296 ↛ 298line 296 didn't jump to line 298 because the condition on line 296 was always true

297 sync_val_hex = sync_value 

298 elif isinstance(sync_value, bytes): 

299 sync_val_hex = int.from_bytes(sync_value, "big") 

300 else: 

301 sync_val_hex = int.from_bytes(bytes([sync_value]), "big") 

302 

303 # Convert sync_marker to int if needed for formatting 

304 if isinstance(self.sync_marker, int): 304 ↛ 306line 304 didn't jump to line 306 because the condition on line 304 was always true

305 expected_hex = self.sync_marker 

306 elif isinstance(self.sync_marker, bytes): 

307 expected_hex = int.from_bytes(self.sync_marker, "big") 

308 else: 

309 expected_hex = 0 

310 

311 msg = f"Sync marker mismatch: expected {expected_hex:#x}, got {sync_val_hex:#x}" 

312 if self.strictness == "strict": 

313 result.add_error(msg) 

314 else: 

315 result.add_warning(msg) 

316 

317 self.stats.sync_failures += 1 

318 self.stats.add_error_type("sync_mismatch") 

319 return False 

320 

321 return True 

322 

323 def _validate_sequence(self, header: dict[str, Any], result: ValidationResult) -> bool: 

324 """Validate sequence number. 

325 

326 Args: 

327 header: Packet header dictionary. 

328 result: Validation result to update. 

329 

330 Returns: 

331 True if sequence is valid. 

332 """ 

333 sequence = header.get(self.sequence_field) 

334 if sequence is None: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 return True # No sequence to validate 

336 

337 if self._last_sequence is not None: 

338 expected = (self._last_sequence + 1) & 0xFFFFFFFF # Handle rollover 

339 

340 if sequence == self._last_sequence: 

341 # Duplicate sequence 

342 msg = f"Duplicate sequence number: {sequence}" 

343 result.add_warning(msg) 

344 self.stats.sequence_duplicates += 1 

345 self.stats.add_error_type("sequence_duplicate") 

346 return False 

347 

348 elif sequence != expected: 

349 # Sequence gap 

350 gap = (sequence - expected) & 0xFFFFFFFF 

351 msg = f"Sequence gap detected: expected {expected}, got {sequence} (gap: {gap})" 

352 

353 if self.strictness == "strict": 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 result.add_error(msg) 

355 else: 

356 result.add_warning(msg) 

357 

358 self.stats.sequence_gaps += 1 

359 self.stats.add_error_type("sequence_gap") 

360 

361 if self.strictness == "strict": 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 self._last_sequence = sequence 

363 return False 

364 

365 self._last_sequence = sequence 

366 return True 

367 

368 def _validate_checksum( 

369 self, header: dict[str, Any], packet_data: bytes, result: ValidationResult 

370 ) -> bool: 

371 """Validate packet checksum. 

372 

373 Args: 

374 header: Packet header dictionary. 

375 packet_data: Raw packet bytes. 

376 result: Validation result to update. 

377 

378 Returns: 

379 True if checksum is valid. 

380 """ 

381 if self.checksum_field not in header: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 if self.strictness == "strict": 

383 result.add_error(f"Missing checksum field: {self.checksum_field}") 

384 self.stats.checksum_failures += 1 

385 self.stats.add_error_type("checksum_missing") 

386 return False 

387 return True 

388 

389 expected_checksum = header[self.checksum_field] 

390 computed_checksum = self._compute_checksum(packet_data) 

391 

392 if computed_checksum != expected_checksum: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true

393 msg = f"Checksum mismatch: expected {expected_checksum:#x}, got {computed_checksum:#x}" 

394 

395 if self.strictness == "strict": 

396 result.add_error(msg) 

397 else: 

398 result.add_warning(msg) 

399 

400 self.stats.checksum_failures += 1 

401 self.stats.add_error_type("checksum_fail") 

402 return False 

403 

404 return True 

405 

406 def _compute_checksum(self, data: bytes) -> int: 

407 """Compute checksum using configured algorithm. 

408 

409 Args: 

410 data: Data to checksum. 

411 

412 Returns: 

413 Computed checksum value. 

414 """ 

415 if self.checksum_type == "crc8": 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true

416 return self._crc8(data) 

417 elif self.checksum_type == "crc16": 417 ↛ 419line 417 didn't jump to line 419 because the condition on line 417 was always true

418 return self._crc16(data) 

419 elif self.checksum_type == "crc32": 

420 return self._crc32(data) 

421 elif self.checksum_type == "sum": 

422 return sum(data) & 0xFF 

423 elif self.checksum_type == "xor": 

424 result = 0 

425 for byte in data: 

426 result ^= byte 

427 return result 

428 else: 

429 logger.warning("Unknown checksum type: %s", self.checksum_type) 

430 return 0 

431 

432 @staticmethod 

433 def _crc8(data: bytes, poly: int = 0x07) -> int: 

434 """Compute CRC-8 checksum. 

435 

436 Args: 

437 data: Data to checksum. 

438 poly: CRC polynomial (default: 0x07). 

439 

440 Returns: 

441 CRC-8 value. 

442 """ 

443 crc = 0 

444 for byte in data: 

445 crc ^= byte 

446 for _ in range(8): 

447 if crc & 0x80: 

448 crc = (crc << 1) ^ poly 

449 else: 

450 crc <<= 1 

451 crc &= 0xFF 

452 return crc 

453 

454 @staticmethod 

455 def _crc16(data: bytes, poly: int = 0x1021) -> int: 

456 """Compute CRC-16 checksum. 

457 

458 Args: 

459 data: Data to checksum. 

460 poly: CRC polynomial (default: 0x1021 for CRC-16-CCITT). 

461 

462 Returns: 

463 CRC-16 value. 

464 """ 

465 crc = 0xFFFF 

466 for byte in data: 

467 crc ^= byte << 8 

468 for _ in range(8): 

469 if crc & 0x8000: 

470 crc = (crc << 1) ^ poly 

471 else: 

472 crc <<= 1 

473 crc &= 0xFFFF 

474 return crc 

475 

476 @staticmethod 

477 def _crc32(data: bytes, poly: int = 0xEDB88320) -> int: 

478 """Compute CRC-32 checksum. 

479 

480 Args: 

481 data: Data to checksum. 

482 poly: CRC polynomial (default: 0xEDB88320 for CRC-32). 

483 

484 Returns: 

485 CRC-32 value. 

486 """ 

487 crc = 0xFFFFFFFF 

488 for byte in data: 

489 crc ^= byte 

490 for _ in range(8): 

491 if crc & 1: 

492 crc = (crc >> 1) ^ poly 

493 else: 

494 crc >>= 1 

495 return crc ^ 0xFFFFFFFF 

496 

497 def get_statistics(self) -> ValidationStats: 

498 """Get aggregate validation statistics. 

499 

500 

501 

502 Returns: 

503 ValidationStats with cumulative validation results. 

504 

505 Example: 

506 >>> validator = PacketValidator() 

507 >>> # ... validate packets ... 

508 >>> stats = validator.get_statistics() 

509 >>> print(f"Pass rate: {stats.pass_rate:.1%}") 

510 >>> print(f"Sync failures: {stats.sync_failures}") 

511 """ 

512 return self.stats 

513 

514 def validate_sequence(self, packets: list[dict[str, Any]]) -> SequenceValidation: 

515 """Validate sequence numbers across multiple packets. 

516 

517 

518 

519 Args: 

520 packets: List of parsed packets with headers. 

521 

522 Returns: 

523 SequenceValidation with gap and duplicate detection results. 

524 

525 Example: 

526 >>> validator = PacketValidator(sequence_field="sequence") 

527 >>> seq_validation = validator.validate_sequence(packets) 

528 >>> if seq_validation.gap_count > 0: 

529 ... print(f"Found {seq_validation.gap_count} sequence gaps") 

530 """ 

531 result = SequenceValidation(total_packets=len(packets)) 

532 

533 if not packets: 

534 return result 

535 

536 last_seq: int | None = None 

537 

538 for i, packet in enumerate(packets): 

539 header = packet.get("header", {}) 

540 seq = header.get(self.sequence_field) 

541 

542 if seq is None: 

543 continue 

544 

545 if last_seq is not None: 

546 expected = (last_seq + 1) & 0xFFFFFFFF 

547 

548 if seq == last_seq: 

549 # Duplicate 

550 result.duplicates += 1 

551 result.valid = False 

552 

553 elif seq != expected: 

554 # Gap detected 

555 gap_size = (seq - expected) & 0xFFFFFFFF 

556 gap = SequenceGap( 

557 position=i, 

558 expected=expected, 

559 got=seq, 

560 gap_size=gap_size, 

561 ) 

562 result.sequence_gaps.append(gap) 

563 result.valid = False 

564 

565 last_seq = seq 

566 

567 return result 

568 

569 def reset_statistics(self) -> None: 

570 """Reset validation statistics. 

571 

572 Useful for validating multiple files or resetting state. 

573 """ 

574 self.stats = ValidationStats() 

575 self._last_sequence = None 

576 

577 

578__all__ = [ 

579 "PacketValidator", 

580 "SequenceGap", 

581 "SequenceValidation", 

582 "ValidationResult", 

583 "ValidationStats", 

584]