Coverage for src / tracekit / loaders / preprocessing.py: 70%
110 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Idle and padding detection and removal.
3This module provides functions to detect and optionally remove idle regions,
4padding, and non-data samples from loaded binary captures.
7Example:
8 >>> from tracekit.loaders.preprocessing import detect_idle_regions, trim_idle
9 >>> regions = detect_idle_regions(trace, pattern='zeros', min_duration=100)
10 >>> print(f"Found {len(regions)} idle regions")
11 >>> trimmed_trace = trim_idle(trace, trim_start=True, trim_end=True)
12 >>> print(f"Trimmed {len(trace.data) - len(trimmed_trace.data)} samples")
13"""
15from __future__ import annotations
17import logging
18from dataclasses import dataclass
19from typing import TYPE_CHECKING
21import numpy as np
23from tracekit.core.types import DigitalTrace, TraceMetadata
25if TYPE_CHECKING:
26 from numpy.typing import NDArray
28# Logger for debug output
29logger = logging.getLogger(__name__)
32@dataclass
33class IdleRegion:
34 """Idle region in a trace.
38 Attributes:
39 start: Start sample index.
40 end: End sample index (exclusive).
41 pattern: Detected idle pattern.
42 duration_samples: Duration in samples.
43 """
45 start: int
46 end: int
47 pattern: str
48 duration_samples: int
50 @property
51 def length(self) -> int:
52 """Get region length in samples.
54 Returns:
55 Number of samples in region.
56 """
57 return self.end - self.start
59 def get_duration_seconds(self, sample_rate: float) -> float:
60 """Get region duration in seconds.
62 Args:
63 sample_rate: Sample rate in Hz.
65 Returns:
66 Duration in seconds.
67 """
68 return self.length / sample_rate
71@dataclass
72class IdleStatistics:
73 """Statistics about idle regions in a trace.
77 Attributes:
78 total_samples: Total number of samples in trace.
79 idle_samples: Total number of idle samples.
80 active_samples: Total number of active samples.
81 idle_regions: List of idle regions.
82 dominant_pattern: Most common idle pattern.
83 """
85 total_samples: int
86 idle_samples: int
87 active_samples: int
88 idle_regions: list[IdleRegion]
89 dominant_pattern: str
91 @property
92 def idle_fraction(self) -> float:
93 """Fraction of trace that is idle.
95 Returns:
96 Idle fraction (0.0 to 1.0).
97 """
98 if self.total_samples == 0: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 return 0.0
100 return self.idle_samples / self.total_samples
102 @property
103 def active_fraction(self) -> float:
104 """Fraction of trace that is active.
106 Returns:
107 Active fraction (0.0 to 1.0).
108 """
109 return 1.0 - self.idle_fraction
112def detect_idle_regions(
113 trace: DigitalTrace,
114 pattern: str = "auto",
115 min_duration: int = 100,
116) -> list[IdleRegion]:
117 """Detect idle regions in a digital trace.
121 Identifies regions where the signal is idle (constant pattern) for
122 a minimum duration. Supports auto-detection and explicit patterns.
124 Args:
125 trace: Digital trace to analyze.
126 pattern: Idle pattern to detect ("auto", "zeros", "ones", or byte value).
127 min_duration: Minimum duration in samples to consider as idle.
129 Returns:
130 List of detected idle regions.
132 Example:
133 >>> regions = detect_idle_regions(trace, pattern='zeros', min_duration=100)
134 >>> for region in regions:
135 ... print(f"Idle from {region.start} to {region.end}")
136 """
137 data = trace.data
139 if len(data) < min_duration: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 return []
142 idle_regions: list[IdleRegion] = []
144 if pattern == "auto": 144 ↛ 146line 144 didn't jump to line 146 because the condition on line 144 was never true
145 # Auto-detect pattern from start/end of trace
146 pattern = _auto_detect_pattern(data)
147 logger.debug("Auto-detected idle pattern: %s", pattern)
149 # Detect idle runs
150 if pattern == "zeros":
151 idle_mask = ~data # Invert: True where data is False (zero)
152 elif pattern == "ones": 152 ↛ 157line 152 didn't jump to line 157 because the condition on line 152 was always true
153 idle_mask = data # True where data is True (one)
154 else:
155 # For specific byte values, would need multi-bit comparison
156 # For now, default to zeros
157 logger.warning("Pattern '%s' not fully supported, using zeros", pattern)
158 idle_mask = ~data
160 # Find runs of idle samples
161 # Pad mask to detect transitions at boundaries
162 padded = np.concatenate(([False], idle_mask, [False]))
163 transitions = np.diff(padded.astype(np.int8))
165 # Rising edges (start of idle region)
166 starts = np.where(transitions == 1)[0]
167 # Falling edges (end of idle region)
168 ends = np.where(transitions == -1)[0]
170 # Filter by minimum duration
171 for start, end in zip(starts, ends, strict=False):
172 duration = end - start
173 if duration >= min_duration: 173 ↛ 171line 173 didn't jump to line 171 because the condition on line 173 was always true
174 idle_regions.append(
175 IdleRegion(
176 start=int(start),
177 end=int(end),
178 pattern=pattern,
179 duration_samples=int(duration),
180 )
181 )
183 logger.info(
184 "Detected %d idle regions (pattern: %s, min_duration: %d)",
185 len(idle_regions),
186 pattern,
187 min_duration,
188 )
190 return idle_regions
193def _auto_detect_pattern(data: NDArray[np.bool_]) -> str:
194 """Auto-detect idle pattern from trace data.
196 Looks at the start and end of the trace to determine the
197 most likely idle pattern.
199 Args:
200 data: Boolean trace data.
202 Returns:
203 Detected pattern ("zeros", "ones", or "unknown").
204 """
205 if len(data) == 0:
206 return "zeros"
208 # Check first and last 100 samples (or 10% of trace, whichever is smaller)
209 check_len = min(100, len(data) // 10, len(data))
211 if check_len == 0:
212 return "zeros"
214 start_samples = data[:check_len]
215 end_samples = data[-check_len:]
217 # Count zeros in start/end regions
218 start_zeros = np.sum(~start_samples)
219 end_zeros = np.sum(~end_samples)
221 # If majority are zeros, pattern is zeros
222 if start_zeros > check_len // 2 or end_zeros > check_len // 2:
223 return "zeros"
225 # If majority are ones, pattern is ones
226 if start_zeros < check_len // 4 and end_zeros < check_len // 4:
227 return "ones"
229 # Default to zeros
230 return "zeros"
233def trim_idle(
234 trace: DigitalTrace,
235 trim_start: bool = True,
236 trim_end: bool = True,
237 pattern: str = "auto",
238 min_duration: int = 100,
239) -> DigitalTrace:
240 """Trim idle regions from trace.
244 Removes idle regions from the start and/or end of a trace.
246 Args:
247 trace: Digital trace to trim.
248 trim_start: Remove idle from start of trace.
249 trim_end: Remove idle from end of trace.
250 pattern: Idle pattern to detect ("auto", "zeros", "ones").
251 min_duration: Minimum idle duration to trim.
253 Returns:
254 New DigitalTrace with idle regions removed.
256 Example:
257 >>> trimmed = trim_idle(trace, trim_start=True, trim_end=True)
258 >>> print(f"Removed {len(trace.data) - len(trimmed.data)} idle samples")
259 """
260 if len(trace.data) == 0: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 return trace
263 # Detect idle regions
264 idle_regions = detect_idle_regions(trace, pattern=pattern, min_duration=min_duration)
266 if not idle_regions: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 return trace
269 # Find start and end trim points
270 start_idx = 0
271 end_idx = len(trace.data)
273 if trim_start and idle_regions: 273 ↛ 280line 273 didn't jump to line 280 because the condition on line 273 was always true
274 # Check if first region starts at beginning
275 first_region = idle_regions[0]
276 if first_region.start == 0: 276 ↛ 280line 276 didn't jump to line 280 because the condition on line 276 was always true
277 start_idx = first_region.end
278 logger.info("Trimming %d idle samples from start", first_region.length)
280 if trim_end and idle_regions: 280 ↛ 288line 280 didn't jump to line 288 because the condition on line 280 was always true
281 # Check if last region ends at end
282 last_region = idle_regions[-1]
283 if last_region.end == len(trace.data): 283 ↛ 288line 283 didn't jump to line 288 because the condition on line 283 was always true
284 end_idx = last_region.start
285 logger.info("Trimming %d idle samples from end", last_region.length)
287 # Create trimmed trace
288 if start_idx > 0 or end_idx < len(trace.data): 288 ↛ 304line 288 didn't jump to line 304 because the condition on line 288 was always true
289 trimmed_data = trace.data[start_idx:end_idx]
291 # Preserve metadata
292 new_metadata = TraceMetadata(
293 sample_rate=trace.metadata.sample_rate,
294 vertical_scale=trace.metadata.vertical_scale,
295 vertical_offset=trace.metadata.vertical_offset,
296 acquisition_time=trace.metadata.acquisition_time,
297 trigger_info=trace.metadata.trigger_info,
298 source_file=trace.metadata.source_file,
299 channel_name=trace.metadata.channel_name,
300 )
302 return DigitalTrace(data=trimmed_data, metadata=new_metadata, edges=None)
304 return trace
307def get_idle_statistics(
308 trace: DigitalTrace,
309 pattern: str = "auto",
310 min_duration: int = 100,
311) -> IdleStatistics:
312 """Get statistics about idle regions in trace.
316 Computes comprehensive statistics about idle vs. active samples.
318 Args:
319 trace: Digital trace to analyze.
320 pattern: Idle pattern to detect ("auto", "zeros", "ones").
321 min_duration: Minimum idle duration to count.
323 Returns:
324 IdleStatistics with analysis results.
326 Example:
327 >>> stats = get_idle_statistics(trace)
328 >>> print(f"Idle fraction: {stats.idle_fraction:.1%}")
329 >>> print(f"Found {len(stats.idle_regions)} idle regions")
330 """
331 idle_regions = detect_idle_regions(trace, pattern=pattern, min_duration=min_duration)
333 total_samples = len(trace.data)
334 idle_samples = sum(region.length for region in idle_regions)
335 active_samples = total_samples - idle_samples
337 # Determine dominant pattern
338 if idle_regions: 338 ↛ 346line 338 didn't jump to line 346 because the condition on line 338 was always true
339 # Count pattern occurrences
340 pattern_counts: dict[str, int] = {}
341 for region in idle_regions:
342 pattern_counts[region.pattern] = pattern_counts.get(region.pattern, 0) + region.length
344 dominant_pattern = max(pattern_counts, key=pattern_counts.get) # type: ignore[arg-type]
345 else:
346 dominant_pattern = "none"
348 return IdleStatistics(
349 total_samples=total_samples,
350 idle_samples=idle_samples,
351 active_samples=active_samples,
352 idle_regions=idle_regions,
353 dominant_pattern=dominant_pattern,
354 )
357# Type alias for backward compatibility
358IdleStats = IdleStatistics
359"""Type alias for IdleStatistics."""
361__all__ = [
362 "IdleRegion",
363 "IdleStatistics",
364 "IdleStats",
365 "detect_idle_regions",
366 "get_idle_statistics",
367 "trim_idle",
368]