yieldcurveml.stripcurve

1from .stripcurve import CurveStripper
2from .bootstrapcurve import RateCurveBootstrapper
3
4__all__ = ['CurveStripper', 
5           'RateCurveBootstrapper']
class CurveStripper(sklearn.base.BaseEstimator, sklearn.base.RegressorMixin):
 26class CurveStripper(BaseEstimator, RegressorMixin):
 27    """Yield curve stripping estimator.
 28    
 29    Parameters
 30    ----------
 31    estimator : sklearn estimator, default=None
 32        Scikit-learn estimator to use for fitting. If None, uses bootstrap method.
 33    lambda1 : float, default=2.5
 34        First lambda parameter for NSS function
 35    lambda2 : float, default=4.5
 36        Second lambda parameter for NSS function
 37    type_regressors : str, default=None
 38        Type of basis functions, one of "laguerre", "cubic", "kernel", or None for bootstrap
 39    kernel_type : str, default=None
 40        Type of kernel to use if type_regressors is "kernel"
 41    interpolation : str, default='linear'
 42        Interpolation method for bootstrap
 43    **kwargs : dict
 44        Additional parameters for kernel generation
 45    """
 46    def __init__(
 47        self,
 48        estimator=None,
 49        lambda1: float = 2.5,
 50        lambda2: float = 4.5,
 51        type_regressors: Optional[Literal["laguerre", "cubic", "kernel"]] = None,
 52        kernel_type: Optional[Literal['matern', 'rbf', 'rationalquadratic', 'smithwilson']] = None,
 53        interpolation: Literal['linear', 'cubic'] = 'linear',
 54        **kwargs
 55    ):
 56        self.estimator = estimator
 57        self.lambda1 = lambda1
 58        self.lambda2 = lambda2
 59        self.type_regressors = type_regressors
 60        self.kernel_type = kernel_type
 61        self.interpolation = interpolation
 62        self.cashflow_dates_ = None
 63        
 64        # Validate configuration
 65        if type_regressors in ["laguerre", "cubic"] and estimator is None:
 66            warnings.warn("For basis regression methods, an estimator should be provided. Falling back to bootstrap method.")
 67            self.type_regressors = None
 68        
 69        # Clear kernel_type if not using kernel regression
 70        if self.type_regressors != "kernel":
 71            self.kernel_type = None
 72            
 73        self.maturities = None
 74        self.swap_rates = None
 75        self.tenor_swaps = None
 76        self.T_UFR = None
 77        self.kernel_params_ = kwargs  # Store kernel parameters
 78        self.coef_ = None
 79        self.cashflows_ = None
 80        self.cashflow_dates_ = None
 81        self.curve_rates_ = None
 82
 83    def _get_basis_functions(self, maturities: np.ndarray) -> np.ndarray:
 84        """Generate basis functions for the regression."""
 85        if self.type_regressors == "laguerre":
 86            temp1 = maturities / self.lambda1
 87            temp = np.exp(-temp1)
 88            temp2 = maturities / self.lambda2
 89            return np.column_stack([
 90                np.ones_like(maturities),
 91                temp,
 92                temp1 * temp,
 93                temp2 * np.exp(-temp2)
 94            ])
 95        elif self.type_regressors == "cubic":  # cubic
 96            return  np.column_stack([
 97                maturities,
 98                maturities**2,
 99                maturities**3
100            ])
101        elif self.type_regressors == "kernel":
102            return generate_kernel(maturities, kernel_type=self.kernel_type, 
103                              **self.kernel_params_)        
104    
105    def fit(
106        self, 
107        maturities: np.ndarray, 
108        swap_rates: np.ndarray,
109        tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m",
110        T_UFR: Optional[float] = None
111    ) -> "CurveStripper":
112        """Fit the curve stripper model.
113        
114        Parameters
115        ----------
116        maturities : np.ndarray
117            Maturities of the swap rates
118        swap_rates : np.ndarray
119            Swap rates
120        tenor_swaps : Literal["1m", "3m", "6m", "1y"], default="6m"
121            Tenor of the swaps to use for the bootstrap
122        T_UFR : float, default=None
123            UFR to use for the Smith-Wilson method
124
125        Returns
126        -------
127        self : CurveStripper
128            Fitted curve stripper model
129        """
130        self.maturities = maturities
131        self.swap_rates = swap_rates
132        self.tenor_swaps = tenor_swaps
133        self.T_UFR = T_UFR
134        # Store inputs
135        self.rates_ = RatesContainer(
136            maturities=np.asarray(maturities),
137            swap_rates=np.asarray(swap_rates)
138        )
139        # Get cashflows and store them
140        self.cashflows_ = swap_cashflows_matrix(
141            swap_rates=self.swap_rates,
142            maturities=self.maturities,
143            tenor_swaps=self.tenor_swaps
144        )
145        self.cashflow_dates_ = self.cashflows_.cashflow_dates[-1]        
146        
147        # Handle different fitting methods
148        if self.type_regressors is None and self.estimator is None:
149            # Bootstrap method
150            bootstrapper = RateCurveBootstrapper(interpolation=self.interpolation)
151            self.curve_rates_ = bootstrapper.fit(
152                maturities=self.rates_.maturities,
153                swap_rates=self.rates_.swap_rates,
154                tenor_swaps=self.tenor_swaps
155            )
156            
157        elif self.type_regressors == "kernel":
158            if self.estimator is None:
159                # Direct kernel solver (kernel inversion)
160                lambda_reg = self.kernel_params_.get('lambda_reg', 1e-6)                
161                # Generate kernel matrix using maturities
162                K = generate_kernel(
163                    self.cashflow_dates_,
164                    kernel_type=self.kernel_type,
165                    **self.kernel_params_
166                )                
167                # Calculate coefficients (solving the system)
168                C = self.cashflows_.cashflow_matrix
169                V = np.ones_like(self.maturities)
170                if self.kernel_type == "smithwilson":
171                    # For Smith-Wilson, include UFR adjustment
172                    ufr = self.kernel_params_.get('ufr', 0.03)
173                    mu = np.exp(-ufr * self.cashflow_dates_)
174                    target = V - C @ mu              
175                    # Solve the system using C @ K @ C.T to get correct dimensions
176                    A = C @ K @ C.T + lambda_reg * np.eye(len(C))  # Now A is (n_maturities × n_maturities)
177                    self.coef_ = np.linalg.solve(A, target)  # Now dimensions match                    
178                else:          
179                    A = C @ K @ C.T + lambda_reg * np.eye(len(C))
180                    A = (A + A.T) / 2  # Ensure symmetry
181                    self.coef_ = np.linalg.solve(A, V)
182            else:
183                # Kernel regression (using kernel as features)
184                X = generate_kernel(
185                    maturities.reshape(-1, 1),
186                    kernel_type=self.kernel_type,
187                    **self.kernel_params_
188                )
189                y = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / maturities
190                self.estimator.fit(X, y)
191        else:
192            # Standard basis regression (laguerre / cubic) : fit estimator before prediction
193            if self.estimator is not None:
194                X_train = self._get_basis_functions(np.asarray(self.maturities).reshape(-1))
195                y_train = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / np.asarray(self.maturities)
196                # Ensure numpy arrays and correct shapes
197                X_train = np.asarray(X_train)
198                y_train = np.asarray(y_train).reshape(-1)
199                # Fit the provided estimator
200                self.estimator.fit(X_train, y_train)
201        
202        # Calculate initial rates for all methods
203        if self.curve_rates_ is None:
204            self.curve_rates_ = self._calculate_rates(self.maturities)
205        return self
206    
207    def _calculate_rates(self, maturities: np.ndarray, X: Optional[np.ndarray] = None) -> CurveRates:
208        """Calculate spot rates, forward rates, and discount factors."""
209        # Handle bootstrap case first
210        if self.type_regressors is None and self.estimator is None:
211            return self.curve_rates_
212
213        # Handle kernel methods
214        if self.type_regressors == "kernel":
215            if self.estimator is None:
216                # Direct kernel prediction
217                K_interp = generate_kernel(
218                    maturities,
219                    kernel_type=self.kernel_type,
220                    nodal_points=self.cashflow_dates_,
221                    **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'}
222                )            
223                # Calculate discount factors
224                if self.kernel_type == "smithwilson":
225                    ufr = self.kernel_params_.get('ufr', 0.03)
226                    mu_interp = np.exp(-ufr * maturities)
227                    # Use cashflow matrix for final calculation
228                    C = self.cashflows_.cashflow_matrix
229                    discount_factors = mu_interp + K_interp @ C.T @ self.coef_
230                else:
231                    discount_factors = K_interp @ self.coef_                    
232            else:
233                # Kernel regression prediction
234                if X is None:
235                    nodal_points_2d = self.maturities.reshape(-1, 1)
236                    maturities_2d = maturities.reshape(-1, 1)
237                    X = generate_kernel(
238                        maturities_2d,
239                        kernel_type=self.kernel_type,
240                        nodal_points=nodal_points_2d,
241                        **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'}
242                    )
243                spot_rates = self.estimator.predict(X)
244                discount_factors = np.exp(-maturities * spot_rates)
245                
246                # Calculate forward rates
247                forward_rates = self._calculate_forward_rates(maturities, discount_factors)
248                
249                return CurveRates(
250                    maturities=maturities,
251                    spot_rates=spot_rates,
252                    forward_rates=forward_rates,
253                    discount_factors=discount_factors
254                )
255        else:
256            # Basis regression methods (laguerre/cubic)
257            if X is None:
258                X = self._get_basis_functions(maturities)
259            
260            if self.estimator is None:
261                # This should not happen due to validation in __init__, but handle gracefully
262                raise ValueError("Estimator is required for basis regression methods")
263                
264            spot_rates = self.estimator.predict(X)
265            discount_factors = np.exp(-maturities * spot_rates)
266    
267        # Calculate spot rates from discount factors (for direct kernel methods)
268        if self.type_regressors == "kernel" and self.estimator is None:
269            spot_rates = -np.log(discount_factors) / maturities
270    
271        # Calculate forward rates
272        forward_rates = self._calculate_forward_rates(maturities, discount_factors)
273    
274        return CurveRates(
275            maturities=maturities,
276            spot_rates=spot_rates,
277            forward_rates=forward_rates,
278            discount_factors=discount_factors
279        )
280    
281    def _calculate_forward_rates(self, maturities: np.ndarray, discount_factors: np.ndarray) -> np.ndarray:
282        """Calculate forward rates from discount factors."""
283        forward_rates = np.zeros_like(maturities)
284        
285        if len(maturities) > 1:
286            # Calculate forward rates between consecutive maturities
287            for i in range(len(maturities) - 1):
288                t1, t2 = maturities[i], maturities[i+1]
289                df1, df2 = discount_factors[i], discount_factors[i+1]
290                
291                # Forward rate from t1 to t2: f(t1,t2) = -ln(df2/df1)/(t2-t1)
292                if t2 > t1 and df1 > 0 and df2 > 0:
293                    forward_rates[i] = -np.log(df2 / df1) / (t2 - t1)
294                else:
295                    forward_rates[i] = 0.0
296            
297            # For the last maturity, use the spot rate
298            forward_rates[-1] = -np.log(discount_factors[-1]) / maturities[-1] if discount_factors[-1] > 0 else 0.0
299        else:
300            # Single maturity case
301            forward_rates[0] = -np.log(discount_factors[0]) / maturities[0] if discount_factors[0] > 0 else 0.0
302            
303        return forward_rates
304    
305    def _interpolate_bootstrap_rates(self, maturities: np.ndarray) -> CurveRates:
306        """Interpolate bootstrap rates for requested maturities."""
307        if np.array_equal(maturities, self.curve_rates_.maturities):
308            return self.curve_rates_
309            
310        # Interpolate spot rates
311        spot_rate_interpolator = interp1d(
312            self.curve_rates_.maturities,
313            self.curve_rates_.spot_rates,
314            kind=self.interpolation,
315            fill_value='extrapolate',
316            bounds_error=False
317        )
318        spot_rates = spot_rate_interpolator(maturities)
319        
320        # Calculate discount factors and forward rates
321        discount_factors = np.exp(-maturities * spot_rates)
322        forward_rates = self._calculate_forward_rates(maturities, discount_factors)
323        
324        return CurveRates(
325            maturities=maturities,
326            spot_rates=spot_rates,
327            forward_rates=forward_rates,
328            discount_factors=discount_factors
329        )
330    
331    def predict(self, maturities: np.ndarray) -> CurveRates:
332        """Predict rates for given maturities."""
333        check_is_fitted(self)
334        # Ensure maturities is 2D for kernel methods
335        maturities = np.asarray(maturities).reshape(-1)  # First ensure 1D        
336        if self.type_regressors is None and self.estimator is None:
337            # Interpolate bootstrap results using scipy's interp1d
338            return self._interpolate_bootstrap_rates(maturities)
339        
340        if self.type_regressors == "kernel":
341            if self.estimator is None:
342                # Direct kernel prediction
343                return self._calculate_rates(maturities)
344            else:
345                # Ensure both inputs are 2D for kernel generation
346                maturities_2d = maturities.reshape(-1, 1)
347                nodal_points_2d = self.maturities.reshape(-1, 1)
348                
349                X = generate_kernel(
350                    maturities_2d,
351                    kernel_type=self.kernel_type,
352                    nodal_points=nodal_points_2d,
353                    **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'}
354                )
355                return self._calculate_rates(maturities, X)
356        else:
357            # Standard basis functions
358            X = self._get_basis_functions(maturities.ravel())
359            return self._calculate_rates(maturities.ravel(), X)
360    
361    def get_diagnostics(
362        self,
363        X_test: Optional[np.ndarray] = None,
364        y_test: Optional[np.ndarray] = None
365    ) -> Union[RegressionDiagnostics, tuple[RegressionDiagnostics, RegressionDiagnostics]]:
366        """Calculate detailed regression diagnostics."""
367        def calculate_diagnostics(y_true: np.ndarray, y_pred: np.ndarray) -> RegressionDiagnostics:
368            residuals = y_true - y_pred
369            abs_residuals = np.abs(residuals)
370            
371            residuals_summary = {
372                'mean': np.mean(residuals),
373                'std': np.std(residuals),
374                'median': np.median(residuals),
375                'mad': np.median(abs_residuals),
376                'skewness': float(np.mean(((residuals - np.mean(residuals)) / np.std(residuals)) ** 3)),
377                'kurtosis': float(np.mean(((residuals - np.mean(residuals)) / np.std(residuals)) ** 4) - 3),
378                'percentiles': {
379                    '1%': np.percentile(residuals, 1),
380                    '5%': np.percentile(residuals, 5),
381                    '25%': np.percentile(residuals, 25),
382                    '75%': np.percentile(residuals, 75),
383                    '95%': np.percentile(residuals, 95),
384                    '99%': np.percentile(residuals, 99)
385                }
386            }
387            
388            return RegressionDiagnostics(
389                r2_score=r2_score(y_true, y_pred),
390                rmse=np.sqrt(mean_squared_error(y_true, y_pred)),
391                mae=mean_absolute_error(y_true, y_pred),
392                max_error=max_error(y_true, y_pred),
393                min_error=float(np.min(abs_residuals)),
394                residuals=residuals,
395                fitted_values=y_pred,
396                actual_values=y_true,
397                n_samples=len(y_true),
398                residuals_summary=residuals_summary
399            )
400        
401        # Training set diagnostics
402        if self.estimator is None and self.type_regressors is None:
403            # For bootstrap method, compare original swap rates with reconstructed rates
404            y_train = self.rates_.swap_rates
405            y_train_pred = self.predict(self.rates_.maturities).spot_rates
406        else:
407            # For regression methods
408            if self.type_regressors == "kernel" and self.estimator is not None:
409                X_train = generate_kernel(
410                    self.maturities.reshape(-1, 1),
411                    kernel_type=self.kernel_type,
412                    **self.kernel_params_
413                )
414            else:
415                X_train = self._get_basis_functions(self.rates_.maturities)
416            y_train = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / self.rates_.maturities
417            y_train_pred = self.estimator.predict(X_train)
418        
419        train_diagnostics = calculate_diagnostics(y_train, y_train_pred)
420        
421        # Test set diagnostics if provided
422        if X_test is not None and y_test is not None:
423            if self.estimator is None and self.type_regressors is None:
424                y_test_pred = self.predict(X_test).spot_rates
425            else:
426                if self.type_regressors == "kernel" and self.estimator is not None:
427                    X_test_basis = generate_kernel(
428                        X_test.reshape(-1, 1),
429                        kernel_type=self.kernel_type,
430                        **self.kernel_params_
431                    )
432                else:
433                    X_test_basis = self._get_basis_functions(X_test)
434                y_test_pred = self.estimator.predict(X_test_basis)
435            test_diagnostics = calculate_diagnostics(y_test, y_test_pred)
436            return train_diagnostics, test_diagnostics
437        
438        return train_diagnostics

Yield curve stripping estimator.

Parameters

estimator : sklearn estimator, default=None Scikit-learn estimator to use for fitting. If None, uses bootstrap method. lambda1 : float, default=2.5 First lambda parameter for NSS function lambda2 : float, default=4.5 Second lambda parameter for NSS function type_regressors : str, default=None Type of basis functions, one of "laguerre", "cubic", "kernel", or None for bootstrap kernel_type : str, default=None Type of kernel to use if type_regressors is "kernel" interpolation : str, default='linear' Interpolation method for bootstrap **kwargs : dict Additional parameters for kernel generation

def fit( self, maturities: numpy.ndarray, swap_rates: numpy.ndarray, tenor_swaps: Literal['1m', '3m', '6m', '1y'] = '6m', T_UFR: Optional[float] = None) -> CurveStripper:
105    def fit(
106        self, 
107        maturities: np.ndarray, 
108        swap_rates: np.ndarray,
109        tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m",
110        T_UFR: Optional[float] = None
111    ) -> "CurveStripper":
112        """Fit the curve stripper model.
113        
114        Parameters
115        ----------
116        maturities : np.ndarray
117            Maturities of the swap rates
118        swap_rates : np.ndarray
119            Swap rates
120        tenor_swaps : Literal["1m", "3m", "6m", "1y"], default="6m"
121            Tenor of the swaps to use for the bootstrap
122        T_UFR : float, default=None
123            UFR to use for the Smith-Wilson method
124
125        Returns
126        -------
127        self : CurveStripper
128            Fitted curve stripper model
129        """
130        self.maturities = maturities
131        self.swap_rates = swap_rates
132        self.tenor_swaps = tenor_swaps
133        self.T_UFR = T_UFR
134        # Store inputs
135        self.rates_ = RatesContainer(
136            maturities=np.asarray(maturities),
137            swap_rates=np.asarray(swap_rates)
138        )
139        # Get cashflows and store them
140        self.cashflows_ = swap_cashflows_matrix(
141            swap_rates=self.swap_rates,
142            maturities=self.maturities,
143            tenor_swaps=self.tenor_swaps
144        )
145        self.cashflow_dates_ = self.cashflows_.cashflow_dates[-1]        
146        
147        # Handle different fitting methods
148        if self.type_regressors is None and self.estimator is None:
149            # Bootstrap method
150            bootstrapper = RateCurveBootstrapper(interpolation=self.interpolation)
151            self.curve_rates_ = bootstrapper.fit(
152                maturities=self.rates_.maturities,
153                swap_rates=self.rates_.swap_rates,
154                tenor_swaps=self.tenor_swaps
155            )
156            
157        elif self.type_regressors == "kernel":
158            if self.estimator is None:
159                # Direct kernel solver (kernel inversion)
160                lambda_reg = self.kernel_params_.get('lambda_reg', 1e-6)                
161                # Generate kernel matrix using maturities
162                K = generate_kernel(
163                    self.cashflow_dates_,
164                    kernel_type=self.kernel_type,
165                    **self.kernel_params_
166                )                
167                # Calculate coefficients (solving the system)
168                C = self.cashflows_.cashflow_matrix
169                V = np.ones_like(self.maturities)
170                if self.kernel_type == "smithwilson":
171                    # For Smith-Wilson, include UFR adjustment
172                    ufr = self.kernel_params_.get('ufr', 0.03)
173                    mu = np.exp(-ufr * self.cashflow_dates_)
174                    target = V - C @ mu              
175                    # Solve the system using C @ K @ C.T to get correct dimensions
176                    A = C @ K @ C.T + lambda_reg * np.eye(len(C))  # Now A is (n_maturities × n_maturities)
177                    self.coef_ = np.linalg.solve(A, target)  # Now dimensions match                    
178                else:          
179                    A = C @ K @ C.T + lambda_reg * np.eye(len(C))
180                    A = (A + A.T) / 2  # Ensure symmetry
181                    self.coef_ = np.linalg.solve(A, V)
182            else:
183                # Kernel regression (using kernel as features)
184                X = generate_kernel(
185                    maturities.reshape(-1, 1),
186                    kernel_type=self.kernel_type,
187                    **self.kernel_params_
188                )
189                y = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / maturities
190                self.estimator.fit(X, y)
191        else:
192            # Standard basis regression (laguerre / cubic) : fit estimator before prediction
193            if self.estimator is not None:
194                X_train = self._get_basis_functions(np.asarray(self.maturities).reshape(-1))
195                y_train = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / np.asarray(self.maturities)
196                # Ensure numpy arrays and correct shapes
197                X_train = np.asarray(X_train)
198                y_train = np.asarray(y_train).reshape(-1)
199                # Fit the provided estimator
200                self.estimator.fit(X_train, y_train)
201        
202        # Calculate initial rates for all methods
203        if self.curve_rates_ is None:
204            self.curve_rates_ = self._calculate_rates(self.maturities)
205        return self

Fit the curve stripper model.

Parameters

maturities : np.ndarray Maturities of the swap rates swap_rates : np.ndarray Swap rates tenor_swaps : Literal["1m", "3m", "6m", "1y"], default="6m" Tenor of the swaps to use for the bootstrap T_UFR : float, default=None UFR to use for the Smith-Wilson method

Returns

self : CurveStripper Fitted curve stripper model

def predict( self, maturities: numpy.ndarray) -> yieldcurveml.utils.datastructures.CurveRates:
331    def predict(self, maturities: np.ndarray) -> CurveRates:
332        """Predict rates for given maturities."""
333        check_is_fitted(self)
334        # Ensure maturities is 2D for kernel methods
335        maturities = np.asarray(maturities).reshape(-1)  # First ensure 1D        
336        if self.type_regressors is None and self.estimator is None:
337            # Interpolate bootstrap results using scipy's interp1d
338            return self._interpolate_bootstrap_rates(maturities)
339        
340        if self.type_regressors == "kernel":
341            if self.estimator is None:
342                # Direct kernel prediction
343                return self._calculate_rates(maturities)
344            else:
345                # Ensure both inputs are 2D for kernel generation
346                maturities_2d = maturities.reshape(-1, 1)
347                nodal_points_2d = self.maturities.reshape(-1, 1)
348                
349                X = generate_kernel(
350                    maturities_2d,
351                    kernel_type=self.kernel_type,
352                    nodal_points=nodal_points_2d,
353                    **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'}
354                )
355                return self._calculate_rates(maturities, X)
356        else:
357            # Standard basis functions
358            X = self._get_basis_functions(maturities.ravel())
359            return self._calculate_rates(maturities.ravel(), X)

Predict rates for given maturities.

class RateCurveBootstrapper(sklearn.base.BaseEstimator, sklearn.base.RegressorMixin):
 12class RateCurveBootstrapper(BaseEstimator, RegressorMixin):
 13    """
 14    Bootstrap interest rate curve from swap rates.
 15
 16    Parameters
 17    ----------
 18
 19    interpolation: Literal['linear', 'cubic'] = 'linear'
 20    """
 21    def __init__(
 22        self,
 23        interpolation: Literal['linear', 'cubic'] = 'linear'
 24    ):
 25        self.interpolation = interpolation
 26        self.maturities = None
 27        self.swap_rates = None
 28        self.tenor_swaps = None
 29        self.n_maturities_ = None
 30        self.cashflows_ = None
 31        self.cashflow_dates_ = None
 32        self.spot_rates_ = None
 33        self.discount_factors_ = None 
 34        self.forward_rates_ = None
 35
 36    def fit(
 37        self, 
 38        maturities: np.ndarray, 
 39        swap_rates: np.ndarray,
 40        tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m",
 41    ) -> CurveRates:
 42        """
 43        Bootstrap interest rate curve from swap rates using an improved procedure.
 44        
 45        Args:
 46            maturities: Array of swap maturities
 47            swap_rates: Array of corresponding swap rates
 48            tenor_swaps: Tenor of the swaps to use for the bootstrap
 49            
 50        Returns:
 51            CurveRates object containing bootstrapped curves
 52        """
 53        if self.interpolation not in ['linear', 'cubic']:
 54            raise ValueError("interpolation must be either 'linear' or 'cubic'")
 55        self.maturities = maturities        
 56        self.swap_rates = swap_rates
 57        self.tenor_swaps = tenor_swaps
 58        self.spot_rates_ = self.swap_rates
 59        self._validate_inputs()
 60        if self.tenor_swaps == "1m":
 61            self.freq_payments_ = 12
 62        elif self.tenor_swaps == "3m":
 63            self.freq_payments_ = 4
 64        elif self.tenor_swaps == "6m":
 65            self.freq_payments_ = 2
 66        elif self.tenor_swaps == "1y":
 67            self.freq_payments_ = 1
 68        self.n_maturities_ = len(self.maturities)                
 69        cashflow_data = swap_cashflows_matrix(
 70            swap_rates=self.swap_rates,
 71            maturities=self.maturities,
 72            tenor_swaps=self.tenor_swaps)
 73        self.cashflows_ = cashflow_data.cashflow_matrix
 74        self.cashflow_dates_ = cashflow_data.cashflow_dates[-1,:]  
 75        self.discount_factors_ = np.zeros(self.n_maturities_)
 76        self.spot_rates_ = np.zeros(self.n_maturities_)
 77        self.forward_rates_ = np.zeros(self.n_maturities_)
 78
 79        def objective_function(x):
 80            current_maturity = self.maturities[i]
 81            mask_maturities = (self.spot_rates_ > 0)
 82            mask_cashflow_dates = (self.cashflow_dates_ <= current_maturity) # mask for cashflow dates before maturity
 83            cashflow_dates = self.cashflow_dates_[mask_cashflow_dates] # cashflow dates before maturity
 84            cashflows = self.cashflows_[i, mask_cashflow_dates] # cashflows before maturity 
 85            if (not np.all(self.spot_rates_ <= 0)): # at least one spot rate is positive
 86                spot_rates = np.interp(x=cashflow_dates, 
 87                                    xp=self.maturities[mask_maturities], 
 88                                    fp=self.spot_rates_[mask_maturities]) if self.interpolation == 'linear' else interp1d(self.maturities[mask_maturities], self.spot_rates_[mask_maturities], kind=self.interpolation)
 89            else: # first guess for spot rate is the swap rate
 90                spot_rates = self.swap_rates[0]
 91            dfs = np.exp(-cashflow_dates*spot_rates)
 92            dfs[-1:] = np.exp(-current_maturity*x) # last cashflow is the maturity, we are solving for x 
 93            drs_ = -np.log(dfs)/cashflow_dates # discount rates for cashflows, excluding the first date = 0
 94            dfs_ = self._compute_discount_factor(cashflow_dates, drs_) # discount factors for cashflow dates after the first date = 0
 95            return np.sum(cashflows*dfs_) - 1
 96        
 97        # Bootstrap iteratively
 98        for i in range(self.n_maturities_):
 99            result = root_scalar(f=objective_function, x0=self.swap_rates[i])
100            self.spot_rates_[i] = result.root
101            self.discount_factors_[i] = self._compute_discount_factor(self.maturities[i], 
102                                                                      self.spot_rates_[i])
103        # Calculate forward rates using improved method
104        self.forward_rates_ = self._calculate_forward_rates(self.maturities, 
105                                                           self.spot_rates_, 
106                                                           self.discount_factors_)
107        
108        return CurveRates(
109            maturities=self.maturities,
110            spot_rates=self.spot_rates_,
111            discount_factors=self.discount_factors_,
112            forward_rates=self.forward_rates_
113        )
114    
115    def _validate_inputs(self) -> None:
116        """Validate input arrays."""
117        if len(self.maturities) != len(self.swap_rates):
118            raise ValueError("Maturities and swap rates must have same length")
119        if not np.all(np.diff(self.maturities) > 1e-10):
120            raise ValueError("Maturities must be strictly increasing with minimum spacing of 1e-10")
121        if np.any(self.maturities <= 0):
122            raise ValueError("Maturities must be positive")
123        if np.any(~np.isfinite(self.maturities)) or np.any(~np.isfinite(self.swap_rates)):
124            raise ValueError("Maturities and swap rates must be finite numbers")
125    
126    def _compute_discount_factor(self, maturity: float, rate: float) -> float:
127        """Compute discount factor from spot rate.
128
129        Args:
130            maturity: Maturity of the discount factor
131            rate: Spot rate
132            
133        Returns:
134            Discount factor
135        """
136        return np.exp(-maturity * rate)
137    
138    def _calculate_forward_rates(
139        self,
140        maturities: np.ndarray,
141        spot_rates: np.ndarray,
142        discount_factors: np.ndarray
143    ) -> np.ndarray:
144        """Calculate forward rates using improved method.
145        
146        Args:
147            maturities: Array of maturities
148            spot_rates: Array of spot rates
149            discount_factors: Array of discount factors
150            
151        Returns:
152            Array of forward rates
153        """
154        n_maturities = len(maturities)
155        forward_rates = np.zeros(n_maturities)
156        
157        # First point: use spot rate
158        forward_rates[0] = spot_rates[0]
159        
160        # Remaining points: use discrete formula for better numerical stability
161        for i in range(1, n_maturities):
162            t1, t2 = maturities[i-1], maturities[i]
163            df1, df2 = discount_factors[i-1], discount_factors[i]
164            forward_rates[i] = -np.log(df2/df1) / (t2 - t1)
165        
166        return forward_rates

Bootstrap interest rate curve from swap rates.

Parameters

interpolation: Literal['linear', 'cubic'] = 'linear'

def fit( self, maturities: numpy.ndarray, swap_rates: numpy.ndarray, tenor_swaps: Literal['1m', '3m', '6m', '1y'] = '6m') -> yieldcurveml.utils.datastructures.CurveRates:
 36    def fit(
 37        self, 
 38        maturities: np.ndarray, 
 39        swap_rates: np.ndarray,
 40        tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m",
 41    ) -> CurveRates:
 42        """
 43        Bootstrap interest rate curve from swap rates using an improved procedure.
 44        
 45        Args:
 46            maturities: Array of swap maturities
 47            swap_rates: Array of corresponding swap rates
 48            tenor_swaps: Tenor of the swaps to use for the bootstrap
 49            
 50        Returns:
 51            CurveRates object containing bootstrapped curves
 52        """
 53        if self.interpolation not in ['linear', 'cubic']:
 54            raise ValueError("interpolation must be either 'linear' or 'cubic'")
 55        self.maturities = maturities        
 56        self.swap_rates = swap_rates
 57        self.tenor_swaps = tenor_swaps
 58        self.spot_rates_ = self.swap_rates
 59        self._validate_inputs()
 60        if self.tenor_swaps == "1m":
 61            self.freq_payments_ = 12
 62        elif self.tenor_swaps == "3m":
 63            self.freq_payments_ = 4
 64        elif self.tenor_swaps == "6m":
 65            self.freq_payments_ = 2
 66        elif self.tenor_swaps == "1y":
 67            self.freq_payments_ = 1
 68        self.n_maturities_ = len(self.maturities)                
 69        cashflow_data = swap_cashflows_matrix(
 70            swap_rates=self.swap_rates,
 71            maturities=self.maturities,
 72            tenor_swaps=self.tenor_swaps)
 73        self.cashflows_ = cashflow_data.cashflow_matrix
 74        self.cashflow_dates_ = cashflow_data.cashflow_dates[-1,:]  
 75        self.discount_factors_ = np.zeros(self.n_maturities_)
 76        self.spot_rates_ = np.zeros(self.n_maturities_)
 77        self.forward_rates_ = np.zeros(self.n_maturities_)
 78
 79        def objective_function(x):
 80            current_maturity = self.maturities[i]
 81            mask_maturities = (self.spot_rates_ > 0)
 82            mask_cashflow_dates = (self.cashflow_dates_ <= current_maturity) # mask for cashflow dates before maturity
 83            cashflow_dates = self.cashflow_dates_[mask_cashflow_dates] # cashflow dates before maturity
 84            cashflows = self.cashflows_[i, mask_cashflow_dates] # cashflows before maturity 
 85            if (not np.all(self.spot_rates_ <= 0)): # at least one spot rate is positive
 86                spot_rates = np.interp(x=cashflow_dates, 
 87                                    xp=self.maturities[mask_maturities], 
 88                                    fp=self.spot_rates_[mask_maturities]) if self.interpolation == 'linear' else interp1d(self.maturities[mask_maturities], self.spot_rates_[mask_maturities], kind=self.interpolation)
 89            else: # first guess for spot rate is the swap rate
 90                spot_rates = self.swap_rates[0]
 91            dfs = np.exp(-cashflow_dates*spot_rates)
 92            dfs[-1:] = np.exp(-current_maturity*x) # last cashflow is the maturity, we are solving for x 
 93            drs_ = -np.log(dfs)/cashflow_dates # discount rates for cashflows, excluding the first date = 0
 94            dfs_ = self._compute_discount_factor(cashflow_dates, drs_) # discount factors for cashflow dates after the first date = 0
 95            return np.sum(cashflows*dfs_) - 1
 96        
 97        # Bootstrap iteratively
 98        for i in range(self.n_maturities_):
 99            result = root_scalar(f=objective_function, x0=self.swap_rates[i])
100            self.spot_rates_[i] = result.root
101            self.discount_factors_[i] = self._compute_discount_factor(self.maturities[i], 
102                                                                      self.spot_rates_[i])
103        # Calculate forward rates using improved method
104        self.forward_rates_ = self._calculate_forward_rates(self.maturities, 
105                                                           self.spot_rates_, 
106                                                           self.discount_factors_)
107        
108        return CurveRates(
109            maturities=self.maturities,
110            spot_rates=self.spot_rates_,
111            discount_factors=self.discount_factors_,
112            forward_rates=self.forward_rates_
113        )

Bootstrap interest rate curve from swap rates using an improved procedure.

Args: maturities: Array of swap maturities swap_rates: Array of corresponding swap rates tenor_swaps: Tenor of the swaps to use for the bootstrap

Returns: CurveRates object containing bootstrapped curves