Coverage for src / tracekit / analyzers / signal_integrity / equalization.py: 98%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Equalization algorithms for signal integrity.
3This module provides FFE, DFE, and CTLE equalization to
4compensate for channel loss and ISI.
7Example:
8 >>> from tracekit.analyzers.signal_integrity.equalization import ffe_equalize
9 >>> equalized = ffe_equalize(trace, taps=[-0.1, 1.0, -0.1])
11References:
12 IEEE 802.3: Ethernet PHY Equalization Requirements
13"""
15from __future__ import annotations
17from dataclasses import dataclass
18from typing import TYPE_CHECKING
20import numpy as np
21from scipy import optimize
22from scipy import signal as scipy_signal
24if TYPE_CHECKING:
25 from numpy.typing import NDArray
27 from tracekit.core.types import WaveformTrace
30@dataclass
31class FFEResult:
32 """Result of FFE equalization.
34 Attributes:
35 equalized_data: Equalized waveform data.
36 taps: FFE tap coefficients used.
37 n_precursor: Number of precursor taps.
38 n_postcursor: Number of postcursor taps.
39 mse: Mean squared error (if optimized).
40 """
42 equalized_data: NDArray[np.float64]
43 taps: NDArray[np.float64]
44 n_precursor: int
45 n_postcursor: int
46 mse: float | None = None
49@dataclass
50class DFEResult:
51 """Result of DFE equalization.
53 Attributes:
54 equalized_data: Equalized waveform data.
55 taps: DFE tap coefficients.
56 decisions: Decoded bit decisions.
57 n_taps: Number of DFE taps.
58 error_count: Number of decision errors (if reference known).
59 """
61 equalized_data: NDArray[np.float64]
62 taps: NDArray[np.float64]
63 decisions: NDArray[np.int_] | None
64 n_taps: int
65 error_count: int | None = None
68@dataclass
69class CTLEResult:
70 """Result of CTLE equalization.
72 Attributes:
73 equalized_data: Equalized waveform data.
74 dc_gain: DC gain in dB.
75 ac_gain: AC (peaking) gain in dB.
76 pole_frequency: Pole frequency in Hz.
77 zero_frequency: Zero frequency in Hz.
78 boost: High-frequency boost in dB.
79 """
81 equalized_data: NDArray[np.float64]
82 dc_gain: float
83 ac_gain: float
84 pole_frequency: float
85 zero_frequency: float | None
86 boost: float
89def ffe_equalize(
90 trace: WaveformTrace,
91 taps: list[float] | NDArray[np.float64],
92 *,
93 samples_per_symbol: int | None = None,
94) -> FFEResult:
95 """Apply Feed-Forward Equalization to waveform.
97 FFE uses a linear FIR filter to compensate for channel ISI.
98 The main cursor (largest tap) should be 1.0 for unity gain.
100 Args:
101 trace: Input waveform trace.
102 taps: FFE tap coefficients (main cursor should be 1.0).
103 samples_per_symbol: Samples per UI (auto-detected if None).
105 Returns:
106 FFEResult with equalized data.
108 Example:
109 >>> result = ffe_equalize(trace, taps=[-0.1, 1.0, -0.1])
110 >>> # 3-tap equalizer: 1 precursor, 1 main, 1 postcursor
112 References:
113 IEEE 802.3 Clause 93
114 """
115 taps = np.array(taps, dtype=np.float64)
117 # Find main cursor position
118 main_idx = int(np.argmax(np.abs(taps)))
119 n_precursor = main_idx
120 n_postcursor = len(taps) - main_idx - 1
122 # Apply FIR filter
123 data = trace.data
124 equalized = np.convolve(data, taps, mode="same")
126 return FFEResult(
127 equalized_data=equalized,
128 taps=taps,
129 n_precursor=n_precursor,
130 n_postcursor=n_postcursor,
131 )
134def optimize_ffe(
135 trace: WaveformTrace,
136 n_taps: int = 5,
137 *,
138 n_precursor: int = 1,
139 samples_per_symbol: int | None = None,
140 target: NDArray[np.float64] | None = None,
141) -> FFEResult:
142 """Find optimal FFE tap coefficients.
144 Uses least-squares optimization to find taps that minimize
145 ISI and maximize eye opening.
147 Args:
148 trace: Input waveform trace.
149 n_taps: Total number of FFE taps.
150 n_precursor: Number of precursor taps.
151 samples_per_symbol: Samples per UI.
152 target: Target (ideal) waveform for optimization.
154 Returns:
155 FFEResult with optimized taps.
157 Example:
158 >>> result = optimize_ffe(trace, n_taps=5, n_precursor=1)
159 >>> print(f"Optimal taps: {result.taps}")
160 """
161 data = trace.data
162 len(data)
164 if target is None:
165 # Create target from sliced data (simplified)
166 # Use a decision slicer approach
167 threshold = np.median(data)
168 target = np.where(data > threshold, 1.0, -1.0)
170 def objective(taps): # type: ignore[no-untyped-def]
171 """Minimize MSE between equalized and target."""
172 equalized = np.convolve(data, taps, mode="same")
173 mse = np.mean((equalized - target) ** 2)
174 return mse
176 # Initial guess: main cursor at 1.0, others small
177 n_postcursor = n_taps - n_precursor - 1
178 x0 = np.zeros(n_taps)
179 x0[n_precursor] = 1.0
181 # Constraints: limit tap magnitude
182 bounds = [(-2.0, 2.0)] * n_taps
183 bounds[n_precursor] = (0.5, 1.5) # Main cursor near 1.0
185 result = optimize.minimize(
186 objective,
187 x0,
188 method="L-BFGS-B",
189 bounds=bounds,
190 options={"maxiter": 100},
191 )
193 optimal_taps = result.x
195 # Normalize so main cursor is 1.0
196 main_val = optimal_taps[n_precursor]
197 if abs(main_val) > 1e-6: 197 ↛ 200line 197 didn't jump to line 200 because the condition on line 197 was always true
198 optimal_taps = optimal_taps / main_val
200 equalized = np.convolve(data, optimal_taps, mode="same")
201 mse = float(np.mean((equalized - target) ** 2))
203 return FFEResult(
204 equalized_data=equalized,
205 taps=optimal_taps,
206 n_precursor=n_precursor,
207 n_postcursor=n_postcursor,
208 mse=mse,
209 )
212def dfe_equalize(
213 trace: WaveformTrace,
214 taps: list[float] | NDArray[np.float64],
215 *,
216 threshold: float | None = None,
217 samples_per_symbol: int = 1,
218) -> DFEResult:
219 """Apply Decision Feedback Equalization.
221 DFE cancels post-cursor ISI using feedback from previous
222 bit decisions. Unlike FFE, DFE does not amplify noise.
224 Args:
225 trace: Input waveform trace.
226 taps: DFE tap coefficients for post-cursor cancellation.
227 threshold: Decision threshold (auto-detected if None).
228 samples_per_symbol: Samples per UI (default 1 for symbol-rate).
230 Returns:
231 DFEResult with equalized data and decisions.
233 Example:
234 >>> result = dfe_equalize(trace, taps=[0.2, 0.1])
235 >>> # 2-tap DFE canceling h1 and h2
237 References:
238 IEEE 802.3 Clause 93
239 """
240 taps = np.array(taps, dtype=np.float64)
241 n_taps = len(taps)
242 data = trace.data
243 n = len(data)
245 # Auto-detect threshold
246 if threshold is None:
247 threshold = float(np.median(data))
249 # Output arrays
250 equalized = np.zeros(n, dtype=np.float64)
251 decisions = np.zeros(n // samples_per_symbol, dtype=np.int_)
253 # Previous decisions buffer (for feedback)
254 prev_decisions = np.zeros(n_taps, dtype=np.float64)
256 # Process symbol-by-symbol
257 decision_idx = 0
259 for i in range(0, n, samples_per_symbol):
260 # Get input sample
261 input_val = data[i]
263 # Subtract DFE feedback
264 dfe_correction = np.dot(taps, prev_decisions)
265 corrected = input_val - dfe_correction
267 # Make decision
268 decision = 1.0 if corrected > threshold else -1.0
270 # Store
271 equalized[i : i + samples_per_symbol] = corrected
272 if decision_idx < len(decisions): 272 ↛ 277line 272 didn't jump to line 277 because the condition on line 272 was always true
273 decisions[decision_idx] = int((decision + 1) / 2) # 0 or 1
274 decision_idx += 1
276 # Shift feedback register
277 prev_decisions = np.roll(prev_decisions, 1)
278 prev_decisions[0] = decision
280 return DFEResult(
281 equalized_data=equalized,
282 taps=taps,
283 decisions=decisions[:decision_idx],
284 n_taps=n_taps,
285 )
288def ctle_equalize(
289 trace: WaveformTrace,
290 dc_gain: float = 0.0,
291 ac_gain: float = 6.0,
292 pole_frequency: float = 5e9,
293 *,
294 zero_frequency: float | None = None,
295) -> CTLEResult:
296 """Apply Continuous Time Linear Equalization.
298 CTLE provides high-frequency boost to compensate for
299 channel loss. It uses an analog-style transfer function.
301 Args:
302 trace: Input waveform trace.
303 dc_gain: DC gain in dB.
304 ac_gain: AC (peaking) gain in dB.
305 pole_frequency: Pole frequency in Hz.
306 zero_frequency: Zero frequency in Hz (computed if None).
308 Returns:
309 CTLEResult with equalized data.
311 Example:
312 >>> result = ctle_equalize(trace, ac_gain=6, pole_frequency=5e9)
313 >>> # 6 dB of high-frequency boost
315 References:
316 IEEE 802.3 Clause 93
317 """
318 data = trace.data
319 sample_rate = trace.metadata.sample_rate
320 len(data)
322 # Convert gains from dB to linear
323 dc_linear = 10 ** (dc_gain / 20)
324 ac_linear = 10 ** (ac_gain / 20)
326 # Calculate zero frequency to achieve desired boost
327 if zero_frequency is None:
328 # Zero at lower frequency than pole to create peaking
329 zero_frequency = pole_frequency / (ac_linear / dc_linear)
331 # Compute boost
332 boost = ac_gain - dc_gain
334 # Create CTLE transfer function
335 # H(s) = (1 + s/wz) / (1 + s/wp) * gain
336 wz = 2 * np.pi * zero_frequency
337 wp = 2 * np.pi * pole_frequency
339 # Convert to digital filter using bilinear transform
340 b_analog = [1 / wz, 1] # numerator coefficients
341 a_analog = [1 / wp, 1] # denominator coefficients
343 # Bilinear transform
344 b_digital, a_digital = scipy_signal.bilinear(b_analog, a_analog, fs=sample_rate)
346 # Scale for DC gain
347 b_digital = b_digital * dc_linear
349 # Apply filter
350 equalized = scipy_signal.lfilter(b_digital, a_digital, data)
352 return CTLEResult(
353 equalized_data=equalized.astype(np.float64),
354 dc_gain=dc_gain,
355 ac_gain=ac_gain,
356 pole_frequency=pole_frequency,
357 zero_frequency=zero_frequency,
358 boost=boost,
359 )
362__all__ = [
363 "CTLEResult",
364 "DFEResult",
365 "FFEResult",
366 "ctle_equalize",
367 "dfe_equalize",
368 "ffe_equalize",
369 "optimize_ffe",
370]