Coverage for src / tracekit / analyzers / protocols / onewire.py: 68%
177 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Dallas/Maxim 1-Wire protocol decoder.
3This module provides a 1-Wire protocol decoder for temperature sensors,
4EEPROMs, and other 1-Wire devices with ROM command decoding.
7Example:
8 >>> from tracekit.analyzers.protocols.onewire import OneWireDecoder
9 >>> decoder = OneWireDecoder()
10 >>> for packet in decoder.decode(trace):
11 ... print(f"ROM: {packet.annotations['rom_id']}")
13References:
14 Dallas/Maxim 1-Wire Protocol
15 DS18B20 Datasheet
16"""
18from __future__ import annotations
20from dataclasses import dataclass
21from enum import Enum
22from typing import TYPE_CHECKING
24import numpy as np
26from tracekit.analyzers.protocols.base import (
27 AnnotationLevel,
28 AsyncDecoder,
29 ChannelDef,
30 OptionDef,
31)
32from tracekit.core.types import (
33 DigitalTrace,
34 ProtocolPacket,
35 TraceMetadata,
36 WaveformTrace,
37)
39if TYPE_CHECKING:
40 from collections.abc import Iterator
42 from numpy.typing import NDArray
45class OneWireMode(Enum):
46 """1-Wire speed modes."""
48 STANDARD = "standard" # Standard speed (15.4 kbps)
49 OVERDRIVE = "overdrive" # Overdrive speed (~142 kbps)
52class OneWireROMCommand(Enum):
53 """1-Wire ROM commands."""
55 SEARCH_ROM = 0xF0
56 READ_ROM = 0x33
57 MATCH_ROM = 0x55
58 SKIP_ROM = 0xCC
59 ALARM_SEARCH = 0xEC
60 OVERDRIVE_SKIP = 0x3C
61 OVERDRIVE_MATCH = 0x69
64ROM_COMMAND_NAMES = {
65 0xF0: "Search ROM",
66 0x33: "Read ROM",
67 0x55: "Match ROM",
68 0xCC: "Skip ROM",
69 0xEC: "Alarm Search",
70 0x3C: "Overdrive Skip ROM",
71 0x69: "Overdrive Match ROM",
72}
74# 1-Wire Family Codes
75FAMILY_CODES = {
76 0x01: "DS1990A/DS2401 Silicon Serial Number",
77 0x10: "DS18S20/DS1820 Temperature Sensor",
78 0x14: "DS2430A 1kb EEPROM",
79 0x22: "DS1822 Econo Temperature Sensor",
80 0x23: "DS2433 4kb EEPROM",
81 0x28: "DS18B20 Temperature Sensor",
82 0x29: "DS2408 8-Channel Addressable Switch",
83 0x2D: "DS2431 1kb EEPROM",
84 0x37: "DS1977 Password-Protected 32kb EEPROM",
85 0x3B: "DS1825 Temperature Sensor",
86 0x42: "DS28EA00 Temperature Sensor",
87}
90@dataclass
91class OneWireTimings:
92 """1-Wire protocol timing specifications in microseconds."""
94 # Standard speed timings
95 reset_min: float = 480.0
96 reset_max: float = 960.0
97 presence_min: float = 60.0
98 presence_max: float = 240.0
99 slot_min: float = 60.0
100 slot_max: float = 120.0
101 write_0_low_min: float = 60.0
102 write_0_low_max: float = 120.0
103 write_1_low_min: float = 1.0
104 write_1_low_max: float = 15.0
105 read_sample_time: float = 15.0
107 @classmethod
108 def overdrive(cls) -> OneWireTimings:
109 """Return overdrive mode timings (approximately 10x faster)."""
110 return cls(
111 reset_min=48.0,
112 reset_max=80.0,
113 presence_min=8.0,
114 presence_max=24.0,
115 slot_min=6.0,
116 slot_max=16.0,
117 write_0_low_min=6.0,
118 write_0_low_max=16.0,
119 write_1_low_min=1.0,
120 write_1_low_max=2.0,
121 read_sample_time=1.0,
122 )
125@dataclass
126class OneWireROMID:
127 """1-Wire 64-bit ROM ID structure."""
129 family_code: int # 8 bits
130 serial_number: bytes # 48 bits (6 bytes)
131 crc: int # 8 bits
133 @classmethod
134 def from_bytes(cls, data: bytes) -> OneWireROMID:
135 """Parse ROM ID from 8 bytes (LSB first)."""
136 if len(data) < 8:
137 raise ValueError("ROM ID requires 8 bytes")
139 family_code = data[0]
140 serial_number = data[1:7]
141 crc = data[7]
143 return cls(family_code=family_code, serial_number=serial_number, crc=crc)
145 @property
146 def family_name(self) -> str:
147 """Get human-readable family name."""
148 return FAMILY_CODES.get(self.family_code, f"Unknown (0x{self.family_code:02X})")
150 def to_hex(self) -> str:
151 """Return ROM ID as hex string."""
152 return f"{self.family_code:02X}-{self.serial_number.hex().upper()}-{self.crc:02X}"
154 def verify_crc(self) -> bool:
155 """Verify CRC-8 of ROM ID."""
156 data = bytes([self.family_code]) + self.serial_number
157 return _crc8_maxim(data) == self.crc
160def _crc8_maxim(data: bytes) -> int:
161 """Calculate CRC-8/MAXIM (Dallas 1-Wire CRC).
163 Polynomial: x^8 + x^5 + x^4 + 1 (0x31 reflected = 0x8C)
165 Args:
166 data: Input bytes to calculate CRC over
168 Returns:
169 8-bit CRC value
170 """
171 crc = 0
172 for byte in data:
173 crc ^= byte
174 for _ in range(8):
175 if crc & 0x01:
176 crc = (crc >> 1) ^ 0x8C
177 else:
178 crc >>= 1
179 return crc
182class OneWireDecoder(AsyncDecoder):
183 """Dallas/Maxim 1-Wire protocol decoder.
185 Decodes 1-Wire bus communication including reset/presence,
186 ROM commands, and data transfers. Supports both standard
187 and overdrive speeds.
189 Attributes:
190 id: "onewire"
191 name: "1-Wire"
192 channels: [data] (required)
194 Example:
195 >>> decoder = OneWireDecoder(mode="standard")
196 >>> for packet in decoder.decode(trace):
197 ... if packet.annotations.get('rom_id'):
198 ... print(f"Device: {packet.annotations['rom_id']}")
199 """
201 id = "onewire"
202 name = "1-Wire"
203 longname = "Dallas/Maxim 1-Wire Protocol"
204 desc = "1-Wire bus decoder with ROM ID extraction"
206 channels = [ # noqa: RUF012
207 ChannelDef("data", "DQ", "1-Wire data line", required=True),
208 ]
210 optional_channels = [] # noqa: RUF012
212 options = [ # noqa: RUF012
213 OptionDef(
214 "mode",
215 "Speed mode",
216 "Standard or overdrive",
217 default="standard",
218 values=["standard", "overdrive"],
219 ),
220 OptionDef(
221 "threshold",
222 "Voltage threshold",
223 "Logic threshold in volts (auto for midpoint)",
224 default="auto",
225 values=None,
226 ),
227 ]
229 annotations = [ # noqa: RUF012
230 ("reset", "Reset pulse"),
231 ("presence", "Presence pulse"),
232 ("bit", "Data bit"),
233 ("byte", "Decoded byte"),
234 ("rom_cmd", "ROM command"),
235 ("rom_id", "ROM ID"),
236 ("error", "Protocol error"),
237 ]
239 def __init__(
240 self,
241 mode: str = "standard",
242 threshold: str | float = "auto",
243 ) -> None:
244 """Initialize 1-Wire decoder.
246 Args:
247 mode: Speed mode ("standard" or "overdrive").
248 threshold: Logic threshold voltage or "auto".
249 """
250 super().__init__(mode=mode, threshold=threshold)
251 self._mode = OneWireMode(mode)
252 self._threshold = threshold
253 self._timings = (
254 OneWireTimings() if self._mode == OneWireMode.STANDARD else OneWireTimings.overdrive()
255 )
257 def decode(
258 self,
259 trace: DigitalTrace | WaveformTrace,
260 **channels: NDArray[np.bool_],
261 ) -> Iterator[ProtocolPacket]:
262 """Decode 1-Wire protocol data.
264 Args:
265 trace: Input trace (digital or analog).
266 **channels: Additional channel data.
268 Yields:
269 Decoded data as ProtocolPacket objects.
271 Example:
272 >>> decoder = OneWireDecoder()
273 >>> for packet in decoder.decode(trace):
274 ... print(f"Command: {packet.annotations.get('rom_command')}")
275 """
276 # Convert to digital if needed
277 if isinstance(trace, WaveformTrace): 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true
278 from tracekit.analyzers.digital.extraction import to_digital
280 threshold = self._threshold if self._threshold != "auto" else "auto"
281 digital_trace = to_digital(trace, threshold=threshold) # type: ignore[arg-type]
282 else:
283 digital_trace = trace
285 data = digital_trace.data.astype(bool)
286 sample_rate = digital_trace.metadata.sample_rate
288 # Convert timing specs to samples
289 us_to_samples = sample_rate / 1_000_000
291 # Find all falling and rising edges
292 falling = np.where((data[:-1]) & (~data[1:]))[0]
293 rising = np.where((~data[:-1]) & (data[1:]))[0]
295 if len(falling) == 0:
296 return
298 # State machine for decoding
299 decoded_bytes: list[int] = []
300 current_bits: list[int] = []
301 rom_id: OneWireROMID | None = None
302 rom_command: int | None = None
303 errors: list[str] = []
304 transaction_start: float = falling[0] / sample_rate
306 i = 0
307 while i < len(falling):
308 fall_idx = falling[i]
309 fall_time = fall_idx / sample_rate
311 # Find corresponding rising edge
312 rise_candidates = rising[rising > fall_idx]
313 if len(rise_candidates) == 0: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 break
316 rise_idx = rise_candidates[0]
317 low_duration_us = (rise_idx - fall_idx) / us_to_samples
319 # Check for reset pulse
320 if low_duration_us >= self._timings.reset_min * 0.8:
321 # This is a reset pulse
322 if decoded_bytes: 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true
323 # Yield previous transaction
324 annotations = self._build_annotations(decoded_bytes, rom_command, rom_id)
325 yield ProtocolPacket(
326 timestamp=transaction_start,
327 protocol="1-wire",
328 data=bytes(decoded_bytes),
329 annotations=annotations,
330 errors=errors.copy() if errors else None, # type: ignore[arg-type]
331 )
333 # Start new transaction
334 transaction_start = fall_time
335 decoded_bytes = []
336 current_bits = []
337 rom_id = None
338 rom_command = None
339 errors = []
341 self.put_annotation(
342 fall_time,
343 rise_idx / sample_rate,
344 AnnotationLevel.BITS,
345 "Reset",
346 )
348 # Look for presence pulse (pulled low by slave)
349 next_falls = falling[falling > rise_idx]
350 if len(next_falls) > 0: 350 ↛ 370line 350 didn't jump to line 370 because the condition on line 350 was always true
351 next_fall = next_falls[0]
352 wait_time_us = (next_fall - rise_idx) / us_to_samples
353 if wait_time_us < self._timings.presence_max * 2: 353 ↛ 370line 353 didn't jump to line 370 because the condition on line 353 was always true
354 # Found presence response
355 next_rises = rising[rising > next_fall]
356 if len(next_rises) > 0: 356 ↛ 370line 356 didn't jump to line 370 because the condition on line 356 was always true
357 presence_end = next_rises[0]
358 presence_us = (presence_end - next_fall) / us_to_samples
359 if ( 359 ↛ 370line 359 didn't jump to line 370 because the condition on line 359 was always true
360 self._timings.presence_min * 0.5
361 <= presence_us
362 <= self._timings.presence_max * 1.5
363 ):
364 self.put_annotation(
365 next_fall / sample_rate,
366 presence_end / sample_rate,
367 AnnotationLevel.BITS,
368 "Presence",
369 )
370 i += 1
371 continue
373 # Data bit - determine if 0 or 1
374 # Short low pulse = write 1 or read 1
375 # Long low pulse = write 0 or read 0
376 if low_duration_us < self._timings.write_1_low_max * 2: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 bit = 1
378 elif low_duration_us >= self._timings.write_0_low_min * 0.5: 378 ↛ 382line 378 didn't jump to line 382 because the condition on line 378 was always true
379 bit = 0
380 else:
381 # Ambiguous timing
382 bit = 1 if low_duration_us < self._timings.slot_min * 0.5 else 0
384 current_bits.append(bit)
386 # Assemble bytes (LSB first)
387 if len(current_bits) == 8: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 byte_val = sum(b << i for i, b in enumerate(current_bits))
389 decoded_bytes.append(byte_val)
390 current_bits = []
392 # Check for ROM command (first byte after reset)
393 if len(decoded_bytes) == 1:
394 rom_command = byte_val
395 cmd_name = ROM_COMMAND_NAMES.get(byte_val, f"Unknown (0x{byte_val:02X})")
396 self.put_annotation(
397 fall_time,
398 rise_idx / sample_rate,
399 AnnotationLevel.BYTES,
400 f"ROM Cmd: {cmd_name}",
401 )
403 # Check for ROM ID (8 bytes after ROM command)
404 if len(decoded_bytes) == 9 and rom_command in (
405 OneWireROMCommand.READ_ROM.value,
406 OneWireROMCommand.MATCH_ROM.value,
407 ):
408 try:
409 rom_id = OneWireROMID.from_bytes(bytes(decoded_bytes[1:9]))
410 if not rom_id.verify_crc():
411 errors.append("ROM ID CRC error")
412 self.put_annotation(
413 transaction_start,
414 rise_idx / sample_rate,
415 AnnotationLevel.BYTES,
416 f"ROM: {rom_id.to_hex()}",
417 )
418 except ValueError as e:
419 errors.append(f"ROM parse error: {e}")
421 i += 1
423 # Yield final transaction if any
424 if decoded_bytes: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 annotations = self._build_annotations(decoded_bytes, rom_command, rom_id)
426 yield ProtocolPacket(
427 timestamp=transaction_start,
428 protocol="1-wire",
429 data=bytes(decoded_bytes),
430 annotations=annotations,
431 errors=errors if errors else None, # type: ignore[arg-type]
432 )
434 def _build_annotations(
435 self,
436 decoded_bytes: list[int],
437 rom_command: int | None,
438 rom_id: OneWireROMID | None,
439 ) -> dict: # type: ignore[type-arg]
440 """Build annotation dictionary for packet."""
441 annotations: dict = { # type: ignore[type-arg]
442 "mode": self._mode.value,
443 "byte_count": len(decoded_bytes),
444 }
446 if rom_command is not None:
447 annotations["rom_command"] = ROM_COMMAND_NAMES.get(rom_command, f"0x{rom_command:02X}")
448 annotations["rom_command_code"] = rom_command
450 if rom_id is not None:
451 annotations["rom_id"] = rom_id.to_hex()
452 annotations["family_code"] = rom_id.family_code
453 annotations["family_name"] = rom_id.family_name
454 annotations["serial_number"] = rom_id.serial_number.hex().upper()
455 annotations["crc_valid"] = rom_id.verify_crc()
457 return annotations
460def decode_onewire(
461 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace,
462 sample_rate: float = 1.0,
463 mode: str = "standard",
464) -> list[ProtocolPacket]:
465 """Convenience function to decode 1-Wire protocol data.
467 Args:
468 data: 1-Wire signal (digital array or trace).
469 sample_rate: Sample rate in Hz.
470 mode: Speed mode ("standard" or "overdrive").
472 Returns:
473 List of decoded packets.
475 Example:
476 >>> packets = decode_onewire(signal, sample_rate=1e6)
477 >>> for pkt in packets:
478 ... if 'rom_id' in pkt.annotations:
479 ... print(f"Device: {pkt.annotations['rom_id']}")
480 """
481 decoder = OneWireDecoder(mode=mode)
482 if isinstance(data, WaveformTrace | DigitalTrace):
483 return list(decoder.decode(data))
484 else:
485 trace = DigitalTrace(
486 data=data.astype(bool),
487 metadata=TraceMetadata(sample_rate=sample_rate),
488 )
489 return list(decoder.decode(trace))
492__all__ = [
493 "FAMILY_CODES",
494 "ROM_COMMAND_NAMES",
495 "OneWireDecoder",
496 "OneWireMode",
497 "OneWireROMCommand",
498 "OneWireROMID",
499 "OneWireTimings",
500 "decode_onewire",
501]