Coverage for src / tracekit / inference / binary.py: 98%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Binary format inference and parser generation. 

2 

3 - RE-BIN-001: Magic Byte Detection 

4 - RE-BIN-002: Structure Alignment Detection 

5 - RE-BIN-003: Binary Parser DSL 

6 

7This module provides tools for inferring binary file/message formats, 

8detecting magic bytes and file signatures, analyzing structure alignment, 

9and generating parser definitions. 

10""" 

11 

12from __future__ import annotations 

13 

14from collections import Counter 

15from collections.abc import Sequence 

16from dataclasses import dataclass 

17from typing import Any, Literal 

18 

19import numpy as np 

20 

21 

22@dataclass 

23class MagicByteResult: 

24 """Result of magic byte detection. 

25 

26 Implements RE-BIN-001: Magic byte detection result. 

27 

28 Attributes: 

29 magic: Detected magic bytes. 

30 offset: Offset of magic bytes. 

31 confidence: Detection confidence (0-1). 

32 frequency: Number of occurrences. 

33 known_format: Known format name if recognized. 

34 file_extension: Suggested file extension. 

35 """ 

36 

37 magic: bytes 

38 offset: int 

39 confidence: float 

40 frequency: int 

41 known_format: str | None = None 

42 file_extension: str | None = None 

43 

44 

45@dataclass 

46class AlignmentResult: 

47 """Result of structure alignment detection. 

48 

49 Implements RE-BIN-002: Alignment detection result. 

50 

51 Attributes: 

52 alignment: Detected alignment (1, 2, 4, 8, etc.). 

53 padding_positions: Positions of detected padding. 

54 field_boundaries: Detected field boundaries. 

55 confidence: Detection confidence (0-1). 

56 structure_size: Estimated structure size. 

57 """ 

58 

59 alignment: int 

60 padding_positions: list[int] 

61 field_boundaries: list[int] 

62 confidence: float 

63 structure_size: int | None = None 

64 

65 

66@dataclass 

67class ParserField: 

68 """A field in a binary parser definition. 

69 

70 Implements RE-BIN-003: Parser field definition. 

71 

72 Attributes: 

73 name: Field name. 

74 offset: Byte offset. 

75 size: Field size in bytes. 

76 field_type: Data type (uint8, uint16, etc.). 

77 endian: Endianness (big or little). 

78 array_count: Array element count (1 for scalar). 

79 condition: Conditional expression. 

80 description: Field description. 

81 """ 

82 

83 name: str 

84 offset: int 

85 size: int 

86 field_type: str 

87 endian: Literal["big", "little"] = "big" 

88 array_count: int = 1 

89 condition: str | None = None 

90 description: str = "" 

91 

92 

93@dataclass 

94class ParserDefinition: 

95 """A complete binary parser definition. 

96 

97 Implements RE-BIN-003: Parser definition. 

98 

99 Attributes: 

100 name: Parser/structure name. 

101 fields: List of field definitions. 

102 total_size: Total structure size. 

103 endian: Default endianness. 

104 magic: Magic bytes if any. 

105 version: Parser version. 

106 """ 

107 

108 name: str 

109 fields: list[ParserField] 

110 total_size: int 

111 endian: Literal["big", "little"] = "big" 

112 magic: bytes | None = None 

113 version: str = "1.0" 

114 

115 

116# Known magic bytes database 

117KNOWN_MAGIC_BYTES: dict[bytes, tuple[str, str]] = { 

118 # Images 

119 b"\x89PNG\r\n\x1a\n": ("PNG", ".png"), 

120 b"\xff\xd8\xff": ("JPEG", ".jpg"), 

121 b"GIF87a": ("GIF", ".gif"), 

122 b"GIF89a": ("GIF", ".gif"), 

123 b"BM": ("BMP", ".bmp"), 

124 b"RIFF": ("RIFF", ".riff"), 

125 b"II*\x00": ("TIFF (LE)", ".tiff"), 

126 b"MM\x00*": ("TIFF (BE)", ".tiff"), 

127 # Archives 

128 b"PK\x03\x04": ("ZIP", ".zip"), 

129 b"\x1f\x8b\x08": ("GZIP", ".gz"), 

130 b"BZh": ("BZIP2", ".bz2"), 

131 b"\xfd7zXZ\x00": ("XZ", ".xz"), 

132 b"Rar!\x1a\x07": ("RAR", ".rar"), 

133 b"7z\xbc\xaf\x27\x1c": ("7Z", ".7z"), 

134 # Documents 

135 b"%PDF": ("PDF", ".pdf"), 

136 b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1": ("OLE2", ".doc"), 

137 # Executables 

138 b"MZ": ("DOS/PE Executable", ".exe"), 

139 b"\x7fELF": ("ELF Executable", ".elf"), 

140 b"\xfe\xed\xfa\xce": ("Mach-O (32-bit)", ".macho"), 

141 b"\xfe\xed\xfa\xcf": ("Mach-O (64-bit)", ".macho"), 

142 b"\xca\xfe\xba\xbe": ("Java Class", ".class"), 

143 # Audio/Video 

144 b"ID3": ("MP3", ".mp3"), 

145 b"\xff\xfb": ("MP3", ".mp3"), 

146 b"OggS": ("OGG", ".ogg"), 

147 b"fLaC": ("FLAC", ".flac"), 

148 # Database 

149 b"SQLite format 3": ("SQLite", ".sqlite"), 

150 # Network 

151 b"\xd4\xc3\xb2\xa1": ("PCAP (LE)", ".pcap"), 

152 b"\xa1\xb2\xc3\xd4": ("PCAP (BE)", ".pcap"), 

153 b"\x0a\x0d\x0d\x0a": ("PCAPNG", ".pcapng"), 

154} 

155 

156 

157class MagicByteDetector: 

158 """Detect magic bytes and file signatures. 

159 

160 Implements RE-BIN-001: Magic Byte Detection. 

161 

162 Identifies file format signatures and common protocol headers. 

163 

164 Example: 

165 >>> detector = MagicByteDetector() 

166 >>> result = detector.detect(data) 

167 >>> print(f"Detected: {result.known_format}") 

168 """ 

169 

170 def __init__( 

171 self, 

172 known_signatures: dict[bytes, tuple[str, str]] | None = None, 

173 min_magic_length: int = 2, 

174 max_magic_length: int = 16, 

175 ) -> None: 

176 """Initialize magic byte detector. 

177 

178 Args: 

179 known_signatures: Dictionary of known magic bytes. 

180 min_magic_length: Minimum magic byte length. 

181 max_magic_length: Maximum magic byte length to consider. 

182 """ 

183 self.known_signatures = known_signatures or KNOWN_MAGIC_BYTES 

184 self.min_magic_length = min_magic_length 

185 self.max_magic_length = max_magic_length 

186 

187 def detect(self, data: bytes, offset: int = 0) -> MagicByteResult | None: 

188 """Detect magic bytes at offset. 

189 

190 Implements RE-BIN-001: Magic byte detection. 

191 

192 Args: 

193 data: Binary data. 

194 offset: Offset to check. 

195 

196 Returns: 

197 MagicByteResult if magic bytes found, None otherwise. 

198 

199 Example: 

200 >>> result = detector.detect(file_data) 

201 >>> if result: 

202 ... print(f"Format: {result.known_format}") 

203 """ 

204 if offset >= len(data): 

205 return None 

206 

207 # Check known signatures 

208 for magic, (format_name, ext) in self.known_signatures.items(): 

209 if len(data) >= offset + len(magic): 

210 if data[offset : offset + len(magic)] == magic: 

211 return MagicByteResult( 

212 magic=magic, 

213 offset=offset, 

214 confidence=1.0, 

215 frequency=1, 

216 known_format=format_name, 

217 file_extension=ext, 

218 ) 

219 

220 return None 

221 

222 def detect_all(self, data: bytes) -> list[MagicByteResult]: 

223 """Detect all magic bytes in data. 

224 

225 Implements RE-BIN-001: Scan for all magic bytes. 

226 

227 Args: 

228 data: Binary data. 

229 

230 Returns: 

231 List of all detected magic bytes. 

232 """ 

233 results = [] 

234 

235 for offset in range(len(data)): 

236 result = self.detect(data, offset) 

237 if result: 

238 results.append(result) 

239 

240 return results 

241 

242 def learn_magic_from_samples( 

243 self, 

244 samples: Sequence[bytes], 

245 min_frequency: int = 2, 

246 ) -> list[MagicByteResult]: 

247 """Learn potential magic bytes from samples. 

248 

249 Implements RE-BIN-001: Magic byte discovery. 

250 

251 Args: 

252 samples: List of binary samples. 

253 min_frequency: Minimum occurrences to consider. 

254 

255 Returns: 

256 List of potential magic byte patterns. 

257 """ 

258 if not samples: 

259 return [] 

260 

261 # Collect common prefixes 

262 prefix_counts: Counter[bytes] = Counter() 

263 

264 for length in range(self.min_magic_length, self.max_magic_length + 1): 

265 for sample in samples: 

266 if len(sample) >= length: 

267 prefix = sample[:length] 

268 prefix_counts[prefix] += 1 

269 

270 # Filter by frequency and sort by frequency (desc) then length (desc) 

271 # This ensures longer magic bytes are preferred when frequencies are equal 

272 results = [] 

273 for prefix, count in sorted( 

274 prefix_counts.items(), key=lambda x: (x[1], len(x[0])), reverse=True 

275 ): 

276 if count >= min_frequency: 

277 # Check if known 

278 known_format = None 

279 file_ext = None 

280 if prefix in self.known_signatures: 

281 known_format, file_ext = self.known_signatures[prefix] 

282 

283 confidence = count / len(samples) 

284 

285 results.append( 

286 MagicByteResult( 

287 magic=prefix, 

288 offset=0, 

289 confidence=confidence, 

290 frequency=count, 

291 known_format=known_format, 

292 file_extension=file_ext, 

293 ) 

294 ) 

295 

296 return results 

297 

298 def add_signature(self, magic: bytes, format_name: str, extension: str) -> None: 

299 """Add a custom signature. 

300 

301 Args: 

302 magic: Magic bytes. 

303 format_name: Format name. 

304 extension: File extension. 

305 """ 

306 self.known_signatures[magic] = (format_name, extension) 

307 

308 

309class AlignmentDetector: 

310 """Detect structure alignment in binary data. 

311 

312 Implements RE-BIN-002: Structure Alignment Detection. 

313 

314 Analyzes binary data to detect natural alignment boundaries 

315 and padding patterns typical of compiled structures. 

316 

317 Example: 

318 >>> detector = AlignmentDetector() 

319 >>> result = detector.detect(structure_data) 

320 >>> print(f"Alignment: {result.alignment} bytes") 

321 """ 

322 

323 def __init__( 

324 self, 

325 test_alignments: list[int] | None = None, 

326 padding_byte: int | None = None, 

327 ) -> None: 

328 """Initialize alignment detector. 

329 

330 Args: 

331 test_alignments: Alignments to test (default: [1, 2, 4, 8, 16]). 

332 padding_byte: Expected padding byte (auto-detect if None). 

333 """ 

334 self.test_alignments = test_alignments or [1, 2, 4, 8, 16] 

335 self.padding_byte = padding_byte 

336 

337 def detect(self, data: bytes) -> AlignmentResult: 

338 """Detect structure alignment. 

339 

340 Implements RE-BIN-002: Alignment detection workflow. 

341 

342 Args: 

343 data: Binary structure data. 

344 

345 Returns: 

346 AlignmentResult with detected alignment. 

347 

348 Example: 

349 >>> result = detector.detect(struct_data) 

350 >>> print(f"Fields at: {result.field_boundaries}") 

351 """ 

352 if not data: 

353 return AlignmentResult( 

354 alignment=1, 

355 padding_positions=[], 

356 field_boundaries=[], 

357 confidence=0.0, 

358 ) 

359 

360 # Detect padding byte 

361 padding_byte = self._detect_padding_byte(data) 

362 

363 # Find potential padding positions 

364 padding_positions = self._find_padding(data, padding_byte) 

365 

366 # Find field boundaries using entropy transitions 

367 field_boundaries = self._find_field_boundaries(data) 

368 

369 # Test each alignment 

370 best_alignment = 1 

371 best_score = 0.0 

372 

373 for alignment in self.test_alignments: 

374 score = self._score_alignment(data, alignment, padding_positions, field_boundaries) 

375 if score > best_score: 

376 best_score = score 

377 best_alignment = alignment 

378 

379 # Estimate structure size 

380 structure_size = self._estimate_structure_size(data, best_alignment) 

381 

382 return AlignmentResult( 

383 alignment=best_alignment, 

384 padding_positions=padding_positions, 

385 field_boundaries=field_boundaries, 

386 confidence=best_score, 

387 structure_size=structure_size, 

388 ) 

389 

390 def detect_field_types( 

391 self, 

392 data: bytes, 

393 alignment: AlignmentResult, 

394 ) -> list[tuple[int, int, str]]: 

395 """Detect field types based on alignment. 

396 

397 Implements RE-BIN-002: Field type inference. 

398 

399 Args: 

400 data: Binary data. 

401 alignment: Alignment detection result. 

402 

403 Returns: 

404 List of (offset, size, type) tuples. 

405 """ 

406 fields = [] 

407 boundaries = sorted(set([0] + alignment.field_boundaries + [len(data)])) 

408 

409 for i in range(len(boundaries) - 1): 

410 start = boundaries[i] 

411 end = boundaries[i + 1] 

412 size = end - start 

413 

414 # Infer type based on size 

415 field_type = self._infer_type(data[start:end], size) 

416 fields.append((start, size, field_type)) 

417 

418 return fields 

419 

420 def _detect_padding_byte(self, data: bytes) -> int: 

421 """Detect most likely padding byte. 

422 

423 Args: 

424 data: Binary data. 

425 

426 Returns: 

427 Most likely padding byte value. 

428 """ 

429 if self.padding_byte is not None: 

430 return self.padding_byte 

431 

432 # Common padding bytes: 0x00, 0xFF, 0xCC, 0xAA 

433 candidates = [0x00, 0xFF, 0xCC, 0xAA] 

434 counts = {c: data.count(c) for c in candidates} 

435 return max(counts.keys(), key=lambda x: counts[x]) 

436 

437 def _find_padding(self, data: bytes, padding_byte: int) -> list[int]: 

438 """Find positions of potential padding. 

439 

440 Args: 

441 data: Binary data. 

442 padding_byte: Padding byte value. 

443 

444 Returns: 

445 List of padding positions. 

446 """ 

447 positions: list[int] = [] 

448 in_padding = False 

449 padding_start = 0 

450 

451 for i, byte in enumerate(data): 

452 if byte == padding_byte: 

453 if not in_padding: 

454 padding_start = i 

455 in_padding = True 

456 else: 

457 if in_padding: 

458 # End of padding region 

459 if i - padding_start >= 1: # At least 1 byte of padding 459 ↛ 461line 459 didn't jump to line 461 because the condition on line 459 was always true

460 positions.extend(range(padding_start, i)) 

461 in_padding = False 

462 

463 return positions 

464 

465 def _find_field_boundaries(self, data: bytes) -> list[int]: 

466 """Find field boundaries using entropy analysis. 

467 

468 Args: 

469 data: Binary data. 

470 

471 Returns: 

472 List of boundary offsets. 

473 """ 

474 if len(data) < 8: 

475 return [] 

476 

477 boundaries = [] 

478 window = 4 

479 

480 for i in range(window, len(data) - window): 

481 before = data[i - window : i] 

482 after = data[i : i + window] 

483 

484 # Check for significant change in byte patterns 

485 before_unique = len(set(before)) 

486 after_unique = len(set(after)) 

487 

488 if abs(before_unique - after_unique) >= 2: 

489 boundaries.append(i) 

490 

491 return boundaries 

492 

493 def _score_alignment( 

494 self, 

495 data: bytes, 

496 alignment: int, 

497 padding_positions: list[int], 

498 field_boundaries: list[int], 

499 ) -> float: 

500 """Score how well an alignment fits the data. 

501 

502 Args: 

503 data: Binary data. 

504 alignment: Alignment value to test. 

505 padding_positions: Detected padding positions. 

506 field_boundaries: Detected field boundaries. 

507 

508 Returns: 

509 Score (0-1) for this alignment. 

510 """ 

511 if alignment > len(data): 

512 return 0.0 

513 

514 score = 0.0 

515 checks = 0 

516 

517 # Check if padding falls at alignment boundaries 

518 for pos in padding_positions: 

519 checks += 1 

520 if pos % alignment == alignment - 1: # Padding before aligned position 

521 score += 1 

522 

523 # Check if field boundaries fall at aligned positions 

524 for pos in field_boundaries: 

525 checks += 1 

526 if pos % alignment == 0: 

527 score += 1 

528 

529 # Check natural field sizes 

530 common_sizes = [1, 2, 4, 8] 

531 for size in common_sizes: 

532 if alignment >= size and alignment % size == 0: 

533 score += 0.5 

534 

535 if checks == 0: 

536 return 0.5 # No data to score 

537 

538 return score / (checks + 2) 

539 

540 def _estimate_structure_size(self, data: bytes, alignment: int) -> int | None: 

541 """Estimate structure size based on alignment. 

542 

543 Args: 

544 data: Binary data. 

545 alignment: Detected alignment. 

546 

547 Returns: 

548 Estimated structure size or None. 

549 """ 

550 # Structure size is typically aligned 

551 for size in range(alignment, len(data) + 1, alignment): 

552 if len(data) % size == 0: 

553 count = len(data) // size 

554 if count >= 2: 

555 return size 

556 

557 return None 

558 

559 def _infer_type(self, data: bytes, size: int) -> str: 

560 """Infer field type from data. 

561 

562 Args: 

563 data: Field data. 

564 size: Field size. 

565 

566 Returns: 

567 Inferred type string. 

568 """ 

569 if size == 1: 

570 return "uint8" 

571 elif size == 2: 

572 return "uint16" 

573 elif size == 4: 

574 # Could be uint32 or float 

575 return "uint32" 

576 elif size == 8: 

577 return "uint64" 

578 else: 

579 return f"bytes[{size}]" 

580 

581 

582class BinaryParserGenerator: 

583 """Generate binary parser definitions. 

584 

585 Implements RE-BIN-003: Binary Parser DSL. 

586 

587 Creates parser definitions from analyzed binary data that can 

588 be used for decoding similar structures. 

589 

590 Example: 

591 >>> generator = BinaryParserGenerator() 

592 >>> parser = generator.generate(samples, name="MyStruct") 

593 >>> print(parser.to_yaml()) 

594 """ 

595 

596 def __init__( 

597 self, 

598 default_endian: Literal["big", "little"] = "big", 

599 ) -> None: 

600 """Initialize parser generator. 

601 

602 Args: 

603 default_endian: Default endianness for fields. 

604 """ 

605 self.default_endian = default_endian 

606 

607 def generate( 

608 self, 

609 samples: Sequence[bytes], 

610 name: str = "Structure", 

611 ) -> ParserDefinition: 

612 """Generate parser definition from samples. 

613 

614 Implements RE-BIN-003: Parser generation workflow. 

615 

616 Args: 

617 samples: Binary data samples. 

618 name: Structure name. 

619 

620 Returns: 

621 ParserDefinition for the data format. 

622 

623 Example: 

624 >>> parser = generator.generate(packet_samples, name="Packet") 

625 """ 

626 if not samples: 

627 return ParserDefinition( 

628 name=name, 

629 fields=[], 

630 total_size=0, 

631 endian=self.default_endian, 

632 ) 

633 

634 # Use first sample as reference 

635 reference = samples[0] 

636 total_size = len(reference) 

637 

638 # Detect magic bytes - try known signatures first 

639 magic_detector = MagicByteDetector() 

640 magic_result = magic_detector.detect(reference) 

641 

642 # If no known signature found, learn from samples 

643 if magic_result is None and len(samples) > 1: 

644 learned_magic = magic_detector.learn_magic_from_samples(samples) 

645 if learned_magic: 

646 # Use the most confident/frequent magic bytes 

647 magic_result = learned_magic[0] 

648 

649 magic = magic_result.magic if magic_result else None 

650 

651 # Detect alignment 

652 alignment_detector = AlignmentDetector() 

653 alignment_result = alignment_detector.detect(reference) 

654 

655 # Detect field types 

656 field_infos = alignment_detector.detect_field_types(reference, alignment_result) 

657 

658 # Analyze field variance across samples 

659 variance_info = self._analyze_variance(samples, field_infos) 

660 

661 # Generate field definitions 

662 fields = [] 

663 for _i, (offset, size, inferred_type) in enumerate(field_infos): 

664 variance = variance_info.get(offset, 0.0) 

665 

666 # Name based on type and position 

667 if variance < 0.01: 

668 field_name = f"const_{offset}" 

669 elif inferred_type.startswith("uint"): 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true

670 field_name = f"field_{offset}" 

671 else: 

672 field_name = f"data_{offset}" 

673 

674 fields.append( 

675 ParserField( 

676 name=field_name, 

677 offset=offset, 

678 size=size, 

679 field_type=inferred_type, 

680 endian=self.default_endian, 

681 description=f"Variance: {variance:.2f}", 

682 ) 

683 ) 

684 

685 return ParserDefinition( 

686 name=name, 

687 fields=fields, 

688 total_size=total_size, 

689 endian=self.default_endian, 

690 magic=magic, 

691 ) 

692 

693 def generate_from_definition( 

694 self, 

695 definition: dict[str, Any], 

696 ) -> ParserDefinition: 

697 """Generate parser from dictionary definition. 

698 

699 Implements RE-BIN-003: Parser from specification. 

700 

701 Args: 

702 definition: Dictionary with parser specification. 

703 

704 Returns: 

705 ParserDefinition object. 

706 """ 

707 fields = [] 

708 for field_def in definition.get("fields", []): 

709 fields.append( 

710 ParserField( 

711 name=field_def["name"], 

712 offset=field_def.get("offset", 0), 

713 size=field_def.get("size", 1), 

714 field_type=field_def.get("type", "uint8"), 

715 endian=field_def.get("endian", self.default_endian), 

716 array_count=field_def.get("count", 1), 

717 condition=field_def.get("condition"), 

718 description=field_def.get("description", ""), 

719 ) 

720 ) 

721 

722 return ParserDefinition( 

723 name=definition.get("name", "Structure"), 

724 fields=fields, 

725 total_size=definition.get("size", sum(f.size for f in fields)), 

726 endian=definition.get("endian", self.default_endian), 

727 magic=definition.get("magic"), 

728 version=definition.get("version", "1.0"), 

729 ) 

730 

731 def to_yaml(self, parser: ParserDefinition) -> str: 

732 """Convert parser definition to YAML. 

733 

734 Implements RE-BIN-003: YAML export. 

735 

736 Args: 

737 parser: Parser definition. 

738 

739 Returns: 

740 YAML string representation. 

741 """ 

742 lines = [ 

743 f"name: {parser.name}", 

744 f"version: {parser.version}", 

745 f"endian: {parser.endian}", 

746 f"size: {parser.total_size}", 

747 ] 

748 

749 if parser.magic: 

750 lines.append(f"magic: {parser.magic.hex()}") 

751 

752 lines.append("fields:") 

753 for field in parser.fields: 

754 lines.append(f" - name: {field.name}") 

755 lines.append(f" offset: {field.offset}") 

756 lines.append(f" size: {field.size}") 

757 lines.append(f" type: {field.field_type}") 

758 if field.endian != parser.endian: 

759 lines.append(f" endian: {field.endian}") 

760 if field.array_count > 1: 

761 lines.append(f" count: {field.array_count}") 

762 if field.condition: 

763 lines.append(f" condition: {field.condition}") 

764 if field.description: 

765 lines.append(f" description: {field.description}") 

766 

767 return "\n".join(lines) 

768 

769 def to_python(self, parser: ParserDefinition) -> str: 

770 """Generate Python struct unpacking code. 

771 

772 Implements RE-BIN-003: Python code generation. 

773 

774 Args: 

775 parser: Parser definition. 

776 

777 Returns: 

778 Python code string. 

779 """ 

780 endian_char = ">" if parser.endian == "big" else "<" 

781 format_chars = { 

782 "uint8": "B", 

783 "int8": "b", 

784 "uint16": "H", 

785 "int16": "h", 

786 "uint32": "I", 

787 "int32": "i", 

788 "uint64": "Q", 

789 "int64": "q", 

790 "float32": "f", 

791 "float64": "d", 

792 } 

793 

794 lines = [ 

795 "import struct", 

796 "from dataclasses import dataclass", 

797 "", 

798 "@dataclass", 

799 f"class {parser.name}:", 

800 ] 

801 

802 # Add fields 

803 for field in parser.fields: 

804 if field.field_type.startswith("bytes"): 

805 py_type = "bytes" 

806 elif field.field_type in format_chars: 806 ↛ 812line 806 didn't jump to line 812 because the condition on line 806 was always true

807 if "int" in field.field_type: 

808 py_type = "int" 

809 else: 

810 py_type = "float" 

811 else: 

812 py_type = "int" 

813 lines.append(f" {field.name}: {py_type}") 

814 

815 # Add parse method 

816 lines.extend( 

817 [ 

818 "", 

819 " @classmethod", 

820 " def parse(cls, data: bytes) -> '{parser.name}':", 

821 ] 

822 ) 

823 

824 # Generate struct format 

825 format_parts = [] 

826 field_names = [] 

827 for field in parser.fields: 

828 if field.field_type.startswith("bytes"): 

829 size = field.size 

830 format_parts.append(f"{size}s") 

831 elif field.field_type in format_chars: 831 ↛ 834line 831 didn't jump to line 834 because the condition on line 831 was always true

832 format_parts.append(format_chars[field.field_type]) 

833 else: 

834 format_parts.append(f"{field.size}s") 

835 field_names.append(field.name) 

836 

837 format_str = endian_char + "".join(format_parts) 

838 lines.append(f' fmt = "{format_str}"') 

839 lines.append(f" values = struct.unpack(fmt, data[:{parser.total_size}])") 

840 lines.append( 

841 f" return cls({', '.join(f'values[{i}]' for i in range(len(field_names)))})" 

842 ) 

843 

844 return "\n".join(lines) 

845 

846 def _analyze_variance( 

847 self, 

848 samples: Sequence[bytes], 

849 field_infos: list[tuple[int, int, str]], 

850 ) -> dict[int, float]: 

851 """Analyze field variance across samples. 

852 

853 Args: 

854 samples: Binary samples. 

855 field_infos: List of (offset, size, type) tuples. 

856 

857 Returns: 

858 Dictionary mapping offsets to variance scores. 

859 """ 

860 variance_info = {} 

861 

862 for offset, size, _ in field_infos: 

863 values = [] 

864 for sample in samples: 

865 if offset + size <= len(sample): 865 ↛ 864line 865 didn't jump to line 864 because the condition on line 865 was always true

866 field_bytes = sample[offset : offset + size] 

867 # Convert to integer for comparison 

868 value = int.from_bytes(field_bytes, "big") 

869 values.append(value) 

870 

871 if values: 

872 arr = np.array(values) 

873 if np.max(arr) > 0: 

874 variance = np.std(arr) / np.max(arr) 

875 else: 

876 variance = 0.0 

877 variance_info[offset] = float(variance) 

878 

879 return variance_info 

880 

881 

882# ============================================================================= 

883# Convenience functions 

884# ============================================================================= 

885 

886 

887def detect_magic_bytes(data: bytes, offset: int = 0) -> MagicByteResult | None: 

888 """Detect magic bytes at offset. 

889 

890 Implements RE-BIN-001: Magic Byte Detection. 

891 

892 Args: 

893 data: Binary data. 

894 offset: Offset to check. 

895 

896 Returns: 

897 MagicByteResult if detected, None otherwise. 

898 

899 Example: 

900 >>> result = detect_magic_bytes(file_data) 

901 >>> if result: 

902 ... print(f"Format: {result.known_format}") 

903 """ 

904 detector = MagicByteDetector() 

905 return detector.detect(data, offset) 

906 

907 

908def detect_alignment(data: bytes) -> AlignmentResult: 

909 """Detect structure alignment in data. 

910 

911 Implements RE-BIN-002: Structure Alignment Detection. 

912 

913 Args: 

914 data: Binary structure data. 

915 

916 Returns: 

917 AlignmentResult with detected alignment. 

918 

919 Example: 

920 >>> result = detect_alignment(struct_data) 

921 >>> print(f"Alignment: {result.alignment} bytes") 

922 """ 

923 detector = AlignmentDetector() 

924 return detector.detect(data) 

925 

926 

927def generate_parser( 

928 samples: Sequence[bytes], 

929 name: str = "Structure", 

930 endian: Literal["big", "little"] = "big", 

931) -> ParserDefinition: 

932 """Generate parser definition from samples. 

933 

934 Implements RE-BIN-003: Binary Parser DSL. 

935 

936 Args: 

937 samples: Binary data samples. 

938 name: Structure name. 

939 endian: Default endianness. 

940 

941 Returns: 

942 ParserDefinition for the data format. 

943 

944 Example: 

945 >>> parser = generate_parser(packet_samples, name="Packet") 

946 >>> print(parser_to_yaml(parser)) 

947 """ 

948 generator = BinaryParserGenerator(default_endian=endian) 

949 return generator.generate(samples, name) 

950 

951 

952def parser_to_yaml(parser: ParserDefinition) -> str: 

953 """Convert parser definition to YAML. 

954 

955 Implements RE-BIN-003: YAML export. 

956 

957 Args: 

958 parser: Parser definition. 

959 

960 Returns: 

961 YAML string. 

962 """ 

963 generator = BinaryParserGenerator() 

964 return generator.to_yaml(parser) 

965 

966 

967def parser_to_python(parser: ParserDefinition) -> str: 

968 """Convert parser definition to Python code. 

969 

970 Implements RE-BIN-003: Python code generation. 

971 

972 Args: 

973 parser: Parser definition. 

974 

975 Returns: 

976 Python code string. 

977 """ 

978 generator = BinaryParserGenerator() 

979 return generator.to_python(parser) 

980 

981 

982def find_all_magic_bytes(data: bytes) -> list[MagicByteResult]: 

983 """Find all magic bytes in data. 

984 

985 Implements RE-BIN-001: Scan for all signatures. 

986 

987 Args: 

988 data: Binary data. 

989 

990 Returns: 

991 List of all detected magic bytes. 

992 """ 

993 detector = MagicByteDetector() 

994 return detector.detect_all(data) 

995 

996 

997__all__ = [ 

998 # Constants 

999 "KNOWN_MAGIC_BYTES", 

1000 "AlignmentDetector", 

1001 "AlignmentResult", 

1002 "BinaryParserGenerator", 

1003 # Classes 

1004 "MagicByteDetector", 

1005 # Data classes 

1006 "MagicByteResult", 

1007 "ParserDefinition", 

1008 "ParserField", 

1009 "detect_alignment", 

1010 # Functions 

1011 "detect_magic_bytes", 

1012 "find_all_magic_bytes", 

1013 "generate_parser", 

1014 "parser_to_python", 

1015 "parser_to_yaml", 

1016]