Coverage for src / tracekit / exploratory / fuzzy.py: 94%
167 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Fuzzy matching for timing and pattern analysis.
3This module provides fuzzy matching capabilities for tolerating
4timing variations and pattern deviations in real-world signals.
7Example:
8 >>> from tracekit.exploratory.fuzzy import fuzzy_timing_match
9 >>> result = fuzzy_timing_match(edges, expected_period=1e-6, tolerance=0.1)
10 >>> print(f"Match confidence: {result.confidence:.1%}")
11"""
13from __future__ import annotations
15from dataclasses import dataclass
16from typing import TYPE_CHECKING, Any
18import numpy as np
20from tracekit.core.types import WaveformTrace
22if TYPE_CHECKING:
23 from numpy.typing import NDArray
26@dataclass
27class FuzzyTimingResult:
28 """Result of fuzzy timing match.
30 Attributes:
31 match: True if timing matches within tolerance.
32 confidence: Match confidence (0.0 to 1.0).
33 period: Detected period.
34 deviation: Deviation from expected period.
35 jitter_rms: RMS timing jitter.
36 outlier_count: Number of timing outliers.
37 outlier_indices: Indices of outlier edges.
38 """
40 match: bool
41 confidence: float
42 period: float
43 deviation: float
44 jitter_rms: float
45 outlier_count: int
46 outlier_indices: list[int]
49def fuzzy_timing_match(
50 trace_or_edges: WaveformTrace | NDArray[np.float64],
51 *,
52 expected_period: float | None = None,
53 tolerance: float = 0.1,
54 sample_rate: float | None = None,
55) -> FuzzyTimingResult:
56 """Match timing with fuzzy tolerance.
58 Allows timing variations while still detecting protocol patterns.
59 Useful for signals with jitter or clock drift.
61 Args:
62 trace_or_edges: WaveformTrace or array of edge times.
63 expected_period: Expected period in seconds.
64 tolerance: Tolerance as fraction (0.1 = 10%).
65 sample_rate: Sample rate (required if trace provided).
67 Returns:
68 FuzzyTimingResult with match information.
70 Raises:
71 ValueError: If sample_rate is invalid when WaveformTrace provided.
73 Example:
74 >>> result = fuzzy_timing_match(trace, expected_period=1e-6, tolerance=0.1)
75 >>> print(f"Period match: {result.match}")
76 >>> print(f"Actual period: {result.period:.3e} s")
78 References:
79 FUZZY-001: Fuzzy Timing Tolerance
80 """
81 # Extract edges if WaveformTrace provided
82 if isinstance(trace_or_edges, WaveformTrace):
83 data = trace_or_edges.data
84 sample_rate = sample_rate or trace_or_edges.metadata.sample_rate
86 if sample_rate is None or sample_rate <= 0: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 raise ValueError("Valid sample_rate required for WaveformTrace")
89 # Threshold and find edges
90 v_min = np.percentile(data, 5)
91 v_max = np.percentile(data, 95)
92 threshold = (v_min + v_max) / 2
93 digital = data > threshold
95 # Find all transitions (rising and falling edges)
96 edge_samples = np.where(np.abs(np.diff(digital.astype(int))) > 0)[0]
97 edges = edge_samples / sample_rate
98 else:
99 edges = trace_or_edges
101 if len(edges) < 2:
102 return FuzzyTimingResult(
103 match=False,
104 confidence=0.0,
105 period=0.0,
106 deviation=1.0,
107 jitter_rms=0.0,
108 outlier_count=0,
109 outlier_indices=[],
110 )
112 # Calculate inter-edge intervals
113 intervals = np.diff(edges)
115 # Detect period (use median for robustness)
116 detected_period = np.median(intervals)
118 # Use expected period if provided, otherwise use detected
119 if expected_period is None:
120 expected_period = detected_period
122 # Calculate deviation
123 deviation = abs(detected_period - expected_period) / expected_period
125 # Determine match
126 match = bool(deviation <= tolerance)
128 # Calculate jitter
129 normalized_intervals = intervals / detected_period
130 jitter_rms = np.std(normalized_intervals - 1.0) * detected_period
132 # Find outliers
133 outlier_threshold = expected_period * tolerance * 3 # 3x tolerance
134 deviations = np.abs(intervals - expected_period)
135 outlier_mask = deviations > outlier_threshold
136 outlier_count = int(np.sum(outlier_mask))
137 outlier_indices = list(np.where(outlier_mask)[0])
139 # Calculate confidence
140 # Higher confidence for lower deviation and fewer outliers
141 confidence = max(0.0, 1.0 - deviation / tolerance)
142 confidence *= max(0.0, 1.0 - outlier_count / max(len(intervals), 1))
143 confidence = min(1.0, confidence)
145 return FuzzyTimingResult(
146 match=match,
147 confidence=confidence,
148 period=detected_period,
149 deviation=deviation,
150 jitter_rms=jitter_rms,
151 outlier_count=outlier_count,
152 outlier_indices=outlier_indices,
153 )
156@dataclass
157class FuzzyPatternResult:
158 """Result of fuzzy pattern match.
160 Attributes:
161 matches: List of match locations with scores.
162 best_match_score: Score of best match.
163 total_matches: Total number of matches found.
164 pattern_variations: Common pattern variations found.
165 """
167 matches: list[dict[str, Any]]
168 best_match_score: float
169 total_matches: int
170 pattern_variations: list[tuple[tuple[int, ...], int]]
173def fuzzy_pattern_match(
174 trace: WaveformTrace,
175 pattern: list[int] | tuple[int, ...],
176 *,
177 max_errors: int = 1,
178 error_weight: float = 0.5,
179) -> FuzzyPatternResult:
180 """Match pattern with allowed bit errors.
182 Finds pattern occurrences allowing for bit errors, useful for
183 noisy signals or partial matches.
185 Args:
186 trace: Signal trace to search.
187 pattern: Bit pattern to find (list of 0s and 1s).
188 max_errors: Maximum allowed bit errors.
189 error_weight: Weight reduction per error.
191 Returns:
192 FuzzyPatternResult with match locations.
194 Example:
195 >>> result = fuzzy_pattern_match(trace, [0, 1, 0, 1, 0, 1], max_errors=1)
196 >>> print(f"Found {result.total_matches} matches")
197 >>> for match in result.matches[:5]:
198 ... print(f" Position {match['position']}: score {match['score']:.2f}")
200 References:
201 FUZZY-002: Fuzzy Pattern Matching
202 """
203 pattern = tuple(pattern)
204 pattern_len = len(pattern)
206 # Handle empty pattern
207 if pattern_len == 0:
208 return FuzzyPatternResult(
209 matches=[],
210 best_match_score=0.0,
211 total_matches=0,
212 pattern_variations=[],
213 )
215 # Convert trace to digital
216 data = trace.data
217 v_min = np.percentile(data, 5)
218 v_max = np.percentile(data, 95)
219 threshold = (v_min + v_max) / 2
220 digital = (data > threshold).astype(int)
222 # Find edges and sample at bit centers
223 edges = np.where(np.diff(digital) != 0)[0]
225 if len(edges) < 2:
226 return FuzzyPatternResult(
227 matches=[],
228 best_match_score=0.0,
229 total_matches=0,
230 pattern_variations=[],
231 )
233 # Estimate bit period from edges
234 # Note: This algorithm works best with signals that have frequent transitions.
235 # For patterns with long runs of identical bits, edge density may be insufficient
236 # for accurate bit recovery. Best suited for alternating patterns or noisy signals.
237 gaps = np.diff(edges)
239 # Use minimum gap as bit period estimate
240 # The smallest gap between edges is typically one bit period
241 estimated_bit_period = float(np.min(gaps))
243 # Sample bits at regular intervals
244 bits_list = []
245 sample_pos = edges[0] + estimated_bit_period / 2
247 while sample_pos < len(digital):
248 idx = int(sample_pos)
249 if idx < len(digital): 249 ↛ 251line 249 didn't jump to line 251 because the condition on line 249 was always true
250 bits_list.append(digital[idx])
251 sample_pos += estimated_bit_period
253 bits = np.array(bits_list)
255 # Search for pattern with fuzzy matching
256 matches = []
257 variations: dict[tuple[int, ...], int] = {}
259 for i in range(len(bits) - pattern_len + 1):
260 window = tuple(bits[i : i + pattern_len])
262 # Count errors
263 errors = sum(1 for a, b in zip(window, pattern, strict=False) if a != b)
265 if errors <= max_errors:
266 score = 1.0 - errors * error_weight
267 matches.append(
268 {
269 "position": i,
270 "sample_position": int(edges[0] + i * estimated_bit_period),
271 "errors": errors,
272 "score": score,
273 "actual_pattern": window,
274 }
275 )
277 # Track variations
278 if window != pattern:
279 variations[window] = variations.get(window, 0) + 1
281 # Sort matches by score
282 matches.sort(key=lambda x: x["score"], reverse=True) # type: ignore[arg-type, return-value]
284 best_score = matches[0]["score"] if matches else 0.0
286 # Sort variations by frequency
287 variation_list = sorted(variations.items(), key=lambda x: x[1], reverse=True)
289 return FuzzyPatternResult(
290 matches=matches,
291 best_match_score=best_score, # type: ignore[arg-type]
292 total_matches=len(matches),
293 pattern_variations=variation_list[:10],
294 )
297@dataclass
298class FuzzyProtocolResult:
299 """Result of fuzzy protocol detection.
301 Attributes:
302 detected_protocol: Most likely protocol.
303 confidence: Detection confidence.
304 alternatives: Alternative protocol candidates.
305 timing_score: Score based on timing match.
306 pattern_score: Score based on pattern match.
307 recommendations: Suggestions for improving detection.
308 """
310 detected_protocol: str
311 confidence: float
312 alternatives: list[tuple[str, float]]
313 timing_score: float
314 pattern_score: float
315 recommendations: list[str]
318# Protocol signatures for fuzzy matching
319PROTOCOL_SIGNATURES = {
320 "UART": {
321 "start_bit": 0,
322 "stop_bits": 1,
323 "frame_size": [8, 9, 10, 11], # With start/stop
324 "typical_rates": [9600, 19200, 38400, 57600, 115200],
325 },
326 "I2C": {
327 "start_pattern": [1, 0], # SDA falls while SCL high
328 "stop_pattern": [0, 1], # SDA rises while SCL high
329 "ack_bit": 0,
330 "typical_rates": [100e3, 400e3, 1e6, 3.4e6],
331 },
332 "SPI": {
333 "idle_clock": [0, 1], # CPOL options
334 "clock_phase": [0, 1], # CPHA options
335 "frame_size": [8, 16],
336 "typical_rates": [1e6, 5e6, 10e6, 20e6, 40e6],
337 },
338 "CAN": {
339 "start_of_frame": 0,
340 "frame_patterns": ["standard", "extended"],
341 "typical_rates": [125e3, 250e3, 500e3, 1e6],
342 },
343}
346def fuzzy_protocol_detect(
347 trace: WaveformTrace,
348 *,
349 candidates: list[str] | None = None,
350 timing_tolerance: float = 0.15,
351 pattern_tolerance: int = 2,
352) -> FuzzyProtocolResult:
353 """Detect protocol with fuzzy matching.
355 Uses timing tolerance and pattern flexibility to identify
356 protocols even with non-ideal signals.
358 Args:
359 trace: Signal trace to analyze.
360 candidates: List of protocols to consider (None = all).
361 timing_tolerance: Timing tolerance as fraction.
362 pattern_tolerance: Maximum pattern bit errors.
364 Returns:
365 FuzzyProtocolResult with detection results.
367 Example:
368 >>> result = fuzzy_protocol_detect(trace)
369 >>> print(f"Detected: {result.detected_protocol}")
370 >>> print(f"Confidence: {result.confidence:.1%}")
372 References:
373 FUZZY-003: Fuzzy Protocol Detection
374 """
375 data = trace.data
376 sample_rate = trace.metadata.sample_rate
378 if candidates is None:
379 candidates = list(PROTOCOL_SIGNATURES.keys())
381 # Analyze signal characteristics
382 v_min = np.percentile(data, 5)
383 v_max = np.percentile(data, 95)
384 threshold = (v_min + v_max) / 2
385 digital = data > threshold
387 edges = np.where(np.diff(digital.astype(int)) != 0)[0]
389 if len(edges) < 4:
390 return FuzzyProtocolResult(
391 detected_protocol="Unknown",
392 confidence=0.0,
393 alternatives=[],
394 timing_score=0.0,
395 pattern_score=0.0,
396 recommendations=["Insufficient edges for protocol detection"],
397 )
399 # Estimate bit rate
400 intervals = np.diff(edges)
401 median_interval = np.median(intervals)
402 estimated_bitrate = sample_rate / median_interval
404 # Score each protocol
405 scores: dict[str, dict[str, float]] = {}
407 for protocol in candidates:
408 if protocol not in PROTOCOL_SIGNATURES: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 continue
411 sig = PROTOCOL_SIGNATURES[protocol]
412 timing_score = 0.0
413 pattern_score = 0.0
415 # Check timing against typical rates
416 if "typical_rates" in sig: 416 ↛ 426line 416 didn't jump to line 426 because the condition on line 416 was always true
417 rates = sig["typical_rates"]
418 if hasattr(rates, "__iter__"): 418 ↛ 426line 418 didn't jump to line 426 because the condition on line 418 was always true
419 for rate in rates:
420 if isinstance(rate, int | float): 420 ↛ 419line 420 didn't jump to line 419 because the condition on line 420 was always true
421 ratio = estimated_bitrate / rate
422 if (1 - timing_tolerance) <= ratio <= (1 + timing_tolerance):
423 timing_score = max(timing_score, 1 - abs(1 - ratio) / timing_tolerance)
425 # Check patterns
426 if "start_pattern" in sig:
427 # Sample first few bits
428 bits = []
429 pos = edges[0] + median_interval / 2
430 for _ in range(4):
431 if pos < len(digital): 431 ↛ 433line 431 didn't jump to line 433 because the condition on line 431 was always true
432 bits.append(int(digital[int(pos)]))
433 pos += median_interval
435 expected = sig["start_pattern"]
436 if len(bits) >= len(expected): # type: ignore[arg-type] 436 ↛ 446line 436 didn't jump to line 446 because the condition on line 436 was always true
437 errors = sum(
438 1 # type: ignore[misc]
439 for a, b in zip(bits[: len(expected)], expected, strict=False) # type: ignore[call-overload, arg-type]
440 if a != b # type: ignore[misc, call-overload, arg-type]
441 )
442 if errors <= pattern_tolerance: 442 ↛ 446line 442 didn't jump to line 446 because the condition on line 442 was always true
443 pattern_score = 1 - errors * 0.3
445 # Check frame size
446 if "frame_size" in sig:
447 # Estimate frame size from inter-frame gaps
448 gap_threshold = median_interval * 2
449 long_gaps = intervals[intervals > gap_threshold]
450 if len(long_gaps) > 0:
451 frame_samples = np.median(long_gaps)
452 frame_bits = frame_samples / median_interval
453 for valid_size in sig["frame_size"]: # type: ignore[attr-defined]
454 if abs(frame_bits - valid_size) < 1.5: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 pattern_score = max(pattern_score, 0.7)
457 scores[protocol] = {
458 "timing": timing_score,
459 "pattern": pattern_score,
460 "total": timing_score * 0.5 + pattern_score * 0.5,
461 }
463 # Find best match
464 if not scores: 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true
465 return FuzzyProtocolResult(
466 detected_protocol="Unknown",
467 confidence=0.0,
468 alternatives=[],
469 timing_score=0.0,
470 pattern_score=0.0,
471 recommendations=["No matching protocols found"],
472 )
474 sorted_protocols = sorted(scores.items(), key=lambda x: x[1]["total"], reverse=True)
475 best_protocol, best_scores = sorted_protocols[0]
477 confidence = best_scores["total"]
478 alternatives = [(p, s["total"]) for p, s in sorted_protocols[1:4] if s["total"] > 0.2]
480 # Generate recommendations
481 recommendations = []
483 if confidence < 0.5:
484 recommendations.append("Low confidence - verify with protocol-specific decoder")
486 if best_scores["timing"] > best_scores["pattern"]:
487 recommendations.append("Timing matched better than patterns - check signal quality")
489 if best_scores["pattern"] > best_scores["timing"]:
490 recommendations.append("Patterns matched but timing off - check clock accuracy")
492 if not alternatives:
493 recommendations.append("No alternative protocols detected")
495 return FuzzyProtocolResult(
496 detected_protocol=best_protocol,
497 confidence=confidence,
498 alternatives=alternatives,
499 timing_score=best_scores["timing"],
500 pattern_score=best_scores["pattern"],
501 recommendations=recommendations,
502 )
505__all__ = [
506 "PROTOCOL_SIGNATURES",
507 "FuzzyPatternResult",
508 "FuzzyProtocolResult",
509 "FuzzyTimingResult",
510 "fuzzy_pattern_match",
511 "fuzzy_protocol_detect",
512 "fuzzy_timing_match",
513]