Coverage for src / tracekit / analyzers / protocols / swd.py: 89%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""SWD protocol decoder. 

2 

3This module provides ARM Serial Wire Debug (SWD) protocol decoding 

4with DP/AP access detection and ACK/WAIT/FAULT response handling. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.swd import SWDDecoder 

9 >>> decoder = SWDDecoder() 

10 >>> for packet in decoder.decode(swclk=swclk, swdio=swdio): 

11 ... print(f"Request: {packet.annotations['request_type']}") 

12 

13References: 

14 ARM Debug Interface Architecture Specification ADIv5.0 to ADIv5.2 

15""" 

16 

17from __future__ import annotations 

18 

19from enum import Enum 

20from typing import TYPE_CHECKING 

21 

22import numpy as np 

23 

24from tracekit.analyzers.protocols.base import ( 

25 AnnotationLevel, 

26 ChannelDef, 

27 SyncDecoder, 

28) 

29from tracekit.core.types import DigitalTrace, ProtocolPacket 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Iterator 

33 

34 from numpy.typing import NDArray 

35 

36 

37class SWDResponse(Enum): 

38 """SWD ACK responses.""" 

39 

40 OK = 0b001 

41 WAIT = 0b010 

42 FAULT = 0b100 

43 

44 

45class SWDDecoder(SyncDecoder): 

46 """SWD protocol decoder. 

47 

48 Decodes ARM Serial Wire Debug transactions including read/write 

49 operations to Debug Port (DP) and Access Port (AP) registers. 

50 

51 Attributes: 

52 id: "swd" 

53 name: "SWD" 

54 channels: [swclk, swdio] (required) 

55 

56 Example: 

57 >>> decoder = SWDDecoder() 

58 >>> for packet in decoder.decode(swclk=swclk, swdio=swdio, sample_rate=10e6): 

59 ... print(f"ACK: {packet.annotations['ack']}") 

60 """ 

61 

62 id = "swd" 

63 name = "SWD" 

64 longname = "Serial Wire Debug" 

65 desc = "ARM Serial Wire Debug protocol decoder" 

66 

67 channels = [ # noqa: RUF012 

68 ChannelDef("swclk", "SWCLK", "Serial Wire Clock", required=True), 

69 ChannelDef("swdio", "SWDIO", "Serial Wire Data I/O", required=True), 

70 ] 

71 

72 optional_channels = [] # noqa: RUF012 

73 

74 options = [] # noqa: RUF012 

75 

76 annotations = [ # noqa: RUF012 

77 ("request", "Request packet"), 

78 ("ack", "ACK response"), 

79 ("data", "Data phase"), 

80 ("parity", "Parity bit"), 

81 ("error", "Error"), 

82 ] 

83 

84 def __init__(self) -> None: 

85 """Initialize SWD decoder.""" 

86 super().__init__() 

87 

88 def decode( # type: ignore[override] 

89 self, 

90 trace: DigitalTrace | None = None, 

91 *, 

92 swclk: NDArray[np.bool_] | None = None, 

93 swdio: NDArray[np.bool_] | None = None, 

94 sample_rate: float = 1.0, 

95 ) -> Iterator[ProtocolPacket]: 

96 """Decode SWD transactions. 

97 

98 Args: 

99 trace: Optional primary trace. 

100 swclk: Serial Wire Clock signal. 

101 swdio: Serial Wire Data I/O signal. 

102 sample_rate: Sample rate in Hz. 

103 

104 Yields: 

105 Decoded SWD transactions as ProtocolPacket objects. 

106 

107 Example: 

108 >>> decoder = SWDDecoder() 

109 >>> for pkt in decoder.decode(swclk=swclk, swdio=swdio, sample_rate=1e6): 

110 ... print(f"R/W: {'Read' if pkt.annotations['read'] else 'Write'}") 

111 """ 

112 if swclk is None or swdio is None: 

113 return 

114 

115 n_samples = min(len(swclk), len(swdio)) 

116 swclk = swclk[:n_samples] 

117 swdio = swdio[:n_samples] 

118 

119 # Find rising edges of SWCLK (data sampled on rising edge) 

120 rising_edges = np.where(~swclk[:-1] & swclk[1:])[0] + 1 

121 

122 if len(rising_edges) == 0: 

123 return 

124 

125 trans_num = 0 

126 edge_idx = 0 

127 

128 while edge_idx < len(rising_edges): 

129 # Look for start bit (should be 1) 

130 start_idx = rising_edges[edge_idx] 

131 if not swdio[start_idx]: 

132 edge_idx += 1 

133 continue 

134 

135 # Parse request packet (8 bits total) 

136 # Bit 0: Start (1) 

137 # Bit 1: APnDP (0=DP, 1=AP) 

138 # Bit 2: RnW (0=Write, 1=Read) 

139 # Bits 3-4: A[2:3] (register address bits) 

140 # Bit 5: Parity (odd parity of bits 1-4) 

141 # Bit 6: Stop (0) 

142 # Bit 7: Park (1) 

143 

144 if edge_idx + 8 > len(rising_edges): 

145 break 

146 

147 request_bits = [] 

148 for i in range(8): 

149 bit_idx = rising_edges[edge_idx + i] 

150 request_bits.append(1 if swdio[bit_idx] else 0) 

151 

152 # Extract fields 

153 start_bit = request_bits[0] 

154 apndp = request_bits[1] 

155 rnw = request_bits[2] 

156 addr_2 = request_bits[3] 

157 addr_3 = request_bits[4] 

158 parity = request_bits[5] 

159 stop_bit = request_bits[6] 

160 park_bit = request_bits[7] 

161 

162 # Validate request format 

163 errors = [] 

164 if start_bit != 1: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 errors.append("Invalid start bit") 

166 if stop_bit != 0: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 errors.append("Invalid stop bit") 

168 if park_bit != 1: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 errors.append("Invalid park bit") 

170 

171 # Check parity (odd parity of APnDP, RnW, A[2:3]) 

172 expected_parity = (apndp + rnw + addr_2 + addr_3) % 2 

173 if parity != expected_parity: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 errors.append("Request parity error") 

175 

176 # Construct register address 

177 register_addr = (addr_3 << 3) | (addr_2 << 2) 

178 

179 edge_idx += 8 

180 

181 # Turnaround period (1 clock, host releases SWDIO) 

182 edge_idx += 1 

183 

184 # ACK response (3 bits from target) 

185 if edge_idx + 3 > len(rising_edges): 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 break 

187 

188 ack_bits = [] 

189 for i in range(3): 

190 bit_idx = rising_edges[edge_idx + i] 

191 ack_bits.append(1 if swdio[bit_idx] else 0) 

192 

193 ack_value = (ack_bits[2] << 2) | (ack_bits[1] << 1) | ack_bits[0] 

194 

195 # Decode ACK 

196 if ack_value == SWDResponse.OK.value: 

197 ack_str = "OK" 

198 elif ack_value == SWDResponse.WAIT.value: 

199 ack_str = "WAIT" 

200 errors.append("Target responded with WAIT") 

201 elif ack_value == SWDResponse.FAULT.value: 201 ↛ 205line 201 didn't jump to line 205 because the condition on line 201 was always true

202 ack_str = "FAULT" 

203 errors.append("Target responded with FAULT") 

204 else: 

205 ack_str = "INVALID" 

206 errors.append(f"Invalid ACK: 0b{ack_value:03b}") 

207 

208 edge_idx += 3 

209 

210 # If ACK is OK, there's a data phase 

211 data_value = 0 

212 if ack_value == SWDResponse.OK.value: 

213 # Turnaround (1 clock) 

214 edge_idx += 1 

215 

216 # Data phase (32 bits + 1 parity) 

217 if edge_idx + 33 > len(rising_edges): 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 break 

219 

220 data_bits = [] 

221 for i in range(32): 

222 bit_idx = rising_edges[edge_idx + i] 

223 data_bits.append(1 if swdio[bit_idx] else 0) 

224 

225 # Convert to value (LSB first) 

226 for i, bit in enumerate(data_bits): 

227 data_value |= bit << i 

228 

229 # Parity bit 

230 parity_idx = rising_edges[edge_idx + 32] 

231 data_parity = 1 if swdio[parity_idx] else 0 

232 

233 # Check data parity (odd parity) 

234 expected_data_parity = sum(data_bits) % 2 

235 if data_parity != expected_data_parity: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 errors.append("Data parity error") 

237 

238 edge_idx += 33 

239 

240 # Calculate timing 

241 start_time = start_idx / sample_rate 

242 end_time = rising_edges[min(edge_idx - 1, len(rising_edges) - 1)] / sample_rate 

243 

244 # Add annotations 

245 port_type = "AP" if apndp else "DP" 

246 access_type = "Read" if rnw else "Write" 

247 

248 self.put_annotation( 

249 start_time, 

250 end_time, 

251 AnnotationLevel.PACKETS, 

252 f"{port_type} {access_type} @ 0x{register_addr:02X}: {ack_str}", 

253 ) 

254 

255 # Create packet 

256 annotations = { 

257 "transaction_num": trans_num, 

258 "apndp": "AP" if apndp else "DP", 

259 "read": bool(rnw), 

260 "register_addr": register_addr, 

261 "ack": ack_str, 

262 "ack_value": ack_value, 

263 } 

264 

265 if ack_value == SWDResponse.OK.value: 

266 annotations["data"] = data_value 

267 

268 # Encode data as bytes (little-endian) 

269 data_bytes = ( 

270 data_value.to_bytes(4, "little") if ack_value == SWDResponse.OK.value else b"" 

271 ) 

272 

273 packet = ProtocolPacket( 

274 timestamp=start_time, 

275 protocol="swd", 

276 data=data_bytes, 

277 annotations=annotations, 

278 errors=errors, 

279 ) 

280 

281 yield packet 

282 

283 trans_num += 1 

284 

285 # Turnaround and idle 

286 edge_idx += 1 

287 

288 

289def decode_swd( 

290 swclk: NDArray[np.bool_], 

291 swdio: NDArray[np.bool_], 

292 sample_rate: float = 1.0, 

293) -> list[ProtocolPacket]: 

294 """Convenience function to decode SWD transactions. 

295 

296 Args: 

297 swclk: Serial Wire Clock signal. 

298 swdio: Serial Wire Data I/O signal. 

299 sample_rate: Sample rate in Hz. 

300 

301 Returns: 

302 List of decoded SWD transactions. 

303 

304 Example: 

305 >>> packets = decode_swd(swclk, swdio, sample_rate=10e6) 

306 >>> for pkt in packets: 

307 ... print(f"ACK: {pkt.annotations['ack']}") 

308 """ 

309 decoder = SWDDecoder() 

310 return list(decoder.decode(swclk=swclk, swdio=swdio, sample_rate=sample_rate)) 

311 

312 

313__all__ = ["SWDDecoder", "SWDResponse", "decode_swd"]