Coverage for src / tracekit / inference / protocol_library.py: 100%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Protocol Format Library for common protocol definitions. 

2 

3 - RE-DSL-003: Protocol Format Library 

4 

5This module provides a library of common protocol definitions for industrial, 

6IoT, and communication protocols to enable immediate decoding without 

7user-defined definitions. 

8""" 

9 

10from __future__ import annotations 

11 

12from dataclasses import dataclass 

13from typing import Literal, overload 

14 

15from tracekit.inference.protocol_dsl import ( 

16 FieldDefinition, 

17 ProtocolDecoder, 

18 ProtocolDefinition, 

19) 

20 

21 

22@dataclass 

23class ProtocolInfo: 

24 """Information about a protocol in the library. 

25 

26 Attributes: 

27 name: Protocol name. 

28 category: Protocol category. 

29 version: Protocol version. 

30 description: Human-readable description. 

31 reference: Reference documentation URL. 

32 definition: Protocol definition for decoding. 

33 """ 

34 

35 name: str 

36 category: Literal["industrial", "iot", "network", "automotive", "building", "serial", "custom"] 

37 version: str 

38 description: str 

39 reference: str = "" 

40 definition: ProtocolDefinition | None = None 

41 

42 

43class ProtocolLibrary: 

44 """Library of common protocol definitions. 

45 

46 Implements RE-DSL-003: Protocol Format Library. 

47 

48 Provides pre-defined protocol definitions for common industrial, 

49 IoT, and communication protocols. 

50 

51 Example: 

52 >>> library = ProtocolLibrary() 

53 >>> modbus = library.get("modbus_rtu") 

54 >>> decoder = library.get_decoder("modbus_rtu") 

55 >>> message = decoder.decode(data) 

56 """ 

57 

58 def __init__(self) -> None: 

59 """Initialize protocol library with built-in protocols.""" 

60 self._protocols: dict[str, ProtocolInfo] = {} 

61 self._decoders: dict[str, ProtocolDecoder] = {} 

62 self._load_builtin_protocols() 

63 

64 def _load_builtin_protocols(self) -> None: 

65 """Load all built-in protocol definitions.""" 

66 # Industrial protocols 

67 self._add_modbus_rtu() 

68 self._add_modbus_tcp() 

69 self._add_dnp3() 

70 self._add_bacnet() 

71 

72 # IoT protocols 

73 self._add_mqtt() 

74 self._add_coap() 

75 self._add_cbor() 

76 self._add_messagepack() 

77 

78 # Automotive protocols 

79 self._add_obd2() 

80 self._add_j1939() 

81 self._add_can() 

82 

83 # Network protocols 

84 self._add_http() 

85 self._add_dns() 

86 self._add_ntp() 

87 self._add_syslog() 

88 

89 # Serial protocols 

90 self._add_nmea() 

91 self._add_xmodem() 

92 

93 # Building automation 

94 self._add_knx() 

95 self._add_lonworks() 

96 

97 # Custom/generic 

98 self._add_tlv() 

99 self._add_length_prefixed() 

100 

101 def list_protocols(self, category: str | None = None) -> list[ProtocolInfo]: 

102 """List available protocols. 

103 

104 Args: 

105 category: Filter by category (optional). 

106 

107 Returns: 

108 List of ProtocolInfo for available protocols. 

109 

110 Example: 

111 >>> protocols = library.list_protocols(category="industrial") 

112 >>> for p in protocols: 

113 ... print(f"{p.name}: {p.description}") 

114 """ 

115 protocols = list(self._protocols.values()) 

116 if category: 

117 protocols = [p for p in protocols if p.category == category] 

118 return protocols 

119 

120 def list_protocol_names(self, category: str | None = None) -> list[str]: 

121 """List available protocol names. 

122 

123 Args: 

124 category: Filter by category (optional). 

125 

126 Returns: 

127 List of protocol name strings. 

128 """ 

129 protocols = self.list_protocols(category) 

130 return [p.name for p in protocols] 

131 

132 def get(self, name: str) -> ProtocolInfo | None: 

133 """Get protocol information by name. 

134 

135 Args: 

136 name: Protocol name (case-insensitive). 

137 

138 Returns: 

139 ProtocolInfo or None if not found. 

140 """ 

141 return self._protocols.get(name.lower()) 

142 

143 def get_decoder(self, name: str) -> ProtocolDecoder | None: 

144 """Get a decoder for a protocol. 

145 

146 Args: 

147 name: Protocol name. 

148 

149 Returns: 

150 ProtocolDecoder instance or None if not found. 

151 

152 Example: 

153 >>> decoder = library.get_decoder("modbus_rtu") 

154 >>> if decoder: 

155 ... message = decoder.decode(data) 

156 """ 

157 name = name.lower() 

158 if name in self._decoders: 

159 return self._decoders[name] 

160 

161 info = self._protocols.get(name) 

162 if info and info.definition: 

163 decoder = ProtocolDecoder(info.definition) 

164 self._decoders[name] = decoder 

165 return decoder 

166 

167 return None 

168 

169 def get_definition(self, name: str) -> ProtocolDefinition | None: 

170 """Get protocol definition by name. 

171 

172 Args: 

173 name: Protocol name. 

174 

175 Returns: 

176 ProtocolDefinition or None if not found. 

177 """ 

178 info = self._protocols.get(name.lower()) 

179 return info.definition if info else None 

180 

181 def add_protocol( 

182 self, 

183 info: ProtocolInfo, 

184 ) -> None: 

185 """Add a custom protocol to the library. 

186 

187 Args: 

188 info: Protocol information and definition. 

189 

190 Example: 

191 >>> custom = ProtocolInfo( 

192 ... name="my_protocol", 

193 ... category="custom", 

194 ... version="1.0", 

195 ... description="My custom protocol", 

196 ... definition=my_definition 

197 ... ) 

198 >>> library.add_protocol(custom) 

199 """ 

200 self._protocols[info.name.lower()] = info 

201 

202 def categories(self) -> list[str]: 

203 """Get list of protocol categories. 

204 

205 Returns: 

206 List of category names. 

207 """ 

208 return sorted({p.category for p in self._protocols.values()}) 

209 

210 # ========================================================================= 

211 # Built-in Protocol Definitions 

212 # ========================================================================= 

213 

214 def _add_modbus_rtu(self) -> None: 

215 """Add Modbus RTU protocol definition.""" 

216 definition = ProtocolDefinition( 

217 name="modbus_rtu", 

218 description="Modbus RTU serial protocol", 

219 endian="big", 

220 fields=[ 

221 FieldDefinition( 

222 name="address", 

223 field_type="uint8", 

224 description="Slave address (1-247)", 

225 ), 

226 FieldDefinition( 

227 name="function_code", 

228 field_type="uint8", 

229 description="Function code", 

230 enum={ 

231 1: "read_coils", 

232 2: "read_discrete_inputs", 

233 3: "read_holding_registers", 

234 4: "read_input_registers", 

235 5: "write_single_coil", 

236 6: "write_single_register", 

237 15: "write_multiple_coils", 

238 16: "write_multiple_registers", 

239 }, 

240 ), 

241 FieldDefinition( 

242 name="data", 

243 field_type="bytes", 

244 size_ref="remaining", 

245 description="Function-specific data", 

246 ), 

247 FieldDefinition( 

248 name="crc", 

249 field_type="uint16", 

250 endian="little", 

251 description="CRC-16 checksum", 

252 ), 

253 ], 

254 ) 

255 

256 self._protocols["modbus_rtu"] = ProtocolInfo( 

257 name="modbus_rtu", 

258 category="industrial", 

259 version="1.0", 

260 description="Modbus RTU serial protocol for industrial automation", 

261 reference="https://modbus.org/specs.php", 

262 definition=definition, 

263 ) 

264 

265 def _add_modbus_tcp(self) -> None: 

266 """Add Modbus TCP protocol definition.""" 

267 definition = ProtocolDefinition( 

268 name="modbus_tcp", 

269 description="Modbus TCP/IP protocol", 

270 endian="big", 

271 fields=[ 

272 FieldDefinition( 

273 name="transaction_id", 

274 field_type="uint16", 

275 description="Transaction identifier", 

276 ), 

277 FieldDefinition( 

278 name="protocol_id", 

279 field_type="uint16", 

280 description="Protocol identifier (0 for Modbus)", 

281 ), 

282 FieldDefinition( 

283 name="length", 

284 field_type="uint16", 

285 description="Length of remaining message", 

286 ), 

287 FieldDefinition( 

288 name="unit_id", 

289 field_type="uint8", 

290 description="Unit identifier", 

291 ), 

292 FieldDefinition( 

293 name="function_code", 

294 field_type="uint8", 

295 description="Function code", 

296 enum={ 

297 1: "read_coils", 

298 2: "read_discrete_inputs", 

299 3: "read_holding_registers", 

300 4: "read_input_registers", 

301 5: "write_single_coil", 

302 6: "write_single_register", 

303 15: "write_multiple_coils", 

304 16: "write_multiple_registers", 

305 }, 

306 ), 

307 FieldDefinition( 

308 name="data", 

309 field_type="bytes", 

310 size_ref="remaining", 

311 description="Function-specific data", 

312 ), 

313 ], 

314 ) 

315 

316 self._protocols["modbus_tcp"] = ProtocolInfo( 

317 name="modbus_tcp", 

318 category="industrial", 

319 version="1.0", 

320 description="Modbus TCP/IP protocol for industrial automation", 

321 reference="https://modbus.org/specs.php", 

322 definition=definition, 

323 ) 

324 

325 def _add_dnp3(self) -> None: 

326 """Add DNP3 protocol definition.""" 

327 definition = ProtocolDefinition( 

328 name="dnp3", 

329 description="Distributed Network Protocol 3", 

330 endian="little", 

331 fields=[ 

332 FieldDefinition( 

333 name="start", 

334 field_type="uint16", 

335 value=0x0564, 

336 description="Start bytes (0x0564)", 

337 ), 

338 FieldDefinition( 

339 name="length", 

340 field_type="uint8", 

341 description="Data link layer length", 

342 ), 

343 FieldDefinition( 

344 name="control", 

345 field_type="uint8", 

346 description="Control byte", 

347 ), 

348 FieldDefinition( 

349 name="destination", 

350 field_type="uint16", 

351 description="Destination address", 

352 ), 

353 FieldDefinition( 

354 name="source", 

355 field_type="uint16", 

356 description="Source address", 

357 ), 

358 FieldDefinition( 

359 name="crc", 

360 field_type="uint16", 

361 description="CRC-16 checksum", 

362 ), 

363 FieldDefinition( 

364 name="transport_header", 

365 field_type="uint8", 

366 description="Transport layer header", 

367 ), 

368 FieldDefinition( 

369 name="application_data", 

370 field_type="bytes", 

371 size_ref="remaining", 

372 description="Application layer data", 

373 ), 

374 ], 

375 ) 

376 

377 self._protocols["dnp3"] = ProtocolInfo( 

378 name="dnp3", 

379 category="industrial", 

380 version="3.0", 

381 description="Distributed Network Protocol for SCADA systems", 

382 reference="https://www.dnp.org/", 

383 definition=definition, 

384 ) 

385 

386 def _add_bacnet(self) -> None: 

387 """Add BACnet protocol definition.""" 

388 definition = ProtocolDefinition( 

389 name="bacnet", 

390 description="Building Automation and Control Networks", 

391 endian="big", 

392 fields=[ 

393 FieldDefinition( 

394 name="type", 

395 field_type="uint8", 

396 description="BVLC type (0x81 for BACnet/IP)", 

397 ), 

398 FieldDefinition( 

399 name="function", 

400 field_type="uint8", 

401 description="BVLC function", 

402 enum={ 

403 0x00: "bvlc_result", 

404 0x01: "write_broadcast_table", 

405 0x04: "forwarded_npdu", 

406 0x0A: "original_unicast_npdu", 

407 0x0B: "original_broadcast_npdu", 

408 }, 

409 ), 

410 FieldDefinition( 

411 name="length", 

412 field_type="uint16", 

413 description="Total BVLC length", 

414 ), 

415 FieldDefinition( 

416 name="npdu", 

417 field_type="bytes", 

418 size_ref="remaining", 

419 description="Network Protocol Data Unit", 

420 ), 

421 ], 

422 ) 

423 

424 self._protocols["bacnet"] = ProtocolInfo( 

425 name="bacnet", 

426 category="building", 

427 version="2020", 

428 description="Building Automation and Control Networks protocol", 

429 reference="https://www.bacnetinternational.org/", 

430 definition=definition, 

431 ) 

432 

433 def _add_http(self) -> None: 

434 """Add HTTP protocol definition (simplified for binary inspection).""" 

435 # HTTP is text-based, so we provide a simplified binary representation 

436 definition = ProtocolDefinition( 

437 name="http", 

438 description="Hypertext Transfer Protocol (simplified)", 

439 endian="big", 

440 fields=[ 

441 FieldDefinition( 

442 name="request_line", 

443 field_type="string", 

444 description="HTTP request/status line", 

445 ), 

446 FieldDefinition( 

447 name="headers", 

448 field_type="string", 

449 size_ref="remaining", 

450 description="HTTP headers and body", 

451 ), 

452 ], 

453 ) 

454 

455 self._protocols["http"] = ProtocolInfo( 

456 name="http", 

457 category="network", 

458 version="1.1", 

459 description="Hypertext Transfer Protocol", 

460 reference="https://tools.ietf.org/html/rfc2616", 

461 definition=definition, 

462 ) 

463 

464 def _add_mqtt(self) -> None: 

465 """Add MQTT protocol definition.""" 

466 definition = ProtocolDefinition( 

467 name="mqtt", 

468 description="Message Queuing Telemetry Transport", 

469 endian="big", 

470 fields=[ 

471 FieldDefinition( 

472 name="fixed_header", 

473 field_type="uint8", 

474 description="Fixed header (type + flags)", 

475 ), 

476 FieldDefinition( 

477 name="remaining_length", 

478 field_type="uint8", 

479 description="Remaining length (variable encoding)", 

480 ), 

481 FieldDefinition( 

482 name="payload", 

483 field_type="bytes", 

484 size_ref="remaining", 

485 description="Variable header + payload", 

486 ), 

487 ], 

488 ) 

489 

490 self._protocols["mqtt"] = ProtocolInfo( 

491 name="mqtt", 

492 category="iot", 

493 version="5.0", 

494 description="MQTT IoT messaging protocol", 

495 reference="https://mqtt.org/", 

496 definition=definition, 

497 ) 

498 

499 def _add_coap(self) -> None: 

500 """Add CoAP protocol definition.""" 

501 definition = ProtocolDefinition( 

502 name="coap", 

503 description="Constrained Application Protocol", 

504 endian="big", 

505 fields=[ 

506 FieldDefinition( 

507 name="version_type_tkl", 

508 field_type="uint8", 

509 description="Version (2b) + Type (2b) + Token Length (4b)", 

510 ), 

511 FieldDefinition( 

512 name="code", 

513 field_type="uint8", 

514 description="Method/Response code", 

515 enum={ 

516 0x01: "GET", 

517 0x02: "POST", 

518 0x03: "PUT", 

519 0x04: "DELETE", 

520 0x44: "2.04 Changed", 

521 0x45: "2.05 Content", 

522 0x81: "4.01 Unauthorized", 

523 0x84: "4.04 Not Found", 

524 }, 

525 ), 

526 FieldDefinition( 

527 name="message_id", 

528 field_type="uint16", 

529 description="Message ID", 

530 ), 

531 FieldDefinition( 

532 name="token_options_payload", 

533 field_type="bytes", 

534 size_ref="remaining", 

535 description="Token + Options + Payload", 

536 ), 

537 ], 

538 ) 

539 

540 self._protocols["coap"] = ProtocolInfo( 

541 name="coap", 

542 category="iot", 

543 version="RFC 7252", 

544 description="Constrained Application Protocol for IoT", 

545 reference="https://tools.ietf.org/html/rfc7252", 

546 definition=definition, 

547 ) 

548 

549 def _add_cbor(self) -> None: 

550 """Add CBOR protocol definition (simplified header).""" 

551 definition = ProtocolDefinition( 

552 name="cbor", 

553 description="Concise Binary Object Representation", 

554 endian="big", 

555 fields=[ 

556 FieldDefinition( 

557 name="initial_byte", 

558 field_type="uint8", 

559 description="Major type (3b) + Additional info (5b)", 

560 ), 

561 FieldDefinition( 

562 name="data", 

563 field_type="bytes", 

564 size_ref="remaining", 

565 description="CBOR data", 

566 ), 

567 ], 

568 ) 

569 

570 self._protocols["cbor"] = ProtocolInfo( 

571 name="cbor", 

572 category="iot", 

573 version="RFC 8949", 

574 description="Concise Binary Object Representation", 

575 reference="https://tools.ietf.org/html/rfc8949", 

576 definition=definition, 

577 ) 

578 

579 def _add_messagepack(self) -> None: 

580 """Add MessagePack protocol definition (simplified header).""" 

581 definition = ProtocolDefinition( 

582 name="messagepack", 

583 description="MessagePack binary serialization", 

584 endian="big", 

585 fields=[ 

586 FieldDefinition( 

587 name="format", 

588 field_type="uint8", 

589 description="Format byte", 

590 ), 

591 FieldDefinition( 

592 name="data", 

593 field_type="bytes", 

594 size_ref="remaining", 

595 description="MessagePack data", 

596 ), 

597 ], 

598 ) 

599 

600 self._protocols["messagepack"] = ProtocolInfo( 

601 name="messagepack", 

602 category="iot", 

603 version="1.0", 

604 description="MessagePack binary serialization format", 

605 reference="https://msgpack.org/", 

606 definition=definition, 

607 ) 

608 

609 def _add_obd2(self) -> None: 

610 """Add OBD-II protocol definition.""" 

611 definition = ProtocolDefinition( 

612 name="obd2", 

613 description="On-Board Diagnostics II", 

614 endian="big", 

615 fields=[ 

616 FieldDefinition( 

617 name="mode", 

618 field_type="uint8", 

619 description="Service/mode", 

620 enum={ 

621 0x01: "show_current_data", 

622 0x02: "show_freeze_frame", 

623 0x03: "show_dtc", 

624 0x04: "clear_dtc", 

625 0x09: "request_vehicle_info", 

626 }, 

627 ), 

628 FieldDefinition( 

629 name="pid", 

630 field_type="uint8", 

631 description="Parameter ID", 

632 ), 

633 FieldDefinition( 

634 name="data", 

635 field_type="bytes", 

636 size_ref="remaining", 

637 description="Parameter data", 

638 ), 

639 ], 

640 ) 

641 

642 self._protocols["obd2"] = ProtocolInfo( 

643 name="obd2", 

644 category="automotive", 

645 version="ISO 15031", 

646 description="On-Board Diagnostics II automotive protocol", 

647 reference="https://www.iso.org/standard/66369.html", 

648 definition=definition, 

649 ) 

650 

651 def _add_j1939(self) -> None: 

652 """Add SAE J1939 protocol definition.""" 

653 definition = ProtocolDefinition( 

654 name="j1939", 

655 description="SAE J1939 Heavy Duty Vehicle Protocol", 

656 endian="big", 

657 fields=[ 

658 FieldDefinition( 

659 name="priority", 

660 field_type="uint8", 

661 size=3, 

662 description="Message priority (0-7)", 

663 ), 

664 FieldDefinition( 

665 name="pgn", 

666 field_type="uint32", 

667 size=18, 

668 description="Parameter Group Number", 

669 ), 

670 FieldDefinition( 

671 name="source_address", 

672 field_type="uint8", 

673 description="Source address", 

674 ), 

675 FieldDefinition( 

676 name="data", 

677 field_type="bytes", 

678 size=8, 

679 description="Data bytes (up to 8)", 

680 ), 

681 ], 

682 ) 

683 

684 self._protocols["j1939"] = ProtocolInfo( 

685 name="j1939", 

686 category="automotive", 

687 version="J1939-21", 

688 description="SAE J1939 protocol for heavy-duty vehicles", 

689 reference="https://www.sae.org/standards/content/j1939_202210/", 

690 definition=definition, 

691 ) 

692 

693 def _add_can(self) -> None: 

694 """Add CAN bus frame definition.""" 

695 definition = ProtocolDefinition( 

696 name="can", 

697 description="Controller Area Network", 

698 endian="big", 

699 fields=[ 

700 FieldDefinition( 

701 name="identifier", 

702 field_type="uint32", 

703 description="CAN ID (11 or 29 bits)", 

704 ), 

705 FieldDefinition( 

706 name="dlc", 

707 field_type="uint8", 

708 description="Data Length Code (0-8)", 

709 ), 

710 FieldDefinition( 

711 name="data", 

712 field_type="bytes", 

713 size=8, 

714 description="Data bytes", 

715 ), 

716 ], 

717 ) 

718 

719 self._protocols["can"] = ProtocolInfo( 

720 name="can", 

721 category="automotive", 

722 version="2.0B", 

723 description="Controller Area Network bus protocol", 

724 reference="https://www.iso.org/standard/63648.html", 

725 definition=definition, 

726 ) 

727 

728 def _add_dns(self) -> None: 

729 """Add DNS protocol definition.""" 

730 definition = ProtocolDefinition( 

731 name="dns", 

732 description="Domain Name System", 

733 endian="big", 

734 fields=[ 

735 FieldDefinition( 

736 name="transaction_id", 

737 field_type="uint16", 

738 description="Transaction ID", 

739 ), 

740 FieldDefinition( 

741 name="flags", 

742 field_type="uint16", 

743 description="Flags", 

744 ), 

745 FieldDefinition( 

746 name="questions", 

747 field_type="uint16", 

748 description="Question count", 

749 ), 

750 FieldDefinition( 

751 name="answer_rrs", 

752 field_type="uint16", 

753 description="Answer RR count", 

754 ), 

755 FieldDefinition( 

756 name="authority_rrs", 

757 field_type="uint16", 

758 description="Authority RR count", 

759 ), 

760 FieldDefinition( 

761 name="additional_rrs", 

762 field_type="uint16", 

763 description="Additional RR count", 

764 ), 

765 FieldDefinition( 

766 name="data", 

767 field_type="bytes", 

768 size_ref="remaining", 

769 description="Questions, answers, authority, additional", 

770 ), 

771 ], 

772 ) 

773 

774 self._protocols["dns"] = ProtocolInfo( 

775 name="dns", 

776 category="network", 

777 version="RFC 1035", 

778 description="Domain Name System protocol", 

779 reference="https://tools.ietf.org/html/rfc1035", 

780 definition=definition, 

781 ) 

782 

783 def _add_ntp(self) -> None: 

784 """Add NTP protocol definition.""" 

785 definition = ProtocolDefinition( 

786 name="ntp", 

787 description="Network Time Protocol", 

788 endian="big", 

789 fields=[ 

790 FieldDefinition( 

791 name="flags", 

792 field_type="uint8", 

793 description="LI (2b) + VN (3b) + Mode (3b)", 

794 ), 

795 FieldDefinition( 

796 name="stratum", 

797 field_type="uint8", 

798 description="Stratum level", 

799 ), 

800 FieldDefinition( 

801 name="poll", 

802 field_type="uint8", 

803 description="Poll interval", 

804 ), 

805 FieldDefinition( 

806 name="precision", 

807 field_type="int8", 

808 description="Clock precision", 

809 ), 

810 FieldDefinition( 

811 name="root_delay", 

812 field_type="uint32", 

813 description="Root delay", 

814 ), 

815 FieldDefinition( 

816 name="root_dispersion", 

817 field_type="uint32", 

818 description="Root dispersion", 

819 ), 

820 FieldDefinition( 

821 name="reference_id", 

822 field_type="bytes", 

823 size=4, 

824 description="Reference identifier", 

825 ), 

826 FieldDefinition( 

827 name="reference_timestamp", 

828 field_type="uint64", 

829 description="Reference timestamp", 

830 ), 

831 FieldDefinition( 

832 name="origin_timestamp", 

833 field_type="uint64", 

834 description="Origin timestamp", 

835 ), 

836 FieldDefinition( 

837 name="receive_timestamp", 

838 field_type="uint64", 

839 description="Receive timestamp", 

840 ), 

841 FieldDefinition( 

842 name="transmit_timestamp", 

843 field_type="uint64", 

844 description="Transmit timestamp", 

845 ), 

846 ], 

847 ) 

848 

849 self._protocols["ntp"] = ProtocolInfo( 

850 name="ntp", 

851 category="network", 

852 version="4", 

853 description="Network Time Protocol for time synchronization", 

854 reference="https://tools.ietf.org/html/rfc5905", 

855 definition=definition, 

856 ) 

857 

858 def _add_syslog(self) -> None: 

859 """Add Syslog protocol definition.""" 

860 definition = ProtocolDefinition( 

861 name="syslog", 

862 description="Syslog Protocol", 

863 endian="big", 

864 fields=[ 

865 FieldDefinition( 

866 name="priority", 

867 field_type="string", 

868 description="Priority value in angle brackets", 

869 ), 

870 FieldDefinition( 

871 name="message", 

872 field_type="string", 

873 size_ref="remaining", 

874 description="Syslog message", 

875 ), 

876 ], 

877 ) 

878 

879 self._protocols["syslog"] = ProtocolInfo( 

880 name="syslog", 

881 category="network", 

882 version="RFC 5424", 

883 description="Syslog protocol for system logging", 

884 reference="https://tools.ietf.org/html/rfc5424", 

885 definition=definition, 

886 ) 

887 

888 def _add_nmea(self) -> None: 

889 """Add NMEA 0183 protocol definition.""" 

890 definition = ProtocolDefinition( 

891 name="nmea", 

892 description="NMEA 0183 GPS Protocol", 

893 endian="big", 

894 fields=[ 

895 FieldDefinition( 

896 name="start", 

897 field_type="string", 

898 size=1, 

899 value="$", 

900 description="Start delimiter", 

901 ), 

902 FieldDefinition( 

903 name="talker_id", 

904 field_type="string", 

905 size=2, 

906 description="Talker identifier (GP, GN, etc.)", 

907 ), 

908 FieldDefinition( 

909 name="sentence_id", 

910 field_type="string", 

911 size=3, 

912 description="Sentence identifier (GGA, RMC, etc.)", 

913 ), 

914 FieldDefinition( 

915 name="data", 

916 field_type="string", 

917 size_ref="remaining", 

918 description="Comma-separated data fields", 

919 ), 

920 ], 

921 ) 

922 

923 self._protocols["nmea"] = ProtocolInfo( 

924 name="nmea", 

925 category="serial", 

926 version="0183", 

927 description="NMEA 0183 GPS/navigation protocol", 

928 reference="https://www.nmea.org/", 

929 definition=definition, 

930 ) 

931 

932 def _add_xmodem(self) -> None: 

933 """Add XMODEM protocol definition.""" 

934 definition = ProtocolDefinition( 

935 name="xmodem", 

936 description="XMODEM File Transfer Protocol", 

937 endian="big", 

938 fields=[ 

939 FieldDefinition( 

940 name="header", 

941 field_type="uint8", 

942 description="SOH (0x01), EOT (0x04), or CAN (0x18)", 

943 enum={ 

944 0x01: "SOH", 

945 0x04: "EOT", 

946 0x06: "ACK", 

947 0x15: "NAK", 

948 0x18: "CAN", 

949 }, 

950 ), 

951 FieldDefinition( 

952 name="block_number", 

953 field_type="uint8", 

954 description="Block number (1-255)", 

955 ), 

956 FieldDefinition( 

957 name="block_complement", 

958 field_type="uint8", 

959 description="One's complement of block number", 

960 ), 

961 FieldDefinition( 

962 name="data", 

963 field_type="bytes", 

964 size=128, 

965 description="Data block (128 bytes)", 

966 ), 

967 FieldDefinition( 

968 name="checksum", 

969 field_type="uint8", 

970 description="Arithmetic sum checksum", 

971 ), 

972 ], 

973 ) 

974 

975 self._protocols["xmodem"] = ProtocolInfo( 

976 name="xmodem", 

977 category="serial", 

978 version="1.0", 

979 description="XMODEM file transfer protocol", 

980 reference="https://en.wikipedia.org/wiki/XMODEM", 

981 definition=definition, 

982 ) 

983 

984 def _add_knx(self) -> None: 

985 """Add KNX protocol definition.""" 

986 definition = ProtocolDefinition( 

987 name="knx", 

988 description="KNX Building Automation Protocol", 

989 endian="big", 

990 fields=[ 

991 FieldDefinition( 

992 name="header_length", 

993 field_type="uint8", 

994 value=0x06, 

995 description="Header length (always 6)", 

996 ), 

997 FieldDefinition( 

998 name="protocol_version", 

999 field_type="uint8", 

1000 value=0x10, 

1001 description="Protocol version (0x10)", 

1002 ), 

1003 FieldDefinition( 

1004 name="service_type", 

1005 field_type="uint16", 

1006 description="Service type identifier", 

1007 enum={ 

1008 0x0201: "search_request", 

1009 0x0202: "search_response", 

1010 0x0203: "description_request", 

1011 0x0204: "description_response", 

1012 0x0205: "connect_request", 

1013 0x0206: "connect_response", 

1014 0x0420: "tunnelling_request", 

1015 0x0421: "tunnelling_ack", 

1016 }, 

1017 ), 

1018 FieldDefinition( 

1019 name="total_length", 

1020 field_type="uint16", 

1021 description="Total length including header", 

1022 ), 

1023 FieldDefinition( 

1024 name="data", 

1025 field_type="bytes", 

1026 size_ref="remaining", 

1027 description="Service-specific data", 

1028 ), 

1029 ], 

1030 ) 

1031 

1032 self._protocols["knx"] = ProtocolInfo( 

1033 name="knx", 

1034 category="building", 

1035 version="2.0", 

1036 description="KNX building automation protocol", 

1037 reference="https://www.knx.org/", 

1038 definition=definition, 

1039 ) 

1040 

1041 def _add_lonworks(self) -> None: 

1042 """Add LonWorks protocol definition (simplified).""" 

1043 definition = ProtocolDefinition( 

1044 name="lonworks", 

1045 description="LonWorks Control Network Protocol", 

1046 endian="big", 

1047 fields=[ 

1048 FieldDefinition( 

1049 name="npdu", 

1050 field_type="uint8", 

1051 description="Network PDU byte", 

1052 ), 

1053 FieldDefinition( 

1054 name="domain_length", 

1055 field_type="uint8", 

1056 description="Domain length (0, 1, 3, 6)", 

1057 ), 

1058 FieldDefinition( 

1059 name="data", 

1060 field_type="bytes", 

1061 size_ref="remaining", 

1062 description="LonWorks data", 

1063 ), 

1064 ], 

1065 ) 

1066 

1067 self._protocols["lonworks"] = ProtocolInfo( 

1068 name="lonworks", 

1069 category="building", 

1070 version="1.0", 

1071 description="LonWorks control network protocol", 

1072 reference="https://www.echelon.com/", 

1073 definition=definition, 

1074 ) 

1075 

1076 def _add_tlv(self) -> None: 

1077 """Add generic TLV (Tag-Length-Value) format.""" 

1078 definition = ProtocolDefinition( 

1079 name="tlv", 

1080 description="Tag-Length-Value Generic Format", 

1081 endian="big", 

1082 fields=[ 

1083 FieldDefinition( 

1084 name="tag", 

1085 field_type="uint8", 

1086 description="Type/tag identifier", 

1087 ), 

1088 FieldDefinition( 

1089 name="length", 

1090 field_type="uint8", 

1091 description="Value length", 

1092 ), 

1093 FieldDefinition( 

1094 name="value", 

1095 field_type="bytes", 

1096 size_ref="length", 

1097 description="Value data", 

1098 ), 

1099 ], 

1100 ) 

1101 

1102 self._protocols["tlv"] = ProtocolInfo( 

1103 name="tlv", 

1104 category="custom", 

1105 version="1.0", 

1106 description="Generic Tag-Length-Value encoding", 

1107 definition=definition, 

1108 ) 

1109 

1110 def _add_length_prefixed(self) -> None: 

1111 """Add generic length-prefixed format.""" 

1112 definition = ProtocolDefinition( 

1113 name="length_prefixed", 

1114 description="Length-Prefixed Message Format", 

1115 endian="big", 

1116 fields=[ 

1117 FieldDefinition( 

1118 name="length", 

1119 field_type="uint16", 

1120 description="Message length", 

1121 ), 

1122 FieldDefinition( 

1123 name="payload", 

1124 field_type="bytes", 

1125 size_ref="length", 

1126 description="Message payload", 

1127 ), 

1128 ], 

1129 ) 

1130 

1131 self._protocols["length_prefixed"] = ProtocolInfo( 

1132 name="length_prefixed", 

1133 category="custom", 

1134 version="1.0", 

1135 description="Generic length-prefixed message format", 

1136 definition=definition, 

1137 ) 

1138 

1139 

1140# Global library instance 

1141_library: ProtocolLibrary | None = None 

1142 

1143 

1144def get_library() -> ProtocolLibrary: 

1145 """Get the global protocol library instance. 

1146 

1147 Returns: 

1148 ProtocolLibrary singleton instance. 

1149 """ 

1150 global _library 

1151 if _library is None: 

1152 _library = ProtocolLibrary() 

1153 return _library 

1154 

1155 

1156@overload 

1157def list_protocols(category: str | None = None, *, names_only: Literal[True]) -> list[str]: ... 

1158 

1159 

1160@overload 

1161def list_protocols( 

1162 category: str | None = None, *, names_only: Literal[False] = ... 

1163) -> list[ProtocolInfo]: ... 

1164 

1165 

1166def list_protocols( 

1167 category: str | None = None, *, names_only: bool = False 

1168) -> list[str] | list[ProtocolInfo]: 

1169 """List available protocols in the library. 

1170 

1171 Implements RE-DSL-003: Protocol listing. 

1172 

1173 Args: 

1174 category: Optional category filter. 

1175 names_only: If True, return list of protocol names (strings). 

1176 If False (default), return list of ProtocolInfo objects. 

1177 

1178 Returns: 

1179 List of available protocols (names or ProtocolInfo). 

1180 

1181 Example: 

1182 >>> # Get protocol names 

1183 >>> names = list_protocols(names_only=True) 

1184 >>> "http" in names 

1185 True 

1186 

1187 >>> # Get full protocol info 

1188 >>> protocols = list_protocols() 

1189 >>> protocols[0].name 

1190 'modbus_rtu' 

1191 """ 

1192 if names_only: 

1193 return get_library().list_protocol_names(category) 

1194 return get_library().list_protocols(category) 

1195 

1196 

1197def get_protocol(name: str) -> ProtocolInfo | None: 

1198 """Get protocol information by name. 

1199 

1200 Args: 

1201 name: Protocol name. 

1202 

1203 Returns: 

1204 ProtocolInfo or None. 

1205 """ 

1206 return get_library().get(name) 

1207 

1208 

1209def get_decoder(name: str) -> ProtocolDecoder | None: 

1210 """Get a decoder for a protocol. 

1211 

1212 Implements RE-DSL-003: Protocol decoding. 

1213 

1214 Args: 

1215 name: Protocol name. 

1216 

1217 Returns: 

1218 ProtocolDecoder or None. 

1219 """ 

1220 return get_library().get_decoder(name) 

1221 

1222 

1223__all__ = [ 

1224 "ProtocolInfo", 

1225 "ProtocolLibrary", 

1226 "get_decoder", 

1227 "get_library", 

1228 "get_protocol", 

1229 "list_protocols", 

1230]