yieldcurveml.stripcurve
26class CurveStripper(BaseEstimator, RegressorMixin): 27 """Yield curve stripping estimator. 28 29 Parameters 30 ---------- 31 estimator : sklearn estimator, default=None 32 Scikit-learn estimator to use for fitting. If None, uses bootstrap method. 33 lambda1 : float, default=2.5 34 First lambda parameter for NSS function 35 lambda2 : float, default=4.5 36 Second lambda parameter for NSS function 37 type_regressors : str, default=None 38 Type of basis functions, one of "laguerre", "cubic", "kernel", or None for bootstrap 39 kernel_type : str, default=None 40 Type of kernel to use if type_regressors is "kernel" 41 interpolation : str, default='linear' 42 Interpolation method for bootstrap 43 **kwargs : dict 44 Additional parameters for kernel generation 45 """ 46 def __init__( 47 self, 48 estimator=None, 49 lambda1: float = 2.5, 50 lambda2: float = 4.5, 51 type_regressors: Optional[Literal["laguerre", "cubic", "kernel"]] = None, 52 kernel_type: Optional[Literal['matern', 'rbf', 'rationalquadratic', 'smithwilson']] = None, 53 interpolation: Literal['linear', 'cubic'] = 'linear', 54 **kwargs 55 ): 56 self.estimator = estimator 57 self.lambda1 = lambda1 58 self.lambda2 = lambda2 59 self.type_regressors = type_regressors 60 self.kernel_type = kernel_type 61 self.interpolation = interpolation 62 self.cashflow_dates_ = None 63 64 # Validate configuration 65 if type_regressors in ["laguerre", "cubic"] and estimator is None: 66 warnings.warn("For basis regression methods, an estimator should be provided. Falling back to bootstrap method.") 67 self.type_regressors = None 68 69 # Clear kernel_type if not using kernel regression 70 if self.type_regressors != "kernel": 71 self.kernel_type = None 72 73 self.maturities = None 74 self.swap_rates = None 75 self.tenor_swaps = None 76 self.T_UFR = None 77 self.kernel_params_ = kwargs # Store kernel parameters 78 self.coef_ = None 79 self.cashflows_ = None 80 self.cashflow_dates_ = None 81 self.curve_rates_ = None 82 83 def _get_basis_functions(self, maturities: np.ndarray) -> np.ndarray: 84 """Generate basis functions for the regression.""" 85 if self.type_regressors == "laguerre": 86 temp1 = maturities / self.lambda1 87 temp = np.exp(-temp1) 88 temp2 = maturities / self.lambda2 89 return np.column_stack([ 90 np.ones_like(maturities), 91 temp, 92 temp1 * temp, 93 temp2 * np.exp(-temp2) 94 ]) 95 elif self.type_regressors == "cubic": # cubic 96 return np.column_stack([ 97 maturities, 98 maturities**2, 99 maturities**3 100 ]) 101 elif self.type_regressors == "kernel": 102 return generate_kernel(maturities, kernel_type=self.kernel_type, 103 **self.kernel_params_) 104 105 def fit( 106 self, 107 maturities: np.ndarray, 108 swap_rates: np.ndarray, 109 tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m", 110 T_UFR: Optional[float] = None 111 ) -> "CurveStripper": 112 """Fit the curve stripper model. 113 114 Parameters 115 ---------- 116 maturities : np.ndarray 117 Maturities of the swap rates 118 swap_rates : np.ndarray 119 Swap rates 120 tenor_swaps : Literal["1m", "3m", "6m", "1y"], default="6m" 121 Tenor of the swaps to use for the bootstrap 122 T_UFR : float, default=None 123 UFR to use for the Smith-Wilson method 124 125 Returns 126 ------- 127 self : CurveStripper 128 Fitted curve stripper model 129 """ 130 self.maturities = maturities 131 self.swap_rates = swap_rates 132 self.tenor_swaps = tenor_swaps 133 self.T_UFR = T_UFR 134 # Store inputs 135 self.rates_ = RatesContainer( 136 maturities=np.asarray(maturities), 137 swap_rates=np.asarray(swap_rates) 138 ) 139 # Get cashflows and store them 140 self.cashflows_ = swap_cashflows_matrix( 141 swap_rates=self.swap_rates, 142 maturities=self.maturities, 143 tenor_swaps=self.tenor_swaps 144 ) 145 self.cashflow_dates_ = self.cashflows_.cashflow_dates[-1] 146 147 # Handle different fitting methods 148 if self.type_regressors is None and self.estimator is None: 149 # Bootstrap method 150 bootstrapper = RateCurveBootstrapper(interpolation=self.interpolation) 151 self.curve_rates_ = bootstrapper.fit( 152 maturities=self.rates_.maturities, 153 swap_rates=self.rates_.swap_rates, 154 tenor_swaps=self.tenor_swaps 155 ) 156 157 elif self.type_regressors == "kernel": 158 if self.estimator is None: 159 # Direct kernel solver (kernel inversion) 160 lambda_reg = self.kernel_params_.get('lambda_reg', 1e-6) 161 # Generate kernel matrix using maturities 162 K = generate_kernel( 163 self.cashflow_dates_, 164 kernel_type=self.kernel_type, 165 **self.kernel_params_ 166 ) 167 # Calculate coefficients (solving the system) 168 C = self.cashflows_.cashflow_matrix 169 V = np.ones_like(self.maturities) 170 if self.kernel_type == "smithwilson": 171 # For Smith-Wilson, include UFR adjustment 172 ufr = self.kernel_params_.get('ufr', 0.03) 173 mu = np.exp(-ufr * self.cashflow_dates_) 174 target = V - C @ mu 175 # Solve the system using C @ K @ C.T to get correct dimensions 176 A = C @ K @ C.T + lambda_reg * np.eye(len(C)) # Now A is (n_maturities × n_maturities) 177 self.coef_ = np.linalg.solve(A, target) # Now dimensions match 178 else: 179 A = C @ K @ C.T + lambda_reg * np.eye(len(C)) 180 A = (A + A.T) / 2 # Ensure symmetry 181 self.coef_ = np.linalg.solve(A, V) 182 else: 183 # Kernel regression (using kernel as features) 184 X = generate_kernel( 185 maturities.reshape(-1, 1), 186 kernel_type=self.kernel_type, 187 **self.kernel_params_ 188 ) 189 y = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / maturities 190 self.estimator.fit(X, y) 191 else: 192 # Standard basis regression (laguerre / cubic) : fit estimator before prediction 193 if self.estimator is not None: 194 X_train = self._get_basis_functions(np.asarray(self.maturities).reshape(-1)) 195 y_train = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / np.asarray(self.maturities) 196 # Ensure numpy arrays and correct shapes 197 X_train = np.asarray(X_train) 198 y_train = np.asarray(y_train).reshape(-1) 199 # Fit the provided estimator 200 self.estimator.fit(X_train, y_train) 201 202 # Calculate initial rates for all methods 203 if self.curve_rates_ is None: 204 self.curve_rates_ = self._calculate_rates(self.maturities) 205 return self 206 207 def _calculate_rates(self, maturities: np.ndarray, X: Optional[np.ndarray] = None) -> CurveRates: 208 """Calculate spot rates, forward rates, and discount factors.""" 209 # Handle bootstrap case first 210 if self.type_regressors is None and self.estimator is None: 211 return self.curve_rates_ 212 213 # Handle kernel methods 214 if self.type_regressors == "kernel": 215 if self.estimator is None: 216 # Direct kernel prediction 217 K_interp = generate_kernel( 218 maturities, 219 kernel_type=self.kernel_type, 220 nodal_points=self.cashflow_dates_, 221 **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'} 222 ) 223 # Calculate discount factors 224 if self.kernel_type == "smithwilson": 225 ufr = self.kernel_params_.get('ufr', 0.03) 226 mu_interp = np.exp(-ufr * maturities) 227 # Use cashflow matrix for final calculation 228 C = self.cashflows_.cashflow_matrix 229 discount_factors = mu_interp + K_interp @ C.T @ self.coef_ 230 else: 231 discount_factors = K_interp @ self.coef_ 232 else: 233 # Kernel regression prediction 234 if X is None: 235 nodal_points_2d = self.maturities.reshape(-1, 1) 236 maturities_2d = maturities.reshape(-1, 1) 237 X = generate_kernel( 238 maturities_2d, 239 kernel_type=self.kernel_type, 240 nodal_points=nodal_points_2d, 241 **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'} 242 ) 243 spot_rates = self.estimator.predict(X) 244 discount_factors = np.exp(-maturities * spot_rates) 245 246 # Calculate forward rates 247 forward_rates = self._calculate_forward_rates(maturities, discount_factors) 248 249 return CurveRates( 250 maturities=maturities, 251 spot_rates=spot_rates, 252 forward_rates=forward_rates, 253 discount_factors=discount_factors 254 ) 255 else: 256 # Basis regression methods (laguerre/cubic) 257 if X is None: 258 X = self._get_basis_functions(maturities) 259 260 if self.estimator is None: 261 # This should not happen due to validation in __init__, but handle gracefully 262 raise ValueError("Estimator is required for basis regression methods") 263 264 spot_rates = self.estimator.predict(X) 265 discount_factors = np.exp(-maturities * spot_rates) 266 267 # Calculate spot rates from discount factors (for direct kernel methods) 268 if self.type_regressors == "kernel" and self.estimator is None: 269 spot_rates = -np.log(discount_factors) / maturities 270 271 # Calculate forward rates 272 forward_rates = self._calculate_forward_rates(maturities, discount_factors) 273 274 return CurveRates( 275 maturities=maturities, 276 spot_rates=spot_rates, 277 forward_rates=forward_rates, 278 discount_factors=discount_factors 279 ) 280 281 def _calculate_forward_rates(self, maturities: np.ndarray, discount_factors: np.ndarray) -> np.ndarray: 282 """Calculate forward rates from discount factors.""" 283 forward_rates = np.zeros_like(maturities) 284 285 if len(maturities) > 1: 286 # Calculate forward rates between consecutive maturities 287 for i in range(len(maturities) - 1): 288 t1, t2 = maturities[i], maturities[i+1] 289 df1, df2 = discount_factors[i], discount_factors[i+1] 290 291 # Forward rate from t1 to t2: f(t1,t2) = -ln(df2/df1)/(t2-t1) 292 if t2 > t1 and df1 > 0 and df2 > 0: 293 forward_rates[i] = -np.log(df2 / df1) / (t2 - t1) 294 else: 295 forward_rates[i] = 0.0 296 297 # For the last maturity, use the spot rate 298 forward_rates[-1] = -np.log(discount_factors[-1]) / maturities[-1] if discount_factors[-1] > 0 else 0.0 299 else: 300 # Single maturity case 301 forward_rates[0] = -np.log(discount_factors[0]) / maturities[0] if discount_factors[0] > 0 else 0.0 302 303 return forward_rates 304 305 def _interpolate_bootstrap_rates(self, maturities: np.ndarray) -> CurveRates: 306 """Interpolate bootstrap rates for requested maturities.""" 307 if np.array_equal(maturities, self.curve_rates_.maturities): 308 return self.curve_rates_ 309 310 # Interpolate spot rates 311 spot_rate_interpolator = interp1d( 312 self.curve_rates_.maturities, 313 self.curve_rates_.spot_rates, 314 kind=self.interpolation, 315 fill_value='extrapolate', 316 bounds_error=False 317 ) 318 spot_rates = spot_rate_interpolator(maturities) 319 320 # Calculate discount factors and forward rates 321 discount_factors = np.exp(-maturities * spot_rates) 322 forward_rates = self._calculate_forward_rates(maturities, discount_factors) 323 324 return CurveRates( 325 maturities=maturities, 326 spot_rates=spot_rates, 327 forward_rates=forward_rates, 328 discount_factors=discount_factors 329 ) 330 331 def predict(self, maturities: np.ndarray) -> CurveRates: 332 """Predict rates for given maturities.""" 333 check_is_fitted(self) 334 # Ensure maturities is 2D for kernel methods 335 maturities = np.asarray(maturities).reshape(-1) # First ensure 1D 336 if self.type_regressors is None and self.estimator is None: 337 # Interpolate bootstrap results using scipy's interp1d 338 return self._interpolate_bootstrap_rates(maturities) 339 340 if self.type_regressors == "kernel": 341 if self.estimator is None: 342 # Direct kernel prediction 343 return self._calculate_rates(maturities) 344 else: 345 # Ensure both inputs are 2D for kernel generation 346 maturities_2d = maturities.reshape(-1, 1) 347 nodal_points_2d = self.maturities.reshape(-1, 1) 348 349 X = generate_kernel( 350 maturities_2d, 351 kernel_type=self.kernel_type, 352 nodal_points=nodal_points_2d, 353 **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'} 354 ) 355 return self._calculate_rates(maturities, X) 356 else: 357 # Standard basis functions 358 X = self._get_basis_functions(maturities.ravel()) 359 return self._calculate_rates(maturities.ravel(), X) 360 361 def get_diagnostics( 362 self, 363 X_test: Optional[np.ndarray] = None, 364 y_test: Optional[np.ndarray] = None 365 ) -> Union[RegressionDiagnostics, tuple[RegressionDiagnostics, RegressionDiagnostics]]: 366 """Calculate detailed regression diagnostics.""" 367 def calculate_diagnostics(y_true: np.ndarray, y_pred: np.ndarray) -> RegressionDiagnostics: 368 residuals = y_true - y_pred 369 abs_residuals = np.abs(residuals) 370 371 residuals_summary = { 372 'mean': np.mean(residuals), 373 'std': np.std(residuals), 374 'median': np.median(residuals), 375 'mad': np.median(abs_residuals), 376 'skewness': float(np.mean(((residuals - np.mean(residuals)) / np.std(residuals)) ** 3)), 377 'kurtosis': float(np.mean(((residuals - np.mean(residuals)) / np.std(residuals)) ** 4) - 3), 378 'percentiles': { 379 '1%': np.percentile(residuals, 1), 380 '5%': np.percentile(residuals, 5), 381 '25%': np.percentile(residuals, 25), 382 '75%': np.percentile(residuals, 75), 383 '95%': np.percentile(residuals, 95), 384 '99%': np.percentile(residuals, 99) 385 } 386 } 387 388 return RegressionDiagnostics( 389 r2_score=r2_score(y_true, y_pred), 390 rmse=np.sqrt(mean_squared_error(y_true, y_pred)), 391 mae=mean_absolute_error(y_true, y_pred), 392 max_error=max_error(y_true, y_pred), 393 min_error=float(np.min(abs_residuals)), 394 residuals=residuals, 395 fitted_values=y_pred, 396 actual_values=y_true, 397 n_samples=len(y_true), 398 residuals_summary=residuals_summary 399 ) 400 401 # Training set diagnostics 402 if self.estimator is None and self.type_regressors is None: 403 # For bootstrap method, compare original swap rates with reconstructed rates 404 y_train = self.rates_.swap_rates 405 y_train_pred = self.predict(self.rates_.maturities).spot_rates 406 else: 407 # For regression methods 408 if self.type_regressors == "kernel" and self.estimator is not None: 409 X_train = generate_kernel( 410 self.maturities.reshape(-1, 1), 411 kernel_type=self.kernel_type, 412 **self.kernel_params_ 413 ) 414 else: 415 X_train = self._get_basis_functions(self.rates_.maturities) 416 y_train = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / self.rates_.maturities 417 y_train_pred = self.estimator.predict(X_train) 418 419 train_diagnostics = calculate_diagnostics(y_train, y_train_pred) 420 421 # Test set diagnostics if provided 422 if X_test is not None and y_test is not None: 423 if self.estimator is None and self.type_regressors is None: 424 y_test_pred = self.predict(X_test).spot_rates 425 else: 426 if self.type_regressors == "kernel" and self.estimator is not None: 427 X_test_basis = generate_kernel( 428 X_test.reshape(-1, 1), 429 kernel_type=self.kernel_type, 430 **self.kernel_params_ 431 ) 432 else: 433 X_test_basis = self._get_basis_functions(X_test) 434 y_test_pred = self.estimator.predict(X_test_basis) 435 test_diagnostics = calculate_diagnostics(y_test, y_test_pred) 436 return train_diagnostics, test_diagnostics 437 438 return train_diagnostics
Yield curve stripping estimator.
Parameters
estimator : sklearn estimator, default=None Scikit-learn estimator to use for fitting. If None, uses bootstrap method. lambda1 : float, default=2.5 First lambda parameter for NSS function lambda2 : float, default=4.5 Second lambda parameter for NSS function type_regressors : str, default=None Type of basis functions, one of "laguerre", "cubic", "kernel", or None for bootstrap kernel_type : str, default=None Type of kernel to use if type_regressors is "kernel" interpolation : str, default='linear' Interpolation method for bootstrap **kwargs : dict Additional parameters for kernel generation
105 def fit( 106 self, 107 maturities: np.ndarray, 108 swap_rates: np.ndarray, 109 tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m", 110 T_UFR: Optional[float] = None 111 ) -> "CurveStripper": 112 """Fit the curve stripper model. 113 114 Parameters 115 ---------- 116 maturities : np.ndarray 117 Maturities of the swap rates 118 swap_rates : np.ndarray 119 Swap rates 120 tenor_swaps : Literal["1m", "3m", "6m", "1y"], default="6m" 121 Tenor of the swaps to use for the bootstrap 122 T_UFR : float, default=None 123 UFR to use for the Smith-Wilson method 124 125 Returns 126 ------- 127 self : CurveStripper 128 Fitted curve stripper model 129 """ 130 self.maturities = maturities 131 self.swap_rates = swap_rates 132 self.tenor_swaps = tenor_swaps 133 self.T_UFR = T_UFR 134 # Store inputs 135 self.rates_ = RatesContainer( 136 maturities=np.asarray(maturities), 137 swap_rates=np.asarray(swap_rates) 138 ) 139 # Get cashflows and store them 140 self.cashflows_ = swap_cashflows_matrix( 141 swap_rates=self.swap_rates, 142 maturities=self.maturities, 143 tenor_swaps=self.tenor_swaps 144 ) 145 self.cashflow_dates_ = self.cashflows_.cashflow_dates[-1] 146 147 # Handle different fitting methods 148 if self.type_regressors is None and self.estimator is None: 149 # Bootstrap method 150 bootstrapper = RateCurveBootstrapper(interpolation=self.interpolation) 151 self.curve_rates_ = bootstrapper.fit( 152 maturities=self.rates_.maturities, 153 swap_rates=self.rates_.swap_rates, 154 tenor_swaps=self.tenor_swaps 155 ) 156 157 elif self.type_regressors == "kernel": 158 if self.estimator is None: 159 # Direct kernel solver (kernel inversion) 160 lambda_reg = self.kernel_params_.get('lambda_reg', 1e-6) 161 # Generate kernel matrix using maturities 162 K = generate_kernel( 163 self.cashflow_dates_, 164 kernel_type=self.kernel_type, 165 **self.kernel_params_ 166 ) 167 # Calculate coefficients (solving the system) 168 C = self.cashflows_.cashflow_matrix 169 V = np.ones_like(self.maturities) 170 if self.kernel_type == "smithwilson": 171 # For Smith-Wilson, include UFR adjustment 172 ufr = self.kernel_params_.get('ufr', 0.03) 173 mu = np.exp(-ufr * self.cashflow_dates_) 174 target = V - C @ mu 175 # Solve the system using C @ K @ C.T to get correct dimensions 176 A = C @ K @ C.T + lambda_reg * np.eye(len(C)) # Now A is (n_maturities × n_maturities) 177 self.coef_ = np.linalg.solve(A, target) # Now dimensions match 178 else: 179 A = C @ K @ C.T + lambda_reg * np.eye(len(C)) 180 A = (A + A.T) / 2 # Ensure symmetry 181 self.coef_ = np.linalg.solve(A, V) 182 else: 183 # Kernel regression (using kernel as features) 184 X = generate_kernel( 185 maturities.reshape(-1, 1), 186 kernel_type=self.kernel_type, 187 **self.kernel_params_ 188 ) 189 y = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / maturities 190 self.estimator.fit(X, y) 191 else: 192 # Standard basis regression (laguerre / cubic) : fit estimator before prediction 193 if self.estimator is not None: 194 X_train = self._get_basis_functions(np.asarray(self.maturities).reshape(-1)) 195 y_train = (self.cashflows_.cashflow_matrix.sum(axis=1) - 1) / np.asarray(self.maturities) 196 # Ensure numpy arrays and correct shapes 197 X_train = np.asarray(X_train) 198 y_train = np.asarray(y_train).reshape(-1) 199 # Fit the provided estimator 200 self.estimator.fit(X_train, y_train) 201 202 # Calculate initial rates for all methods 203 if self.curve_rates_ is None: 204 self.curve_rates_ = self._calculate_rates(self.maturities) 205 return self
Fit the curve stripper model.
Parameters
maturities : np.ndarray Maturities of the swap rates swap_rates : np.ndarray Swap rates tenor_swaps : Literal["1m", "3m", "6m", "1y"], default="6m" Tenor of the swaps to use for the bootstrap T_UFR : float, default=None UFR to use for the Smith-Wilson method
Returns
self : CurveStripper Fitted curve stripper model
331 def predict(self, maturities: np.ndarray) -> CurveRates: 332 """Predict rates for given maturities.""" 333 check_is_fitted(self) 334 # Ensure maturities is 2D for kernel methods 335 maturities = np.asarray(maturities).reshape(-1) # First ensure 1D 336 if self.type_regressors is None and self.estimator is None: 337 # Interpolate bootstrap results using scipy's interp1d 338 return self._interpolate_bootstrap_rates(maturities) 339 340 if self.type_regressors == "kernel": 341 if self.estimator is None: 342 # Direct kernel prediction 343 return self._calculate_rates(maturities) 344 else: 345 # Ensure both inputs are 2D for kernel generation 346 maturities_2d = maturities.reshape(-1, 1) 347 nodal_points_2d = self.maturities.reshape(-1, 1) 348 349 X = generate_kernel( 350 maturities_2d, 351 kernel_type=self.kernel_type, 352 nodal_points=nodal_points_2d, 353 **{k: v for k, v in self.kernel_params_.items() if k != 'nodal_points'} 354 ) 355 return self._calculate_rates(maturities, X) 356 else: 357 # Standard basis functions 358 X = self._get_basis_functions(maturities.ravel()) 359 return self._calculate_rates(maturities.ravel(), X)
Predict rates for given maturities.
12class RateCurveBootstrapper(BaseEstimator, RegressorMixin): 13 """ 14 Bootstrap interest rate curve from swap rates. 15 16 Parameters 17 ---------- 18 19 interpolation: Literal['linear', 'cubic'] = 'linear' 20 """ 21 def __init__( 22 self, 23 interpolation: Literal['linear', 'cubic'] = 'linear' 24 ): 25 self.interpolation = interpolation 26 self.maturities = None 27 self.swap_rates = None 28 self.tenor_swaps = None 29 self.n_maturities_ = None 30 self.cashflows_ = None 31 self.cashflow_dates_ = None 32 self.spot_rates_ = None 33 self.discount_factors_ = None 34 self.forward_rates_ = None 35 36 def fit( 37 self, 38 maturities: np.ndarray, 39 swap_rates: np.ndarray, 40 tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m", 41 ) -> CurveRates: 42 """ 43 Bootstrap interest rate curve from swap rates using an improved procedure. 44 45 Args: 46 maturities: Array of swap maturities 47 swap_rates: Array of corresponding swap rates 48 tenor_swaps: Tenor of the swaps to use for the bootstrap 49 50 Returns: 51 CurveRates object containing bootstrapped curves 52 """ 53 if self.interpolation not in ['linear', 'cubic']: 54 raise ValueError("interpolation must be either 'linear' or 'cubic'") 55 self.maturities = maturities 56 self.swap_rates = swap_rates 57 self.tenor_swaps = tenor_swaps 58 self.spot_rates_ = self.swap_rates 59 self._validate_inputs() 60 if self.tenor_swaps == "1m": 61 self.freq_payments_ = 12 62 elif self.tenor_swaps == "3m": 63 self.freq_payments_ = 4 64 elif self.tenor_swaps == "6m": 65 self.freq_payments_ = 2 66 elif self.tenor_swaps == "1y": 67 self.freq_payments_ = 1 68 self.n_maturities_ = len(self.maturities) 69 cashflow_data = swap_cashflows_matrix( 70 swap_rates=self.swap_rates, 71 maturities=self.maturities, 72 tenor_swaps=self.tenor_swaps) 73 self.cashflows_ = cashflow_data.cashflow_matrix 74 self.cashflow_dates_ = cashflow_data.cashflow_dates[-1,:] 75 self.discount_factors_ = np.zeros(self.n_maturities_) 76 self.spot_rates_ = np.zeros(self.n_maturities_) 77 self.forward_rates_ = np.zeros(self.n_maturities_) 78 79 def objective_function(x): 80 current_maturity = self.maturities[i] 81 mask_maturities = (self.spot_rates_ > 0) 82 mask_cashflow_dates = (self.cashflow_dates_ <= current_maturity) # mask for cashflow dates before maturity 83 cashflow_dates = self.cashflow_dates_[mask_cashflow_dates] # cashflow dates before maturity 84 cashflows = self.cashflows_[i, mask_cashflow_dates] # cashflows before maturity 85 if (not np.all(self.spot_rates_ <= 0)): # at least one spot rate is positive 86 spot_rates = np.interp(x=cashflow_dates, 87 xp=self.maturities[mask_maturities], 88 fp=self.spot_rates_[mask_maturities]) if self.interpolation == 'linear' else interp1d(self.maturities[mask_maturities], self.spot_rates_[mask_maturities], kind=self.interpolation) 89 else: # first guess for spot rate is the swap rate 90 spot_rates = self.swap_rates[0] 91 dfs = np.exp(-cashflow_dates*spot_rates) 92 dfs[-1:] = np.exp(-current_maturity*x) # last cashflow is the maturity, we are solving for x 93 drs_ = -np.log(dfs)/cashflow_dates # discount rates for cashflows, excluding the first date = 0 94 dfs_ = self._compute_discount_factor(cashflow_dates, drs_) # discount factors for cashflow dates after the first date = 0 95 return np.sum(cashflows*dfs_) - 1 96 97 # Bootstrap iteratively 98 for i in range(self.n_maturities_): 99 result = root_scalar(f=objective_function, x0=self.swap_rates[i]) 100 self.spot_rates_[i] = result.root 101 self.discount_factors_[i] = self._compute_discount_factor(self.maturities[i], 102 self.spot_rates_[i]) 103 # Calculate forward rates using improved method 104 self.forward_rates_ = self._calculate_forward_rates(self.maturities, 105 self.spot_rates_, 106 self.discount_factors_) 107 108 return CurveRates( 109 maturities=self.maturities, 110 spot_rates=self.spot_rates_, 111 discount_factors=self.discount_factors_, 112 forward_rates=self.forward_rates_ 113 ) 114 115 def _validate_inputs(self) -> None: 116 """Validate input arrays.""" 117 if len(self.maturities) != len(self.swap_rates): 118 raise ValueError("Maturities and swap rates must have same length") 119 if not np.all(np.diff(self.maturities) > 1e-10): 120 raise ValueError("Maturities must be strictly increasing with minimum spacing of 1e-10") 121 if np.any(self.maturities <= 0): 122 raise ValueError("Maturities must be positive") 123 if np.any(~np.isfinite(self.maturities)) or np.any(~np.isfinite(self.swap_rates)): 124 raise ValueError("Maturities and swap rates must be finite numbers") 125 126 def _compute_discount_factor(self, maturity: float, rate: float) -> float: 127 """Compute discount factor from spot rate. 128 129 Args: 130 maturity: Maturity of the discount factor 131 rate: Spot rate 132 133 Returns: 134 Discount factor 135 """ 136 return np.exp(-maturity * rate) 137 138 def _calculate_forward_rates( 139 self, 140 maturities: np.ndarray, 141 spot_rates: np.ndarray, 142 discount_factors: np.ndarray 143 ) -> np.ndarray: 144 """Calculate forward rates using improved method. 145 146 Args: 147 maturities: Array of maturities 148 spot_rates: Array of spot rates 149 discount_factors: Array of discount factors 150 151 Returns: 152 Array of forward rates 153 """ 154 n_maturities = len(maturities) 155 forward_rates = np.zeros(n_maturities) 156 157 # First point: use spot rate 158 forward_rates[0] = spot_rates[0] 159 160 # Remaining points: use discrete formula for better numerical stability 161 for i in range(1, n_maturities): 162 t1, t2 = maturities[i-1], maturities[i] 163 df1, df2 = discount_factors[i-1], discount_factors[i] 164 forward_rates[i] = -np.log(df2/df1) / (t2 - t1) 165 166 return forward_rates
Bootstrap interest rate curve from swap rates.
Parameters
interpolation: Literal['linear', 'cubic'] = 'linear'
36 def fit( 37 self, 38 maturities: np.ndarray, 39 swap_rates: np.ndarray, 40 tenor_swaps: Literal["1m", "3m", "6m", "1y"] = "6m", 41 ) -> CurveRates: 42 """ 43 Bootstrap interest rate curve from swap rates using an improved procedure. 44 45 Args: 46 maturities: Array of swap maturities 47 swap_rates: Array of corresponding swap rates 48 tenor_swaps: Tenor of the swaps to use for the bootstrap 49 50 Returns: 51 CurveRates object containing bootstrapped curves 52 """ 53 if self.interpolation not in ['linear', 'cubic']: 54 raise ValueError("interpolation must be either 'linear' or 'cubic'") 55 self.maturities = maturities 56 self.swap_rates = swap_rates 57 self.tenor_swaps = tenor_swaps 58 self.spot_rates_ = self.swap_rates 59 self._validate_inputs() 60 if self.tenor_swaps == "1m": 61 self.freq_payments_ = 12 62 elif self.tenor_swaps == "3m": 63 self.freq_payments_ = 4 64 elif self.tenor_swaps == "6m": 65 self.freq_payments_ = 2 66 elif self.tenor_swaps == "1y": 67 self.freq_payments_ = 1 68 self.n_maturities_ = len(self.maturities) 69 cashflow_data = swap_cashflows_matrix( 70 swap_rates=self.swap_rates, 71 maturities=self.maturities, 72 tenor_swaps=self.tenor_swaps) 73 self.cashflows_ = cashflow_data.cashflow_matrix 74 self.cashflow_dates_ = cashflow_data.cashflow_dates[-1,:] 75 self.discount_factors_ = np.zeros(self.n_maturities_) 76 self.spot_rates_ = np.zeros(self.n_maturities_) 77 self.forward_rates_ = np.zeros(self.n_maturities_) 78 79 def objective_function(x): 80 current_maturity = self.maturities[i] 81 mask_maturities = (self.spot_rates_ > 0) 82 mask_cashflow_dates = (self.cashflow_dates_ <= current_maturity) # mask for cashflow dates before maturity 83 cashflow_dates = self.cashflow_dates_[mask_cashflow_dates] # cashflow dates before maturity 84 cashflows = self.cashflows_[i, mask_cashflow_dates] # cashflows before maturity 85 if (not np.all(self.spot_rates_ <= 0)): # at least one spot rate is positive 86 spot_rates = np.interp(x=cashflow_dates, 87 xp=self.maturities[mask_maturities], 88 fp=self.spot_rates_[mask_maturities]) if self.interpolation == 'linear' else interp1d(self.maturities[mask_maturities], self.spot_rates_[mask_maturities], kind=self.interpolation) 89 else: # first guess for spot rate is the swap rate 90 spot_rates = self.swap_rates[0] 91 dfs = np.exp(-cashflow_dates*spot_rates) 92 dfs[-1:] = np.exp(-current_maturity*x) # last cashflow is the maturity, we are solving for x 93 drs_ = -np.log(dfs)/cashflow_dates # discount rates for cashflows, excluding the first date = 0 94 dfs_ = self._compute_discount_factor(cashflow_dates, drs_) # discount factors for cashflow dates after the first date = 0 95 return np.sum(cashflows*dfs_) - 1 96 97 # Bootstrap iteratively 98 for i in range(self.n_maturities_): 99 result = root_scalar(f=objective_function, x0=self.swap_rates[i]) 100 self.spot_rates_[i] = result.root 101 self.discount_factors_[i] = self._compute_discount_factor(self.maturities[i], 102 self.spot_rates_[i]) 103 # Calculate forward rates using improved method 104 self.forward_rates_ = self._calculate_forward_rates(self.maturities, 105 self.spot_rates_, 106 self.discount_factors_) 107 108 return CurveRates( 109 maturities=self.maturities, 110 spot_rates=self.spot_rates_, 111 discount_factors=self.discount_factors_, 112 forward_rates=self.forward_rates_ 113 )
Bootstrap interest rate curve from swap rates using an improved procedure.
Args: maturities: Array of swap maturities swap_rates: Array of corresponding swap rates tenor_swaps: Tenor of the swaps to use for the bootstrap
Returns: CurveRates object containing bootstrapped curves