Coverage for src / tracekit / analyzers / patterns / periodic.py: 97%
222 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Periodic pattern detection using multiple algorithms.
3This module implements robust periodic pattern detection for digital signals
4and binary data using autocorrelation, FFT spectral analysis, and suffix array
5techniques.
8Author: TraceKit Development Team
9"""
11from __future__ import annotations
13from dataclasses import dataclass, field
14from typing import TYPE_CHECKING, Literal
16import numpy as np
18if TYPE_CHECKING:
19 from numpy.typing import NDArray
22@dataclass
23class PeriodResult:
24 """Result of period detection.
26 Attributes:
27 period_samples: Period in number of samples
28 period_seconds: Period in seconds (if sample_rate provided)
29 frequency_hz: Fundamental frequency in Hz
30 confidence: Detection confidence (0-1)
31 method: Detection method used
32 harmonics: List of detected harmonic frequencies (optional)
33 """
35 period_samples: float
36 period_seconds: float
37 frequency_hz: float
38 confidence: float
39 method: str
40 harmonics: list[float] | None = field(default=None)
42 # Alias for compatibility with tests
43 @property
44 def period(self) -> float:
45 """Alias for period_samples for test compatibility."""
46 return self.period_samples
48 def __post_init__(self) -> None:
49 """Validate period result values."""
50 if self.period_samples <= 0:
51 raise ValueError("period_samples must be positive")
52 if self.confidence < 0 or self.confidence > 1:
53 raise ValueError("confidence must be in range [0, 1]")
56def detect_period(
57 trace: NDArray[np.float64],
58 sample_rate: float = 1.0,
59 method: Literal["auto", "autocorr", "fft", "suffix"] = "auto",
60 min_period: int = 2,
61 max_period: int | None = None,
62) -> PeriodResult | None:
63 """Detect dominant period in signal using best available method.
65 : Periodic Pattern Detection
67 This function automatically selects the most appropriate algorithm based
68 on signal characteristics or uses the specified method.
70 Args:
71 trace: Input signal array (1D)
72 sample_rate: Sampling rate in Hz (default: 1.0)
73 method: Detection method ('auto', 'autocorr', 'fft', 'suffix')
74 min_period: Minimum period to detect in samples
75 max_period: Maximum period to detect in samples (None = len(trace)//2)
77 Returns:
78 PeriodResult with detected period information, or None if no period found
80 Raises:
81 ValueError: If trace is empty, min_period invalid, or parameters inconsistent
83 Examples:
84 >>> signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 1000))
85 >>> result = detect_period(signal, sample_rate=1000.0)
86 >>> print(f"Period: {result.period_seconds:.3f}s, Freq: {result.frequency_hz:.1f}Hz")
87 """
88 # Input validation
89 if trace.size == 0:
90 raise ValueError("trace cannot be empty")
91 if min_period < 2:
92 raise ValueError("min_period must be at least 2")
93 if max_period is not None and max_period < min_period:
94 raise ValueError("max_period must be >= min_period")
95 if sample_rate <= 0:
96 raise ValueError("sample_rate must be positive")
98 # Ensure 1D array
99 trace = np.asarray(trace).flatten()
101 # Set default max_period
102 if max_period is None:
103 max_period = len(trace) // 2
104 max_period = min(max_period, len(trace) // 2)
106 # Auto-select method based on signal characteristics
107 if method == "auto":
108 # Use FFT for longer signals (more efficient)
109 # Use autocorr for shorter signals or binary data
110 if len(trace) > 10000:
111 method = "fft"
112 elif np.all(np.isin(trace, [0, 1])):
113 method = "autocorr" # Better for binary signals
114 else:
115 method = "fft"
117 # Dispatch to appropriate method
118 if method == "autocorr":
119 results = detect_periods_autocorr(trace, sample_rate, max_period, min_correlation=0.5)
120 return results[0] if results else None
122 elif method == "fft":
123 min_freq = sample_rate / max_period if max_period else None
124 max_freq = sample_rate / min_period if min_period > 0 else None
125 results = detect_periods_fft(trace, sample_rate, min_freq, max_freq, num_peaks=5)
126 return results[0] if results else None
128 elif method == "suffix":
129 # Suffix array method for exact repeats
130 period_samples = _detect_period_suffix(trace, min_period, max_period)
131 if period_samples is None: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 return None
133 return PeriodResult(
134 period_samples=float(period_samples),
135 period_seconds=period_samples / sample_rate,
136 frequency_hz=sample_rate / period_samples,
137 confidence=0.9, # High confidence for exact matches
138 method="suffix",
139 )
140 else:
141 raise ValueError(f"Unknown method: {method}")
144def detect_periods_fft(
145 trace: NDArray[np.float64],
146 sample_rate: float = 1.0,
147 min_freq: float | None = None,
148 max_freq: float | None = None,
149 num_peaks: int = 5,
150) -> list[PeriodResult]:
151 """Detect periods using FFT spectral analysis.
153 : Periodic Pattern Detection via FFT
155 Uses power spectral density to identify dominant frequencies and their
156 harmonics. More efficient than autocorrelation for long signals.
158 Args:
159 trace: Input signal array (1D)
160 sample_rate: Sampling rate in Hz
161 min_freq: Minimum frequency to consider (Hz)
162 max_freq: Maximum frequency to consider (Hz)
163 num_peaks: Maximum number of peaks to return
165 Returns:
166 List of PeriodResult sorted by confidence (strongest first)
168 Examples:
169 >>> signal = np.sin(2*np.pi*10*np.linspace(0, 1, 1000))
170 >>> periods = detect_periods_fft(signal, sample_rate=1000.0, num_peaks=3)
171 """
172 trace = np.asarray(trace).flatten()
174 if trace.size == 0:
175 return []
177 # Remove DC component
178 trace_centered = trace - np.mean(trace)
180 # Compute FFT
181 n = len(trace_centered)
182 fft_result = np.fft.rfft(trace_centered)
183 power = np.asarray(np.abs(fft_result) ** 2, dtype=np.float64)
184 freqs = np.fft.rfftfreq(n, 1.0 / sample_rate)
186 # Apply frequency range filtering
187 valid_mask = np.ones(len(freqs), dtype=bool)
188 if min_freq is not None:
189 valid_mask &= freqs >= min_freq
190 if max_freq is not None:
191 valid_mask &= freqs <= max_freq
193 # Exclude DC component
194 valid_mask[0] = False
196 # Find peaks in power spectrum
197 peak_indices = _find_spectral_peaks(power, min_distance=1)
198 peak_indices = peak_indices[valid_mask[peak_indices]]
200 if len(peak_indices) == 0:
201 return []
203 # Sort by power
204 peak_powers = power[peak_indices]
205 sorted_indices = np.argsort(peak_powers)[::-1][:num_peaks]
206 peak_indices = peak_indices[sorted_indices]
208 # Build results
209 results = []
210 max_power = np.max(power[peak_indices]) if len(peak_indices) > 0 else 1.0
212 for idx in peak_indices:
213 freq = freqs[idx]
214 if freq == 0: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 continue
217 period_seconds = 1.0 / freq
218 period_samples = sample_rate / freq
220 # Confidence based on relative power
221 confidence = float(power[idx] / max_power)
223 # Detect harmonics (simple approach: look for integer multiples)
224 harmonics = []
225 for mult in range(2, 6):
226 harmonic_freq = freq * mult
227 if harmonic_freq < freqs[-1]:
228 # Find closest frequency bin
229 harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq))
230 if power[harmonic_idx] > 0.1 * power[idx]:
231 harmonics.append(harmonic_freq)
233 results.append(
234 PeriodResult(
235 period_samples=period_samples,
236 period_seconds=period_seconds,
237 frequency_hz=freq,
238 confidence=min(confidence, 1.0),
239 method="fft",
240 harmonics=harmonics if harmonics else None,
241 )
242 )
244 return results
247def detect_periods_autocorr(
248 trace: NDArray[np.float64],
249 sample_rate: float = 1.0,
250 max_period: int | None = None,
251 min_correlation: float = 0.5,
252) -> list[PeriodResult]:
253 """Detect periods using autocorrelation.
255 : Periodic Pattern Detection via autocorrelation
257 Computes normalized autocorrelation and finds peaks corresponding to
258 periodic patterns. More robust to noise than simple pattern matching.
260 Args:
261 trace: Input signal array (1D)
262 sample_rate: Sampling rate in Hz
263 max_period: Maximum period to search (samples)
264 min_correlation: Minimum correlation threshold (0-1)
266 Returns:
267 List of PeriodResult sorted by confidence
269 Examples:
270 >>> signal = np.tile([1, 0, 1, 0], 100)
271 >>> periods = detect_periods_autocorr(signal, min_correlation=0.7)
272 """
273 trace = np.asarray(trace).flatten()
275 if trace.size == 0:
276 return []
278 # Set default max_period
279 if max_period is None:
280 max_period = len(trace) // 2
281 max_period = min(max_period, len(trace) // 2)
283 # Compute normalized autocorrelation using FFT (efficient)
284 trace_centered = trace - np.mean(trace)
286 # Compute via FFT convolution
287 n = len(trace_centered)
288 fft_trace = np.fft.fft(trace_centered, n=2 * n)
289 autocorr = np.fft.ifft(fft_trace * np.conj(fft_trace)).real[:n]
291 # Normalize
292 if autocorr[0] > 0:
293 autocorr = autocorr / autocorr[0]
295 # Limit search range
296 autocorr = autocorr[: max_period + 1]
298 # Find peaks (skip lag 0)
299 peaks = _find_spectral_peaks(autocorr[1:], min_distance=1) + 1
301 # Filter by minimum correlation
302 peaks = peaks[autocorr[peaks] >= min_correlation]
304 if len(peaks) == 0:
305 return []
307 # Sort by correlation value
308 peak_corrs = autocorr[peaks]
309 sorted_indices = np.argsort(peak_corrs)[::-1]
310 peaks = peaks[sorted_indices]
312 # Build results
313 results = []
314 for lag in peaks[:5]: # Top 5 peaks
315 period_samples = float(lag)
316 period_seconds = period_samples / sample_rate
317 frequency_hz = sample_rate / period_samples
318 confidence = float(autocorr[lag])
320 results.append(
321 PeriodResult(
322 period_samples=period_samples,
323 period_seconds=period_seconds,
324 frequency_hz=frequency_hz,
325 confidence=confidence,
326 method="autocorr",
327 )
328 )
330 return results
333def validate_period(
334 trace: NDArray[np.float64], period: float, tolerance: float = 0.01
335) -> tuple[bool, float]:
336 """Validate detected period against signal.
338 : Period validation
340 Verifies that the signal actually repeats at the given period by measuring
341 correlation between shifted copies of the signal.
343 Args:
344 trace: Input signal array
345 period: Period to validate (in samples)
346 tolerance: Allowed fractional deviation in period
348 Returns:
349 Tuple of (is_valid, actual_confidence)
350 - is_valid: True if period is confirmed
351 - actual_confidence: Measured correlation strength (0-1)
353 Examples:
354 >>> signal = np.tile([1, 2, 3, 4], 50)
355 >>> is_valid, conf = validate_period(signal, period=4.0)
356 >>> assert is_valid and conf > 0.95
357 """
358 trace = np.asarray(trace).flatten()
360 if trace.size == 0:
361 return False, 0.0
363 if period < 1 or period >= len(trace):
364 return False, 0.0
366 # Convert to integer lag for nearest-neighbor validation
367 lag = int(round(period))
369 # Check if lag is within tolerance
370 if abs(period - lag) > period * tolerance:
371 # Use interpolation for sub-sample periods
372 lag_low = int(np.floor(period))
373 lag_high = int(np.ceil(period))
374 alpha = period - lag_low
376 if lag_high >= len(trace): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 lag = lag_low
378 else:
379 # Weighted average of both lags
380 corr_low = _compute_lag_correlation(trace, lag_low)
381 corr_high = _compute_lag_correlation(trace, lag_high)
382 confidence = (1 - alpha) * corr_low + alpha * corr_high
384 is_valid = confidence >= 0.5
385 return is_valid, float(confidence)
387 # Simple case: integer lag
388 confidence = _compute_lag_correlation(trace, lag)
389 is_valid = confidence >= 0.5
391 return is_valid, float(confidence)
394def _compute_lag_correlation(trace: NDArray[np.float64], lag: int) -> float:
395 """Compute normalized correlation at specific lag.
397 Args:
398 trace: Input signal
399 lag: Lag in samples
401 Returns:
402 Normalized correlation coefficient (0-1)
403 """
404 if lag <= 0 or lag >= len(trace):
405 return 0.0
407 # Center the signal
408 trace_centered = trace - np.mean(trace)
410 # Compute correlation
411 n_overlap = len(trace) - lag
412 part1 = trace_centered[:n_overlap]
413 part2 = trace_centered[lag : lag + n_overlap]
415 # Pearson correlation
416 std1 = np.std(part1)
417 std2 = np.std(part2)
419 if std1 == 0 or std2 == 0:
420 return 0.0
422 correlation = np.mean(part1 * part2) / (std1 * std2)
424 # Clamp to [0, 1] range
425 return float(np.clip(correlation, 0, 1))
428def _find_spectral_peaks(data: NDArray[np.float64], min_distance: int = 1) -> NDArray[np.intp]:
429 """Find peaks in 1D array.
431 Simple peak detection: point is peak if higher than neighbors.
433 Args:
434 data: 1D array
435 min_distance: Minimum distance between peaks
437 Returns:
438 Array of peak indices
439 """
440 if len(data) < 3:
441 return np.array([], dtype=np.intp)
443 # Find local maxima
444 peaks_list: list[int] = []
445 for i in range(1, len(data) - 1):
446 if data[i] > data[i - 1] and data[i] > data[i + 1]:
447 peaks_list.append(i)
449 peaks: NDArray[np.intp] = np.array(peaks_list, dtype=np.intp)
451 # Apply minimum distance constraint
452 if len(peaks) > 0 and min_distance > 1:
453 # Keep highest peaks when too close
454 filtered_peaks_list: list[int] = []
455 last_peak = -min_distance
457 # Sort by height
458 sorted_indices = np.argsort(data[peaks])[::-1]
460 for idx in sorted_indices:
461 peak_pos = int(peaks[idx])
462 if peak_pos - last_peak >= min_distance: 462 ↛ 460line 462 didn't jump to line 460 because the condition on line 462 was always true
463 filtered_peaks_list.append(peak_pos)
464 last_peak = peak_pos
466 peaks = np.array(np.sort(np.array(filtered_peaks_list, dtype=np.intp)), dtype=np.intp)
468 return peaks
471def _detect_period_suffix(
472 trace: NDArray[np.float64], min_period: int, max_period: int
473) -> int | None:
474 """Detect period using suffix array (for exact repeats).
476 : Suffix array-based period detection
478 This method finds the longest exact repeating substring, which corresponds
479 to the period for perfectly periodic signals.
481 Args:
482 trace: Input signal (will be converted to bytes)
483 min_period: Minimum period length
484 max_period: Maximum period length
486 Returns:
487 Period in samples, or None if not found
488 """
489 # Convert to byte sequence for suffix array
490 if trace.dtype == np.bool_ or np.all(np.isin(trace, [0, 1])):
491 # Binary signal
492 trace_bytes = np.packbits(trace.astype(np.uint8))
493 else:
494 # Use raw bytes
495 trace_bytes = trace.astype(np.uint8)
497 n = len(trace_bytes)
499 # Simple period detection: check for repeating patterns
500 for period in range(min_period, min(max_period + 1, n // 2)):
501 # Check if trace repeats with this period
502 num_repeats = n // period
503 if num_repeats < 2: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 continue
506 # Compare first period with subsequent periods
507 pattern = trace_bytes[:period]
508 matches = 0
510 for i in range(1, num_repeats):
511 segment = trace_bytes[i * period : (i + 1) * period]
512 if len(segment) == period and np.array_equal(pattern, segment):
513 matches += 1
515 # If most repetitions match, consider it valid
516 if matches >= num_repeats * 0.8 - 1:
517 return period
519 return None
522class PeriodicPatternDetector:
523 """Object-oriented wrapper for periodic pattern detection.
525 Provides a class-based interface for period detection operations,
526 wrapping the functional API for consistency with test expectations.
530 Example:
531 >>> detector = PeriodicPatternDetector()
532 >>> result = detector.detect_period(signal)
533 >>> print(f"Period: {result.period} samples, confidence: {result.confidence}")
534 """
536 def __init__(
537 self,
538 method: Literal["auto", "autocorr", "fft", "autocorrelation"] = "auto",
539 sample_rate: float = 1.0,
540 min_period: int = 2,
541 max_period: int | None = None,
542 ):
543 """Initialize periodic pattern detector.
545 Args:
546 method: Detection method ('auto', 'autocorr', 'fft', 'autocorrelation').
547 sample_rate: Sample rate in Hz.
548 min_period: Minimum period in samples.
549 max_period: Maximum period in samples.
550 """
551 # Map 'autocorrelation' to 'autocorr' for test compatibility
552 if method == "autocorrelation":
553 method = "autocorr"
554 self.method = method
555 self.sample_rate = sample_rate
556 self.min_period = min_period
557 self.max_period = max_period
559 def detect_period(self, trace: NDArray[np.float64]) -> PeriodResult:
560 """Detect the dominant period in the signal.
562 Args:
563 trace: Input signal array (1D, boolean or numeric).
565 Returns:
566 PeriodResult with detected period information.
568 Raises:
569 ValueError: If trace is empty or too short.
571 Example:
572 >>> detector = PeriodicPatternDetector(method="autocorr")
573 >>> result = detector.detect_period(np.tile([1, 0], 100))
574 >>> result.period == 2
575 True
576 """
577 trace = np.asarray(trace).flatten()
579 # Validate input
580 if trace.size == 0:
581 raise ValueError("trace cannot be empty")
582 if trace.size < 3:
583 raise ValueError("trace must have at least 3 elements for period detection")
585 result = detect_period(
586 trace,
587 sample_rate=self.sample_rate,
588 method=self.method,
589 min_period=self.min_period,
590 max_period=self.max_period,
591 )
593 if result is None:
594 # Return a low-confidence result if no period found
595 return PeriodResult(
596 period_samples=1.0,
597 period_seconds=1.0 / self.sample_rate,
598 frequency_hz=self.sample_rate,
599 confidence=0.0,
600 method=self.method,
601 )
603 return result
605 def detect_multiple_periods(
606 self, trace: NDArray[np.float64], num_periods: int = 5
607 ) -> list[PeriodResult]:
608 """Detect multiple periods in the signal.
610 Args:
611 trace: Input signal array.
612 num_periods: Maximum number of periods to return.
614 Returns:
615 List of PeriodResult sorted by confidence.
616 """
617 trace = np.asarray(trace).flatten()
619 if self.method in ["fft", "auto"]:
620 min_freq = self.sample_rate / self.max_period if self.max_period else None
621 max_freq = self.sample_rate / self.min_period if self.min_period > 0 else None
622 return detect_periods_fft(trace, self.sample_rate, min_freq, max_freq, num_periods)
623 else:
624 return detect_periods_autocorr(
625 trace, self.sample_rate, self.max_period, min_correlation=0.3
626 )[:num_periods]
628 def validate(self, trace: NDArray[np.float64], period: float, tolerance: float = 0.01) -> bool:
629 """Validate a detected period.
631 Args:
632 trace: Input signal array.
633 period: Period to validate.
634 tolerance: Tolerance for period matching.
636 Returns:
637 True if period is valid.
638 """
639 is_valid, _ = validate_period(trace, period, tolerance)
640 return is_valid
643__all__ = [
644 "PeriodResult",
645 "PeriodicPatternDetector",
646 "detect_period",
647 "detect_periods_autocorr",
648 "detect_periods_fft",
649 "validate_period",
650]