Coverage for src / tracekit / analyzers / protocols / onewire.py: 68%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Dallas/Maxim 1-Wire protocol decoder. 

2 

3This module provides a 1-Wire protocol decoder for temperature sensors, 

4EEPROMs, and other 1-Wire devices with ROM command decoding. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.onewire import OneWireDecoder 

9 >>> decoder = OneWireDecoder() 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"ROM: {packet.annotations['rom_id']}") 

12 

13References: 

14 Dallas/Maxim 1-Wire Protocol 

15 DS18B20 Datasheet 

16""" 

17 

18from __future__ import annotations 

19 

20from dataclasses import dataclass 

21from enum import Enum 

22from typing import TYPE_CHECKING 

23 

24import numpy as np 

25 

26from tracekit.analyzers.protocols.base import ( 

27 AnnotationLevel, 

28 AsyncDecoder, 

29 ChannelDef, 

30 OptionDef, 

31) 

32from tracekit.core.types import ( 

33 DigitalTrace, 

34 ProtocolPacket, 

35 TraceMetadata, 

36 WaveformTrace, 

37) 

38 

39if TYPE_CHECKING: 

40 from collections.abc import Iterator 

41 

42 from numpy.typing import NDArray 

43 

44 

45class OneWireMode(Enum): 

46 """1-Wire speed modes.""" 

47 

48 STANDARD = "standard" # Standard speed (15.4 kbps) 

49 OVERDRIVE = "overdrive" # Overdrive speed (~142 kbps) 

50 

51 

52class OneWireROMCommand(Enum): 

53 """1-Wire ROM commands.""" 

54 

55 SEARCH_ROM = 0xF0 

56 READ_ROM = 0x33 

57 MATCH_ROM = 0x55 

58 SKIP_ROM = 0xCC 

59 ALARM_SEARCH = 0xEC 

60 OVERDRIVE_SKIP = 0x3C 

61 OVERDRIVE_MATCH = 0x69 

62 

63 

64ROM_COMMAND_NAMES = { 

65 0xF0: "Search ROM", 

66 0x33: "Read ROM", 

67 0x55: "Match ROM", 

68 0xCC: "Skip ROM", 

69 0xEC: "Alarm Search", 

70 0x3C: "Overdrive Skip ROM", 

71 0x69: "Overdrive Match ROM", 

72} 

73 

74# 1-Wire Family Codes 

75FAMILY_CODES = { 

76 0x01: "DS1990A/DS2401 Silicon Serial Number", 

77 0x10: "DS18S20/DS1820 Temperature Sensor", 

78 0x14: "DS2430A 1kb EEPROM", 

79 0x22: "DS1822 Econo Temperature Sensor", 

80 0x23: "DS2433 4kb EEPROM", 

81 0x28: "DS18B20 Temperature Sensor", 

82 0x29: "DS2408 8-Channel Addressable Switch", 

83 0x2D: "DS2431 1kb EEPROM", 

84 0x37: "DS1977 Password-Protected 32kb EEPROM", 

85 0x3B: "DS1825 Temperature Sensor", 

86 0x42: "DS28EA00 Temperature Sensor", 

87} 

88 

89 

90@dataclass 

91class OneWireTimings: 

92 """1-Wire protocol timing specifications in microseconds.""" 

93 

94 # Standard speed timings 

95 reset_min: float = 480.0 

96 reset_max: float = 960.0 

97 presence_min: float = 60.0 

98 presence_max: float = 240.0 

99 slot_min: float = 60.0 

100 slot_max: float = 120.0 

101 write_0_low_min: float = 60.0 

102 write_0_low_max: float = 120.0 

103 write_1_low_min: float = 1.0 

104 write_1_low_max: float = 15.0 

105 read_sample_time: float = 15.0 

106 

107 @classmethod 

108 def overdrive(cls) -> OneWireTimings: 

109 """Return overdrive mode timings (approximately 10x faster).""" 

110 return cls( 

111 reset_min=48.0, 

112 reset_max=80.0, 

113 presence_min=8.0, 

114 presence_max=24.0, 

115 slot_min=6.0, 

116 slot_max=16.0, 

117 write_0_low_min=6.0, 

118 write_0_low_max=16.0, 

119 write_1_low_min=1.0, 

120 write_1_low_max=2.0, 

121 read_sample_time=1.0, 

122 ) 

123 

124 

125@dataclass 

126class OneWireROMID: 

127 """1-Wire 64-bit ROM ID structure.""" 

128 

129 family_code: int # 8 bits 

130 serial_number: bytes # 48 bits (6 bytes) 

131 crc: int # 8 bits 

132 

133 @classmethod 

134 def from_bytes(cls, data: bytes) -> OneWireROMID: 

135 """Parse ROM ID from 8 bytes (LSB first).""" 

136 if len(data) < 8: 

137 raise ValueError("ROM ID requires 8 bytes") 

138 

139 family_code = data[0] 

140 serial_number = data[1:7] 

141 crc = data[7] 

142 

143 return cls(family_code=family_code, serial_number=serial_number, crc=crc) 

144 

145 @property 

146 def family_name(self) -> str: 

147 """Get human-readable family name.""" 

148 return FAMILY_CODES.get(self.family_code, f"Unknown (0x{self.family_code:02X})") 

149 

150 def to_hex(self) -> str: 

151 """Return ROM ID as hex string.""" 

152 return f"{self.family_code:02X}-{self.serial_number.hex().upper()}-{self.crc:02X}" 

153 

154 def verify_crc(self) -> bool: 

155 """Verify CRC-8 of ROM ID.""" 

156 data = bytes([self.family_code]) + self.serial_number 

157 return _crc8_maxim(data) == self.crc 

158 

159 

160def _crc8_maxim(data: bytes) -> int: 

161 """Calculate CRC-8/MAXIM (Dallas 1-Wire CRC). 

162 

163 Polynomial: x^8 + x^5 + x^4 + 1 (0x31 reflected = 0x8C) 

164 

165 Args: 

166 data: Input bytes to calculate CRC over 

167 

168 Returns: 

169 8-bit CRC value 

170 """ 

171 crc = 0 

172 for byte in data: 

173 crc ^= byte 

174 for _ in range(8): 

175 if crc & 0x01: 

176 crc = (crc >> 1) ^ 0x8C 

177 else: 

178 crc >>= 1 

179 return crc 

180 

181 

182class OneWireDecoder(AsyncDecoder): 

183 """Dallas/Maxim 1-Wire protocol decoder. 

184 

185 Decodes 1-Wire bus communication including reset/presence, 

186 ROM commands, and data transfers. Supports both standard 

187 and overdrive speeds. 

188 

189 Attributes: 

190 id: "onewire" 

191 name: "1-Wire" 

192 channels: [data] (required) 

193 

194 Example: 

195 >>> decoder = OneWireDecoder(mode="standard") 

196 >>> for packet in decoder.decode(trace): 

197 ... if packet.annotations.get('rom_id'): 

198 ... print(f"Device: {packet.annotations['rom_id']}") 

199 """ 

200 

201 id = "onewire" 

202 name = "1-Wire" 

203 longname = "Dallas/Maxim 1-Wire Protocol" 

204 desc = "1-Wire bus decoder with ROM ID extraction" 

205 

206 channels = [ # noqa: RUF012 

207 ChannelDef("data", "DQ", "1-Wire data line", required=True), 

208 ] 

209 

210 optional_channels = [] # noqa: RUF012 

211 

212 options = [ # noqa: RUF012 

213 OptionDef( 

214 "mode", 

215 "Speed mode", 

216 "Standard or overdrive", 

217 default="standard", 

218 values=["standard", "overdrive"], 

219 ), 

220 OptionDef( 

221 "threshold", 

222 "Voltage threshold", 

223 "Logic threshold in volts (auto for midpoint)", 

224 default="auto", 

225 values=None, 

226 ), 

227 ] 

228 

229 annotations = [ # noqa: RUF012 

230 ("reset", "Reset pulse"), 

231 ("presence", "Presence pulse"), 

232 ("bit", "Data bit"), 

233 ("byte", "Decoded byte"), 

234 ("rom_cmd", "ROM command"), 

235 ("rom_id", "ROM ID"), 

236 ("error", "Protocol error"), 

237 ] 

238 

239 def __init__( 

240 self, 

241 mode: str = "standard", 

242 threshold: str | float = "auto", 

243 ) -> None: 

244 """Initialize 1-Wire decoder. 

245 

246 Args: 

247 mode: Speed mode ("standard" or "overdrive"). 

248 threshold: Logic threshold voltage or "auto". 

249 """ 

250 super().__init__(mode=mode, threshold=threshold) 

251 self._mode = OneWireMode(mode) 

252 self._threshold = threshold 

253 self._timings = ( 

254 OneWireTimings() if self._mode == OneWireMode.STANDARD else OneWireTimings.overdrive() 

255 ) 

256 

257 def decode( 

258 self, 

259 trace: DigitalTrace | WaveformTrace, 

260 **channels: NDArray[np.bool_], 

261 ) -> Iterator[ProtocolPacket]: 

262 """Decode 1-Wire protocol data. 

263 

264 Args: 

265 trace: Input trace (digital or analog). 

266 **channels: Additional channel data. 

267 

268 Yields: 

269 Decoded data as ProtocolPacket objects. 

270 

271 Example: 

272 >>> decoder = OneWireDecoder() 

273 >>> for packet in decoder.decode(trace): 

274 ... print(f"Command: {packet.annotations.get('rom_command')}") 

275 """ 

276 # Convert to digital if needed 

277 if isinstance(trace, WaveformTrace): 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true

278 from tracekit.analyzers.digital.extraction import to_digital 

279 

280 threshold = self._threshold if self._threshold != "auto" else "auto" 

281 digital_trace = to_digital(trace, threshold=threshold) # type: ignore[arg-type] 

282 else: 

283 digital_trace = trace 

284 

285 data = digital_trace.data.astype(bool) 

286 sample_rate = digital_trace.metadata.sample_rate 

287 

288 # Convert timing specs to samples 

289 us_to_samples = sample_rate / 1_000_000 

290 

291 # Find all falling and rising edges 

292 falling = np.where((data[:-1]) & (~data[1:]))[0] 

293 rising = np.where((~data[:-1]) & (data[1:]))[0] 

294 

295 if len(falling) == 0: 

296 return 

297 

298 # State machine for decoding 

299 decoded_bytes: list[int] = [] 

300 current_bits: list[int] = [] 

301 rom_id: OneWireROMID | None = None 

302 rom_command: int | None = None 

303 errors: list[str] = [] 

304 transaction_start: float = falling[0] / sample_rate 

305 

306 i = 0 

307 while i < len(falling): 

308 fall_idx = falling[i] 

309 fall_time = fall_idx / sample_rate 

310 

311 # Find corresponding rising edge 

312 rise_candidates = rising[rising > fall_idx] 

313 if len(rise_candidates) == 0: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 break 

315 

316 rise_idx = rise_candidates[0] 

317 low_duration_us = (rise_idx - fall_idx) / us_to_samples 

318 

319 # Check for reset pulse 

320 if low_duration_us >= self._timings.reset_min * 0.8: 

321 # This is a reset pulse 

322 if decoded_bytes: 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true

323 # Yield previous transaction 

324 annotations = self._build_annotations(decoded_bytes, rom_command, rom_id) 

325 yield ProtocolPacket( 

326 timestamp=transaction_start, 

327 protocol="1-wire", 

328 data=bytes(decoded_bytes), 

329 annotations=annotations, 

330 errors=errors.copy() if errors else None, # type: ignore[arg-type] 

331 ) 

332 

333 # Start new transaction 

334 transaction_start = fall_time 

335 decoded_bytes = [] 

336 current_bits = [] 

337 rom_id = None 

338 rom_command = None 

339 errors = [] 

340 

341 self.put_annotation( 

342 fall_time, 

343 rise_idx / sample_rate, 

344 AnnotationLevel.BITS, 

345 "Reset", 

346 ) 

347 

348 # Look for presence pulse (pulled low by slave) 

349 next_falls = falling[falling > rise_idx] 

350 if len(next_falls) > 0: 350 ↛ 370line 350 didn't jump to line 370 because the condition on line 350 was always true

351 next_fall = next_falls[0] 

352 wait_time_us = (next_fall - rise_idx) / us_to_samples 

353 if wait_time_us < self._timings.presence_max * 2: 353 ↛ 370line 353 didn't jump to line 370 because the condition on line 353 was always true

354 # Found presence response 

355 next_rises = rising[rising > next_fall] 

356 if len(next_rises) > 0: 356 ↛ 370line 356 didn't jump to line 370 because the condition on line 356 was always true

357 presence_end = next_rises[0] 

358 presence_us = (presence_end - next_fall) / us_to_samples 

359 if ( 359 ↛ 370line 359 didn't jump to line 370 because the condition on line 359 was always true

360 self._timings.presence_min * 0.5 

361 <= presence_us 

362 <= self._timings.presence_max * 1.5 

363 ): 

364 self.put_annotation( 

365 next_fall / sample_rate, 

366 presence_end / sample_rate, 

367 AnnotationLevel.BITS, 

368 "Presence", 

369 ) 

370 i += 1 

371 continue 

372 

373 # Data bit - determine if 0 or 1 

374 # Short low pulse = write 1 or read 1 

375 # Long low pulse = write 0 or read 0 

376 if low_duration_us < self._timings.write_1_low_max * 2: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 bit = 1 

378 elif low_duration_us >= self._timings.write_0_low_min * 0.5: 378 ↛ 382line 378 didn't jump to line 382 because the condition on line 378 was always true

379 bit = 0 

380 else: 

381 # Ambiguous timing 

382 bit = 1 if low_duration_us < self._timings.slot_min * 0.5 else 0 

383 

384 current_bits.append(bit) 

385 

386 # Assemble bytes (LSB first) 

387 if len(current_bits) == 8: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 byte_val = sum(b << i for i, b in enumerate(current_bits)) 

389 decoded_bytes.append(byte_val) 

390 current_bits = [] 

391 

392 # Check for ROM command (first byte after reset) 

393 if len(decoded_bytes) == 1: 

394 rom_command = byte_val 

395 cmd_name = ROM_COMMAND_NAMES.get(byte_val, f"Unknown (0x{byte_val:02X})") 

396 self.put_annotation( 

397 fall_time, 

398 rise_idx / sample_rate, 

399 AnnotationLevel.BYTES, 

400 f"ROM Cmd: {cmd_name}", 

401 ) 

402 

403 # Check for ROM ID (8 bytes after ROM command) 

404 if len(decoded_bytes) == 9 and rom_command in ( 

405 OneWireROMCommand.READ_ROM.value, 

406 OneWireROMCommand.MATCH_ROM.value, 

407 ): 

408 try: 

409 rom_id = OneWireROMID.from_bytes(bytes(decoded_bytes[1:9])) 

410 if not rom_id.verify_crc(): 

411 errors.append("ROM ID CRC error") 

412 self.put_annotation( 

413 transaction_start, 

414 rise_idx / sample_rate, 

415 AnnotationLevel.BYTES, 

416 f"ROM: {rom_id.to_hex()}", 

417 ) 

418 except ValueError as e: 

419 errors.append(f"ROM parse error: {e}") 

420 

421 i += 1 

422 

423 # Yield final transaction if any 

424 if decoded_bytes: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 annotations = self._build_annotations(decoded_bytes, rom_command, rom_id) 

426 yield ProtocolPacket( 

427 timestamp=transaction_start, 

428 protocol="1-wire", 

429 data=bytes(decoded_bytes), 

430 annotations=annotations, 

431 errors=errors if errors else None, # type: ignore[arg-type] 

432 ) 

433 

434 def _build_annotations( 

435 self, 

436 decoded_bytes: list[int], 

437 rom_command: int | None, 

438 rom_id: OneWireROMID | None, 

439 ) -> dict: # type: ignore[type-arg] 

440 """Build annotation dictionary for packet.""" 

441 annotations: dict = { # type: ignore[type-arg] 

442 "mode": self._mode.value, 

443 "byte_count": len(decoded_bytes), 

444 } 

445 

446 if rom_command is not None: 

447 annotations["rom_command"] = ROM_COMMAND_NAMES.get(rom_command, f"0x{rom_command:02X}") 

448 annotations["rom_command_code"] = rom_command 

449 

450 if rom_id is not None: 

451 annotations["rom_id"] = rom_id.to_hex() 

452 annotations["family_code"] = rom_id.family_code 

453 annotations["family_name"] = rom_id.family_name 

454 annotations["serial_number"] = rom_id.serial_number.hex().upper() 

455 annotations["crc_valid"] = rom_id.verify_crc() 

456 

457 return annotations 

458 

459 

460def decode_onewire( 

461 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace, 

462 sample_rate: float = 1.0, 

463 mode: str = "standard", 

464) -> list[ProtocolPacket]: 

465 """Convenience function to decode 1-Wire protocol data. 

466 

467 Args: 

468 data: 1-Wire signal (digital array or trace). 

469 sample_rate: Sample rate in Hz. 

470 mode: Speed mode ("standard" or "overdrive"). 

471 

472 Returns: 

473 List of decoded packets. 

474 

475 Example: 

476 >>> packets = decode_onewire(signal, sample_rate=1e6) 

477 >>> for pkt in packets: 

478 ... if 'rom_id' in pkt.annotations: 

479 ... print(f"Device: {pkt.annotations['rom_id']}") 

480 """ 

481 decoder = OneWireDecoder(mode=mode) 

482 if isinstance(data, WaveformTrace | DigitalTrace): 

483 return list(decoder.decode(data)) 

484 else: 

485 trace = DigitalTrace( 

486 data=data.astype(bool), 

487 metadata=TraceMetadata(sample_rate=sample_rate), 

488 ) 

489 return list(decoder.decode(trace)) 

490 

491 

492__all__ = [ 

493 "FAMILY_CODES", 

494 "ROM_COMMAND_NAMES", 

495 "OneWireDecoder", 

496 "OneWireMode", 

497 "OneWireROMCommand", 

498 "OneWireROMID", 

499 "OneWireTimings", 

500 "decode_onewire", 

501]