Coverage for src / tracekit / analyzers / signal_integrity / equalization.py: 98%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Equalization algorithms for signal integrity. 

2 

3This module provides FFE, DFE, and CTLE equalization to 

4compensate for channel loss and ISI. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.signal_integrity.equalization import ffe_equalize 

9 >>> equalized = ffe_equalize(trace, taps=[-0.1, 1.0, -0.1]) 

10 

11References: 

12 IEEE 802.3: Ethernet PHY Equalization Requirements 

13""" 

14 

15from __future__ import annotations 

16 

17from dataclasses import dataclass 

18from typing import TYPE_CHECKING 

19 

20import numpy as np 

21from scipy import optimize 

22from scipy import signal as scipy_signal 

23 

24if TYPE_CHECKING: 

25 from numpy.typing import NDArray 

26 

27 from tracekit.core.types import WaveformTrace 

28 

29 

30@dataclass 

31class FFEResult: 

32 """Result of FFE equalization. 

33 

34 Attributes: 

35 equalized_data: Equalized waveform data. 

36 taps: FFE tap coefficients used. 

37 n_precursor: Number of precursor taps. 

38 n_postcursor: Number of postcursor taps. 

39 mse: Mean squared error (if optimized). 

40 """ 

41 

42 equalized_data: NDArray[np.float64] 

43 taps: NDArray[np.float64] 

44 n_precursor: int 

45 n_postcursor: int 

46 mse: float | None = None 

47 

48 

49@dataclass 

50class DFEResult: 

51 """Result of DFE equalization. 

52 

53 Attributes: 

54 equalized_data: Equalized waveform data. 

55 taps: DFE tap coefficients. 

56 decisions: Decoded bit decisions. 

57 n_taps: Number of DFE taps. 

58 error_count: Number of decision errors (if reference known). 

59 """ 

60 

61 equalized_data: NDArray[np.float64] 

62 taps: NDArray[np.float64] 

63 decisions: NDArray[np.int_] | None 

64 n_taps: int 

65 error_count: int | None = None 

66 

67 

68@dataclass 

69class CTLEResult: 

70 """Result of CTLE equalization. 

71 

72 Attributes: 

73 equalized_data: Equalized waveform data. 

74 dc_gain: DC gain in dB. 

75 ac_gain: AC (peaking) gain in dB. 

76 pole_frequency: Pole frequency in Hz. 

77 zero_frequency: Zero frequency in Hz. 

78 boost: High-frequency boost in dB. 

79 """ 

80 

81 equalized_data: NDArray[np.float64] 

82 dc_gain: float 

83 ac_gain: float 

84 pole_frequency: float 

85 zero_frequency: float | None 

86 boost: float 

87 

88 

89def ffe_equalize( 

90 trace: WaveformTrace, 

91 taps: list[float] | NDArray[np.float64], 

92 *, 

93 samples_per_symbol: int | None = None, 

94) -> FFEResult: 

95 """Apply Feed-Forward Equalization to waveform. 

96 

97 FFE uses a linear FIR filter to compensate for channel ISI. 

98 The main cursor (largest tap) should be 1.0 for unity gain. 

99 

100 Args: 

101 trace: Input waveform trace. 

102 taps: FFE tap coefficients (main cursor should be 1.0). 

103 samples_per_symbol: Samples per UI (auto-detected if None). 

104 

105 Returns: 

106 FFEResult with equalized data. 

107 

108 Example: 

109 >>> result = ffe_equalize(trace, taps=[-0.1, 1.0, -0.1]) 

110 >>> # 3-tap equalizer: 1 precursor, 1 main, 1 postcursor 

111 

112 References: 

113 IEEE 802.3 Clause 93 

114 """ 

115 taps = np.array(taps, dtype=np.float64) 

116 

117 # Find main cursor position 

118 main_idx = int(np.argmax(np.abs(taps))) 

119 n_precursor = main_idx 

120 n_postcursor = len(taps) - main_idx - 1 

121 

122 # Apply FIR filter 

123 data = trace.data 

124 equalized = np.convolve(data, taps, mode="same") 

125 

126 return FFEResult( 

127 equalized_data=equalized, 

128 taps=taps, 

129 n_precursor=n_precursor, 

130 n_postcursor=n_postcursor, 

131 ) 

132 

133 

134def optimize_ffe( 

135 trace: WaveformTrace, 

136 n_taps: int = 5, 

137 *, 

138 n_precursor: int = 1, 

139 samples_per_symbol: int | None = None, 

140 target: NDArray[np.float64] | None = None, 

141) -> FFEResult: 

142 """Find optimal FFE tap coefficients. 

143 

144 Uses least-squares optimization to find taps that minimize 

145 ISI and maximize eye opening. 

146 

147 Args: 

148 trace: Input waveform trace. 

149 n_taps: Total number of FFE taps. 

150 n_precursor: Number of precursor taps. 

151 samples_per_symbol: Samples per UI. 

152 target: Target (ideal) waveform for optimization. 

153 

154 Returns: 

155 FFEResult with optimized taps. 

156 

157 Example: 

158 >>> result = optimize_ffe(trace, n_taps=5, n_precursor=1) 

159 >>> print(f"Optimal taps: {result.taps}") 

160 """ 

161 data = trace.data 

162 len(data) 

163 

164 if target is None: 

165 # Create target from sliced data (simplified) 

166 # Use a decision slicer approach 

167 threshold = np.median(data) 

168 target = np.where(data > threshold, 1.0, -1.0) 

169 

170 def objective(taps): # type: ignore[no-untyped-def] 

171 """Minimize MSE between equalized and target.""" 

172 equalized = np.convolve(data, taps, mode="same") 

173 mse = np.mean((equalized - target) ** 2) 

174 return mse 

175 

176 # Initial guess: main cursor at 1.0, others small 

177 n_postcursor = n_taps - n_precursor - 1 

178 x0 = np.zeros(n_taps) 

179 x0[n_precursor] = 1.0 

180 

181 # Constraints: limit tap magnitude 

182 bounds = [(-2.0, 2.0)] * n_taps 

183 bounds[n_precursor] = (0.5, 1.5) # Main cursor near 1.0 

184 

185 result = optimize.minimize( 

186 objective, 

187 x0, 

188 method="L-BFGS-B", 

189 bounds=bounds, 

190 options={"maxiter": 100}, 

191 ) 

192 

193 optimal_taps = result.x 

194 

195 # Normalize so main cursor is 1.0 

196 main_val = optimal_taps[n_precursor] 

197 if abs(main_val) > 1e-6: 197 ↛ 200line 197 didn't jump to line 200 because the condition on line 197 was always true

198 optimal_taps = optimal_taps / main_val 

199 

200 equalized = np.convolve(data, optimal_taps, mode="same") 

201 mse = float(np.mean((equalized - target) ** 2)) 

202 

203 return FFEResult( 

204 equalized_data=equalized, 

205 taps=optimal_taps, 

206 n_precursor=n_precursor, 

207 n_postcursor=n_postcursor, 

208 mse=mse, 

209 ) 

210 

211 

212def dfe_equalize( 

213 trace: WaveformTrace, 

214 taps: list[float] | NDArray[np.float64], 

215 *, 

216 threshold: float | None = None, 

217 samples_per_symbol: int = 1, 

218) -> DFEResult: 

219 """Apply Decision Feedback Equalization. 

220 

221 DFE cancels post-cursor ISI using feedback from previous 

222 bit decisions. Unlike FFE, DFE does not amplify noise. 

223 

224 Args: 

225 trace: Input waveform trace. 

226 taps: DFE tap coefficients for post-cursor cancellation. 

227 threshold: Decision threshold (auto-detected if None). 

228 samples_per_symbol: Samples per UI (default 1 for symbol-rate). 

229 

230 Returns: 

231 DFEResult with equalized data and decisions. 

232 

233 Example: 

234 >>> result = dfe_equalize(trace, taps=[0.2, 0.1]) 

235 >>> # 2-tap DFE canceling h1 and h2 

236 

237 References: 

238 IEEE 802.3 Clause 93 

239 """ 

240 taps = np.array(taps, dtype=np.float64) 

241 n_taps = len(taps) 

242 data = trace.data 

243 n = len(data) 

244 

245 # Auto-detect threshold 

246 if threshold is None: 

247 threshold = float(np.median(data)) 

248 

249 # Output arrays 

250 equalized = np.zeros(n, dtype=np.float64) 

251 decisions = np.zeros(n // samples_per_symbol, dtype=np.int_) 

252 

253 # Previous decisions buffer (for feedback) 

254 prev_decisions = np.zeros(n_taps, dtype=np.float64) 

255 

256 # Process symbol-by-symbol 

257 decision_idx = 0 

258 

259 for i in range(0, n, samples_per_symbol): 

260 # Get input sample 

261 input_val = data[i] 

262 

263 # Subtract DFE feedback 

264 dfe_correction = np.dot(taps, prev_decisions) 

265 corrected = input_val - dfe_correction 

266 

267 # Make decision 

268 decision = 1.0 if corrected > threshold else -1.0 

269 

270 # Store 

271 equalized[i : i + samples_per_symbol] = corrected 

272 if decision_idx < len(decisions): 272 ↛ 277line 272 didn't jump to line 277 because the condition on line 272 was always true

273 decisions[decision_idx] = int((decision + 1) / 2) # 0 or 1 

274 decision_idx += 1 

275 

276 # Shift feedback register 

277 prev_decisions = np.roll(prev_decisions, 1) 

278 prev_decisions[0] = decision 

279 

280 return DFEResult( 

281 equalized_data=equalized, 

282 taps=taps, 

283 decisions=decisions[:decision_idx], 

284 n_taps=n_taps, 

285 ) 

286 

287 

288def ctle_equalize( 

289 trace: WaveformTrace, 

290 dc_gain: float = 0.0, 

291 ac_gain: float = 6.0, 

292 pole_frequency: float = 5e9, 

293 *, 

294 zero_frequency: float | None = None, 

295) -> CTLEResult: 

296 """Apply Continuous Time Linear Equalization. 

297 

298 CTLE provides high-frequency boost to compensate for 

299 channel loss. It uses an analog-style transfer function. 

300 

301 Args: 

302 trace: Input waveform trace. 

303 dc_gain: DC gain in dB. 

304 ac_gain: AC (peaking) gain in dB. 

305 pole_frequency: Pole frequency in Hz. 

306 zero_frequency: Zero frequency in Hz (computed if None). 

307 

308 Returns: 

309 CTLEResult with equalized data. 

310 

311 Example: 

312 >>> result = ctle_equalize(trace, ac_gain=6, pole_frequency=5e9) 

313 >>> # 6 dB of high-frequency boost 

314 

315 References: 

316 IEEE 802.3 Clause 93 

317 """ 

318 data = trace.data 

319 sample_rate = trace.metadata.sample_rate 

320 len(data) 

321 

322 # Convert gains from dB to linear 

323 dc_linear = 10 ** (dc_gain / 20) 

324 ac_linear = 10 ** (ac_gain / 20) 

325 

326 # Calculate zero frequency to achieve desired boost 

327 if zero_frequency is None: 

328 # Zero at lower frequency than pole to create peaking 

329 zero_frequency = pole_frequency / (ac_linear / dc_linear) 

330 

331 # Compute boost 

332 boost = ac_gain - dc_gain 

333 

334 # Create CTLE transfer function 

335 # H(s) = (1 + s/wz) / (1 + s/wp) * gain 

336 wz = 2 * np.pi * zero_frequency 

337 wp = 2 * np.pi * pole_frequency 

338 

339 # Convert to digital filter using bilinear transform 

340 b_analog = [1 / wz, 1] # numerator coefficients 

341 a_analog = [1 / wp, 1] # denominator coefficients 

342 

343 # Bilinear transform 

344 b_digital, a_digital = scipy_signal.bilinear(b_analog, a_analog, fs=sample_rate) 

345 

346 # Scale for DC gain 

347 b_digital = b_digital * dc_linear 

348 

349 # Apply filter 

350 equalized = scipy_signal.lfilter(b_digital, a_digital, data) 

351 

352 return CTLEResult( 

353 equalized_data=equalized.astype(np.float64), 

354 dc_gain=dc_gain, 

355 ac_gain=ac_gain, 

356 pole_frequency=pole_frequency, 

357 zero_frequency=zero_frequency, 

358 boost=boost, 

359 ) 

360 

361 

362__all__ = [ 

363 "CTLEResult", 

364 "DFEResult", 

365 "FFEResult", 

366 "ctle_equalize", 

367 "dfe_equalize", 

368 "ffe_equalize", 

369 "optimize_ffe", 

370]