Coverage for src / tracekit / loaders / preprocessing.py: 70%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Idle and padding detection and removal. 

2 

3This module provides functions to detect and optionally remove idle regions, 

4padding, and non-data samples from loaded binary captures. 

5 

6 

7Example: 

8 >>> from tracekit.loaders.preprocessing import detect_idle_regions, trim_idle 

9 >>> regions = detect_idle_regions(trace, pattern='zeros', min_duration=100) 

10 >>> print(f"Found {len(regions)} idle regions") 

11 >>> trimmed_trace = trim_idle(trace, trim_start=True, trim_end=True) 

12 >>> print(f"Trimmed {len(trace.data) - len(trimmed_trace.data)} samples") 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18from dataclasses import dataclass 

19from typing import TYPE_CHECKING 

20 

21import numpy as np 

22 

23from tracekit.core.types import DigitalTrace, TraceMetadata 

24 

25if TYPE_CHECKING: 

26 from numpy.typing import NDArray 

27 

28# Logger for debug output 

29logger = logging.getLogger(__name__) 

30 

31 

32@dataclass 

33class IdleRegion: 

34 """Idle region in a trace. 

35 

36 

37 

38 Attributes: 

39 start: Start sample index. 

40 end: End sample index (exclusive). 

41 pattern: Detected idle pattern. 

42 duration_samples: Duration in samples. 

43 """ 

44 

45 start: int 

46 end: int 

47 pattern: str 

48 duration_samples: int 

49 

50 @property 

51 def length(self) -> int: 

52 """Get region length in samples. 

53 

54 Returns: 

55 Number of samples in region. 

56 """ 

57 return self.end - self.start 

58 

59 def get_duration_seconds(self, sample_rate: float) -> float: 

60 """Get region duration in seconds. 

61 

62 Args: 

63 sample_rate: Sample rate in Hz. 

64 

65 Returns: 

66 Duration in seconds. 

67 """ 

68 return self.length / sample_rate 

69 

70 

71@dataclass 

72class IdleStatistics: 

73 """Statistics about idle regions in a trace. 

74 

75 

76 

77 Attributes: 

78 total_samples: Total number of samples in trace. 

79 idle_samples: Total number of idle samples. 

80 active_samples: Total number of active samples. 

81 idle_regions: List of idle regions. 

82 dominant_pattern: Most common idle pattern. 

83 """ 

84 

85 total_samples: int 

86 idle_samples: int 

87 active_samples: int 

88 idle_regions: list[IdleRegion] 

89 dominant_pattern: str 

90 

91 @property 

92 def idle_fraction(self) -> float: 

93 """Fraction of trace that is idle. 

94 

95 Returns: 

96 Idle fraction (0.0 to 1.0). 

97 """ 

98 if self.total_samples == 0: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 return 0.0 

100 return self.idle_samples / self.total_samples 

101 

102 @property 

103 def active_fraction(self) -> float: 

104 """Fraction of trace that is active. 

105 

106 Returns: 

107 Active fraction (0.0 to 1.0). 

108 """ 

109 return 1.0 - self.idle_fraction 

110 

111 

112def detect_idle_regions( 

113 trace: DigitalTrace, 

114 pattern: str = "auto", 

115 min_duration: int = 100, 

116) -> list[IdleRegion]: 

117 """Detect idle regions in a digital trace. 

118 

119 

120 

121 Identifies regions where the signal is idle (constant pattern) for 

122 a minimum duration. Supports auto-detection and explicit patterns. 

123 

124 Args: 

125 trace: Digital trace to analyze. 

126 pattern: Idle pattern to detect ("auto", "zeros", "ones", or byte value). 

127 min_duration: Minimum duration in samples to consider as idle. 

128 

129 Returns: 

130 List of detected idle regions. 

131 

132 Example: 

133 >>> regions = detect_idle_regions(trace, pattern='zeros', min_duration=100) 

134 >>> for region in regions: 

135 ... print(f"Idle from {region.start} to {region.end}") 

136 """ 

137 data = trace.data 

138 

139 if len(data) < min_duration: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 return [] 

141 

142 idle_regions: list[IdleRegion] = [] 

143 

144 if pattern == "auto": 144 ↛ 146line 144 didn't jump to line 146 because the condition on line 144 was never true

145 # Auto-detect pattern from start/end of trace 

146 pattern = _auto_detect_pattern(data) 

147 logger.debug("Auto-detected idle pattern: %s", pattern) 

148 

149 # Detect idle runs 

150 if pattern == "zeros": 

151 idle_mask = ~data # Invert: True where data is False (zero) 

152 elif pattern == "ones": 152 ↛ 157line 152 didn't jump to line 157 because the condition on line 152 was always true

153 idle_mask = data # True where data is True (one) 

154 else: 

155 # For specific byte values, would need multi-bit comparison 

156 # For now, default to zeros 

157 logger.warning("Pattern '%s' not fully supported, using zeros", pattern) 

158 idle_mask = ~data 

159 

160 # Find runs of idle samples 

161 # Pad mask to detect transitions at boundaries 

162 padded = np.concatenate(([False], idle_mask, [False])) 

163 transitions = np.diff(padded.astype(np.int8)) 

164 

165 # Rising edges (start of idle region) 

166 starts = np.where(transitions == 1)[0] 

167 # Falling edges (end of idle region) 

168 ends = np.where(transitions == -1)[0] 

169 

170 # Filter by minimum duration 

171 for start, end in zip(starts, ends, strict=False): 

172 duration = end - start 

173 if duration >= min_duration: 173 ↛ 171line 173 didn't jump to line 171 because the condition on line 173 was always true

174 idle_regions.append( 

175 IdleRegion( 

176 start=int(start), 

177 end=int(end), 

178 pattern=pattern, 

179 duration_samples=int(duration), 

180 ) 

181 ) 

182 

183 logger.info( 

184 "Detected %d idle regions (pattern: %s, min_duration: %d)", 

185 len(idle_regions), 

186 pattern, 

187 min_duration, 

188 ) 

189 

190 return idle_regions 

191 

192 

193def _auto_detect_pattern(data: NDArray[np.bool_]) -> str: 

194 """Auto-detect idle pattern from trace data. 

195 

196 Looks at the start and end of the trace to determine the 

197 most likely idle pattern. 

198 

199 Args: 

200 data: Boolean trace data. 

201 

202 Returns: 

203 Detected pattern ("zeros", "ones", or "unknown"). 

204 """ 

205 if len(data) == 0: 

206 return "zeros" 

207 

208 # Check first and last 100 samples (or 10% of trace, whichever is smaller) 

209 check_len = min(100, len(data) // 10, len(data)) 

210 

211 if check_len == 0: 

212 return "zeros" 

213 

214 start_samples = data[:check_len] 

215 end_samples = data[-check_len:] 

216 

217 # Count zeros in start/end regions 

218 start_zeros = np.sum(~start_samples) 

219 end_zeros = np.sum(~end_samples) 

220 

221 # If majority are zeros, pattern is zeros 

222 if start_zeros > check_len // 2 or end_zeros > check_len // 2: 

223 return "zeros" 

224 

225 # If majority are ones, pattern is ones 

226 if start_zeros < check_len // 4 and end_zeros < check_len // 4: 

227 return "ones" 

228 

229 # Default to zeros 

230 return "zeros" 

231 

232 

233def trim_idle( 

234 trace: DigitalTrace, 

235 trim_start: bool = True, 

236 trim_end: bool = True, 

237 pattern: str = "auto", 

238 min_duration: int = 100, 

239) -> DigitalTrace: 

240 """Trim idle regions from trace. 

241 

242 

243 

244 Removes idle regions from the start and/or end of a trace. 

245 

246 Args: 

247 trace: Digital trace to trim. 

248 trim_start: Remove idle from start of trace. 

249 trim_end: Remove idle from end of trace. 

250 pattern: Idle pattern to detect ("auto", "zeros", "ones"). 

251 min_duration: Minimum idle duration to trim. 

252 

253 Returns: 

254 New DigitalTrace with idle regions removed. 

255 

256 Example: 

257 >>> trimmed = trim_idle(trace, trim_start=True, trim_end=True) 

258 >>> print(f"Removed {len(trace.data) - len(trimmed.data)} idle samples") 

259 """ 

260 if len(trace.data) == 0: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 return trace 

262 

263 # Detect idle regions 

264 idle_regions = detect_idle_regions(trace, pattern=pattern, min_duration=min_duration) 

265 

266 if not idle_regions: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 return trace 

268 

269 # Find start and end trim points 

270 start_idx = 0 

271 end_idx = len(trace.data) 

272 

273 if trim_start and idle_regions: 273 ↛ 280line 273 didn't jump to line 280 because the condition on line 273 was always true

274 # Check if first region starts at beginning 

275 first_region = idle_regions[0] 

276 if first_region.start == 0: 276 ↛ 280line 276 didn't jump to line 280 because the condition on line 276 was always true

277 start_idx = first_region.end 

278 logger.info("Trimming %d idle samples from start", first_region.length) 

279 

280 if trim_end and idle_regions: 280 ↛ 288line 280 didn't jump to line 288 because the condition on line 280 was always true

281 # Check if last region ends at end 

282 last_region = idle_regions[-1] 

283 if last_region.end == len(trace.data): 283 ↛ 288line 283 didn't jump to line 288 because the condition on line 283 was always true

284 end_idx = last_region.start 

285 logger.info("Trimming %d idle samples from end", last_region.length) 

286 

287 # Create trimmed trace 

288 if start_idx > 0 or end_idx < len(trace.data): 288 ↛ 304line 288 didn't jump to line 304 because the condition on line 288 was always true

289 trimmed_data = trace.data[start_idx:end_idx] 

290 

291 # Preserve metadata 

292 new_metadata = TraceMetadata( 

293 sample_rate=trace.metadata.sample_rate, 

294 vertical_scale=trace.metadata.vertical_scale, 

295 vertical_offset=trace.metadata.vertical_offset, 

296 acquisition_time=trace.metadata.acquisition_time, 

297 trigger_info=trace.metadata.trigger_info, 

298 source_file=trace.metadata.source_file, 

299 channel_name=trace.metadata.channel_name, 

300 ) 

301 

302 return DigitalTrace(data=trimmed_data, metadata=new_metadata, edges=None) 

303 

304 return trace 

305 

306 

307def get_idle_statistics( 

308 trace: DigitalTrace, 

309 pattern: str = "auto", 

310 min_duration: int = 100, 

311) -> IdleStatistics: 

312 """Get statistics about idle regions in trace. 

313 

314 

315 

316 Computes comprehensive statistics about idle vs. active samples. 

317 

318 Args: 

319 trace: Digital trace to analyze. 

320 pattern: Idle pattern to detect ("auto", "zeros", "ones"). 

321 min_duration: Minimum idle duration to count. 

322 

323 Returns: 

324 IdleStatistics with analysis results. 

325 

326 Example: 

327 >>> stats = get_idle_statistics(trace) 

328 >>> print(f"Idle fraction: {stats.idle_fraction:.1%}") 

329 >>> print(f"Found {len(stats.idle_regions)} idle regions") 

330 """ 

331 idle_regions = detect_idle_regions(trace, pattern=pattern, min_duration=min_duration) 

332 

333 total_samples = len(trace.data) 

334 idle_samples = sum(region.length for region in idle_regions) 

335 active_samples = total_samples - idle_samples 

336 

337 # Determine dominant pattern 

338 if idle_regions: 338 ↛ 346line 338 didn't jump to line 346 because the condition on line 338 was always true

339 # Count pattern occurrences 

340 pattern_counts: dict[str, int] = {} 

341 for region in idle_regions: 

342 pattern_counts[region.pattern] = pattern_counts.get(region.pattern, 0) + region.length 

343 

344 dominant_pattern = max(pattern_counts, key=pattern_counts.get) # type: ignore[arg-type] 

345 else: 

346 dominant_pattern = "none" 

347 

348 return IdleStatistics( 

349 total_samples=total_samples, 

350 idle_samples=idle_samples, 

351 active_samples=active_samples, 

352 idle_regions=idle_regions, 

353 dominant_pattern=dominant_pattern, 

354 ) 

355 

356 

357# Type alias for backward compatibility 

358IdleStats = IdleStatistics 

359"""Type alias for IdleStatistics.""" 

360 

361__all__ = [ 

362 "IdleRegion", 

363 "IdleStatistics", 

364 "IdleStats", 

365 "detect_idle_regions", 

366 "get_idle_statistics", 

367 "trim_idle", 

368]