Coverage for src / tracekit / inference / protocol_library.py: 100%
143 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Protocol Format Library for common protocol definitions.
3 - RE-DSL-003: Protocol Format Library
5This module provides a library of common protocol definitions for industrial,
6IoT, and communication protocols to enable immediate decoding without
7user-defined definitions.
8"""
10from __future__ import annotations
12from dataclasses import dataclass
13from typing import Literal, overload
15from tracekit.inference.protocol_dsl import (
16 FieldDefinition,
17 ProtocolDecoder,
18 ProtocolDefinition,
19)
22@dataclass
23class ProtocolInfo:
24 """Information about a protocol in the library.
26 Attributes:
27 name: Protocol name.
28 category: Protocol category.
29 version: Protocol version.
30 description: Human-readable description.
31 reference: Reference documentation URL.
32 definition: Protocol definition for decoding.
33 """
35 name: str
36 category: Literal["industrial", "iot", "network", "automotive", "building", "serial", "custom"]
37 version: str
38 description: str
39 reference: str = ""
40 definition: ProtocolDefinition | None = None
43class ProtocolLibrary:
44 """Library of common protocol definitions.
46 Implements RE-DSL-003: Protocol Format Library.
48 Provides pre-defined protocol definitions for common industrial,
49 IoT, and communication protocols.
51 Example:
52 >>> library = ProtocolLibrary()
53 >>> modbus = library.get("modbus_rtu")
54 >>> decoder = library.get_decoder("modbus_rtu")
55 >>> message = decoder.decode(data)
56 """
58 def __init__(self) -> None:
59 """Initialize protocol library with built-in protocols."""
60 self._protocols: dict[str, ProtocolInfo] = {}
61 self._decoders: dict[str, ProtocolDecoder] = {}
62 self._load_builtin_protocols()
64 def _load_builtin_protocols(self) -> None:
65 """Load all built-in protocol definitions."""
66 # Industrial protocols
67 self._add_modbus_rtu()
68 self._add_modbus_tcp()
69 self._add_dnp3()
70 self._add_bacnet()
72 # IoT protocols
73 self._add_mqtt()
74 self._add_coap()
75 self._add_cbor()
76 self._add_messagepack()
78 # Automotive protocols
79 self._add_obd2()
80 self._add_j1939()
81 self._add_can()
83 # Network protocols
84 self._add_http()
85 self._add_dns()
86 self._add_ntp()
87 self._add_syslog()
89 # Serial protocols
90 self._add_nmea()
91 self._add_xmodem()
93 # Building automation
94 self._add_knx()
95 self._add_lonworks()
97 # Custom/generic
98 self._add_tlv()
99 self._add_length_prefixed()
101 def list_protocols(self, category: str | None = None) -> list[ProtocolInfo]:
102 """List available protocols.
104 Args:
105 category: Filter by category (optional).
107 Returns:
108 List of ProtocolInfo for available protocols.
110 Example:
111 >>> protocols = library.list_protocols(category="industrial")
112 >>> for p in protocols:
113 ... print(f"{p.name}: {p.description}")
114 """
115 protocols = list(self._protocols.values())
116 if category:
117 protocols = [p for p in protocols if p.category == category]
118 return protocols
120 def list_protocol_names(self, category: str | None = None) -> list[str]:
121 """List available protocol names.
123 Args:
124 category: Filter by category (optional).
126 Returns:
127 List of protocol name strings.
128 """
129 protocols = self.list_protocols(category)
130 return [p.name for p in protocols]
132 def get(self, name: str) -> ProtocolInfo | None:
133 """Get protocol information by name.
135 Args:
136 name: Protocol name (case-insensitive).
138 Returns:
139 ProtocolInfo or None if not found.
140 """
141 return self._protocols.get(name.lower())
143 def get_decoder(self, name: str) -> ProtocolDecoder | None:
144 """Get a decoder for a protocol.
146 Args:
147 name: Protocol name.
149 Returns:
150 ProtocolDecoder instance or None if not found.
152 Example:
153 >>> decoder = library.get_decoder("modbus_rtu")
154 >>> if decoder:
155 ... message = decoder.decode(data)
156 """
157 name = name.lower()
158 if name in self._decoders:
159 return self._decoders[name]
161 info = self._protocols.get(name)
162 if info and info.definition:
163 decoder = ProtocolDecoder(info.definition)
164 self._decoders[name] = decoder
165 return decoder
167 return None
169 def get_definition(self, name: str) -> ProtocolDefinition | None:
170 """Get protocol definition by name.
172 Args:
173 name: Protocol name.
175 Returns:
176 ProtocolDefinition or None if not found.
177 """
178 info = self._protocols.get(name.lower())
179 return info.definition if info else None
181 def add_protocol(
182 self,
183 info: ProtocolInfo,
184 ) -> None:
185 """Add a custom protocol to the library.
187 Args:
188 info: Protocol information and definition.
190 Example:
191 >>> custom = ProtocolInfo(
192 ... name="my_protocol",
193 ... category="custom",
194 ... version="1.0",
195 ... description="My custom protocol",
196 ... definition=my_definition
197 ... )
198 >>> library.add_protocol(custom)
199 """
200 self._protocols[info.name.lower()] = info
202 def categories(self) -> list[str]:
203 """Get list of protocol categories.
205 Returns:
206 List of category names.
207 """
208 return sorted({p.category for p in self._protocols.values()})
210 # =========================================================================
211 # Built-in Protocol Definitions
212 # =========================================================================
214 def _add_modbus_rtu(self) -> None:
215 """Add Modbus RTU protocol definition."""
216 definition = ProtocolDefinition(
217 name="modbus_rtu",
218 description="Modbus RTU serial protocol",
219 endian="big",
220 fields=[
221 FieldDefinition(
222 name="address",
223 field_type="uint8",
224 description="Slave address (1-247)",
225 ),
226 FieldDefinition(
227 name="function_code",
228 field_type="uint8",
229 description="Function code",
230 enum={
231 1: "read_coils",
232 2: "read_discrete_inputs",
233 3: "read_holding_registers",
234 4: "read_input_registers",
235 5: "write_single_coil",
236 6: "write_single_register",
237 15: "write_multiple_coils",
238 16: "write_multiple_registers",
239 },
240 ),
241 FieldDefinition(
242 name="data",
243 field_type="bytes",
244 size_ref="remaining",
245 description="Function-specific data",
246 ),
247 FieldDefinition(
248 name="crc",
249 field_type="uint16",
250 endian="little",
251 description="CRC-16 checksum",
252 ),
253 ],
254 )
256 self._protocols["modbus_rtu"] = ProtocolInfo(
257 name="modbus_rtu",
258 category="industrial",
259 version="1.0",
260 description="Modbus RTU serial protocol for industrial automation",
261 reference="https://modbus.org/specs.php",
262 definition=definition,
263 )
265 def _add_modbus_tcp(self) -> None:
266 """Add Modbus TCP protocol definition."""
267 definition = ProtocolDefinition(
268 name="modbus_tcp",
269 description="Modbus TCP/IP protocol",
270 endian="big",
271 fields=[
272 FieldDefinition(
273 name="transaction_id",
274 field_type="uint16",
275 description="Transaction identifier",
276 ),
277 FieldDefinition(
278 name="protocol_id",
279 field_type="uint16",
280 description="Protocol identifier (0 for Modbus)",
281 ),
282 FieldDefinition(
283 name="length",
284 field_type="uint16",
285 description="Length of remaining message",
286 ),
287 FieldDefinition(
288 name="unit_id",
289 field_type="uint8",
290 description="Unit identifier",
291 ),
292 FieldDefinition(
293 name="function_code",
294 field_type="uint8",
295 description="Function code",
296 enum={
297 1: "read_coils",
298 2: "read_discrete_inputs",
299 3: "read_holding_registers",
300 4: "read_input_registers",
301 5: "write_single_coil",
302 6: "write_single_register",
303 15: "write_multiple_coils",
304 16: "write_multiple_registers",
305 },
306 ),
307 FieldDefinition(
308 name="data",
309 field_type="bytes",
310 size_ref="remaining",
311 description="Function-specific data",
312 ),
313 ],
314 )
316 self._protocols["modbus_tcp"] = ProtocolInfo(
317 name="modbus_tcp",
318 category="industrial",
319 version="1.0",
320 description="Modbus TCP/IP protocol for industrial automation",
321 reference="https://modbus.org/specs.php",
322 definition=definition,
323 )
325 def _add_dnp3(self) -> None:
326 """Add DNP3 protocol definition."""
327 definition = ProtocolDefinition(
328 name="dnp3",
329 description="Distributed Network Protocol 3",
330 endian="little",
331 fields=[
332 FieldDefinition(
333 name="start",
334 field_type="uint16",
335 value=0x0564,
336 description="Start bytes (0x0564)",
337 ),
338 FieldDefinition(
339 name="length",
340 field_type="uint8",
341 description="Data link layer length",
342 ),
343 FieldDefinition(
344 name="control",
345 field_type="uint8",
346 description="Control byte",
347 ),
348 FieldDefinition(
349 name="destination",
350 field_type="uint16",
351 description="Destination address",
352 ),
353 FieldDefinition(
354 name="source",
355 field_type="uint16",
356 description="Source address",
357 ),
358 FieldDefinition(
359 name="crc",
360 field_type="uint16",
361 description="CRC-16 checksum",
362 ),
363 FieldDefinition(
364 name="transport_header",
365 field_type="uint8",
366 description="Transport layer header",
367 ),
368 FieldDefinition(
369 name="application_data",
370 field_type="bytes",
371 size_ref="remaining",
372 description="Application layer data",
373 ),
374 ],
375 )
377 self._protocols["dnp3"] = ProtocolInfo(
378 name="dnp3",
379 category="industrial",
380 version="3.0",
381 description="Distributed Network Protocol for SCADA systems",
382 reference="https://www.dnp.org/",
383 definition=definition,
384 )
386 def _add_bacnet(self) -> None:
387 """Add BACnet protocol definition."""
388 definition = ProtocolDefinition(
389 name="bacnet",
390 description="Building Automation and Control Networks",
391 endian="big",
392 fields=[
393 FieldDefinition(
394 name="type",
395 field_type="uint8",
396 description="BVLC type (0x81 for BACnet/IP)",
397 ),
398 FieldDefinition(
399 name="function",
400 field_type="uint8",
401 description="BVLC function",
402 enum={
403 0x00: "bvlc_result",
404 0x01: "write_broadcast_table",
405 0x04: "forwarded_npdu",
406 0x0A: "original_unicast_npdu",
407 0x0B: "original_broadcast_npdu",
408 },
409 ),
410 FieldDefinition(
411 name="length",
412 field_type="uint16",
413 description="Total BVLC length",
414 ),
415 FieldDefinition(
416 name="npdu",
417 field_type="bytes",
418 size_ref="remaining",
419 description="Network Protocol Data Unit",
420 ),
421 ],
422 )
424 self._protocols["bacnet"] = ProtocolInfo(
425 name="bacnet",
426 category="building",
427 version="2020",
428 description="Building Automation and Control Networks protocol",
429 reference="https://www.bacnetinternational.org/",
430 definition=definition,
431 )
433 def _add_http(self) -> None:
434 """Add HTTP protocol definition (simplified for binary inspection)."""
435 # HTTP is text-based, so we provide a simplified binary representation
436 definition = ProtocolDefinition(
437 name="http",
438 description="Hypertext Transfer Protocol (simplified)",
439 endian="big",
440 fields=[
441 FieldDefinition(
442 name="request_line",
443 field_type="string",
444 description="HTTP request/status line",
445 ),
446 FieldDefinition(
447 name="headers",
448 field_type="string",
449 size_ref="remaining",
450 description="HTTP headers and body",
451 ),
452 ],
453 )
455 self._protocols["http"] = ProtocolInfo(
456 name="http",
457 category="network",
458 version="1.1",
459 description="Hypertext Transfer Protocol",
460 reference="https://tools.ietf.org/html/rfc2616",
461 definition=definition,
462 )
464 def _add_mqtt(self) -> None:
465 """Add MQTT protocol definition."""
466 definition = ProtocolDefinition(
467 name="mqtt",
468 description="Message Queuing Telemetry Transport",
469 endian="big",
470 fields=[
471 FieldDefinition(
472 name="fixed_header",
473 field_type="uint8",
474 description="Fixed header (type + flags)",
475 ),
476 FieldDefinition(
477 name="remaining_length",
478 field_type="uint8",
479 description="Remaining length (variable encoding)",
480 ),
481 FieldDefinition(
482 name="payload",
483 field_type="bytes",
484 size_ref="remaining",
485 description="Variable header + payload",
486 ),
487 ],
488 )
490 self._protocols["mqtt"] = ProtocolInfo(
491 name="mqtt",
492 category="iot",
493 version="5.0",
494 description="MQTT IoT messaging protocol",
495 reference="https://mqtt.org/",
496 definition=definition,
497 )
499 def _add_coap(self) -> None:
500 """Add CoAP protocol definition."""
501 definition = ProtocolDefinition(
502 name="coap",
503 description="Constrained Application Protocol",
504 endian="big",
505 fields=[
506 FieldDefinition(
507 name="version_type_tkl",
508 field_type="uint8",
509 description="Version (2b) + Type (2b) + Token Length (4b)",
510 ),
511 FieldDefinition(
512 name="code",
513 field_type="uint8",
514 description="Method/Response code",
515 enum={
516 0x01: "GET",
517 0x02: "POST",
518 0x03: "PUT",
519 0x04: "DELETE",
520 0x44: "2.04 Changed",
521 0x45: "2.05 Content",
522 0x81: "4.01 Unauthorized",
523 0x84: "4.04 Not Found",
524 },
525 ),
526 FieldDefinition(
527 name="message_id",
528 field_type="uint16",
529 description="Message ID",
530 ),
531 FieldDefinition(
532 name="token_options_payload",
533 field_type="bytes",
534 size_ref="remaining",
535 description="Token + Options + Payload",
536 ),
537 ],
538 )
540 self._protocols["coap"] = ProtocolInfo(
541 name="coap",
542 category="iot",
543 version="RFC 7252",
544 description="Constrained Application Protocol for IoT",
545 reference="https://tools.ietf.org/html/rfc7252",
546 definition=definition,
547 )
549 def _add_cbor(self) -> None:
550 """Add CBOR protocol definition (simplified header)."""
551 definition = ProtocolDefinition(
552 name="cbor",
553 description="Concise Binary Object Representation",
554 endian="big",
555 fields=[
556 FieldDefinition(
557 name="initial_byte",
558 field_type="uint8",
559 description="Major type (3b) + Additional info (5b)",
560 ),
561 FieldDefinition(
562 name="data",
563 field_type="bytes",
564 size_ref="remaining",
565 description="CBOR data",
566 ),
567 ],
568 )
570 self._protocols["cbor"] = ProtocolInfo(
571 name="cbor",
572 category="iot",
573 version="RFC 8949",
574 description="Concise Binary Object Representation",
575 reference="https://tools.ietf.org/html/rfc8949",
576 definition=definition,
577 )
579 def _add_messagepack(self) -> None:
580 """Add MessagePack protocol definition (simplified header)."""
581 definition = ProtocolDefinition(
582 name="messagepack",
583 description="MessagePack binary serialization",
584 endian="big",
585 fields=[
586 FieldDefinition(
587 name="format",
588 field_type="uint8",
589 description="Format byte",
590 ),
591 FieldDefinition(
592 name="data",
593 field_type="bytes",
594 size_ref="remaining",
595 description="MessagePack data",
596 ),
597 ],
598 )
600 self._protocols["messagepack"] = ProtocolInfo(
601 name="messagepack",
602 category="iot",
603 version="1.0",
604 description="MessagePack binary serialization format",
605 reference="https://msgpack.org/",
606 definition=definition,
607 )
609 def _add_obd2(self) -> None:
610 """Add OBD-II protocol definition."""
611 definition = ProtocolDefinition(
612 name="obd2",
613 description="On-Board Diagnostics II",
614 endian="big",
615 fields=[
616 FieldDefinition(
617 name="mode",
618 field_type="uint8",
619 description="Service/mode",
620 enum={
621 0x01: "show_current_data",
622 0x02: "show_freeze_frame",
623 0x03: "show_dtc",
624 0x04: "clear_dtc",
625 0x09: "request_vehicle_info",
626 },
627 ),
628 FieldDefinition(
629 name="pid",
630 field_type="uint8",
631 description="Parameter ID",
632 ),
633 FieldDefinition(
634 name="data",
635 field_type="bytes",
636 size_ref="remaining",
637 description="Parameter data",
638 ),
639 ],
640 )
642 self._protocols["obd2"] = ProtocolInfo(
643 name="obd2",
644 category="automotive",
645 version="ISO 15031",
646 description="On-Board Diagnostics II automotive protocol",
647 reference="https://www.iso.org/standard/66369.html",
648 definition=definition,
649 )
651 def _add_j1939(self) -> None:
652 """Add SAE J1939 protocol definition."""
653 definition = ProtocolDefinition(
654 name="j1939",
655 description="SAE J1939 Heavy Duty Vehicle Protocol",
656 endian="big",
657 fields=[
658 FieldDefinition(
659 name="priority",
660 field_type="uint8",
661 size=3,
662 description="Message priority (0-7)",
663 ),
664 FieldDefinition(
665 name="pgn",
666 field_type="uint32",
667 size=18,
668 description="Parameter Group Number",
669 ),
670 FieldDefinition(
671 name="source_address",
672 field_type="uint8",
673 description="Source address",
674 ),
675 FieldDefinition(
676 name="data",
677 field_type="bytes",
678 size=8,
679 description="Data bytes (up to 8)",
680 ),
681 ],
682 )
684 self._protocols["j1939"] = ProtocolInfo(
685 name="j1939",
686 category="automotive",
687 version="J1939-21",
688 description="SAE J1939 protocol for heavy-duty vehicles",
689 reference="https://www.sae.org/standards/content/j1939_202210/",
690 definition=definition,
691 )
693 def _add_can(self) -> None:
694 """Add CAN bus frame definition."""
695 definition = ProtocolDefinition(
696 name="can",
697 description="Controller Area Network",
698 endian="big",
699 fields=[
700 FieldDefinition(
701 name="identifier",
702 field_type="uint32",
703 description="CAN ID (11 or 29 bits)",
704 ),
705 FieldDefinition(
706 name="dlc",
707 field_type="uint8",
708 description="Data Length Code (0-8)",
709 ),
710 FieldDefinition(
711 name="data",
712 field_type="bytes",
713 size=8,
714 description="Data bytes",
715 ),
716 ],
717 )
719 self._protocols["can"] = ProtocolInfo(
720 name="can",
721 category="automotive",
722 version="2.0B",
723 description="Controller Area Network bus protocol",
724 reference="https://www.iso.org/standard/63648.html",
725 definition=definition,
726 )
728 def _add_dns(self) -> None:
729 """Add DNS protocol definition."""
730 definition = ProtocolDefinition(
731 name="dns",
732 description="Domain Name System",
733 endian="big",
734 fields=[
735 FieldDefinition(
736 name="transaction_id",
737 field_type="uint16",
738 description="Transaction ID",
739 ),
740 FieldDefinition(
741 name="flags",
742 field_type="uint16",
743 description="Flags",
744 ),
745 FieldDefinition(
746 name="questions",
747 field_type="uint16",
748 description="Question count",
749 ),
750 FieldDefinition(
751 name="answer_rrs",
752 field_type="uint16",
753 description="Answer RR count",
754 ),
755 FieldDefinition(
756 name="authority_rrs",
757 field_type="uint16",
758 description="Authority RR count",
759 ),
760 FieldDefinition(
761 name="additional_rrs",
762 field_type="uint16",
763 description="Additional RR count",
764 ),
765 FieldDefinition(
766 name="data",
767 field_type="bytes",
768 size_ref="remaining",
769 description="Questions, answers, authority, additional",
770 ),
771 ],
772 )
774 self._protocols["dns"] = ProtocolInfo(
775 name="dns",
776 category="network",
777 version="RFC 1035",
778 description="Domain Name System protocol",
779 reference="https://tools.ietf.org/html/rfc1035",
780 definition=definition,
781 )
783 def _add_ntp(self) -> None:
784 """Add NTP protocol definition."""
785 definition = ProtocolDefinition(
786 name="ntp",
787 description="Network Time Protocol",
788 endian="big",
789 fields=[
790 FieldDefinition(
791 name="flags",
792 field_type="uint8",
793 description="LI (2b) + VN (3b) + Mode (3b)",
794 ),
795 FieldDefinition(
796 name="stratum",
797 field_type="uint8",
798 description="Stratum level",
799 ),
800 FieldDefinition(
801 name="poll",
802 field_type="uint8",
803 description="Poll interval",
804 ),
805 FieldDefinition(
806 name="precision",
807 field_type="int8",
808 description="Clock precision",
809 ),
810 FieldDefinition(
811 name="root_delay",
812 field_type="uint32",
813 description="Root delay",
814 ),
815 FieldDefinition(
816 name="root_dispersion",
817 field_type="uint32",
818 description="Root dispersion",
819 ),
820 FieldDefinition(
821 name="reference_id",
822 field_type="bytes",
823 size=4,
824 description="Reference identifier",
825 ),
826 FieldDefinition(
827 name="reference_timestamp",
828 field_type="uint64",
829 description="Reference timestamp",
830 ),
831 FieldDefinition(
832 name="origin_timestamp",
833 field_type="uint64",
834 description="Origin timestamp",
835 ),
836 FieldDefinition(
837 name="receive_timestamp",
838 field_type="uint64",
839 description="Receive timestamp",
840 ),
841 FieldDefinition(
842 name="transmit_timestamp",
843 field_type="uint64",
844 description="Transmit timestamp",
845 ),
846 ],
847 )
849 self._protocols["ntp"] = ProtocolInfo(
850 name="ntp",
851 category="network",
852 version="4",
853 description="Network Time Protocol for time synchronization",
854 reference="https://tools.ietf.org/html/rfc5905",
855 definition=definition,
856 )
858 def _add_syslog(self) -> None:
859 """Add Syslog protocol definition."""
860 definition = ProtocolDefinition(
861 name="syslog",
862 description="Syslog Protocol",
863 endian="big",
864 fields=[
865 FieldDefinition(
866 name="priority",
867 field_type="string",
868 description="Priority value in angle brackets",
869 ),
870 FieldDefinition(
871 name="message",
872 field_type="string",
873 size_ref="remaining",
874 description="Syslog message",
875 ),
876 ],
877 )
879 self._protocols["syslog"] = ProtocolInfo(
880 name="syslog",
881 category="network",
882 version="RFC 5424",
883 description="Syslog protocol for system logging",
884 reference="https://tools.ietf.org/html/rfc5424",
885 definition=definition,
886 )
888 def _add_nmea(self) -> None:
889 """Add NMEA 0183 protocol definition."""
890 definition = ProtocolDefinition(
891 name="nmea",
892 description="NMEA 0183 GPS Protocol",
893 endian="big",
894 fields=[
895 FieldDefinition(
896 name="start",
897 field_type="string",
898 size=1,
899 value="$",
900 description="Start delimiter",
901 ),
902 FieldDefinition(
903 name="talker_id",
904 field_type="string",
905 size=2,
906 description="Talker identifier (GP, GN, etc.)",
907 ),
908 FieldDefinition(
909 name="sentence_id",
910 field_type="string",
911 size=3,
912 description="Sentence identifier (GGA, RMC, etc.)",
913 ),
914 FieldDefinition(
915 name="data",
916 field_type="string",
917 size_ref="remaining",
918 description="Comma-separated data fields",
919 ),
920 ],
921 )
923 self._protocols["nmea"] = ProtocolInfo(
924 name="nmea",
925 category="serial",
926 version="0183",
927 description="NMEA 0183 GPS/navigation protocol",
928 reference="https://www.nmea.org/",
929 definition=definition,
930 )
932 def _add_xmodem(self) -> None:
933 """Add XMODEM protocol definition."""
934 definition = ProtocolDefinition(
935 name="xmodem",
936 description="XMODEM File Transfer Protocol",
937 endian="big",
938 fields=[
939 FieldDefinition(
940 name="header",
941 field_type="uint8",
942 description="SOH (0x01), EOT (0x04), or CAN (0x18)",
943 enum={
944 0x01: "SOH",
945 0x04: "EOT",
946 0x06: "ACK",
947 0x15: "NAK",
948 0x18: "CAN",
949 },
950 ),
951 FieldDefinition(
952 name="block_number",
953 field_type="uint8",
954 description="Block number (1-255)",
955 ),
956 FieldDefinition(
957 name="block_complement",
958 field_type="uint8",
959 description="One's complement of block number",
960 ),
961 FieldDefinition(
962 name="data",
963 field_type="bytes",
964 size=128,
965 description="Data block (128 bytes)",
966 ),
967 FieldDefinition(
968 name="checksum",
969 field_type="uint8",
970 description="Arithmetic sum checksum",
971 ),
972 ],
973 )
975 self._protocols["xmodem"] = ProtocolInfo(
976 name="xmodem",
977 category="serial",
978 version="1.0",
979 description="XMODEM file transfer protocol",
980 reference="https://en.wikipedia.org/wiki/XMODEM",
981 definition=definition,
982 )
984 def _add_knx(self) -> None:
985 """Add KNX protocol definition."""
986 definition = ProtocolDefinition(
987 name="knx",
988 description="KNX Building Automation Protocol",
989 endian="big",
990 fields=[
991 FieldDefinition(
992 name="header_length",
993 field_type="uint8",
994 value=0x06,
995 description="Header length (always 6)",
996 ),
997 FieldDefinition(
998 name="protocol_version",
999 field_type="uint8",
1000 value=0x10,
1001 description="Protocol version (0x10)",
1002 ),
1003 FieldDefinition(
1004 name="service_type",
1005 field_type="uint16",
1006 description="Service type identifier",
1007 enum={
1008 0x0201: "search_request",
1009 0x0202: "search_response",
1010 0x0203: "description_request",
1011 0x0204: "description_response",
1012 0x0205: "connect_request",
1013 0x0206: "connect_response",
1014 0x0420: "tunnelling_request",
1015 0x0421: "tunnelling_ack",
1016 },
1017 ),
1018 FieldDefinition(
1019 name="total_length",
1020 field_type="uint16",
1021 description="Total length including header",
1022 ),
1023 FieldDefinition(
1024 name="data",
1025 field_type="bytes",
1026 size_ref="remaining",
1027 description="Service-specific data",
1028 ),
1029 ],
1030 )
1032 self._protocols["knx"] = ProtocolInfo(
1033 name="knx",
1034 category="building",
1035 version="2.0",
1036 description="KNX building automation protocol",
1037 reference="https://www.knx.org/",
1038 definition=definition,
1039 )
1041 def _add_lonworks(self) -> None:
1042 """Add LonWorks protocol definition (simplified)."""
1043 definition = ProtocolDefinition(
1044 name="lonworks",
1045 description="LonWorks Control Network Protocol",
1046 endian="big",
1047 fields=[
1048 FieldDefinition(
1049 name="npdu",
1050 field_type="uint8",
1051 description="Network PDU byte",
1052 ),
1053 FieldDefinition(
1054 name="domain_length",
1055 field_type="uint8",
1056 description="Domain length (0, 1, 3, 6)",
1057 ),
1058 FieldDefinition(
1059 name="data",
1060 field_type="bytes",
1061 size_ref="remaining",
1062 description="LonWorks data",
1063 ),
1064 ],
1065 )
1067 self._protocols["lonworks"] = ProtocolInfo(
1068 name="lonworks",
1069 category="building",
1070 version="1.0",
1071 description="LonWorks control network protocol",
1072 reference="https://www.echelon.com/",
1073 definition=definition,
1074 )
1076 def _add_tlv(self) -> None:
1077 """Add generic TLV (Tag-Length-Value) format."""
1078 definition = ProtocolDefinition(
1079 name="tlv",
1080 description="Tag-Length-Value Generic Format",
1081 endian="big",
1082 fields=[
1083 FieldDefinition(
1084 name="tag",
1085 field_type="uint8",
1086 description="Type/tag identifier",
1087 ),
1088 FieldDefinition(
1089 name="length",
1090 field_type="uint8",
1091 description="Value length",
1092 ),
1093 FieldDefinition(
1094 name="value",
1095 field_type="bytes",
1096 size_ref="length",
1097 description="Value data",
1098 ),
1099 ],
1100 )
1102 self._protocols["tlv"] = ProtocolInfo(
1103 name="tlv",
1104 category="custom",
1105 version="1.0",
1106 description="Generic Tag-Length-Value encoding",
1107 definition=definition,
1108 )
1110 def _add_length_prefixed(self) -> None:
1111 """Add generic length-prefixed format."""
1112 definition = ProtocolDefinition(
1113 name="length_prefixed",
1114 description="Length-Prefixed Message Format",
1115 endian="big",
1116 fields=[
1117 FieldDefinition(
1118 name="length",
1119 field_type="uint16",
1120 description="Message length",
1121 ),
1122 FieldDefinition(
1123 name="payload",
1124 field_type="bytes",
1125 size_ref="length",
1126 description="Message payload",
1127 ),
1128 ],
1129 )
1131 self._protocols["length_prefixed"] = ProtocolInfo(
1132 name="length_prefixed",
1133 category="custom",
1134 version="1.0",
1135 description="Generic length-prefixed message format",
1136 definition=definition,
1137 )
1140# Global library instance
1141_library: ProtocolLibrary | None = None
1144def get_library() -> ProtocolLibrary:
1145 """Get the global protocol library instance.
1147 Returns:
1148 ProtocolLibrary singleton instance.
1149 """
1150 global _library
1151 if _library is None:
1152 _library = ProtocolLibrary()
1153 return _library
1156@overload
1157def list_protocols(category: str | None = None, *, names_only: Literal[True]) -> list[str]: ...
1160@overload
1161def list_protocols(
1162 category: str | None = None, *, names_only: Literal[False] = ...
1163) -> list[ProtocolInfo]: ...
1166def list_protocols(
1167 category: str | None = None, *, names_only: bool = False
1168) -> list[str] | list[ProtocolInfo]:
1169 """List available protocols in the library.
1171 Implements RE-DSL-003: Protocol listing.
1173 Args:
1174 category: Optional category filter.
1175 names_only: If True, return list of protocol names (strings).
1176 If False (default), return list of ProtocolInfo objects.
1178 Returns:
1179 List of available protocols (names or ProtocolInfo).
1181 Example:
1182 >>> # Get protocol names
1183 >>> names = list_protocols(names_only=True)
1184 >>> "http" in names
1185 True
1187 >>> # Get full protocol info
1188 >>> protocols = list_protocols()
1189 >>> protocols[0].name
1190 'modbus_rtu'
1191 """
1192 if names_only:
1193 return get_library().list_protocol_names(category)
1194 return get_library().list_protocols(category)
1197def get_protocol(name: str) -> ProtocolInfo | None:
1198 """Get protocol information by name.
1200 Args:
1201 name: Protocol name.
1203 Returns:
1204 ProtocolInfo or None.
1205 """
1206 return get_library().get(name)
1209def get_decoder(name: str) -> ProtocolDecoder | None:
1210 """Get a decoder for a protocol.
1212 Implements RE-DSL-003: Protocol decoding.
1214 Args:
1215 name: Protocol name.
1217 Returns:
1218 ProtocolDecoder or None.
1219 """
1220 return get_library().get_decoder(name)
1223__all__ = [
1224 "ProtocolInfo",
1225 "ProtocolLibrary",
1226 "get_decoder",
1227 "get_library",
1228 "get_protocol",
1229 "list_protocols",
1230]