Coverage for src / tracekit / analyzers / patterns / periodic.py: 97%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Periodic pattern detection using multiple algorithms. 

2 

3This module implements robust periodic pattern detection for digital signals 

4and binary data using autocorrelation, FFT spectral analysis, and suffix array 

5techniques. 

6 

7 

8Author: TraceKit Development Team 

9""" 

10 

11from __future__ import annotations 

12 

13from dataclasses import dataclass, field 

14from typing import TYPE_CHECKING, Literal 

15 

16import numpy as np 

17 

18if TYPE_CHECKING: 

19 from numpy.typing import NDArray 

20 

21 

22@dataclass 

23class PeriodResult: 

24 """Result of period detection. 

25 

26 Attributes: 

27 period_samples: Period in number of samples 

28 period_seconds: Period in seconds (if sample_rate provided) 

29 frequency_hz: Fundamental frequency in Hz 

30 confidence: Detection confidence (0-1) 

31 method: Detection method used 

32 harmonics: List of detected harmonic frequencies (optional) 

33 """ 

34 

35 period_samples: float 

36 period_seconds: float 

37 frequency_hz: float 

38 confidence: float 

39 method: str 

40 harmonics: list[float] | None = field(default=None) 

41 

42 # Alias for compatibility with tests 

43 @property 

44 def period(self) -> float: 

45 """Alias for period_samples for test compatibility.""" 

46 return self.period_samples 

47 

48 def __post_init__(self) -> None: 

49 """Validate period result values.""" 

50 if self.period_samples <= 0: 

51 raise ValueError("period_samples must be positive") 

52 if self.confidence < 0 or self.confidence > 1: 

53 raise ValueError("confidence must be in range [0, 1]") 

54 

55 

56def detect_period( 

57 trace: NDArray[np.float64], 

58 sample_rate: float = 1.0, 

59 method: Literal["auto", "autocorr", "fft", "suffix"] = "auto", 

60 min_period: int = 2, 

61 max_period: int | None = None, 

62) -> PeriodResult | None: 

63 """Detect dominant period in signal using best available method. 

64 

65 : Periodic Pattern Detection 

66 

67 This function automatically selects the most appropriate algorithm based 

68 on signal characteristics or uses the specified method. 

69 

70 Args: 

71 trace: Input signal array (1D) 

72 sample_rate: Sampling rate in Hz (default: 1.0) 

73 method: Detection method ('auto', 'autocorr', 'fft', 'suffix') 

74 min_period: Minimum period to detect in samples 

75 max_period: Maximum period to detect in samples (None = len(trace)//2) 

76 

77 Returns: 

78 PeriodResult with detected period information, or None if no period found 

79 

80 Raises: 

81 ValueError: If trace is empty, min_period invalid, or parameters inconsistent 

82 

83 Examples: 

84 >>> signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 1000)) 

85 >>> result = detect_period(signal, sample_rate=1000.0) 

86 >>> print(f"Period: {result.period_seconds:.3f}s, Freq: {result.frequency_hz:.1f}Hz") 

87 """ 

88 # Input validation 

89 if trace.size == 0: 

90 raise ValueError("trace cannot be empty") 

91 if min_period < 2: 

92 raise ValueError("min_period must be at least 2") 

93 if max_period is not None and max_period < min_period: 

94 raise ValueError("max_period must be >= min_period") 

95 if sample_rate <= 0: 

96 raise ValueError("sample_rate must be positive") 

97 

98 # Ensure 1D array 

99 trace = np.asarray(trace).flatten() 

100 

101 # Set default max_period 

102 if max_period is None: 

103 max_period = len(trace) // 2 

104 max_period = min(max_period, len(trace) // 2) 

105 

106 # Auto-select method based on signal characteristics 

107 if method == "auto": 

108 # Use FFT for longer signals (more efficient) 

109 # Use autocorr for shorter signals or binary data 

110 if len(trace) > 10000: 

111 method = "fft" 

112 elif np.all(np.isin(trace, [0, 1])): 

113 method = "autocorr" # Better for binary signals 

114 else: 

115 method = "fft" 

116 

117 # Dispatch to appropriate method 

118 if method == "autocorr": 

119 results = detect_periods_autocorr(trace, sample_rate, max_period, min_correlation=0.5) 

120 return results[0] if results else None 

121 

122 elif method == "fft": 

123 min_freq = sample_rate / max_period if max_period else None 

124 max_freq = sample_rate / min_period if min_period > 0 else None 

125 results = detect_periods_fft(trace, sample_rate, min_freq, max_freq, num_peaks=5) 

126 return results[0] if results else None 

127 

128 elif method == "suffix": 

129 # Suffix array method for exact repeats 

130 period_samples = _detect_period_suffix(trace, min_period, max_period) 

131 if period_samples is None: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 return None 

133 return PeriodResult( 

134 period_samples=float(period_samples), 

135 period_seconds=period_samples / sample_rate, 

136 frequency_hz=sample_rate / period_samples, 

137 confidence=0.9, # High confidence for exact matches 

138 method="suffix", 

139 ) 

140 else: 

141 raise ValueError(f"Unknown method: {method}") 

142 

143 

144def detect_periods_fft( 

145 trace: NDArray[np.float64], 

146 sample_rate: float = 1.0, 

147 min_freq: float | None = None, 

148 max_freq: float | None = None, 

149 num_peaks: int = 5, 

150) -> list[PeriodResult]: 

151 """Detect periods using FFT spectral analysis. 

152 

153 : Periodic Pattern Detection via FFT 

154 

155 Uses power spectral density to identify dominant frequencies and their 

156 harmonics. More efficient than autocorrelation for long signals. 

157 

158 Args: 

159 trace: Input signal array (1D) 

160 sample_rate: Sampling rate in Hz 

161 min_freq: Minimum frequency to consider (Hz) 

162 max_freq: Maximum frequency to consider (Hz) 

163 num_peaks: Maximum number of peaks to return 

164 

165 Returns: 

166 List of PeriodResult sorted by confidence (strongest first) 

167 

168 Examples: 

169 >>> signal = np.sin(2*np.pi*10*np.linspace(0, 1, 1000)) 

170 >>> periods = detect_periods_fft(signal, sample_rate=1000.0, num_peaks=3) 

171 """ 

172 trace = np.asarray(trace).flatten() 

173 

174 if trace.size == 0: 

175 return [] 

176 

177 # Remove DC component 

178 trace_centered = trace - np.mean(trace) 

179 

180 # Compute FFT 

181 n = len(trace_centered) 

182 fft_result = np.fft.rfft(trace_centered) 

183 power = np.asarray(np.abs(fft_result) ** 2, dtype=np.float64) 

184 freqs = np.fft.rfftfreq(n, 1.0 / sample_rate) 

185 

186 # Apply frequency range filtering 

187 valid_mask = np.ones(len(freqs), dtype=bool) 

188 if min_freq is not None: 

189 valid_mask &= freqs >= min_freq 

190 if max_freq is not None: 

191 valid_mask &= freqs <= max_freq 

192 

193 # Exclude DC component 

194 valid_mask[0] = False 

195 

196 # Find peaks in power spectrum 

197 peak_indices = _find_spectral_peaks(power, min_distance=1) 

198 peak_indices = peak_indices[valid_mask[peak_indices]] 

199 

200 if len(peak_indices) == 0: 

201 return [] 

202 

203 # Sort by power 

204 peak_powers = power[peak_indices] 

205 sorted_indices = np.argsort(peak_powers)[::-1][:num_peaks] 

206 peak_indices = peak_indices[sorted_indices] 

207 

208 # Build results 

209 results = [] 

210 max_power = np.max(power[peak_indices]) if len(peak_indices) > 0 else 1.0 

211 

212 for idx in peak_indices: 

213 freq = freqs[idx] 

214 if freq == 0: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 continue 

216 

217 period_seconds = 1.0 / freq 

218 period_samples = sample_rate / freq 

219 

220 # Confidence based on relative power 

221 confidence = float(power[idx] / max_power) 

222 

223 # Detect harmonics (simple approach: look for integer multiples) 

224 harmonics = [] 

225 for mult in range(2, 6): 

226 harmonic_freq = freq * mult 

227 if harmonic_freq < freqs[-1]: 

228 # Find closest frequency bin 

229 harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq)) 

230 if power[harmonic_idx] > 0.1 * power[idx]: 

231 harmonics.append(harmonic_freq) 

232 

233 results.append( 

234 PeriodResult( 

235 period_samples=period_samples, 

236 period_seconds=period_seconds, 

237 frequency_hz=freq, 

238 confidence=min(confidence, 1.0), 

239 method="fft", 

240 harmonics=harmonics if harmonics else None, 

241 ) 

242 ) 

243 

244 return results 

245 

246 

247def detect_periods_autocorr( 

248 trace: NDArray[np.float64], 

249 sample_rate: float = 1.0, 

250 max_period: int | None = None, 

251 min_correlation: float = 0.5, 

252) -> list[PeriodResult]: 

253 """Detect periods using autocorrelation. 

254 

255 : Periodic Pattern Detection via autocorrelation 

256 

257 Computes normalized autocorrelation and finds peaks corresponding to 

258 periodic patterns. More robust to noise than simple pattern matching. 

259 

260 Args: 

261 trace: Input signal array (1D) 

262 sample_rate: Sampling rate in Hz 

263 max_period: Maximum period to search (samples) 

264 min_correlation: Minimum correlation threshold (0-1) 

265 

266 Returns: 

267 List of PeriodResult sorted by confidence 

268 

269 Examples: 

270 >>> signal = np.tile([1, 0, 1, 0], 100) 

271 >>> periods = detect_periods_autocorr(signal, min_correlation=0.7) 

272 """ 

273 trace = np.asarray(trace).flatten() 

274 

275 if trace.size == 0: 

276 return [] 

277 

278 # Set default max_period 

279 if max_period is None: 

280 max_period = len(trace) // 2 

281 max_period = min(max_period, len(trace) // 2) 

282 

283 # Compute normalized autocorrelation using FFT (efficient) 

284 trace_centered = trace - np.mean(trace) 

285 

286 # Compute via FFT convolution 

287 n = len(trace_centered) 

288 fft_trace = np.fft.fft(trace_centered, n=2 * n) 

289 autocorr = np.fft.ifft(fft_trace * np.conj(fft_trace)).real[:n] 

290 

291 # Normalize 

292 if autocorr[0] > 0: 

293 autocorr = autocorr / autocorr[0] 

294 

295 # Limit search range 

296 autocorr = autocorr[: max_period + 1] 

297 

298 # Find peaks (skip lag 0) 

299 peaks = _find_spectral_peaks(autocorr[1:], min_distance=1) + 1 

300 

301 # Filter by minimum correlation 

302 peaks = peaks[autocorr[peaks] >= min_correlation] 

303 

304 if len(peaks) == 0: 

305 return [] 

306 

307 # Sort by correlation value 

308 peak_corrs = autocorr[peaks] 

309 sorted_indices = np.argsort(peak_corrs)[::-1] 

310 peaks = peaks[sorted_indices] 

311 

312 # Build results 

313 results = [] 

314 for lag in peaks[:5]: # Top 5 peaks 

315 period_samples = float(lag) 

316 period_seconds = period_samples / sample_rate 

317 frequency_hz = sample_rate / period_samples 

318 confidence = float(autocorr[lag]) 

319 

320 results.append( 

321 PeriodResult( 

322 period_samples=period_samples, 

323 period_seconds=period_seconds, 

324 frequency_hz=frequency_hz, 

325 confidence=confidence, 

326 method="autocorr", 

327 ) 

328 ) 

329 

330 return results 

331 

332 

333def validate_period( 

334 trace: NDArray[np.float64], period: float, tolerance: float = 0.01 

335) -> tuple[bool, float]: 

336 """Validate detected period against signal. 

337 

338 : Period validation 

339 

340 Verifies that the signal actually repeats at the given period by measuring 

341 correlation between shifted copies of the signal. 

342 

343 Args: 

344 trace: Input signal array 

345 period: Period to validate (in samples) 

346 tolerance: Allowed fractional deviation in period 

347 

348 Returns: 

349 Tuple of (is_valid, actual_confidence) 

350 - is_valid: True if period is confirmed 

351 - actual_confidence: Measured correlation strength (0-1) 

352 

353 Examples: 

354 >>> signal = np.tile([1, 2, 3, 4], 50) 

355 >>> is_valid, conf = validate_period(signal, period=4.0) 

356 >>> assert is_valid and conf > 0.95 

357 """ 

358 trace = np.asarray(trace).flatten() 

359 

360 if trace.size == 0: 

361 return False, 0.0 

362 

363 if period < 1 or period >= len(trace): 

364 return False, 0.0 

365 

366 # Convert to integer lag for nearest-neighbor validation 

367 lag = int(round(period)) 

368 

369 # Check if lag is within tolerance 

370 if abs(period - lag) > period * tolerance: 

371 # Use interpolation for sub-sample periods 

372 lag_low = int(np.floor(period)) 

373 lag_high = int(np.ceil(period)) 

374 alpha = period - lag_low 

375 

376 if lag_high >= len(trace): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 lag = lag_low 

378 else: 

379 # Weighted average of both lags 

380 corr_low = _compute_lag_correlation(trace, lag_low) 

381 corr_high = _compute_lag_correlation(trace, lag_high) 

382 confidence = (1 - alpha) * corr_low + alpha * corr_high 

383 

384 is_valid = confidence >= 0.5 

385 return is_valid, float(confidence) 

386 

387 # Simple case: integer lag 

388 confidence = _compute_lag_correlation(trace, lag) 

389 is_valid = confidence >= 0.5 

390 

391 return is_valid, float(confidence) 

392 

393 

394def _compute_lag_correlation(trace: NDArray[np.float64], lag: int) -> float: 

395 """Compute normalized correlation at specific lag. 

396 

397 Args: 

398 trace: Input signal 

399 lag: Lag in samples 

400 

401 Returns: 

402 Normalized correlation coefficient (0-1) 

403 """ 

404 if lag <= 0 or lag >= len(trace): 

405 return 0.0 

406 

407 # Center the signal 

408 trace_centered = trace - np.mean(trace) 

409 

410 # Compute correlation 

411 n_overlap = len(trace) - lag 

412 part1 = trace_centered[:n_overlap] 

413 part2 = trace_centered[lag : lag + n_overlap] 

414 

415 # Pearson correlation 

416 std1 = np.std(part1) 

417 std2 = np.std(part2) 

418 

419 if std1 == 0 or std2 == 0: 

420 return 0.0 

421 

422 correlation = np.mean(part1 * part2) / (std1 * std2) 

423 

424 # Clamp to [0, 1] range 

425 return float(np.clip(correlation, 0, 1)) 

426 

427 

428def _find_spectral_peaks(data: NDArray[np.float64], min_distance: int = 1) -> NDArray[np.intp]: 

429 """Find peaks in 1D array. 

430 

431 Simple peak detection: point is peak if higher than neighbors. 

432 

433 Args: 

434 data: 1D array 

435 min_distance: Minimum distance between peaks 

436 

437 Returns: 

438 Array of peak indices 

439 """ 

440 if len(data) < 3: 

441 return np.array([], dtype=np.intp) 

442 

443 # Find local maxima 

444 peaks_list: list[int] = [] 

445 for i in range(1, len(data) - 1): 

446 if data[i] > data[i - 1] and data[i] > data[i + 1]: 

447 peaks_list.append(i) 

448 

449 peaks: NDArray[np.intp] = np.array(peaks_list, dtype=np.intp) 

450 

451 # Apply minimum distance constraint 

452 if len(peaks) > 0 and min_distance > 1: 

453 # Keep highest peaks when too close 

454 filtered_peaks_list: list[int] = [] 

455 last_peak = -min_distance 

456 

457 # Sort by height 

458 sorted_indices = np.argsort(data[peaks])[::-1] 

459 

460 for idx in sorted_indices: 

461 peak_pos = int(peaks[idx]) 

462 if peak_pos - last_peak >= min_distance: 462 ↛ 460line 462 didn't jump to line 460 because the condition on line 462 was always true

463 filtered_peaks_list.append(peak_pos) 

464 last_peak = peak_pos 

465 

466 peaks = np.array(np.sort(np.array(filtered_peaks_list, dtype=np.intp)), dtype=np.intp) 

467 

468 return peaks 

469 

470 

471def _detect_period_suffix( 

472 trace: NDArray[np.float64], min_period: int, max_period: int 

473) -> int | None: 

474 """Detect period using suffix array (for exact repeats). 

475 

476 : Suffix array-based period detection 

477 

478 This method finds the longest exact repeating substring, which corresponds 

479 to the period for perfectly periodic signals. 

480 

481 Args: 

482 trace: Input signal (will be converted to bytes) 

483 min_period: Minimum period length 

484 max_period: Maximum period length 

485 

486 Returns: 

487 Period in samples, or None if not found 

488 """ 

489 # Convert to byte sequence for suffix array 

490 if trace.dtype == np.bool_ or np.all(np.isin(trace, [0, 1])): 

491 # Binary signal 

492 trace_bytes = np.packbits(trace.astype(np.uint8)) 

493 else: 

494 # Use raw bytes 

495 trace_bytes = trace.astype(np.uint8) 

496 

497 n = len(trace_bytes) 

498 

499 # Simple period detection: check for repeating patterns 

500 for period in range(min_period, min(max_period + 1, n // 2)): 

501 # Check if trace repeats with this period 

502 num_repeats = n // period 

503 if num_repeats < 2: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true

504 continue 

505 

506 # Compare first period with subsequent periods 

507 pattern = trace_bytes[:period] 

508 matches = 0 

509 

510 for i in range(1, num_repeats): 

511 segment = trace_bytes[i * period : (i + 1) * period] 

512 if len(segment) == period and np.array_equal(pattern, segment): 

513 matches += 1 

514 

515 # If most repetitions match, consider it valid 

516 if matches >= num_repeats * 0.8 - 1: 

517 return period 

518 

519 return None 

520 

521 

522class PeriodicPatternDetector: 

523 """Object-oriented wrapper for periodic pattern detection. 

524 

525 Provides a class-based interface for period detection operations, 

526 wrapping the functional API for consistency with test expectations. 

527 

528 

529 

530 Example: 

531 >>> detector = PeriodicPatternDetector() 

532 >>> result = detector.detect_period(signal) 

533 >>> print(f"Period: {result.period} samples, confidence: {result.confidence}") 

534 """ 

535 

536 def __init__( 

537 self, 

538 method: Literal["auto", "autocorr", "fft", "autocorrelation"] = "auto", 

539 sample_rate: float = 1.0, 

540 min_period: int = 2, 

541 max_period: int | None = None, 

542 ): 

543 """Initialize periodic pattern detector. 

544 

545 Args: 

546 method: Detection method ('auto', 'autocorr', 'fft', 'autocorrelation'). 

547 sample_rate: Sample rate in Hz. 

548 min_period: Minimum period in samples. 

549 max_period: Maximum period in samples. 

550 """ 

551 # Map 'autocorrelation' to 'autocorr' for test compatibility 

552 if method == "autocorrelation": 

553 method = "autocorr" 

554 self.method = method 

555 self.sample_rate = sample_rate 

556 self.min_period = min_period 

557 self.max_period = max_period 

558 

559 def detect_period(self, trace: NDArray[np.float64]) -> PeriodResult: 

560 """Detect the dominant period in the signal. 

561 

562 Args: 

563 trace: Input signal array (1D, boolean or numeric). 

564 

565 Returns: 

566 PeriodResult with detected period information. 

567 

568 Raises: 

569 ValueError: If trace is empty or too short. 

570 

571 Example: 

572 >>> detector = PeriodicPatternDetector(method="autocorr") 

573 >>> result = detector.detect_period(np.tile([1, 0], 100)) 

574 >>> result.period == 2 

575 True 

576 """ 

577 trace = np.asarray(trace).flatten() 

578 

579 # Validate input 

580 if trace.size == 0: 

581 raise ValueError("trace cannot be empty") 

582 if trace.size < 3: 

583 raise ValueError("trace must have at least 3 elements for period detection") 

584 

585 result = detect_period( 

586 trace, 

587 sample_rate=self.sample_rate, 

588 method=self.method, 

589 min_period=self.min_period, 

590 max_period=self.max_period, 

591 ) 

592 

593 if result is None: 

594 # Return a low-confidence result if no period found 

595 return PeriodResult( 

596 period_samples=1.0, 

597 period_seconds=1.0 / self.sample_rate, 

598 frequency_hz=self.sample_rate, 

599 confidence=0.0, 

600 method=self.method, 

601 ) 

602 

603 return result 

604 

605 def detect_multiple_periods( 

606 self, trace: NDArray[np.float64], num_periods: int = 5 

607 ) -> list[PeriodResult]: 

608 """Detect multiple periods in the signal. 

609 

610 Args: 

611 trace: Input signal array. 

612 num_periods: Maximum number of periods to return. 

613 

614 Returns: 

615 List of PeriodResult sorted by confidence. 

616 """ 

617 trace = np.asarray(trace).flatten() 

618 

619 if self.method in ["fft", "auto"]: 

620 min_freq = self.sample_rate / self.max_period if self.max_period else None 

621 max_freq = self.sample_rate / self.min_period if self.min_period > 0 else None 

622 return detect_periods_fft(trace, self.sample_rate, min_freq, max_freq, num_periods) 

623 else: 

624 return detect_periods_autocorr( 

625 trace, self.sample_rate, self.max_period, min_correlation=0.3 

626 )[:num_periods] 

627 

628 def validate(self, trace: NDArray[np.float64], period: float, tolerance: float = 0.01) -> bool: 

629 """Validate a detected period. 

630 

631 Args: 

632 trace: Input signal array. 

633 period: Period to validate. 

634 tolerance: Tolerance for period matching. 

635 

636 Returns: 

637 True if period is valid. 

638 """ 

639 is_valid, _ = validate_period(trace, period, tolerance) 

640 return is_valid 

641 

642 

643__all__ = [ 

644 "PeriodResult", 

645 "PeriodicPatternDetector", 

646 "detect_period", 

647 "detect_periods_autocorr", 

648 "detect_periods_fft", 

649 "validate_period", 

650]