Coverage for src / tracekit / analyzers / digital / extraction.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Digital signal extraction and edge detection. 

2 

3This module provides functions for extracting digital signals from 

4analog waveforms and detecting edge transitions. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.digital import to_digital, detect_edges 

9 >>> digital = to_digital(analog_trace, threshold=1.4) 

10 >>> edges = detect_edges(digital, edge_type="rising") 

11""" 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING, Any, Literal 

16 

17import numpy as np 

18 

19from tracekit.core.exceptions import InsufficientDataError 

20from tracekit.core.types import DigitalTrace, WaveformTrace 

21 

22if TYPE_CHECKING: 

23 from numpy.typing import NDArray 

24 

25# Standard logic family threshold constants 

26# Reference: Various IC manufacturer datasheets 

27LOGIC_FAMILIES: dict[str, dict[str, float]] = { 

28 "TTL": { 

29 "VIL_max": 0.8, # Maximum input low voltage 

30 "VIH_min": 2.0, # Minimum input high voltage 

31 "VOL_max": 0.4, # Maximum output low voltage 

32 "VOH_min": 2.4, # Minimum output high voltage 

33 "VCC": 5.0, 

34 }, 

35 "CMOS_5V": { 

36 "VIL_max": 1.5, 

37 "VIH_min": 3.5, 

38 "VOL_max": 0.1, 

39 "VOH_min": 4.9, 

40 "VCC": 5.0, 

41 }, 

42 "LVTTL": { 

43 "VIL_max": 0.8, 

44 "VIH_min": 1.5, 

45 "VOL_max": 0.4, 

46 "VOH_min": 2.4, 

47 "VCC": 3.3, 

48 }, 

49 "LVCMOS_3V3": { 

50 "VIL_max": 0.3 * 3.3, # 30% of VCC 

51 "VIH_min": 0.7 * 3.3, # 70% of VCC 

52 "VOL_max": 0.1, 

53 "VOH_min": 3.2, 

54 "VCC": 3.3, 

55 }, 

56 "LVCMOS_2V5": { 

57 "VIL_max": 0.3 * 2.5, 

58 "VIH_min": 0.7 * 2.5, 

59 "VOL_max": 0.1, 

60 "VOH_min": 2.4, 

61 "VCC": 2.5, 

62 }, 

63 "LVCMOS_1V8": { 

64 "VIL_max": 0.3 * 1.8, 

65 "VIH_min": 0.7 * 1.8, 

66 "VOL_max": 0.1, 

67 "VOH_min": 1.7, 

68 "VCC": 1.8, 

69 }, 

70 "LVCMOS_1V2": { 

71 "VIL_max": 0.3 * 1.2, 

72 "VIH_min": 0.7 * 1.2, 

73 "VOL_max": 0.1, 

74 "VOH_min": 1.1, 

75 "VCC": 1.2, 

76 }, 

77} 

78 

79 

80def to_digital( 

81 trace: WaveformTrace, 

82 *, 

83 threshold: float | Literal["auto"] = "auto", 

84 hysteresis: float | tuple[float, float] | None = None, 

85) -> DigitalTrace: 

86 """Extract digital signal from analog waveform. 

87 

88 Converts an analog waveform to a digital (boolean) signal using 

89 threshold comparison. 

90 

91 Args: 

92 trace: Input analog waveform trace. 

93 threshold: Voltage threshold for conversion. Can be: 

94 - A float value for fixed threshold 

95 - "auto" for adaptive threshold (midpoint of 10th-90th percentile) 

96 hysteresis: Hysteresis for noise immunity. Can be: 

97 - None: No hysteresis 

98 - A float: Symmetric hysteresis band around threshold 

99 - A tuple (low, high): Explicit low and high thresholds 

100 

101 Returns: 

102 DigitalTrace with boolean data and detected edges. 

103 

104 Raises: 

105 InsufficientDataError: If trace has insufficient data. 

106 

107 Example: 

108 >>> digital = to_digital(analog_trace, threshold=1.4) 

109 >>> print(f"High samples: {digital.data.sum()}") 

110 

111 >>> # With hysteresis for noisy signals 

112 >>> digital = to_digital(analog_trace, threshold=1.4, hysteresis=0.2) 

113 

114 References: 

115 TTL Logic thresholds: VIL_max=0.8V, VIH_min=2.0V 

116 """ 

117 if len(trace.data) < 2: 

118 raise InsufficientDataError( 

119 "Trace too short for digital extraction", 

120 required=2, 

121 available=len(trace.data), 

122 analysis_type="digital_extraction", 

123 ) 

124 

125 # Convert memoryview to ndarray if needed 

126 data = np.asarray(trace.data) 

127 

128 # Determine threshold 

129 if threshold == "auto": 

130 # Adaptive threshold: midpoint of 10th-90th percentile 

131 p10, p90 = np.percentile(data, [10, 90]) 

132 thresh_value = (p10 + p90) / 2.0 

133 else: 

134 thresh_value = float(threshold) 

135 

136 # Apply threshold with or without hysteresis 

137 if hysteresis is not None: 

138 if isinstance(hysteresis, tuple): 

139 thresh_low, thresh_high = hysteresis 

140 else: 

141 thresh_low = thresh_value - hysteresis / 2 

142 thresh_high = thresh_value + hysteresis / 2 

143 digital_data = _apply_hysteresis(data, thresh_low, thresh_high) 

144 else: 

145 digital_data = data >= thresh_value 

146 

147 # Detect edges 

148 edges = _detect_edges_internal(data, digital_data, trace.metadata.sample_rate, thresh_value) 

149 

150 return DigitalTrace( 

151 data=digital_data, 

152 metadata=trace.metadata, 

153 edges=edges, 

154 ) 

155 

156 

157def _apply_hysteresis( 

158 data: NDArray[np.floating[Any]], 

159 thresh_low: float, 

160 thresh_high: float, 

161) -> NDArray[np.bool_]: 

162 """Apply Schmitt trigger-style hysteresis thresholding. 

163 

164 Args: 

165 data: Input analog data. 

166 thresh_low: Lower threshold (switch to low when below). 

167 thresh_high: Upper threshold (switch to high when above). 

168 

169 Returns: 

170 Boolean array with hysteresis applied. 

171 """ 

172 result = np.zeros(len(data), dtype=np.bool_) 

173 

174 # Initial state based on first sample 

175 state = data[0] >= (thresh_low + thresh_high) / 2 

176 

177 for i, value in enumerate(data): 

178 if state: 

179 # Currently high, switch low if below thresh_low 

180 if value < thresh_low: 

181 state = False 

182 # Currently low, switch high if above thresh_high 

183 elif value >= thresh_high: 

184 state = True 

185 result[i] = state 

186 

187 return result 

188 

189 

190def detect_edges( 

191 trace: WaveformTrace | DigitalTrace, 

192 *, 

193 edge_type: Literal["rising", "falling", "both"] = "both", 

194 threshold: float | Literal["auto"] = "auto", 

195) -> NDArray[np.float64]: 

196 """Detect edge transitions in a signal. 

197 

198 Finds rising and/or falling edges with sub-sample timestamp 

199 interpolation for improved accuracy. 

200 

201 Args: 

202 trace: Input waveform (analog or digital). 

203 edge_type: Type of edges to detect: 

204 - "rising": Low-to-high transitions 

205 - "falling": High-to-low transitions 

206 - "both": All transitions 

207 threshold: Threshold for edge detection (only for analog traces). 

208 

209 Returns: 

210 Array of edge timestamps in seconds. 

211 

212 Raises: 

213 InsufficientDataError: If trace has insufficient data. 

214 

215 Example: 

216 >>> edges = detect_edges(trace, edge_type="rising") 

217 >>> print(f"Found {len(edges)} rising edges") 

218 """ 

219 if len(trace.data) < 2: 

220 raise InsufficientDataError( 

221 "Trace too short for edge detection", 

222 required=2, 

223 available=len(trace.data), 

224 analysis_type="edge_detection", 

225 ) 

226 

227 # Convert to digital if analog 

228 digital = to_digital(trace, threshold=threshold) if isinstance(trace, WaveformTrace) else trace 

229 

230 # Find transitions - ensure we have a numpy array 

231 data = np.asarray(digital.data) 

232 

233 transitions = np.diff(data.astype(np.int8)) 

234 

235 # Get edge indices 

236 if edge_type == "rising": 

237 edge_indices = np.where(transitions == 1)[0] 

238 elif edge_type == "falling": 

239 edge_indices = np.where(transitions == -1)[0] 

240 else: # both 

241 edge_indices = np.where(transitions != 0)[0] 

242 

243 # Convert indices to timestamps 

244 sample_period = digital.metadata.time_base 

245 timestamps = edge_indices.astype(np.float64) * sample_period 

246 

247 # Sub-sample interpolation for analog traces 

248 if isinstance(trace, WaveformTrace) and threshold != "auto": 

249 thresh_value = float(threshold) 

250 timestamps = _interpolate_edges(trace.data, edge_indices, sample_period, thresh_value) 

251 

252 return timestamps 

253 

254 

255def _detect_edges_internal( 

256 analog_data: NDArray[np.floating[Any]], 

257 digital_data: NDArray[np.bool_], 

258 sample_rate: float, 

259 threshold: float, 

260) -> list[tuple[float, bool]]: 

261 """Detect edges and return as (timestamp, is_rising) tuples. 

262 

263 Args: 

264 analog_data: Original analog data for interpolation. 

265 digital_data: Thresholded digital data. 

266 sample_rate: Sample rate in Hz. 

267 threshold: Threshold used for conversion. 

268 

269 Returns: 

270 List of (timestamp, is_rising) tuples. 

271 """ 

272 sample_period = 1.0 / sample_rate 

273 transitions = np.diff(digital_data.astype(np.int8)) 

274 

275 edges: list[tuple[float, bool]] = [] 

276 

277 # Rising edges 

278 rising_indices = np.where(transitions == 1)[0] 

279 for idx in rising_indices: 

280 # Sub-sample interpolation 

281 if 0 < idx < len(analog_data) - 1: 

282 t = _interpolate_crossing( 

283 analog_data[idx], analog_data[idx + 1], threshold, sample_period 

284 ) 

285 timestamp = idx * sample_period + t 

286 else: 

287 timestamp = idx * sample_period 

288 edges.append((timestamp, True)) 

289 

290 # Falling edges 

291 falling_indices = np.where(transitions == -1)[0] 

292 for idx in falling_indices: 

293 if 0 < idx < len(analog_data) - 1: 

294 t = _interpolate_crossing( 

295 analog_data[idx], analog_data[idx + 1], threshold, sample_period 

296 ) 

297 timestamp = idx * sample_period + t 

298 else: 

299 timestamp = idx * sample_period 

300 edges.append((timestamp, False)) 

301 

302 # Sort by timestamp 

303 edges.sort(key=lambda x: x[0]) 

304 

305 return edges 

306 

307 

308def _interpolate_edges( 

309 data: NDArray[np.floating[Any]], 

310 edge_indices: NDArray[np.intp], 

311 sample_period: float, 

312 threshold: float, 

313) -> NDArray[np.float64]: 

314 """Interpolate edge timestamps for sub-sample accuracy. 

315 

316 Uses linear interpolation between samples to estimate the 

317 exact crossing point. 

318 

319 Args: 

320 data: Analog data array. 

321 edge_indices: Indices of detected edges. 

322 sample_period: Time between samples. 

323 threshold: Threshold level. 

324 

325 Returns: 

326 Array of interpolated timestamps. 

327 """ 

328 timestamps = np.zeros(len(edge_indices), dtype=np.float64) 

329 

330 for i, idx in enumerate(edge_indices): 

331 base_time = idx * sample_period 

332 

333 if 0 < idx < len(data) - 1: 

334 # Linear interpolation between samples 

335 t = _interpolate_crossing(data[idx], data[idx + 1], threshold, sample_period) 

336 timestamps[i] = base_time + t 

337 else: 

338 timestamps[i] = base_time 

339 

340 return timestamps 

341 

342 

343def _interpolate_crossing( 

344 v1: float, 

345 v2: float, 

346 threshold: float, 

347 sample_period: float, 

348) -> float: 

349 """Linearly interpolate threshold crossing time. 

350 

351 Args: 

352 v1: Voltage at sample before crossing. 

353 v2: Voltage at sample after crossing. 

354 threshold: Threshold level. 

355 sample_period: Time between samples. 

356 

357 Returns: 

358 Time offset from v1 to crossing point. 

359 """ 

360 dv = v2 - v1 

361 if abs(dv) < 1e-12: 

362 return sample_period / 2 # Midpoint if no change 

363 

364 # Linear interpolation: t = (threshold - v1) / (v2 - v1) * period 

365 t = (threshold - v1) / dv * sample_period 

366 return max(0.0, min(sample_period, t)) 

367 

368 

369def get_logic_threshold( 

370 family: str, 

371 threshold_type: Literal["midpoint", "VIH", "VIL"] = "midpoint", 

372) -> float: 

373 """Get threshold voltage for a logic family. 

374 

375 Args: 

376 family: Logic family name (e.g., "TTL", "LVCMOS_3V3"). 

377 threshold_type: Type of threshold: 

378 - "midpoint": Midpoint between VIL_max and VIH_min 

379 - "VIH": Minimum input high voltage 

380 - "VIL": Maximum input low voltage 

381 

382 Returns: 

383 Threshold voltage. 

384 

385 Raises: 

386 ValueError: If family or threshold_type is unknown. 

387 

388 Example: 

389 >>> get_logic_threshold("TTL", "midpoint") 

390 1.4 

391 """ 

392 if family not in LOGIC_FAMILIES: 

393 available = ", ".join(LOGIC_FAMILIES.keys()) 

394 raise ValueError(f"Unknown logic family: {family}. Available: {available}") 

395 

396 levels = LOGIC_FAMILIES[family] 

397 

398 if threshold_type == "midpoint": 

399 return (levels["VIL_max"] + levels["VIH_min"]) / 2 

400 elif threshold_type == "VIH": 

401 return levels["VIH_min"] 

402 elif threshold_type == "VIL": 

403 return levels["VIL_max"] 

404 else: 

405 raise ValueError(f"Unknown threshold_type: {threshold_type}") 

406 

407 

408__all__ = [ 

409 "LOGIC_FAMILIES", 

410 "detect_edges", 

411 "get_logic_threshold", 

412 "to_digital", 

413]