Coverage for src / tracekit / exploratory / recovery.py: 95%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Bit error pattern analysis and capture diagnostics. 

2 

3 

4This module characterizes bit error patterns to diagnose capture quality 

5issues (EMI, USB problems, clock jitter) and suggests likely causes. 

6""" 

7 

8from dataclasses import dataclass 

9from enum import Enum 

10 

11import numpy as np 

12from numpy.typing import NDArray 

13 

14 

15class ErrorPattern(Enum): 

16 """Classified error pattern types. 

17 

18 Attributes: 

19 RANDOM: Errors uniformly distributed, no clustering (likely EMI) 

20 BURST: Errors clustered together (likely USB transmission issue) 

21 PERIODIC: Errors repeat at regular intervals (likely clock jitter) 

22 UNKNOWN: Pattern doesn't match known types 

23 """ 

24 

25 RANDOM = "random" 

26 BURST = "burst" 

27 PERIODIC = "periodic" 

28 UNKNOWN = "unknown" 

29 

30 

31@dataclass 

32class ErrorAnalysis: 

33 """Result from bit error pattern analysis. 

34 

35 Attributes: 

36 bit_error_rate: Ratio of errors to total bits 

37 error_count: Total number of bit errors detected 

38 total_bits: Total bits examined 

39 pattern_type: Classified error pattern (random, burst, periodic) 

40 mean_error_gap: Mean number of bits between errors 

41 error_positions: Array of bit positions where errors occurred 

42 diagnosis: Suggested cause based on pattern 

43 severity: Error severity level (low, moderate, severe) 

44 """ 

45 

46 bit_error_rate: float 

47 error_count: int 

48 total_bits: int 

49 pattern_type: ErrorPattern 

50 mean_error_gap: float 

51 error_positions: NDArray[np.int64] 

52 diagnosis: str 

53 severity: str 

54 

55 

56def analyze_bit_errors( 

57 received: NDArray[np.uint8], 

58 expected: NDArray[np.uint8], 

59 *, 

60 burst_threshold: int = 100, 

61 periodicity_threshold: float = 0.1, 

62) -> ErrorAnalysis: 

63 """Characterize bit error patterns for capture diagnostics. 

64 

65 : Analyzes bit errors to diagnose capture quality 

66 issues and distinguish between EMI, USB problems, and clock jitter. 

67 

68 Error pattern classification (DAQ-005): 

69 - Random: Errors uniformly distributed, no clustering 

70 - Burst: Errors clustered, mean_gap < 100 bits 

71 - Periodic: Errors repeat at regular intervals (FFT peak in positions) 

72 

73 Diagnosis suggestions (DAQ-005): 

74 - BER > 0.01: Severe capture issue, check connections 

75 - BER 0.001-0.01: Moderate errors, reduce sample rate 

76 - BER < 0.001: Acceptable, likely EMI 

77 - Burst errors: USB transmission issue 

78 - Periodic errors: Clock jitter or interference 

79 

80 Args: 

81 received: Received bit array (actual capture) 

82 expected: Expected bit array (golden reference) 

83 burst_threshold: Mean gap threshold for burst classification 

84 periodicity_threshold: FFT peak threshold for periodic detection 

85 

86 Returns: 

87 ErrorAnalysis with BER, pattern type, and diagnosis 

88 

89 Raises: 

90 ValueError: If received and expected have different lengths 

91 ValueError: If arrays are empty 

92 

93 Examples: 

94 >>> # Analyze random EMI errors 

95 >>> import numpy as np 

96 >>> expected = np.random.randint(0, 2, 10000, dtype=np.uint8) 

97 >>> received = expected.copy() 

98 >>> errors = np.random.choice(10000, 50, replace=False) 

99 >>> received[errors] = 1 - received[errors] # Flip bits 

100 >>> analysis = analyze_bit_errors(received, expected) 

101 >>> print(f"BER: {analysis.bit_error_rate:.6f}") 

102 >>> print(f"Pattern: {analysis.pattern_type.value}") 

103 

104 >>> # Analyze burst errors (USB issue) 

105 >>> received = expected.copy() 

106 >>> received[1000:1050] = 1 - received[1000:1050] # 50-bit burst 

107 >>> analysis = analyze_bit_errors(received, expected) 

108 >>> print(analysis.diagnosis) 

109 'USB transmission issue' 

110 

111 References: 

112 DAQ-005: Bit Error Pattern Analysis and Capture Diagnostics 

113 """ 

114 if len(received) != len(expected): 

115 raise ValueError("Received and expected arrays must have same length") 

116 

117 if len(received) == 0: 

118 raise ValueError("Arrays cannot be empty") 

119 

120 # Find bit errors (XOR) 

121 errors = received != expected 

122 error_positions = np.where(errors)[0] 

123 error_count = len(error_positions) 

124 total_bits = len(received) 

125 

126 # Calculate BER 

127 bit_error_rate = error_count / total_bits if total_bits > 0 else 0.0 

128 

129 if error_count == 0: 

130 # No errors 

131 return ErrorAnalysis( 

132 bit_error_rate=0.0, 

133 error_count=0, 

134 total_bits=total_bits, 

135 pattern_type=ErrorPattern.RANDOM, 

136 mean_error_gap=float(total_bits), 

137 error_positions=error_positions, 

138 diagnosis="No errors detected - good capture quality", 

139 severity="low", 

140 ) 

141 

142 # Calculate error gaps 

143 if error_count > 1: 

144 error_gaps = np.diff(error_positions) 

145 mean_gap = float(np.mean(error_gaps)) 

146 else: 

147 mean_gap = float(total_bits) 

148 

149 # Classify error pattern 

150 pattern_type = ErrorPattern.UNKNOWN 

151 diagnosis = "" 

152 

153 # Check for burst pattern (errors clustered) 

154 if error_count > 1 and mean_gap < burst_threshold: 

155 pattern_type = ErrorPattern.BURST 

156 diagnosis = "Burst errors detected - likely USB transmission issue" 

157 

158 # Check for periodic pattern (FFT analysis) 

159 # Need at least 10 errors for reliable periodicity detection 

160 elif error_count >= 10: 

161 # Create binary error signal 

162 error_signal = errors.astype(float) 

163 

164 # Compute FFT to detect periodicity 

165 fft = np.fft.rfft(error_signal) 

166 fft_mag = np.abs(fft[1:]) # Skip DC component 

167 

168 if len(fft_mag) > 0: 168 ↛ 180line 168 didn't jump to line 180 because the condition on line 168 was always true

169 # Check if there's a strong peak relative to mean (not just max) 

170 mean_mag = np.mean(fft_mag) 

171 max_mag = np.max(fft_mag) 

172 peak_ratio = max_mag / (mean_mag + 1e-12) 

173 

174 # Require strong peak (>10x mean) and exceeds threshold 

175 if peak_ratio > 10 and (max_mag / (np.max(fft_mag) + 1e-12)) > periodicity_threshold: 

176 pattern_type = ErrorPattern.PERIODIC 

177 diagnosis = "Periodic errors detected - likely clock jitter or interference" 

178 

179 # If not burst or periodic, classify as random 

180 if pattern_type == ErrorPattern.UNKNOWN: 

181 # Check if errors are uniformly distributed 

182 if error_count > 2: 

183 # Use coefficient of variation of gaps 

184 if error_count > 1: 184 ↛ 194line 184 didn't jump to line 194 because the condition on line 184 was always true

185 gap_std = float(np.std(error_gaps)) 

186 gap_cv = gap_std / (mean_gap + 1e-12) 

187 

188 if gap_cv < 1.0: # Relatively uniform spacing 188 ↛ 192line 188 didn't jump to line 192 because the condition on line 188 was always true

189 pattern_type = ErrorPattern.RANDOM 

190 diagnosis = "Random errors detected - likely EMI or noise" 

191 else: 

192 diagnosis = "Mixed error pattern - multiple causes possible" 

193 else: 

194 pattern_type = ErrorPattern.RANDOM 

195 diagnosis = "Single error - insufficient data for classification" 

196 else: 

197 pattern_type = ErrorPattern.RANDOM 

198 diagnosis = "Few errors - likely random EMI or noise" 

199 

200 # Determine severity based on BER 

201 if bit_error_rate > 0.01: 

202 severity = "severe" 

203 diagnosis += ". SEVERE: Check connections and hardware" 

204 elif bit_error_rate > 0.001: 

205 severity = "moderate" 

206 diagnosis += ". MODERATE: Consider reducing sample rate" 

207 else: 

208 severity = "low" 

209 diagnosis += ". Acceptable error rate" 

210 

211 return ErrorAnalysis( 

212 bit_error_rate=bit_error_rate, 

213 error_count=error_count, 

214 total_bits=total_bits, 

215 pattern_type=pattern_type, 

216 mean_error_gap=mean_gap, 

217 error_positions=error_positions, 

218 diagnosis=diagnosis, 

219 severity=severity, 

220 ) 

221 

222 

223def generate_error_visualization_data( 

224 analysis: ErrorAnalysis, 

225 *, 

226 histogram_bins: int = 50, 

227) -> dict[str, NDArray[np.float64]]: 

228 """Generate data for error distribution visualization. 

229 

230 Creates histogram and timeline data suitable for plotting error patterns. 

231 

232 Args: 

233 analysis: ErrorAnalysis result from analyze_bit_errors() 

234 histogram_bins: Number of bins for error position histogram 

235 

236 Returns: 

237 Dictionary with 'histogram_counts', 'histogram_edges', and 

238 'timeline' arrays for visualization 

239 

240 Examples: 

241 >>> # Generate visualization data 

242 >>> analysis = analyze_bit_errors(received, expected) 

243 >>> viz_data = generate_error_visualization_data(analysis) 

244 >>> # Plot with matplotlib 

245 >>> import matplotlib.pyplot as plt 

246 >>> plt.hist(analysis.error_positions, bins=viz_data['histogram_edges']) 

247 >>> plt.xlabel('Bit Position') 

248 >>> plt.ylabel('Error Count') 

249 >>> plt.show() 

250 

251 References: 

252 DAQ-005: Bit Error Pattern Analysis and Capture Diagnostics 

253 """ 

254 if len(analysis.error_positions) == 0: 

255 # No errors - return empty data 

256 return { 

257 "histogram_counts": np.array([], dtype=np.float64), 

258 "histogram_edges": np.array([], dtype=np.float64), 

259 "timeline": np.array([], dtype=np.float64), 

260 } 

261 

262 # Generate histogram over full bit range 

263 counts, edges = np.histogram( 

264 analysis.error_positions, bins=histogram_bins, range=(0, analysis.total_bits), density=False 

265 ) 

266 

267 # Timeline: binary array with 1s at error positions 

268 timeline = np.zeros(analysis.total_bits, dtype=np.float64) 

269 timeline[analysis.error_positions] = 1.0 

270 

271 return { 

272 "histogram_counts": counts.astype(np.float64), 

273 "histogram_edges": edges.astype(np.float64), 

274 "timeline": timeline, 

275 }