insurance_credibility
insurance-credibility: Credibility models for UK non-life insurance pricing.
Two subpackages covering the full credibility toolkit:
classical Bühlmann-Straub (1970) group credibility and its hierarchical extension (Jewell 1975) for nested structures (scheme → book, sector → district → area).
experience Individual policy-level Bayesian experience rating. Four model tiers: static Bühlmann-Straub, dynamic Poisson-gamma state-space, IS-surrogate, and deep attention (Wüthrich 2024).
Quick start::
# Group-level credibility (scheme pricing)
from insurance_credibility import BuhlmannStraub
bs = BuhlmannStraub()
bs.fit(df, group_col="scheme", period_col="year",
loss_col="loss_rate", weight_col="exposure")
# Individual policy experience rating
from insurance_credibility import ClaimsHistory, StaticCredibilityModel
model = StaticCredibilityModel()
model.fit(histories)
cf = model.predict(history)
1""" 2insurance-credibility: Credibility models for UK non-life insurance pricing. 3 4Two subpackages covering the full credibility toolkit: 5 6classical 7 Bühlmann-Straub (1970) group credibility and its hierarchical extension 8 (Jewell 1975) for nested structures (scheme → book, sector → district → area). 9 10experience 11 Individual policy-level Bayesian experience rating. Four model tiers: 12 static Bühlmann-Straub, dynamic Poisson-gamma state-space, IS-surrogate, 13 and deep attention (Wüthrich 2024). 14 15Quick start:: 16 17 # Group-level credibility (scheme pricing) 18 from insurance_credibility import BuhlmannStraub 19 bs = BuhlmannStraub() 20 bs.fit(df, group_col="scheme", period_col="year", 21 loss_col="loss_rate", weight_col="exposure") 22 23 # Individual policy experience rating 24 from insurance_credibility import ClaimsHistory, StaticCredibilityModel 25 model = StaticCredibilityModel() 26 model.fit(histories) 27 cf = model.predict(history) 28""" 29 30# Classical credibility 31from .classical import BuhlmannStraub, HierarchicalBuhlmannStraub, LevelResult 32 33# Experience rating data types 34from .experience import CalibrationResult, ClaimsHistory 35 36# Experience rating models 37from .experience import ( 38 DynamicPoissonGammaModel, 39 StaticCredibilityModel, 40 SurrogateModel, 41) 42 43# Experience rating calibration 44from .experience import ( 45 apply_calibration, 46 balance_calibrate, 47 balance_report, 48 calibrated_predict_fn, 49) 50 51# Experience rating utilities 52from .experience import ( 53 credibility_factor, 54 exposure_weighted_mean, 55 history_sufficient_stat, 56 posterior_premium, 57 seniority_weights, 58) 59 60from importlib.metadata import version, PackageNotFoundError 61 62try: 63 __version__ = version("insurance-credibility") 64except PackageNotFoundError: 65 __version__ = "0.0.0" # not installed 66 67__all__ = [ 68 # Classical 69 "BuhlmannStraub", 70 "HierarchicalBuhlmannStraub", 71 "LevelResult", 72 # Experience — data types 73 "ClaimsHistory", 74 "CalibrationResult", 75 # Experience — models 76 "StaticCredibilityModel", 77 "DynamicPoissonGammaModel", 78 "SurrogateModel", 79 "DeepAttentionModel", 80 # Experience — calibration 81 "balance_calibrate", 82 "apply_calibration", 83 "calibrated_predict_fn", 84 "balance_report", 85 # Experience — utilities 86 "credibility_factor", 87 "posterior_premium", 88 "seniority_weights", 89 "exposure_weighted_mean", 90 "history_sufficient_stat", 91 # Meta 92 "__version__", 93] 94 95 96def __getattr__(name: str): 97 """Lazy import for optional torch-dependent classes.""" 98 if name == "DeepAttentionModel": 99 from .experience.attention import DeepAttentionModel 100 101 return DeepAttentionModel 102 raise AttributeError(f"module 'insurance_credibility' has no attribute {name!r}")
59class BuhlmannStraub: 60 """ 61 Bühlmann-Straub credibility model with non-parametric structural parameter 62 estimation. 63 64 Fit the model to a panel dataset - one row per (group, period) - where 65 each row has a loss rate and an exposure weight. The model estimates the 66 structural parameters v (within-group variance) and a (between-group 67 variance) from the data itself, then computes a credibility factor Z_i 68 for each group. 69 70 The result is a credibility premium for each group that blends its own 71 weighted-average loss rate with the portfolio mean, with the blend governed 72 entirely by the ratio K = v/a and the group's total exposure. 73 74 Parameters 75 ---------- 76 truncate_a : bool, default True 77 If True (the default), truncate a_hat at zero when the between-group 78 variance estimate is negative. This sets K → ∞ and Z_i → 0 for all 79 groups, so every group gets the collective mean. If False, a negative 80 a_hat raises an error instead. 81 82 Examples 83 -------- 84 >>> import polars as pl 85 >>> from insurance_credibility import BuhlmannStraub 86 >>> 87 >>> df = pl.DataFrame({ 88 ... "scheme": ["A", "A", "A", "B", "B", "B", "C", "C", "C"], 89 ... "year": [2021, 2022, 2023] * 3, 90 ... "loss_rate": [0.55, 0.60, 0.58, 0.80, 0.75, 0.82, 0.40, 0.42, 0.38], 91 ... "exposure": [1000, 1200, 1100, 300, 350, 320, 5000, 4800, 5200], 92 ... }) 93 >>> bs = BuhlmannStraub() 94 >>> bs.fit(df, group_col="scheme", period_col="year", 95 ... loss_col="loss_rate", weight_col="exposure") 96 >>> bs.summary() 97 """ 98 99 def __init__(self, truncate_a: bool = True) -> None: 100 self.truncate_a = truncate_a 101 102 # Fitted attributes - set by .fit() 103 self._mu_hat: Optional[float] = None 104 self._v_hat: Optional[float] = None 105 self._a_hat: Optional[float] = None 106 self._k: Optional[float] = None 107 self._z: Optional[pl.DataFrame] = None 108 self._premiums: Optional[pl.DataFrame] = None 109 self._fitted = False 110 111 # ------------------------------------------------------------------ 112 # Scikit-learn style fit 113 # ------------------------------------------------------------------ 114 115 def fit( 116 self, 117 data: Union[pl.DataFrame, "pd.DataFrame"], # type: ignore[name-defined] 118 group_col: str = "group", 119 period_col: str = "period", 120 loss_col: str = "loss", 121 weight_col: str = "weight", 122 ) -> "BuhlmannStraub": 123 """ 124 Estimate structural parameters and credibility factors from panel data. 125 126 Parameters 127 ---------- 128 data: 129 A Polars DataFrame (preferred) or pandas DataFrame (converted 130 internally) with one row per (group, period). Columns need not be 131 in any particular order. 132 group_col: 133 Column identifying the group (e.g. scheme ID, territory, NCD class). 134 period_col: 135 Column identifying the time period (e.g. accident year). Used only 136 to count periods per group; the ordering does not matter. 137 loss_col: 138 Column containing the loss rate or loss ratio. This should be losses 139 per unit of exposure - not total losses. If you have total losses, 140 divide by the exposure column before fitting. 141 weight_col: 142 Column containing the exposure weight (e.g. earned car years, 143 earned premium, policy count). Must be strictly positive. 144 145 Returns 146 ------- 147 self 148 Returns the fitted estimator so calls can be chained. 149 """ 150 data = _to_polars(data) 151 152 validate_panel_data(data, group_col, period_col, loss_col, weight_col) 153 check_duplicate_periods(data, group_col, period_col) 154 155 self._group_col = group_col 156 self._period_col = period_col 157 self._loss_col = loss_col 158 self._weight_col = weight_col 159 160 # Build group-level summary: w_i, X_bar_i, T_i 161 groups = self._build_group_summary(data, group_col, period_col, loss_col, weight_col) 162 self._groups = groups 163 164 # Estimate structural parameters 165 mu_hat, v_hat, a_hat_raw = self._estimate_structural_params( 166 data, groups, group_col, period_col, loss_col, weight_col 167 ) 168 169 # Handle negative a_hat 170 if a_hat_raw <= 0: 171 if self.truncate_a: 172 warnings.warn( 173 f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. " 174 "Truncating to zero. This means the model finds no detectable " 175 "heterogeneity between groups - all groups will receive the " 176 "collective mean as their credibility premium (Z_i = 0). " 177 "Consider whether your data genuinely lacks between-group " 178 "variation, or whether you have too few groups to estimate a reliably.", 179 stacklevel=2, 180 ) 181 a_hat = 0.0 182 k = np.inf 183 else: 184 raise ValueError( 185 f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. " 186 "Set truncate_a=True to handle this automatically." 187 ) 188 else: 189 a_hat = a_hat_raw 190 k = v_hat / a_hat 191 192 # Credibility factors 193 w = groups["w_i"].to_numpy() 194 if np.isinf(k): 195 z_values = np.zeros(len(groups)) 196 else: 197 z_values = w / (w + k) 198 199 group_ids = groups["group"].to_list() 200 x_bar = groups["x_bar_i"].to_numpy() 201 202 # z_ is a Polars DataFrame with columns ["group", "Z"] 203 z_df = pl.DataFrame({ 204 "group": group_ids, 205 "Z": z_values, 206 }) 207 208 # premiums_ is a Polars DataFrame with a "group" column (no index) 209 premiums = pl.DataFrame({ 210 "group": group_ids, 211 "exposure": w, 212 "observed_mean": x_bar, 213 "Z": z_values, 214 "credibility_premium": z_values * x_bar + (1 - z_values) * mu_hat, 215 "complement": np.full(len(groups), mu_hat), 216 }) 217 218 self._mu_hat = float(mu_hat) 219 self._v_hat = float(v_hat) 220 self._a_hat = float(a_hat) 221 self._a_hat_raw = float(a_hat_raw) 222 self._k = float(k) if not np.isinf(k) else np.inf 223 self._z = z_df 224 self._premiums = premiums 225 self._fitted = True 226 227 return self 228 229 # ------------------------------------------------------------------ 230 # Structural parameter properties 231 # ------------------------------------------------------------------ 232 233 @property 234 def mu_hat_(self) -> float: 235 """ 236 Collective mean - the grand weighted average loss rate across all groups. 237 238 This is the complement: what every group's credibility premium regresses 239 toward as exposure shrinks. 240 """ 241 self._check_fitted() 242 return self._mu_hat # type: ignore[return-value] 243 244 @property 245 def v_hat_(self) -> float: 246 """ 247 EPV - Expected value of Process Variance. 248 249 Measures within-group year-to-year volatility, averaged across groups. 250 High v_hat means individual group experience is noisy - lean on the 251 collective mean. 252 """ 253 self._check_fitted() 254 return self._v_hat # type: ignore[return-value] 255 256 @property 257 def a_hat_(self) -> float: 258 """ 259 VHM - Variance of Hypothetical Means. 260 261 Measures how much the true underlying loss rates differ between groups. 262 High a_hat means groups are genuinely heterogeneous - trust their own 263 experience. Truncated at zero if truncate_a=True. 264 """ 265 self._check_fitted() 266 return self._a_hat # type: ignore[return-value] 267 268 @property 269 def k_(self) -> float: 270 """ 271 Bühlmann's k - the credibility parameter. 272 273 k = v / a. This is the exposure a group needs to achieve Z = 0.5 (equal 274 weight on own experience and collective mean). A group with exposure w_i 275 has Z_i = w_i / (w_i + k). 276 277 Returns np.inf when a_hat is zero (all groups get the collective mean). 278 """ 279 self._check_fitted() 280 return self._k # type: ignore[return-value] 281 282 @property 283 def z_(self) -> pl.DataFrame: 284 """ 285 Credibility factors - a Polars DataFrame with columns ["group", "Z"]. 286 287 Z_i = w_i / (w_i + k), ranging from 0 (no credibility, use collective 288 mean) to 1 (full credibility, use group's own experience). 289 290 To look up a specific group:: 291 292 bs.z_.filter(pl.col("group") == "Motor Guild")["Z"][0] 293 294 To find the group with the highest Z:: 295 296 bs.z_.sort("Z", descending=True)["group"][0] 297 """ 298 self._check_fitted() 299 return self._z # type: ignore[return-value] 300 301 @property 302 def premiums_(self) -> pl.DataFrame: 303 """ 304 Credibility premiums - one row per group, as a Polars DataFrame. 305 306 Columns: 307 308 - ``group``: group identifier 309 - ``exposure``: total exposure weight for the group 310 - ``observed_mean``: exposure-weighted average loss rate 311 - ``Z``: credibility factor 312 - ``credibility_premium``: Z * observed_mean + (1 - Z) * mu_hat 313 - ``complement``: mu_hat (the collective mean complement) 314 """ 315 self._check_fitted() 316 return self._premiums # type: ignore[return-value] 317 318 # ------------------------------------------------------------------ 319 # Output 320 # ------------------------------------------------------------------ 321 322 def summary(self) -> pl.DataFrame: 323 """ 324 Return a formatted summary of the fit. 325 326 The table shows structural parameters at the top and per-group credibility 327 results below, formatted for an actuarial audience. 328 329 Returns 330 ------- 331 pl.DataFrame 332 Per-group results. Structural parameters are printed to stdout. 333 """ 334 self._check_fitted() 335 336 print("Bühlmann-Straub Credibility Model") 337 print("=" * 42) 338 print(f" Collective mean mu = {self._mu_hat:.6g}") 339 print(f" Process variance v = {self._v_hat:.6g} (EPV, within-group)") 340 print(f" Between-group var a = {self._a_hat:.6g} (VHM, between-group)") 341 if np.isinf(self._k): 342 print(f" Credibility param k = inf (a ≤ 0, all Z = 0)") 343 else: 344 print(f" Credibility param k = {self._k:.6g} (v / a)") 345 print() 346 print(" Interpretation: a group needs exposure = k to achieve Z = 0.50") 347 if self._a_hat_raw <= 0: 348 print(f" (raw a_hat before truncation: {self._a_hat_raw:.6g})") 349 print() 350 351 tbl = self._premiums.rename({ 352 "exposure": "Exposure", 353 "observed_mean": "Obs. Mean", 354 "Z": "Z", 355 "credibility_premium": "Cred. Premium", 356 "complement": "Complement", 357 }) 358 return tbl 359 360 # ------------------------------------------------------------------ 361 # Internal helpers 362 # ------------------------------------------------------------------ 363 364 @staticmethod 365 def _build_group_summary( 366 data: pl.DataFrame, 367 group_col: str, 368 period_col: str, 369 loss_col: str, 370 weight_col: str, 371 ) -> pl.DataFrame: 372 """ 373 Compute per-group summary statistics needed for parameter estimation. 374 375 Returns a Polars DataFrame with columns: 376 group : group identifier (same type as group_col) 377 w_i : total exposure (sum of weights) 378 x_bar_i : exposure-weighted mean loss rate 379 T_i : number of periods 380 """ 381 groups = ( 382 data.group_by(group_col) 383 .agg([ 384 pl.col(weight_col).sum().alias("w_i"), 385 ( 386 (pl.col(weight_col) * pl.col(loss_col)).sum() 387 / pl.col(weight_col).sum() 388 ).alias("x_bar_i"), 389 pl.len().alias("T_i"), 390 ]) 391 .rename({group_col: "group"}) 392 .sort("group") 393 ) 394 return groups 395 396 @staticmethod 397 def _estimate_structural_params( 398 data: pl.DataFrame, 399 groups: pl.DataFrame, 400 group_col: str, 401 period_col: str, 402 loss_col: str, 403 weight_col: str, 404 ) -> tuple[float, float, float]: 405 """ 406 Compute unbiased estimates of mu, v, and a. 407 408 These are the standard Bühlmann-Straub (1970) non-parametric estimators, 409 as presented in Bühlmann & Gisler (2005), Chapter 4. 410 411 Parameters 412 ---------- 413 data: 414 Full panel data (Polars DataFrame). 415 groups: 416 Per-group summary from _build_group_summary. 417 418 Returns 419 ------- 420 mu_hat, v_hat, a_hat_raw 421 a_hat_raw may be negative - the caller decides how to handle it. 422 """ 423 w = groups["w_i"].to_numpy() 424 x_bar = groups["x_bar_i"].to_numpy() 425 T = groups["T_i"].to_numpy() 426 group_ids = groups["group"].to_list() 427 r = len(groups) 428 429 # Collective mean: grand weighted average 430 w_total = w.sum() 431 mu_hat = (w * x_bar).sum() / w_total 432 433 # v_hat: within-group weighted mean squared deviation 434 # v_hat = (1 / sum_i(T_i - 1)) * sum_i sum_j [ w_{ij} * (X_{ij} - X_bar_i)^2 ] 435 denom_v = (T - 1).sum() # degrees of freedom for v 436 437 # Build a lookup from group id → x_bar_i for the within-group deviation 438 x_bar_map = dict(zip(group_ids, x_bar)) 439 440 # Join x_bar_i into the full data, then compute the weighted squared deviation 441 # We use a join rather than a Python loop for efficiency 442 x_bar_lf = pl.DataFrame({ 443 group_col: group_ids, 444 "_x_bar_i": x_bar, 445 }).lazy() 446 447 numerator_v = ( 448 data.lazy() 449 .join(x_bar_lf, on=group_col, how="left") 450 .with_columns( 451 ( 452 pl.col(weight_col) * (pl.col(loss_col) - pl.col("_x_bar_i")) ** 2 453 ).alias("_sq_dev") 454 ) 455 .select(pl.col("_sq_dev").sum()) 456 .collect() 457 ["_sq_dev"][0] 458 ) 459 460 v_hat = float(numerator_v) / denom_v if denom_v > 0 else 0.0 461 462 # a_hat: between-group variance 463 # c = w_total - sum_i(w_i^2) / w_total 464 # s2 = sum_i [ w_i * (x_bar_i - mu_hat)^2 ] 465 # a_hat = (s2 - (r - 1) * v_hat) / c 466 c = w_total - (w ** 2).sum() / w_total 467 s2 = (w * (x_bar - mu_hat) ** 2).sum() 468 a_hat_raw = (s2 - (r - 1) * v_hat) / c 469 470 return float(mu_hat), float(v_hat), float(a_hat_raw) 471 472 def _check_fitted(self) -> None: 473 if not self._fitted: 474 raise RuntimeError( 475 "Model has not been fitted. Call .fit() first." 476 ) 477 478 def __repr__(self) -> str: 479 if not self._fitted: 480 return "BuhlmannStraub(not fitted)" 481 return ( 482 f"BuhlmannStraub(" 483 f"mu={self._mu_hat:.4g}, " 484 f"v={self._v_hat:.4g}, " 485 f"a={self._a_hat:.4g}, " 486 f"k={self._k:.4g})" 487 )
Bühlmann-Straub credibility model with non-parametric structural parameter estimation.
Fit the model to a panel dataset - one row per (group, period) - where each row has a loss rate and an exposure weight. The model estimates the structural parameters v (within-group variance) and a (between-group variance) from the data itself, then computes a credibility factor Z_i for each group.
The result is a credibility premium for each group that blends its own weighted-average loss rate with the portfolio mean, with the blend governed entirely by the ratio K = v/a and the group's total exposure.
Parameters
truncate_a : bool, default True If True (the default), truncate a_hat at zero when the between-group variance estimate is negative. This sets K → ∞ and Z_i → 0 for all groups, so every group gets the collective mean. If False, a negative a_hat raises an error instead.
Examples
>>> import polars as pl
>>> from insurance_credibility import BuhlmannStraub
>>>
>>> df = pl.DataFrame({
... "scheme": ["A", "A", "A", "B", "B", "B", "C", "C", "C"],
... "year": [2021, 2022, 2023] * 3,
... "loss_rate": [0.55, 0.60, 0.58, 0.80, 0.75, 0.82, 0.40, 0.42, 0.38],
... "exposure": [1000, 1200, 1100, 300, 350, 320, 5000, 4800, 5200],
... })
>>> bs = BuhlmannStraub()
>>> bs.fit(df, group_col="scheme", period_col="year",
... loss_col="loss_rate", weight_col="exposure")
>>> bs.summary()
99 def __init__(self, truncate_a: bool = True) -> None: 100 self.truncate_a = truncate_a 101 102 # Fitted attributes - set by .fit() 103 self._mu_hat: Optional[float] = None 104 self._v_hat: Optional[float] = None 105 self._a_hat: Optional[float] = None 106 self._k: Optional[float] = None 107 self._z: Optional[pl.DataFrame] = None 108 self._premiums: Optional[pl.DataFrame] = None 109 self._fitted = False
115 def fit( 116 self, 117 data: Union[pl.DataFrame, "pd.DataFrame"], # type: ignore[name-defined] 118 group_col: str = "group", 119 period_col: str = "period", 120 loss_col: str = "loss", 121 weight_col: str = "weight", 122 ) -> "BuhlmannStraub": 123 """ 124 Estimate structural parameters and credibility factors from panel data. 125 126 Parameters 127 ---------- 128 data: 129 A Polars DataFrame (preferred) or pandas DataFrame (converted 130 internally) with one row per (group, period). Columns need not be 131 in any particular order. 132 group_col: 133 Column identifying the group (e.g. scheme ID, territory, NCD class). 134 period_col: 135 Column identifying the time period (e.g. accident year). Used only 136 to count periods per group; the ordering does not matter. 137 loss_col: 138 Column containing the loss rate or loss ratio. This should be losses 139 per unit of exposure - not total losses. If you have total losses, 140 divide by the exposure column before fitting. 141 weight_col: 142 Column containing the exposure weight (e.g. earned car years, 143 earned premium, policy count). Must be strictly positive. 144 145 Returns 146 ------- 147 self 148 Returns the fitted estimator so calls can be chained. 149 """ 150 data = _to_polars(data) 151 152 validate_panel_data(data, group_col, period_col, loss_col, weight_col) 153 check_duplicate_periods(data, group_col, period_col) 154 155 self._group_col = group_col 156 self._period_col = period_col 157 self._loss_col = loss_col 158 self._weight_col = weight_col 159 160 # Build group-level summary: w_i, X_bar_i, T_i 161 groups = self._build_group_summary(data, group_col, period_col, loss_col, weight_col) 162 self._groups = groups 163 164 # Estimate structural parameters 165 mu_hat, v_hat, a_hat_raw = self._estimate_structural_params( 166 data, groups, group_col, period_col, loss_col, weight_col 167 ) 168 169 # Handle negative a_hat 170 if a_hat_raw <= 0: 171 if self.truncate_a: 172 warnings.warn( 173 f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. " 174 "Truncating to zero. This means the model finds no detectable " 175 "heterogeneity between groups - all groups will receive the " 176 "collective mean as their credibility premium (Z_i = 0). " 177 "Consider whether your data genuinely lacks between-group " 178 "variation, or whether you have too few groups to estimate a reliably.", 179 stacklevel=2, 180 ) 181 a_hat = 0.0 182 k = np.inf 183 else: 184 raise ValueError( 185 f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. " 186 "Set truncate_a=True to handle this automatically." 187 ) 188 else: 189 a_hat = a_hat_raw 190 k = v_hat / a_hat 191 192 # Credibility factors 193 w = groups["w_i"].to_numpy() 194 if np.isinf(k): 195 z_values = np.zeros(len(groups)) 196 else: 197 z_values = w / (w + k) 198 199 group_ids = groups["group"].to_list() 200 x_bar = groups["x_bar_i"].to_numpy() 201 202 # z_ is a Polars DataFrame with columns ["group", "Z"] 203 z_df = pl.DataFrame({ 204 "group": group_ids, 205 "Z": z_values, 206 }) 207 208 # premiums_ is a Polars DataFrame with a "group" column (no index) 209 premiums = pl.DataFrame({ 210 "group": group_ids, 211 "exposure": w, 212 "observed_mean": x_bar, 213 "Z": z_values, 214 "credibility_premium": z_values * x_bar + (1 - z_values) * mu_hat, 215 "complement": np.full(len(groups), mu_hat), 216 }) 217 218 self._mu_hat = float(mu_hat) 219 self._v_hat = float(v_hat) 220 self._a_hat = float(a_hat) 221 self._a_hat_raw = float(a_hat_raw) 222 self._k = float(k) if not np.isinf(k) else np.inf 223 self._z = z_df 224 self._premiums = premiums 225 self._fitted = True 226 227 return self
Estimate structural parameters and credibility factors from panel data.
Parameters
data: A Polars DataFrame (preferred) or pandas DataFrame (converted internally) with one row per (group, period). Columns need not be in any particular order. group_col: Column identifying the group (e.g. scheme ID, territory, NCD class). period_col: Column identifying the time period (e.g. accident year). Used only to count periods per group; the ordering does not matter. loss_col: Column containing the loss rate or loss ratio. This should be losses per unit of exposure - not total losses. If you have total losses, divide by the exposure column before fitting. weight_col: Column containing the exposure weight (e.g. earned car years, earned premium, policy count). Must be strictly positive.
Returns
self Returns the fitted estimator so calls can be chained.
233 @property 234 def mu_hat_(self) -> float: 235 """ 236 Collective mean - the grand weighted average loss rate across all groups. 237 238 This is the complement: what every group's credibility premium regresses 239 toward as exposure shrinks. 240 """ 241 self._check_fitted() 242 return self._mu_hat # type: ignore[return-value]
Collective mean - the grand weighted average loss rate across all groups.
This is the complement: what every group's credibility premium regresses toward as exposure shrinks.
244 @property 245 def v_hat_(self) -> float: 246 """ 247 EPV - Expected value of Process Variance. 248 249 Measures within-group year-to-year volatility, averaged across groups. 250 High v_hat means individual group experience is noisy - lean on the 251 collective mean. 252 """ 253 self._check_fitted() 254 return self._v_hat # type: ignore[return-value]
EPV - Expected value of Process Variance.
Measures within-group year-to-year volatility, averaged across groups. High v_hat means individual group experience is noisy - lean on the collective mean.
256 @property 257 def a_hat_(self) -> float: 258 """ 259 VHM - Variance of Hypothetical Means. 260 261 Measures how much the true underlying loss rates differ between groups. 262 High a_hat means groups are genuinely heterogeneous - trust their own 263 experience. Truncated at zero if truncate_a=True. 264 """ 265 self._check_fitted() 266 return self._a_hat # type: ignore[return-value]
VHM - Variance of Hypothetical Means.
Measures how much the true underlying loss rates differ between groups. High a_hat means groups are genuinely heterogeneous - trust their own experience. Truncated at zero if truncate_a=True.
268 @property 269 def k_(self) -> float: 270 """ 271 Bühlmann's k - the credibility parameter. 272 273 k = v / a. This is the exposure a group needs to achieve Z = 0.5 (equal 274 weight on own experience and collective mean). A group with exposure w_i 275 has Z_i = w_i / (w_i + k). 276 277 Returns np.inf when a_hat is zero (all groups get the collective mean). 278 """ 279 self._check_fitted() 280 return self._k # type: ignore[return-value]
Bühlmann's k - the credibility parameter.
k = v / a. This is the exposure a group needs to achieve Z = 0.5 (equal weight on own experience and collective mean). A group with exposure w_i has Z_i = w_i / (w_i + k).
Returns np.inf when a_hat is zero (all groups get the collective mean).
282 @property 283 def z_(self) -> pl.DataFrame: 284 """ 285 Credibility factors - a Polars DataFrame with columns ["group", "Z"]. 286 287 Z_i = w_i / (w_i + k), ranging from 0 (no credibility, use collective 288 mean) to 1 (full credibility, use group's own experience). 289 290 To look up a specific group:: 291 292 bs.z_.filter(pl.col("group") == "Motor Guild")["Z"][0] 293 294 To find the group with the highest Z:: 295 296 bs.z_.sort("Z", descending=True)["group"][0] 297 """ 298 self._check_fitted() 299 return self._z # type: ignore[return-value]
Credibility factors - a Polars DataFrame with columns ["group", "Z"].
Z_i = w_i / (w_i + k), ranging from 0 (no credibility, use collective mean) to 1 (full credibility, use group's own experience).
To look up a specific group::
bs.z_.filter(pl.col("group") == "Motor Guild")["Z"][0]
To find the group with the highest Z::
bs.z_.sort("Z", descending=True)["group"][0]
322 def summary(self) -> pl.DataFrame: 323 """ 324 Return a formatted summary of the fit. 325 326 The table shows structural parameters at the top and per-group credibility 327 results below, formatted for an actuarial audience. 328 329 Returns 330 ------- 331 pl.DataFrame 332 Per-group results. Structural parameters are printed to stdout. 333 """ 334 self._check_fitted() 335 336 print("Bühlmann-Straub Credibility Model") 337 print("=" * 42) 338 print(f" Collective mean mu = {self._mu_hat:.6g}") 339 print(f" Process variance v = {self._v_hat:.6g} (EPV, within-group)") 340 print(f" Between-group var a = {self._a_hat:.6g} (VHM, between-group)") 341 if np.isinf(self._k): 342 print(f" Credibility param k = inf (a ≤ 0, all Z = 0)") 343 else: 344 print(f" Credibility param k = {self._k:.6g} (v / a)") 345 print() 346 print(" Interpretation: a group needs exposure = k to achieve Z = 0.50") 347 if self._a_hat_raw <= 0: 348 print(f" (raw a_hat before truncation: {self._a_hat_raw:.6g})") 349 print() 350 351 tbl = self._premiums.rename({ 352 "exposure": "Exposure", 353 "observed_mean": "Obs. Mean", 354 "Z": "Z", 355 "credibility_premium": "Cred. Premium", 356 "complement": "Complement", 357 }) 358 return tbl
Return a formatted summary of the fit.
The table shows structural parameters at the top and per-group credibility results below, formatted for an actuarial audience.
Returns
pl.DataFrame Per-group results. Structural parameters are printed to stdout.
102class HierarchicalBuhlmannStraub: 103 """ 104 Multi-level Bühlmann-Straub credibility model for nested group structures. 105 106 This model is appropriate when groups are arranged in a strict hierarchy - 107 every observation belongs to exactly one parent at each level. Typical uses: 108 109 - Geographic: postcode sector → district → area 110 - Organisational: risk → scheme → portfolio 111 - Time: accident quarter → accident year → underwriting year 112 113 Parameters 114 ---------- 115 level_cols: 116 List of column names defining the hierarchy from outermost (top) to 117 innermost (bottom). For a three-level model, provide three names. 118 Example: ``["area", "district", "sector"]``. 119 truncate_a: 120 Whether to truncate negative between-group variance estimates at zero. 121 Passed through to the BuhlmannStraub model at each level. 122 123 Examples 124 -------- 125 Geographic three-level model:: 126 127 >>> model = HierarchicalBuhlmannStraub( 128 ... level_cols=["region", "district", "sector"] 129 ... ) 130 >>> model.fit( 131 ... data=df, 132 ... period_col="year", 133 ... loss_col="loss_rate", 134 ... weight_col="exposure", 135 ... ) 136 >>> model.premiums_at("sector") # blended sector-level premiums 137 >>> model.level_results_["district"].k # k at district level 138 139 Two-level scheme model (equivalent to BuhlmannStraub with a book-level prior):: 140 141 >>> model = HierarchicalBuhlmannStraub(level_cols=["book", "scheme"]) 142 >>> model.fit(df, period_col="year", loss_col="loss_rate", weight_col="exposure") 143 """ 144 145 def __init__( 146 self, 147 level_cols: List[str], 148 truncate_a: bool = True, 149 ) -> None: 150 if len(level_cols) < 2: 151 raise ValueError( 152 "At least two levels are required for a hierarchical model. " 153 "For a single-level model, use BuhlmannStraub." 154 ) 155 self.level_cols = level_cols 156 self.truncate_a = truncate_a 157 self._fitted = False 158 self._level_results: Optional[Dict[str, LevelResult]] = None 159 self._bottom_premiums: Optional[pl.DataFrame] = None 160 161 def fit( 162 self, 163 data: Union[pl.DataFrame, "pd.DataFrame"], # type: ignore[name-defined] 164 period_col: str = "period", 165 loss_col: str = "loss", 166 weight_col: str = "weight", 167 ) -> "HierarchicalBuhlmannStraub": 168 """ 169 Fit the hierarchical model to panel data. 170 171 Parameters 172 ---------- 173 data: 174 Panel DataFrame (Polars preferred, pandas accepted) with one row 175 per (leaf node, period). Must contain all columns in ``level_cols`` 176 plus ``period_col``, ``loss_col``, and ``weight_col``. 177 period_col: 178 Column identifying the time period. 179 loss_col: 180 Column containing the per-unit loss rate. 181 weight_col: 182 Column containing exposure weights (must be strictly positive). 183 184 Returns 185 ------- 186 self 187 """ 188 data = _to_polars(data) 189 190 required_cols = set(self.level_cols) | {period_col, loss_col, weight_col} 191 missing = required_cols - set(data.columns) 192 if missing: 193 raise ValueError(f"Columns not found in data: {missing}") 194 195 self._period_col = period_col 196 self._loss_col = loss_col 197 self._weight_col = weight_col 198 199 # Validate that hierarchy is strict (no cross-level ambiguity) 200 self._validate_hierarchy(data) 201 202 # Bottom-up: estimate variance components at each level 203 level_results: Dict[str, LevelResult] = {} 204 bottom_level = self.level_cols[-1] 205 206 # At the innermost level, fit BuhlmannStraub with the leaf node as group 207 # and each period as an observation 208 innermost_fit = self._fit_innermost_level(data, bottom_level, period_col, loss_col, weight_col) 209 level_results[bottom_level] = innermost_fit 210 211 # For each higher level, aggregate upward and fit B-S treating each node 212 # at that level as a "group" with its children's credibility premiums as 213 # "observations" (but we use the raw aggregated data for cleaner estimation) 214 for depth in range(len(self.level_cols) - 2, -1, -1): 215 parent_col = self.level_cols[depth] 216 child_col = self.level_cols[depth + 1] 217 level_result = self._fit_upper_level( 218 data, 219 parent_col, 220 child_col, 221 loss_col, 222 weight_col, 223 lower_v=level_results[child_col].v_hat, 224 ) 225 level_results[parent_col] = level_result 226 227 self._level_results = level_results 228 229 # Top-down: compute final blended premiums at each level 230 self._bottom_premiums = self._compute_top_down_premiums(data, level_results) 231 self._fitted = True 232 return self 233 234 # ------------------------------------------------------------------ 235 # Output 236 # ------------------------------------------------------------------ 237 238 @property 239 def level_results_(self) -> Dict[str, LevelResult]: 240 """ 241 Dictionary mapping level name to LevelResult with fitted parameters. 242 """ 243 self._check_fitted() 244 return self._level_results # type: ignore[return-value] 245 246 @property 247 def premiums_(self) -> pl.DataFrame: 248 """ 249 Credibility premiums at the bottom (leaf) level of the hierarchy. 250 251 Equivalent to ``premiums_at(self.level_cols[-1])``. 252 """ 253 self._check_fitted() 254 return self._bottom_premiums # type: ignore[return-value] 255 256 def premiums_at(self, level: str) -> pl.DataFrame: 257 """ 258 Return the credibility premiums at a specified level of the hierarchy. 259 260 Parameters 261 ---------- 262 level: 263 One of the column names from ``level_cols``. 264 265 Returns 266 ------- 267 pl.DataFrame 268 With a ``group`` column plus ``exposure``, ``observed_mean``, 269 ``Z``, ``credibility_premium``, and ``complement`` columns. 270 """ 271 self._check_fitted() 272 if level not in self.level_cols: 273 raise ValueError( 274 f"'{level}' is not one of the fitted levels: {self.level_cols}" 275 ) 276 return self._level_results[level].premiums 277 278 def summary(self) -> None: 279 """Print structural parameters at each level.""" 280 self._check_fitted() 281 print("Hierarchical Bühlmann-Straub Credibility Model") 282 print("=" * 50) 283 print(f"Levels (outer → inner): {' → '.join(self.level_cols)}") 284 print() 285 for level in self.level_cols: 286 lr = self._level_results[level] 287 print(f" Level: {level}") 288 print(f" mu = {lr.mu_hat:.6g}") 289 print(f" v = {lr.v_hat:.6g} (EPV within-node)") 290 print(f" a = {lr.a_hat:.6g} (VHM between-node)") 291 k_str = "inf" if np.isinf(lr.k) else f"{lr.k:.6g}" 292 print(f" k = {k_str}") 293 print() 294 295 # ------------------------------------------------------------------ 296 # Internal helpers 297 # ------------------------------------------------------------------ 298 299 def _validate_hierarchy(self, data: pl.DataFrame) -> None: 300 """Check that the hierarchy is strict - each child belongs to exactly one parent.""" 301 for depth in range(len(self.level_cols) - 1): 302 parent = self.level_cols[depth] 303 child = self.level_cols[depth + 1] 304 parents_per_child = ( 305 data.select([parent, child]) 306 .unique() 307 .group_by(child) 308 .agg(pl.col(parent).n_unique().alias("n_parents")) 309 ) 310 ambiguous = parents_per_child.filter(pl.col("n_parents") > 1) 311 if ambiguous.height > 0: 312 examples = ambiguous[child].to_list()[:3] 313 raise ValueError( 314 f"Hierarchy is not strict at level '{child}' → '{parent}': " 315 f"{ambiguous.height} child node(s) appear under multiple parents. " 316 f"Examples: {examples}" 317 ) 318 319 def _fit_innermost_level( 320 self, 321 data: pl.DataFrame, 322 leaf_col: str, 323 period_col: str, 324 loss_col: str, 325 weight_col: str, 326 ) -> LevelResult: 327 """Fit B-S at the innermost level using raw period data.""" 328 bs = BuhlmannStraub(truncate_a=self.truncate_a) 329 bs.fit( 330 data, 331 group_col=leaf_col, 332 period_col=period_col, 333 loss_col=loss_col, 334 weight_col=weight_col, 335 ) 336 return LevelResult( 337 level_name=leaf_col, 338 mu_hat=bs.mu_hat_, 339 v_hat=bs.v_hat_, 340 a_hat=bs.a_hat_, 341 k=bs.k_, 342 z=bs.z_, 343 premiums=bs.premiums_, 344 ) 345 346 def _fit_upper_level( 347 self, 348 data: pl.DataFrame, 349 parent_col: str, 350 child_col: str, 351 loss_col: str, 352 weight_col: str, 353 lower_v: float, 354 ) -> LevelResult: 355 """ 356 Fit B-S at a higher level by aggregating child-level data. 357 358 We treat each child node as a "period" within the parent node, 359 using the child's total exposure as weight and its weighted mean loss 360 rate as the observation. The within-parent variance v at this level 361 is the between-child variance a from the level below. 362 363 This is the Jewell (1975) / actuar-compatible approach: v at level L 364 equals a at level L+1, propagating variance components up the hierarchy. 365 """ 366 # Aggregate data to child level (summing over periods) 367 child_summary = ( 368 data.select([parent_col, child_col, weight_col, loss_col]) 369 .group_by([parent_col, child_col]) 370 .agg([ 371 pl.col(weight_col).sum().alias("exposure"), 372 ( 373 (pl.col(weight_col) * pl.col(loss_col)).sum() 374 / pl.col(weight_col).sum() 375 ).alias("loss_rate"), 376 ]) 377 ) 378 379 # Fit B-S at parent level, treating each child as one "observation" 380 # The period col here is child_col, loss col is loss_rate, weight is exposure 381 bs = BuhlmannStraub(truncate_a=self.truncate_a) 382 bs.fit( 383 child_summary, 384 group_col=parent_col, 385 period_col=child_col, 386 loss_col="loss_rate", 387 weight_col="exposure", 388 ) 389 390 return LevelResult( 391 level_name=parent_col, 392 mu_hat=bs.mu_hat_, 393 v_hat=bs.v_hat_, 394 a_hat=bs.a_hat_, 395 k=bs.k_, 396 z=bs.z_, 397 premiums=bs.premiums_, 398 ) 399 400 def _compute_top_down_premiums( 401 self, 402 data: pl.DataFrame, 403 level_results: Dict[str, LevelResult], 404 ) -> pl.DataFrame: 405 """ 406 Compute the final blended premiums by propagating the hierarchy top-down. 407 408 At the topmost level, the complement is the grand mean. At each level 409 below, the complement for a node is the credibility premium of its parent. 410 The final output is a premium for each leaf node. 411 412 Blending formula at each level: 413 P_node = Z_node * X̄_node + (1 - Z_node) * P_parent 414 """ 415 top_level = self.level_cols[0] 416 top_lr = level_results[top_level] 417 418 # Start: parent premiums map from group id → credibility_premium at top level 419 # Represented as a pl.DataFrame with [group, parent_premium] for joining 420 parent_premiums_df = top_lr.premiums.select([ 421 pl.col("group"), 422 pl.col("credibility_premium").alias("parent_premium"), 423 ]) 424 425 # Work downward, level by level 426 for depth in range(1, len(self.level_cols)): 427 current_level = self.level_cols[depth] 428 parent_level = self.level_cols[depth - 1] 429 current_lr = level_results[current_level] 430 431 # Map: child node id → parent node id (unique pairs) 432 child_to_parent = ( 433 data.select([current_level, parent_level]) 434 .unique() 435 .rename({current_level: "group", parent_level: "parent_id"}) 436 ) 437 438 # Join child→parent map with parent premiums to get P_parent for each child 439 child_with_parent_premium = ( 440 child_to_parent 441 .join( 442 parent_premiums_df.rename({"group": "parent_id"}), 443 on="parent_id", 444 how="left", 445 ) 446 ) 447 448 # Join current level premiums (Z, observed_mean) with parent premium 449 blended = ( 450 current_lr.premiums.select(["group", "Z", "observed_mean"]) 451 .join(child_with_parent_premium.select(["group", "parent_premium"]), on="group", how="left") 452 .with_columns( 453 ( 454 pl.col("Z") * pl.col("observed_mean") 455 + (1 - pl.col("Z")) * pl.col("parent_premium") 456 ).alias("blended_premium") 457 ) 458 .select(["group", "blended_premium"]) 459 .rename({"blended_premium": "parent_premium"}) 460 ) 461 462 parent_premiums_df = blended 463 464 # Build final DataFrame: start from bottom-level premiums, replace credibility_premium 465 bottom_level = self.level_cols[-1] 466 bottom_lr = level_results[bottom_level] 467 result = ( 468 bottom_lr.premiums 469 .drop("credibility_premium") 470 .join( 471 parent_premiums_df.rename({"parent_premium": "credibility_premium"}), 472 on="group", 473 how="left", 474 ) 475 ) 476 return result 477 478 def _check_fitted(self) -> None: 479 if not self._fitted: 480 raise RuntimeError( 481 "Model has not been fitted. Call .fit() first." 482 ) 483 484 def __repr__(self) -> str: 485 levels_str = " → ".join(self.level_cols) 486 if not self._fitted: 487 return f"HierarchicalBuhlmannStraub(levels=[{levels_str}], not fitted)" 488 return f"HierarchicalBuhlmannStraub(levels=[{levels_str}])"
Multi-level Bühlmann-Straub credibility model for nested group structures.
This model is appropriate when groups are arranged in a strict hierarchy - every observation belongs to exactly one parent at each level. Typical uses:
- Geographic: postcode sector → district → area
- Organisational: risk → scheme → portfolio
- Time: accident quarter → accident year → underwriting year
Parameters
level_cols:
List of column names defining the hierarchy from outermost (top) to
innermost (bottom). For a three-level model, provide three names.
Example: ["area", "district", "sector"].
truncate_a:
Whether to truncate negative between-group variance estimates at zero.
Passed through to the BuhlmannStraub model at each level.
Examples
Geographic three-level model::
>>> model = HierarchicalBuhlmannStraub(
... level_cols=["region", "district", "sector"]
... )
>>> model.fit(
... data=df,
... period_col="year",
... loss_col="loss_rate",
... weight_col="exposure",
... )
>>> model.premiums_at("sector") # blended sector-level premiums
>>> model.level_results_["district"].k # k at district level
Two-level scheme model (equivalent to BuhlmannStraub with a book-level prior)::
>>> model = HierarchicalBuhlmannStraub(level_cols=["book", "scheme"])
>>> model.fit(df, period_col="year", loss_col="loss_rate", weight_col="exposure")
145 def __init__( 146 self, 147 level_cols: List[str], 148 truncate_a: bool = True, 149 ) -> None: 150 if len(level_cols) < 2: 151 raise ValueError( 152 "At least two levels are required for a hierarchical model. " 153 "For a single-level model, use BuhlmannStraub." 154 ) 155 self.level_cols = level_cols 156 self.truncate_a = truncate_a 157 self._fitted = False 158 self._level_results: Optional[Dict[str, LevelResult]] = None 159 self._bottom_premiums: Optional[pl.DataFrame] = None
161 def fit( 162 self, 163 data: Union[pl.DataFrame, "pd.DataFrame"], # type: ignore[name-defined] 164 period_col: str = "period", 165 loss_col: str = "loss", 166 weight_col: str = "weight", 167 ) -> "HierarchicalBuhlmannStraub": 168 """ 169 Fit the hierarchical model to panel data. 170 171 Parameters 172 ---------- 173 data: 174 Panel DataFrame (Polars preferred, pandas accepted) with one row 175 per (leaf node, period). Must contain all columns in ``level_cols`` 176 plus ``period_col``, ``loss_col``, and ``weight_col``. 177 period_col: 178 Column identifying the time period. 179 loss_col: 180 Column containing the per-unit loss rate. 181 weight_col: 182 Column containing exposure weights (must be strictly positive). 183 184 Returns 185 ------- 186 self 187 """ 188 data = _to_polars(data) 189 190 required_cols = set(self.level_cols) | {period_col, loss_col, weight_col} 191 missing = required_cols - set(data.columns) 192 if missing: 193 raise ValueError(f"Columns not found in data: {missing}") 194 195 self._period_col = period_col 196 self._loss_col = loss_col 197 self._weight_col = weight_col 198 199 # Validate that hierarchy is strict (no cross-level ambiguity) 200 self._validate_hierarchy(data) 201 202 # Bottom-up: estimate variance components at each level 203 level_results: Dict[str, LevelResult] = {} 204 bottom_level = self.level_cols[-1] 205 206 # At the innermost level, fit BuhlmannStraub with the leaf node as group 207 # and each period as an observation 208 innermost_fit = self._fit_innermost_level(data, bottom_level, period_col, loss_col, weight_col) 209 level_results[bottom_level] = innermost_fit 210 211 # For each higher level, aggregate upward and fit B-S treating each node 212 # at that level as a "group" with its children's credibility premiums as 213 # "observations" (but we use the raw aggregated data for cleaner estimation) 214 for depth in range(len(self.level_cols) - 2, -1, -1): 215 parent_col = self.level_cols[depth] 216 child_col = self.level_cols[depth + 1] 217 level_result = self._fit_upper_level( 218 data, 219 parent_col, 220 child_col, 221 loss_col, 222 weight_col, 223 lower_v=level_results[child_col].v_hat, 224 ) 225 level_results[parent_col] = level_result 226 227 self._level_results = level_results 228 229 # Top-down: compute final blended premiums at each level 230 self._bottom_premiums = self._compute_top_down_premiums(data, level_results) 231 self._fitted = True 232 return self
Fit the hierarchical model to panel data.
Parameters
data:
Panel DataFrame (Polars preferred, pandas accepted) with one row
per (leaf node, period). Must contain all columns in level_cols
plus period_col, loss_col, and weight_col.
period_col:
Column identifying the time period.
loss_col:
Column containing the per-unit loss rate.
weight_col:
Column containing exposure weights (must be strictly positive).
Returns
self
238 @property 239 def level_results_(self) -> Dict[str, LevelResult]: 240 """ 241 Dictionary mapping level name to LevelResult with fitted parameters. 242 """ 243 self._check_fitted() 244 return self._level_results # type: ignore[return-value]
Dictionary mapping level name to LevelResult with fitted parameters.
278 def summary(self) -> None: 279 """Print structural parameters at each level.""" 280 self._check_fitted() 281 print("Hierarchical Bühlmann-Straub Credibility Model") 282 print("=" * 50) 283 print(f"Levels (outer → inner): {' → '.join(self.level_cols)}") 284 print() 285 for level in self.level_cols: 286 lr = self._level_results[level] 287 print(f" Level: {level}") 288 print(f" mu = {lr.mu_hat:.6g}") 289 print(f" v = {lr.v_hat:.6g} (EPV within-node)") 290 print(f" a = {lr.a_hat:.6g} (VHM between-node)") 291 k_str = "inf" if np.isinf(lr.k) else f"{lr.k:.6g}" 292 print(f" k = {k_str}") 293 print()
Print structural parameters at each level.
54class LevelResult: 55 """ 56 Fitted parameters and premiums for one level of the hierarchy. 57 58 Attributes 59 ---------- 60 level_name: 61 The column name that identifies this level (e.g. "district"). 62 mu_hat: 63 Collective mean at this level. 64 v_hat: 65 Within-node variance at this level (EPV). 66 a_hat: 67 Between-node variance at this level (VHM). Zero if truncated. 68 k: 69 Bühlmann's k = v / a at this level. 70 z: 71 Credibility factors as a pl.DataFrame with columns ["group", "Z"]. 72 premiums: 73 Credibility premiums as a pl.DataFrame with a "group" column. 74 """ 75 76 def __init__( 77 self, 78 level_name: str, 79 mu_hat: float, 80 v_hat: float, 81 a_hat: float, 82 k: float, 83 z: pl.DataFrame, 84 premiums: pl.DataFrame, 85 ) -> None: 86 self.level_name = level_name 87 self.mu_hat = mu_hat 88 self.v_hat = v_hat 89 self.a_hat = a_hat 90 self.k = k 91 self.z = z 92 self.premiums = premiums 93 94 def __repr__(self) -> str: 95 return ( 96 f"LevelResult(level='{self.level_name}', " 97 f"mu={self.mu_hat:.4g}, v={self.v_hat:.4g}, " 98 f"a={self.a_hat:.4g}, k={self.k:.4g})" 99 )
Fitted parameters and premiums for one level of the hierarchy.
Attributes
level_name: The column name that identifies this level (e.g. "district"). mu_hat: Collective mean at this level. v_hat: Within-node variance at this level (EPV). a_hat: Between-node variance at this level (VHM). Zero if truncated. k: Bühlmann's k = v / a at this level. z: Credibility factors as a pl.DataFrame with columns ["group", "Z"]. premiums: Credibility premiums as a pl.DataFrame with a "group" column.
76 def __init__( 77 self, 78 level_name: str, 79 mu_hat: float, 80 v_hat: float, 81 a_hat: float, 82 k: float, 83 z: pl.DataFrame, 84 premiums: pl.DataFrame, 85 ) -> None: 86 self.level_name = level_name 87 self.mu_hat = mu_hat 88 self.v_hat = v_hat 89 self.a_hat = a_hat 90 self.k = k 91 self.z = z 92 self.premiums = premiums
15@dataclass 16class ClaimsHistory: 17 """Claims history for a single policy. 18 19 This is the fundamental input type for all experience rating models. 20 Each instance represents one policy's observed claims sequence across 21 one or more periods (typically policy years). 22 23 Parameters 24 ---------- 25 policy_id : str 26 Unique identifier for the policy. 27 periods : list[int] 28 Year indices for each observation period. Typically [1, 2, 3, ...]. 29 Must be non-empty. Periods need not be contiguous but must be unique 30 and in ascending order. 31 claim_counts : list[int] 32 Observed claim count in each period. Must have the same length as 33 ``periods``. Non-negative integers. 34 claim_amounts : list[float] or None 35 Aggregate claim amount (incurred) in each period. Set to None for 36 frequency-only models. When provided, must have the same length as 37 ``periods``. 38 exposures : list[float] 39 Risk exposure in each period (e.g., years on risk, vehicle-years). 40 Must have the same length as ``periods``. Defaults to 1.0 for each 41 period if not supplied. 42 prior_premium : float 43 The a priori (GLM-based) expected annual claim cost for the next 44 period. This is the base rate that experience rating will adjust. 45 Must be strictly positive. 46 47 Examples 48 -------- 49 >>> history = ClaimsHistory( 50 ... policy_id="POL001", 51 ... periods=[1, 2, 3], 52 ... claim_counts=[0, 1, 0], 53 ... exposures=[1.0, 1.0, 0.8], 54 ... prior_premium=450.0, 55 ... ) 56 """ 57 58 policy_id: str 59 periods: list[int] 60 claim_counts: list[int] 61 claim_amounts: Optional[list[float]] = None 62 exposures: Optional[list[float]] = None 63 prior_premium: float = 1.0 64 65 def __post_init__(self) -> None: 66 self._validate() 67 # Normalise exposures to a list if not provided 68 if self.exposures is None: 69 self.exposures = [1.0] * len(self.periods) 70 71 def _validate(self) -> None: 72 if len(self.periods) == 0: 73 raise ValueError("periods must be non-empty") 74 if len(self.claim_counts) != len(self.periods): 75 raise ValueError( 76 f"claim_counts length ({len(self.claim_counts)}) must match " 77 f"periods length ({len(self.periods)})" 78 ) 79 if self.claim_amounts is not None and len(self.claim_amounts) != len( 80 self.periods 81 ): 82 raise ValueError( 83 f"claim_amounts length ({len(self.claim_amounts)}) must match " 84 f"periods length ({len(self.periods)})" 85 ) 86 exposures = self.exposures 87 if exposures is not None: 88 if len(exposures) != len(self.periods): 89 raise ValueError( 90 f"exposures length ({len(exposures)}) must match " 91 f"periods length ({len(self.periods)})" 92 ) 93 if any(e <= 0.0 for e in exposures): 94 raise ValueError("All exposures must be strictly positive") 95 if any(c < 0 for c in self.claim_counts): 96 raise ValueError("claim_counts must be non-negative") 97 if self.prior_premium <= 0.0: 98 raise ValueError("prior_premium must be strictly positive") 99 if len(self.periods) != len(set(self.periods)): 100 raise ValueError("periods must be unique") 101 102 @property 103 def n_periods(self) -> int: 104 """Number of observation periods.""" 105 return len(self.periods) 106 107 @property 108 def total_claims(self) -> int: 109 """Total claim count across all periods.""" 110 return sum(self.claim_counts) 111 112 @property 113 def total_exposure(self) -> float: 114 """Total exposure across all periods.""" 115 assert self.exposures is not None 116 return sum(self.exposures) 117 118 @property 119 def claim_frequency(self) -> float: 120 """Observed claim frequency (claims per unit exposure). 121 122 Returns 0.0 if total exposure is zero (guard only — exposures 123 are validated to be positive). 124 """ 125 total_exp = self.total_exposure 126 if total_exp == 0.0: 127 return 0.0 128 return self.total_claims / total_exp 129 130 @property 131 def exposure_weighted_counts(self) -> list[float]: 132 """Claim counts adjusted by exposure: Y_t / e_t per period.""" 133 assert self.exposures is not None 134 return [c / e for c, e in zip(self.claim_counts, self.exposures)]
Claims history for a single policy.
This is the fundamental input type for all experience rating models. Each instance represents one policy's observed claims sequence across one or more periods (typically policy years).
Parameters
policy_id : str
Unique identifier for the policy.
periods : list[int]
Year indices for each observation period. Typically [1, 2, 3, ...].
Must be non-empty. Periods need not be contiguous but must be unique
and in ascending order.
claim_counts : list[int]
Observed claim count in each period. Must have the same length as
periods. Non-negative integers.
claim_amounts : list[float] or None
Aggregate claim amount (incurred) in each period. Set to None for
frequency-only models. When provided, must have the same length as
periods.
exposures : list[float]
Risk exposure in each period (e.g., years on risk, vehicle-years).
Must have the same length as periods. Defaults to 1.0 for each
period if not supplied.
prior_premium : float
The a priori (GLM-based) expected annual claim cost for the next
period. This is the base rate that experience rating will adjust.
Must be strictly positive.
Examples
>>> history = ClaimsHistory(
... policy_id="POL001",
... periods=[1, 2, 3],
... claim_counts=[0, 1, 0],
... exposures=[1.0, 1.0, 0.8],
... prior_premium=450.0,
... )
102 @property 103 def n_periods(self) -> int: 104 """Number of observation periods.""" 105 return len(self.periods)
Number of observation periods.
107 @property 108 def total_claims(self) -> int: 109 """Total claim count across all periods.""" 110 return sum(self.claim_counts)
Total claim count across all periods.
112 @property 113 def total_exposure(self) -> float: 114 """Total exposure across all periods.""" 115 assert self.exposures is not None 116 return sum(self.exposures)
Total exposure across all periods.
118 @property 119 def claim_frequency(self) -> float: 120 """Observed claim frequency (claims per unit exposure). 121 122 Returns 0.0 if total exposure is zero (guard only — exposures 123 are validated to be positive). 124 """ 125 total_exp = self.total_exposure 126 if total_exp == 0.0: 127 return 0.0 128 return self.total_claims / total_exp
Observed claim frequency (claims per unit exposure).
Returns 0.0 if total exposure is zero (guard only — exposures are validated to be positive).
130 @property 131 def exposure_weighted_counts(self) -> list[float]: 132 """Claim counts adjusted by exposure: Y_t / e_t per period.""" 133 assert self.exposures is not None 134 return [c / e for c, e in zip(self.claim_counts, self.exposures)]
Claim counts adjusted by exposure: Y_t / e_t per period.
137@dataclass 138class CalibrationResult: 139 """Output of the balance calibration step. 140 141 Parameters 142 ---------- 143 calibration_factor : float 144 Multiplicative factor applied to all posterior premiums to restore 145 portfolio balance. Values close to 1.0 indicate the model was 146 already approximately balanced before calibration. 147 sum_actual : float 148 Sum of observed claim counts (weighted by exposure) over the 149 calibration portfolio. 150 sum_predicted : float 151 Sum of posterior premiums (weighted by exposure) before calibration. 152 n_policies : int 153 Number of policies in the calibration portfolio. 154 """ 155 156 calibration_factor: float 157 sum_actual: float 158 sum_predicted: float 159 n_policies: int 160 161 @property 162 def relative_bias(self) -> float: 163 """Relative bias before calibration: (predicted - actual) / actual.""" 164 if self.sum_actual == 0.0: 165 return float("nan") 166 return (self.sum_predicted - self.sum_actual) / self.sum_actual
Output of the balance calibration step.
Parameters
calibration_factor : float Multiplicative factor applied to all posterior premiums to restore portfolio balance. Values close to 1.0 indicate the model was already approximately balanced before calibration. sum_actual : float Sum of observed claim counts (weighted by exposure) over the calibration portfolio. sum_predicted : float Sum of posterior premiums (weighted by exposure) before calibration. n_policies : int Number of policies in the calibration portfolio.
161 @property 162 def relative_bias(self) -> float: 163 """Relative bias before calibration: (predicted - actual) / actual.""" 164 if self.sum_actual == 0.0: 165 return float("nan") 166 return (self.sum_predicted - self.sum_actual) / self.sum_actual
Relative bias before calibration: (predicted - actual) / actual.
32class StaticCredibilityModel: 33 """Bühlmann-Straub credibility at individual policy level. 34 35 Fits the structural parameter kappa from a portfolio of policy histories 36 using the method of moments estimator. Predicts a multiplicative 37 credibility factor for each policy. 38 39 The model assumes: 40 Y_{t} | Theta ~ Poisson(mu * Theta * e_t) 41 Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2 42 43 This gives the credibility approximation: 44 E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t) 45 46 Parameters 47 ---------- 48 kappa : float or None 49 Known credibility coefficient kappa = sigma^2 / tau^2. If None, 50 kappa is estimated from the portfolio during fit(). Set explicitly 51 to share a kappa estimated from a larger portfolio. 52 min_kappa : float 53 Lower bound on the estimated kappa. Prevents omega from reaching 1 54 when the portfolio is very homogeneous. Default 0.1. 55 max_kappa : float 56 Upper bound on the estimated kappa. Prevents degenerate credibility 57 for very heterogeneous portfolios. Default 1000.0. 58 59 Attributes 60 ---------- 61 kappa_ : float 62 Fitted credibility coefficient (after calling fit()). 63 within_variance_ : float 64 Estimated within-policy variance component sigma^2. 65 between_variance_ : float 66 Estimated between-policy variance component tau^2. 67 portfolio_mean_ : float 68 Exposure-weighted mean claim frequency across the portfolio. 69 is_fitted_ : bool 70 True after fit() has been called. 71 72 Examples 73 -------- 74 >>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0), 75 ... ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)] 76 >>> model = StaticCredibilityModel() 77 >>> model.fit(histories) 78 StaticCredibilityModel(kappa=...) 79 >>> cf = model.predict(histories[0]) 80 >>> posterior = histories[0].prior_premium * cf 81 """ 82 83 def __init__( 84 self, 85 kappa: Optional[float] = None, 86 min_kappa: float = 0.1, 87 max_kappa: float = 1000.0, 88 ) -> None: 89 self.kappa = kappa 90 self.min_kappa = min_kappa 91 self.max_kappa = max_kappa 92 93 # Set after fit() 94 self.kappa_: Optional[float] = None 95 self.within_variance_: Optional[float] = None 96 self.between_variance_: Optional[float] = None 97 self.portfolio_mean_: Optional[float] = None 98 self.is_fitted_: bool = False 99 100 def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel": 101 """Estimate structural parameters from a portfolio of policy histories. 102 103 Uses the method of moments estimator for Bühlmann-Straub: 104 - within-policy variance estimated from period-to-period fluctuations 105 - between-policy variance estimated from cross-sectional spread 106 107 Parameters 108 ---------- 109 histories : list[ClaimsHistory] 110 Portfolio of policy histories. Must contain at least 2 policies 111 with at least 1 period each for parameter estimation. 112 113 Returns 114 ------- 115 StaticCredibilityModel 116 self (fitted model). 117 118 Raises 119 ------ 120 ValueError 121 If fewer than 2 histories are provided, or if all histories have 122 only 1 period (insufficient for within-variance estimation). 123 """ 124 if len(histories) < 2: 125 raise ValueError( 126 "At least 2 policy histories are required to estimate kappa" 127 ) 128 129 if self.kappa is not None: 130 # Use provided kappa, just estimate portfolio mean 131 self.kappa_ = self.kappa 132 self.within_variance_ = float("nan") 133 self.between_variance_ = float("nan") 134 self.portfolio_mean_ = self._portfolio_mean(histories) 135 self.is_fitted_ = True 136 return self 137 138 self.kappa_, self.within_variance_, self.between_variance_ = ( 139 self._estimate_kappa(histories) 140 ) 141 self.portfolio_mean_ = self._portfolio_mean(histories) 142 self.is_fitted_ = True 143 return self 144 145 def predict(self, history: ClaimsHistory) -> float: 146 """Compute the credibility factor for a single policy. 147 148 Parameters 149 ---------- 150 history : ClaimsHistory 151 The policy's claims history. The prior_premium field is used as 152 the a priori rate mu. 153 154 Returns 155 ------- 156 float 157 Credibility factor CF in (0, inf). Multiply by prior_premium to 158 obtain the posterior premium. 159 CF = omega_t * (Y_bar / mu) + (1 - omega_t) 160 161 Raises 162 ------ 163 RuntimeError 164 If fit() has not been called. 165 """ 166 self._check_fitted() 167 assert self.kappa_ is not None 168 assert self.exposures_ok(history) 169 170 assert history.exposures is not None 171 t = history.total_exposure # effective credibility weight 172 omega_t = t / (t + self.kappa_) 173 174 mu = history.prior_premium # a priori rate (expected frequency) 175 y_bar = history.claim_frequency # empirical frequency 176 177 # Credibility factor: omega * (empirical / prior) + (1 - omega) 178 if mu <= 0.0: 179 return 1.0 180 181 cf = omega_t * (y_bar / mu) + (1.0 - omega_t) 182 return max(cf, 0.0) # guard against negative values from rounding 183 184 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 185 """Score a batch of policies and return a Polars DataFrame. 186 187 Parameters 188 ---------- 189 histories : list[ClaimsHistory] 190 Policies to score. 191 192 Returns 193 ------- 194 pl.DataFrame 195 Columns: policy_id, prior_premium, credibility_factor, 196 posterior_premium. 197 """ 198 self._check_fitted() 199 rows = [] 200 for h in histories: 201 cf = self.predict(h) 202 rows.append( 203 { 204 "policy_id": h.policy_id, 205 "prior_premium": h.prior_premium, 206 "credibility_factor": cf, 207 "posterior_premium": h.prior_premium * cf, 208 } 209 ) 210 return pl.DataFrame(rows) 211 212 def credibility_weight(self, history: ClaimsHistory) -> float: 213 """Return the Bühlmann credibility weight omega_t for a policy. 214 215 Parameters 216 ---------- 217 history : ClaimsHistory 218 The policy's claims history. 219 220 Returns 221 ------- 222 float 223 omega_t = t / (t + kappa) in [0, 1], where t is total exposure. 224 """ 225 self._check_fitted() 226 assert self.kappa_ is not None 227 assert history.exposures is not None 228 t = history.total_exposure 229 return t / (t + self.kappa_) 230 231 # ------------------------------------------------------------------ 232 # Private helpers 233 # ------------------------------------------------------------------ 234 235 def _estimate_kappa( 236 self, histories: list[ClaimsHistory] 237 ) -> tuple[float, float, float]: 238 """Bühlmann-Straub method of moments estimation. 239 240 Returns (kappa, within_variance, between_variance). 241 """ 242 # Compute per-policy exposure-weighted mean frequencies 243 assert all(h.exposures is not None for h in histories) 244 n = len(histories) 245 246 exposures = [np.array(h.exposures, dtype=float) for h in histories] 247 counts = [np.array(h.claim_counts, dtype=float) for h in histories] 248 rates = [c / e for c, e in zip(counts, exposures)] # Y_t / e_t 249 250 total_exp = [float(e.sum()) for e in exposures] 251 y_bars = [float((c.sum()) / t) for c, t in zip(counts, total_exp)] 252 253 grand_total_exp = sum(total_exp) 254 grand_mean = sum( 255 c.sum() for c in counts 256 ) / grand_total_exp # E_i[Y_bar_i] 257 258 # --- Within-policy variance estimate (sigma^2) --- 259 # sigma^2 = E[Var(Y_t | Theta)] = mu (for Poisson) 260 # MoM estimator: average exposure-weighted within-period variance 261 within_num = 0.0 262 within_denom = 0.0 263 for i, (rate_i, exp_i, y_bar_i) in enumerate( 264 zip(rates, exposures, y_bars) 265 ): 266 t_i = len(rate_i) 267 if t_i < 2: 268 continue 269 for t in range(t_i): 270 within_num += exp_i[t] * (rate_i[t] - y_bar_i) ** 2 271 within_denom += 1 272 if within_denom == 0 or within_num == 0: 273 # Fallback: use portfolio mean as within-variance estimate 274 sigma2 = grand_mean 275 else: 276 # Unbiased estimator: divide by (T_i - 1) per policy 277 total_periods = sum(len(r) for r in rates) 278 denom2 = total_periods - n 279 if denom2 <= 0: 280 sigma2 = grand_mean 281 else: 282 sigma2 = within_num / denom2 283 284 # --- Between-policy variance estimate (tau^2) --- 285 # tau^2 = Var(Theta) 286 # Between-policy sum of squares 287 c_const = grand_total_exp - sum(t**2 for t in total_exp) / grand_total_exp 288 if c_const <= 0: 289 tau2 = max(sigma2 / 10.0, 1e-6) 290 else: 291 between_ss = sum( 292 t_i * (y_bar_i - grand_mean) ** 2 293 for t_i, y_bar_i in zip(total_exp, y_bars) 294 ) 295 tau2 = max((between_ss - (n - 1) * sigma2) / c_const, 1e-8) 296 297 kappa = sigma2 / tau2 298 kappa = float(np.clip(kappa, self.min_kappa, self.max_kappa)) 299 return kappa, float(sigma2), float(tau2) 300 301 def _portfolio_mean(self, histories: list[ClaimsHistory]) -> float: 302 total_claims = sum(h.total_claims for h in histories) 303 total_exp = sum(h.total_exposure for h in histories) 304 if total_exp == 0.0: 305 return 0.0 306 return total_claims / total_exp 307 308 def _check_fitted(self) -> None: 309 if not self.is_fitted_: 310 raise RuntimeError( 311 "Model has not been fitted. Call fit() before predict()." 312 ) 313 314 @staticmethod 315 def exposures_ok(history: ClaimsHistory) -> bool: 316 return history.exposures is not None and len(history.exposures) > 0 317 318 def __repr__(self) -> str: 319 if self.is_fitted_: 320 return f"StaticCredibilityModel(kappa={self.kappa_:.4f})" 321 return "StaticCredibilityModel(unfitted)"
Bühlmann-Straub credibility at individual policy level.
Fits the structural parameter kappa from a portfolio of policy histories using the method of moments estimator. Predicts a multiplicative credibility factor for each policy.
The model assumes:
Y_{t} | Theta ~ Poisson(mu * Theta * e_t) Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2
This gives the credibility approximation:
E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t)
Parameters
kappa : float or None Known credibility coefficient kappa = sigma^2 / tau^2. If None, kappa is estimated from the portfolio during fit(). Set explicitly to share a kappa estimated from a larger portfolio. min_kappa : float Lower bound on the estimated kappa. Prevents omega from reaching 1 when the portfolio is very homogeneous. Default 0.1. max_kappa : float Upper bound on the estimated kappa. Prevents degenerate credibility for very heterogeneous portfolios. Default 1000.0.
Attributes
kappa_ : float Fitted credibility coefficient (after calling fit()). within_variance_ : float Estimated within-policy variance component sigma^2. between_variance_ : float Estimated between-policy variance component tau^2. portfolio_mean_ : float Exposure-weighted mean claim frequency across the portfolio. is_fitted_ : bool True after fit() has been called.
Examples
>>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0),
... ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)]
>>> model = StaticCredibilityModel()
>>> model.fit(histories)
StaticCredibilityModel(kappa=...)
>>> cf = model.predict(histories[0])
>>> posterior = histories[0].prior_premium * cf
83 def __init__( 84 self, 85 kappa: Optional[float] = None, 86 min_kappa: float = 0.1, 87 max_kappa: float = 1000.0, 88 ) -> None: 89 self.kappa = kappa 90 self.min_kappa = min_kappa 91 self.max_kappa = max_kappa 92 93 # Set after fit() 94 self.kappa_: Optional[float] = None 95 self.within_variance_: Optional[float] = None 96 self.between_variance_: Optional[float] = None 97 self.portfolio_mean_: Optional[float] = None 98 self.is_fitted_: bool = False
100 def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel": 101 """Estimate structural parameters from a portfolio of policy histories. 102 103 Uses the method of moments estimator for Bühlmann-Straub: 104 - within-policy variance estimated from period-to-period fluctuations 105 - between-policy variance estimated from cross-sectional spread 106 107 Parameters 108 ---------- 109 histories : list[ClaimsHistory] 110 Portfolio of policy histories. Must contain at least 2 policies 111 with at least 1 period each for parameter estimation. 112 113 Returns 114 ------- 115 StaticCredibilityModel 116 self (fitted model). 117 118 Raises 119 ------ 120 ValueError 121 If fewer than 2 histories are provided, or if all histories have 122 only 1 period (insufficient for within-variance estimation). 123 """ 124 if len(histories) < 2: 125 raise ValueError( 126 "At least 2 policy histories are required to estimate kappa" 127 ) 128 129 if self.kappa is not None: 130 # Use provided kappa, just estimate portfolio mean 131 self.kappa_ = self.kappa 132 self.within_variance_ = float("nan") 133 self.between_variance_ = float("nan") 134 self.portfolio_mean_ = self._portfolio_mean(histories) 135 self.is_fitted_ = True 136 return self 137 138 self.kappa_, self.within_variance_, self.between_variance_ = ( 139 self._estimate_kappa(histories) 140 ) 141 self.portfolio_mean_ = self._portfolio_mean(histories) 142 self.is_fitted_ = True 143 return self
Estimate structural parameters from a portfolio of policy histories.
Uses the method of moments estimator for Bühlmann-Straub:
- within-policy variance estimated from period-to-period fluctuations
- between-policy variance estimated from cross-sectional spread
Parameters
histories : list[ClaimsHistory] Portfolio of policy histories. Must contain at least 2 policies with at least 1 period each for parameter estimation.
Returns
StaticCredibilityModel self (fitted model).
Raises
ValueError If fewer than 2 histories are provided, or if all histories have only 1 period (insufficient for within-variance estimation).
145 def predict(self, history: ClaimsHistory) -> float: 146 """Compute the credibility factor for a single policy. 147 148 Parameters 149 ---------- 150 history : ClaimsHistory 151 The policy's claims history. The prior_premium field is used as 152 the a priori rate mu. 153 154 Returns 155 ------- 156 float 157 Credibility factor CF in (0, inf). Multiply by prior_premium to 158 obtain the posterior premium. 159 CF = omega_t * (Y_bar / mu) + (1 - omega_t) 160 161 Raises 162 ------ 163 RuntimeError 164 If fit() has not been called. 165 """ 166 self._check_fitted() 167 assert self.kappa_ is not None 168 assert self.exposures_ok(history) 169 170 assert history.exposures is not None 171 t = history.total_exposure # effective credibility weight 172 omega_t = t / (t + self.kappa_) 173 174 mu = history.prior_premium # a priori rate (expected frequency) 175 y_bar = history.claim_frequency # empirical frequency 176 177 # Credibility factor: omega * (empirical / prior) + (1 - omega) 178 if mu <= 0.0: 179 return 1.0 180 181 cf = omega_t * (y_bar / mu) + (1.0 - omega_t) 182 return max(cf, 0.0) # guard against negative values from rounding
Compute the credibility factor for a single policy.
Parameters
history : ClaimsHistory The policy's claims history. The prior_premium field is used as the a priori rate mu.
Returns
float Credibility factor CF in (0, inf). Multiply by prior_premium to obtain the posterior premium. CF = omega_t * (Y_bar / mu) + (1 - omega_t)
Raises
RuntimeError If fit() has not been called.
184 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 185 """Score a batch of policies and return a Polars DataFrame. 186 187 Parameters 188 ---------- 189 histories : list[ClaimsHistory] 190 Policies to score. 191 192 Returns 193 ------- 194 pl.DataFrame 195 Columns: policy_id, prior_premium, credibility_factor, 196 posterior_premium. 197 """ 198 self._check_fitted() 199 rows = [] 200 for h in histories: 201 cf = self.predict(h) 202 rows.append( 203 { 204 "policy_id": h.policy_id, 205 "prior_premium": h.prior_premium, 206 "credibility_factor": cf, 207 "posterior_premium": h.prior_premium * cf, 208 } 209 ) 210 return pl.DataFrame(rows)
Score a batch of policies and return a Polars DataFrame.
Parameters
histories : list[ClaimsHistory] Policies to score.
Returns
pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium.
212 def credibility_weight(self, history: ClaimsHistory) -> float: 213 """Return the Bühlmann credibility weight omega_t for a policy. 214 215 Parameters 216 ---------- 217 history : ClaimsHistory 218 The policy's claims history. 219 220 Returns 221 ------- 222 float 223 omega_t = t / (t + kappa) in [0, 1], where t is total exposure. 224 """ 225 self._check_fitted() 226 assert self.kappa_ is not None 227 assert history.exposures is not None 228 t = history.total_exposure 229 return t / (t + self.kappa_)
Return the Bühlmann credibility weight omega_t for a policy.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
float omega_t = t / (t + kappa) in [0, 1], where t is total exposure.
69class DynamicPoissonGammaModel: 70 """Poisson-gamma conjugate state-space model with seniority weighting. 71 72 The model fits two decay parameters (p, q) from the portfolio using 73 empirical Bayes maximum likelihood. At prediction time, it runs the 74 forward recursion to compute the posterior state (alpha_t, beta_t) 75 and returns the corresponding posterior mean as a credibility factor. 76 77 Parameters 78 ---------- 79 p0 : float 80 Initial guess for the state-reversion parameter p. Must be in (0, 1]. 81 q0 : float 82 Initial guess for the decay parameter q. Must be in (0, 1]. 83 alpha0 : float 84 Initial shape parameter of the prior Gamma distribution for Theta. 85 Must be > 0. Default 1.0 (prior mode at 1 with matching scale). 86 beta0_multiplier : float 87 The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier 88 per unit of a priori rate. This keeps the prior mean at 1.0. 89 Default 1.0. 90 bounds : tuple 91 (lower, upper) bounds for p and q during optimisation. 92 Default ((0.01, 0.99), (0.01, 0.99)). 93 94 Attributes 95 ---------- 96 p_ : float 97 Fitted state-reversion parameter (after fit()). 98 q_ : float 99 Fitted decay parameter (after fit()). 100 loglik_ : float 101 Log-likelihood at the fitted parameters. 102 is_fitted_ : bool 103 True after fit() has been called. 104 105 Examples 106 -------- 107 >>> import numpy as np 108 >>> rng = np.random.default_rng(42) 109 >>> histories = [ 110 ... ClaimsHistory(f"P{i}", [1,2,3], 111 ... rng.poisson(1.2, size=3).tolist(), 112 ... prior_premium=400.0) 113 ... for i in range(100) 114 ... ] 115 >>> model = DynamicPoissonGammaModel() 116 >>> model.fit(histories) 117 DynamicPoissonGammaModel(p=..., q=...) 118 """ 119 120 def __init__( 121 self, 122 p0: float = 0.5, 123 q0: float = 0.8, 124 alpha0: float = 1.0, 125 beta0_multiplier: float = 1.0, 126 bounds: tuple = ((0.01, 0.99), (0.01, 0.99)), 127 ) -> None: 128 self.p0 = p0 129 self.q0 = q0 130 self.alpha0 = alpha0 131 self.beta0_multiplier = beta0_multiplier 132 self.bounds = bounds 133 134 self.p_: Optional[float] = None 135 self.q_: Optional[float] = None 136 self.loglik_: Optional[float] = None 137 self.is_fitted_: bool = False 138 139 def fit( 140 self, histories: list[ClaimsHistory], verbose: bool = False 141 ) -> "DynamicPoissonGammaModel": 142 """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood. 143 144 The log-likelihood sums over all policies and all periods. Within each 145 policy, the marginal Y_t is Negative Binomial with state-dependent 146 shape and mean parameters derived from the forward recursion. 147 148 Parameters 149 ---------- 150 histories : list[ClaimsHistory] 151 Training portfolio. At least 10 policies recommended for stable 152 parameter estimation. 153 verbose : bool 154 If True, print optimisation progress. 155 156 Returns 157 ------- 158 DynamicPoissonGammaModel 159 self (fitted model). 160 """ 161 if len(histories) < 2: 162 raise ValueError( 163 "At least 2 policy histories are required for fitting" 164 ) 165 166 def neg_loglik(params: np.ndarray) -> float: 167 p, q = float(params[0]), float(params[1]) 168 total_ll = 0.0 169 for history in histories: 170 total_ll += self._policy_loglik(history, p, q) 171 return -total_ll 172 173 x0 = np.array([self.p0, self.q0]) 174 bounds_list = [self.bounds[0], self.bounds[1]] 175 176 result = optimize.minimize( 177 neg_loglik, 178 x0, 179 method="L-BFGS-B", 180 bounds=bounds_list, 181 options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6}, 182 ) 183 184 if verbose and not result.success: 185 print(f"Optimisation warning: {result.message}") 186 187 self.p_ = float(result.x[0]) 188 self.q_ = float(result.x[1]) 189 self.loglik_ = float(-result.fun) 190 self.is_fitted_ = True 191 return self 192 193 def predict(self, history: ClaimsHistory) -> float: 194 """Compute the credibility factor for a single policy. 195 196 Runs the forward recursion using the fitted (p, q) parameters to 197 propagate the Gamma posterior through the observed claim sequence. 198 The credibility factor is the ratio of the posterior mean to the 199 a priori rate. 200 201 Parameters 202 ---------- 203 history : ClaimsHistory 204 The policy's claims history. 205 206 Returns 207 ------- 208 float 209 Credibility factor CF = mu_post / mu_prior. Multiply by 210 prior_premium to obtain the posterior premium. 211 212 Raises 213 ------ 214 RuntimeError 215 If fit() has not been called. 216 """ 217 self._check_fitted() 218 assert self.p_ is not None 219 assert self.q_ is not None 220 221 alpha_t, beta_t = self._forward_recursion( 222 history, self.p_, self.q_ 223 ) 224 mu = history.prior_premium 225 mu_post = (alpha_t / beta_t) * mu 226 cf = mu_post / mu 227 return max(cf, 0.0) 228 229 def predict_posterior_params( 230 self, history: ClaimsHistory 231 ) -> tuple[float, float]: 232 """Return the posterior Gamma parameters (alpha, beta) after recursion. 233 234 The posterior distribution of Theta_{t+1} | Y_{1:t} is: 235 Gamma(alpha_{t+1}, beta_{t+1}) 236 with mean alpha_{t+1} / beta_{t+1}. 237 238 This is useful for uncertainty quantification: the posterior variance 239 is alpha / beta^2. 240 241 Parameters 242 ---------- 243 history : ClaimsHistory 244 The policy's claims history. 245 246 Returns 247 ------- 248 tuple[float, float] 249 (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma. 250 """ 251 self._check_fitted() 252 assert self.p_ is not None 253 assert self.q_ is not None 254 return self._forward_recursion(history, self.p_, self.q_) 255 256 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 257 """Score a batch of policies and return a Polars DataFrame. 258 259 Parameters 260 ---------- 261 histories : list[ClaimsHistory] 262 Policies to score. 263 264 Returns 265 ------- 266 pl.DataFrame 267 Columns: policy_id, prior_premium, credibility_factor, 268 posterior_premium, posterior_alpha, posterior_beta, 269 posterior_variance. 270 """ 271 self._check_fitted() 272 rows = [] 273 for h in histories: 274 cf = self.predict(h) 275 alpha_t, beta_t = self.predict_posterior_params(h) 276 rows.append( 277 { 278 "policy_id": h.policy_id, 279 "prior_premium": h.prior_premium, 280 "credibility_factor": cf, 281 "posterior_premium": h.prior_premium * cf, 282 "posterior_alpha": alpha_t, 283 "posterior_beta": beta_t, 284 "posterior_variance": alpha_t / (beta_t**2), 285 } 286 ) 287 return pl.DataFrame(rows) 288 289 # ------------------------------------------------------------------ 290 # Core recursion (also used in log-likelihood) 291 # ------------------------------------------------------------------ 292 293 def _forward_recursion( 294 self, 295 history: ClaimsHistory, 296 p: float, 297 q: float, 298 ) -> tuple[float, float]: 299 """Run the Gamma state recursion through the claims history. 300 301 Returns the posterior parameters (alpha, beta) after observing 302 all periods in ``history``. The initial prior is set so that the 303 prior mean of Theta is 1 (i.e., alpha_0 / beta_0 = 1). 304 305 Parameters 306 ---------- 307 history : ClaimsHistory 308 Policy's claims history. 309 p : float 310 State-reversion parameter. 311 q : float 312 Decay parameter. 313 314 Returns 315 ------- 316 tuple[float, float] 317 (alpha_{T+1}, beta_{T+1}) — the one-step-ahead posterior state. 318 """ 319 assert history.exposures is not None 320 mu = history.prior_premium 321 322 # Initialise prior: Theta ~ Gamma(alpha0, beta0) with E[Theta]=1 323 alpha = self.alpha0 324 beta = self.alpha0 * self.beta0_multiplier # alpha0 / beta0 = 1 325 326 for y_t, e_t in zip(history.claim_counts, history.exposures): 327 # Bayesian update: observe Y_t ~ Poi(mu * e_t * Theta) 328 # Posterior: Theta | Y_t ~ Gamma(alpha + Y_t, beta + mu*e_t) 329 alpha_post = alpha + float(y_t) 330 beta_post = beta + mu * e_t 331 332 # State transition to next period (Ahn et al. 2023, eq. 2.4) 333 # beta_next = q * beta_{t|t} — posterior rate, not double-counted 334 beta_next = q * beta_post 335 alpha_next = p * q * alpha_post + (1.0 - p) * beta_next 336 337 # Guard against numerical underflow 338 alpha = max(alpha_next, 1e-10) 339 beta = max(beta_next, 1e-10) 340 341 return float(alpha), float(beta) 342 343 def _policy_loglik( 344 self, 345 history: ClaimsHistory, 346 p: float, 347 q: float, 348 ) -> float: 349 """Compute the marginal log-likelihood for a single policy. 350 351 The marginal distribution of Y_t given Y_{1:t-1} is Negative Binomial. 352 The shape parameter r_t = alpha_t and the mean is mu * e_t * alpha_t / beta_t. 353 354 Summing these marginal log-likelihoods over all periods gives the 355 complete-data log-likelihood tractable via the forward recursion. 356 """ 357 assert history.exposures is not None 358 mu = history.prior_premium 359 360 alpha = self.alpha0 361 beta = self.alpha0 * self.beta0_multiplier 362 363 total_ll = 0.0 364 for y_t, e_t in zip(history.claim_counts, history.exposures): 365 # Marginal: Y_t | Y_{1:t-1} ~ NegBin(r=alpha, mu=alpha/beta * mu*e_t) 366 r_t = alpha 367 mu_t = (alpha / beta) * mu * e_t 368 total_ll += _negbin_logpmf(int(y_t), r_t, mu_t) 369 370 # Update state (Ahn et al. 2023, eq. 2.4) 371 alpha_post = alpha + float(y_t) 372 beta_post = beta + mu * e_t 373 # beta_next = q * beta_{t|t} — posterior rate only, no double-count 374 beta_next = q * beta_post 375 alpha_next = p * q * alpha_post + (1.0 - p) * beta_next 376 377 alpha = max(alpha_next, 1e-10) 378 beta = max(beta_next, 1e-10) 379 380 return total_ll 381 382 def _check_fitted(self) -> None: 383 if not self.is_fitted_: 384 raise RuntimeError( 385 "Model has not been fitted. Call fit() before predict()." 386 ) 387 388 def __repr__(self) -> str: 389 if self.is_fitted_: 390 return ( 391 f"DynamicPoissonGammaModel(p={self.p_:.4f}, q={self.q_:.4f})" 392 ) 393 return "DynamicPoissonGammaModel(unfitted)"
Poisson-gamma conjugate state-space model with seniority weighting.
The model fits two decay parameters (p, q) from the portfolio using empirical Bayes maximum likelihood. At prediction time, it runs the forward recursion to compute the posterior state (alpha_t, beta_t) and returns the corresponding posterior mean as a credibility factor.
Parameters
p0 : float Initial guess for the state-reversion parameter p. Must be in (0, 1]. q0 : float Initial guess for the decay parameter q. Must be in (0, 1]. alpha0 : float Initial shape parameter of the prior Gamma distribution for Theta. Must be > 0. Default 1.0 (prior mode at 1 with matching scale). beta0_multiplier : float The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier per unit of a priori rate. This keeps the prior mean at 1.0. Default 1.0. bounds : tuple (lower, upper) bounds for p and q during optimisation. Default ((0.01, 0.99), (0.01, 0.99)).
Attributes
p_ : float Fitted state-reversion parameter (after fit()). q_ : float Fitted decay parameter (after fit()). loglik_ : float Log-likelihood at the fitted parameters. is_fitted_ : bool True after fit() has been called.
Examples
>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> histories = [
... ClaimsHistory(f"P{i}", [1,2,3],
... rng.poisson(1.2, size=3).tolist(),
... prior_premium=400.0)
... for i in range(100)
... ]
>>> model = DynamicPoissonGammaModel()
>>> model.fit(histories)
DynamicPoissonGammaModel(p=..., q=...)
120 def __init__( 121 self, 122 p0: float = 0.5, 123 q0: float = 0.8, 124 alpha0: float = 1.0, 125 beta0_multiplier: float = 1.0, 126 bounds: tuple = ((0.01, 0.99), (0.01, 0.99)), 127 ) -> None: 128 self.p0 = p0 129 self.q0 = q0 130 self.alpha0 = alpha0 131 self.beta0_multiplier = beta0_multiplier 132 self.bounds = bounds 133 134 self.p_: Optional[float] = None 135 self.q_: Optional[float] = None 136 self.loglik_: Optional[float] = None 137 self.is_fitted_: bool = False
139 def fit( 140 self, histories: list[ClaimsHistory], verbose: bool = False 141 ) -> "DynamicPoissonGammaModel": 142 """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood. 143 144 The log-likelihood sums over all policies and all periods. Within each 145 policy, the marginal Y_t is Negative Binomial with state-dependent 146 shape and mean parameters derived from the forward recursion. 147 148 Parameters 149 ---------- 150 histories : list[ClaimsHistory] 151 Training portfolio. At least 10 policies recommended for stable 152 parameter estimation. 153 verbose : bool 154 If True, print optimisation progress. 155 156 Returns 157 ------- 158 DynamicPoissonGammaModel 159 self (fitted model). 160 """ 161 if len(histories) < 2: 162 raise ValueError( 163 "At least 2 policy histories are required for fitting" 164 ) 165 166 def neg_loglik(params: np.ndarray) -> float: 167 p, q = float(params[0]), float(params[1]) 168 total_ll = 0.0 169 for history in histories: 170 total_ll += self._policy_loglik(history, p, q) 171 return -total_ll 172 173 x0 = np.array([self.p0, self.q0]) 174 bounds_list = [self.bounds[0], self.bounds[1]] 175 176 result = optimize.minimize( 177 neg_loglik, 178 x0, 179 method="L-BFGS-B", 180 bounds=bounds_list, 181 options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6}, 182 ) 183 184 if verbose and not result.success: 185 print(f"Optimisation warning: {result.message}") 186 187 self.p_ = float(result.x[0]) 188 self.q_ = float(result.x[1]) 189 self.loglik_ = float(-result.fun) 190 self.is_fitted_ = True 191 return self
Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.
The log-likelihood sums over all policies and all periods. Within each policy, the marginal Y_t is Negative Binomial with state-dependent shape and mean parameters derived from the forward recursion.
Parameters
histories : list[ClaimsHistory] Training portfolio. At least 10 policies recommended for stable parameter estimation. verbose : bool If True, print optimisation progress.
Returns
DynamicPoissonGammaModel self (fitted model).
193 def predict(self, history: ClaimsHistory) -> float: 194 """Compute the credibility factor for a single policy. 195 196 Runs the forward recursion using the fitted (p, q) parameters to 197 propagate the Gamma posterior through the observed claim sequence. 198 The credibility factor is the ratio of the posterior mean to the 199 a priori rate. 200 201 Parameters 202 ---------- 203 history : ClaimsHistory 204 The policy's claims history. 205 206 Returns 207 ------- 208 float 209 Credibility factor CF = mu_post / mu_prior. Multiply by 210 prior_premium to obtain the posterior premium. 211 212 Raises 213 ------ 214 RuntimeError 215 If fit() has not been called. 216 """ 217 self._check_fitted() 218 assert self.p_ is not None 219 assert self.q_ is not None 220 221 alpha_t, beta_t = self._forward_recursion( 222 history, self.p_, self.q_ 223 ) 224 mu = history.prior_premium 225 mu_post = (alpha_t / beta_t) * mu 226 cf = mu_post / mu 227 return max(cf, 0.0)
Compute the credibility factor for a single policy.
Runs the forward recursion using the fitted (p, q) parameters to propagate the Gamma posterior through the observed claim sequence. The credibility factor is the ratio of the posterior mean to the a priori rate.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
float Credibility factor CF = mu_post / mu_prior. Multiply by prior_premium to obtain the posterior premium.
Raises
RuntimeError If fit() has not been called.
229 def predict_posterior_params( 230 self, history: ClaimsHistory 231 ) -> tuple[float, float]: 232 """Return the posterior Gamma parameters (alpha, beta) after recursion. 233 234 The posterior distribution of Theta_{t+1} | Y_{1:t} is: 235 Gamma(alpha_{t+1}, beta_{t+1}) 236 with mean alpha_{t+1} / beta_{t+1}. 237 238 This is useful for uncertainty quantification: the posterior variance 239 is alpha / beta^2. 240 241 Parameters 242 ---------- 243 history : ClaimsHistory 244 The policy's claims history. 245 246 Returns 247 ------- 248 tuple[float, float] 249 (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma. 250 """ 251 self._check_fitted() 252 assert self.p_ is not None 253 assert self.q_ is not None 254 return self._forward_recursion(history, self.p_, self.q_)
Return the posterior Gamma parameters (alpha, beta) after recursion.
The posterior distribution of Theta_{t+1} | Y_{1:t} is: Gamma(alpha_{t+1}, beta_{t+1}) with mean alpha_{t+1} / beta_{t+1}.
This is useful for uncertainty quantification: the posterior variance is alpha / beta^2.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
tuple[float, float] (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.
256 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 257 """Score a batch of policies and return a Polars DataFrame. 258 259 Parameters 260 ---------- 261 histories : list[ClaimsHistory] 262 Policies to score. 263 264 Returns 265 ------- 266 pl.DataFrame 267 Columns: policy_id, prior_premium, credibility_factor, 268 posterior_premium, posterior_alpha, posterior_beta, 269 posterior_variance. 270 """ 271 self._check_fitted() 272 rows = [] 273 for h in histories: 274 cf = self.predict(h) 275 alpha_t, beta_t = self.predict_posterior_params(h) 276 rows.append( 277 { 278 "policy_id": h.policy_id, 279 "prior_premium": h.prior_premium, 280 "credibility_factor": cf, 281 "posterior_premium": h.prior_premium * cf, 282 "posterior_alpha": alpha_t, 283 "posterior_beta": beta_t, 284 "posterior_variance": alpha_t / (beta_t**2), 285 } 286 ) 287 return pl.DataFrame(rows)
Score a batch of policies and return a Polars DataFrame.
Parameters
histories : list[ClaimsHistory] Policies to score.
Returns
pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, posterior_alpha, posterior_beta, posterior_variance.
38class SurrogateModel: 39 """Surrogate model for large-portfolio Bayesian experience rating. 40 41 Fits an approximate g(.) function on a sub-portfolio using importance 42 sampling, then applies it to the full portfolio analytically. 43 44 The posterior premium for policy i is: 45 mu_post_i = mu_prior_i * exp(g(L_i, n_i)) 46 where L_i is the log-likelihood sufficient statistic and n_i is the 47 number of periods observed. 48 49 The form of g(.) is a polynomial in L and n: 50 g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n 51 52 This is flexible enough to capture Bühlmann-Straub as a special case 53 (linear in L = Y_bar) while allowing non-linear responses. 54 55 Parameters 56 ---------- 57 prior_model : callable or None 58 Function mapping ClaimsHistory -> float, returning the a priori 59 premium for any policy. If None, uses history.prior_premium directly. 60 This should be a GLM or similar model already fitted on the portfolio. 61 n_is_samples : int 62 Number of importance sampling draws from the prior. More samples 63 give a better IS estimate but are slower. Default 2000. 64 subsample_frac : float 65 Fraction of the portfolio to use in the sub-portfolio for fitting g. 66 Default 0.10 (10%). Minimum 20 policies regardless of fraction. 67 poly_degree : int 68 Polynomial degree for g(.). Degree 1 gives linear g; degree 2 69 allows quadratic response. Default 1. 70 random_state : int or None 71 Random seed for IS sampling and sub-portfolio selection. 72 sufficient_stat_fn : callable or None 73 Custom function mapping ClaimsHistory -> float for the sufficient 74 statistic. If None, uses the log-likelihood under Poisson(theta_ref). 75 76 Attributes 77 ---------- 78 theta_ : np.ndarray 79 Fitted WLS coefficients for g(.). 80 theta_ref_ : float 81 Reference Poisson intensity used in the sufficient statistic. 82 is_fitted_ : bool 83 True after fit() has been called. 84 """ 85 86 def __init__( 87 self, 88 prior_model: Optional[Callable[[ClaimsHistory], float]] = None, 89 n_is_samples: int = 2000, 90 subsample_frac: float = 0.10, 91 poly_degree: int = 1, 92 random_state: Optional[int] = None, 93 sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None, 94 ) -> None: 95 self.prior_model = prior_model 96 self.n_is_samples = n_is_samples 97 self.subsample_frac = subsample_frac 98 self.poly_degree = poly_degree 99 self.random_state = random_state 100 self.sufficient_stat_fn = sufficient_stat_fn 101 102 self.theta_: Optional[np.ndarray] = None 103 self.theta_ref_: Optional[float] = None 104 self.is_fitted_: bool = False 105 self._rng: Optional[np.random.Generator] = None 106 107 def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel": 108 """Fit the surrogate g(.) function on a representative sub-portfolio. 109 110 Steps: 111 1. Draw a sub-portfolio (random stratified selection by n_periods). 112 2. Estimate reference intensity theta_ref as portfolio mean frequency. 113 3. Draw IS samples Theta_k ~ prior once (shared across all policies). 114 4. For each sub-portfolio policy, compute IS-based Bayesian premium. 115 5. Fit g(.) by WLS minimising weighted squared log-credibility error. 116 117 Parameters 118 ---------- 119 histories : list[ClaimsHistory] 120 Full training portfolio. 121 122 Returns 123 ------- 124 SurrogateModel 125 self (fitted model). 126 """ 127 self._rng = np.random.default_rng(self.random_state) 128 129 # Estimate reference intensity 130 total_claims = sum(h.total_claims for h in histories) 131 total_exp = sum(h.total_exposure for h in histories) 132 self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0 133 134 # Select sub-portfolio 135 n_sub = max(20, int(len(histories) * self.subsample_frac)) 136 n_sub = min(n_sub, len(histories)) 137 idx = self._rng.choice(len(histories), size=n_sub, replace=False) 138 sub_histories = [histories[i] for i in idx] 139 140 # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior) 141 # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1 142 prior_alpha = 1.0 143 prior_beta = 1.0 144 theta_samples = self._rng.gamma( 145 prior_alpha, 1.0 / prior_beta, size=self.n_is_samples 146 ) 147 148 # Compute IS-based posterior premiums for sub-portfolio 149 sub_L = [] 150 sub_n = [] 151 sub_log_cf = [] 152 sub_weights = [] 153 154 for hist in sub_histories: 155 mu_prior = self._get_prior(hist) 156 L_i = self._sufficient_stat(hist) 157 n_i = float(hist.n_periods) 158 159 mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior) 160 161 if mu_prior <= 0.0 or mu_post_is <= 0.0: 162 continue 163 log_cf_i = np.log(mu_post_is / mu_prior) 164 w_i = 1.0 / max(se_i**2, 1e-8) 165 166 sub_L.append(L_i) 167 sub_n.append(n_i) 168 sub_log_cf.append(log_cf_i) 169 sub_weights.append(w_i) 170 171 if len(sub_L) < 4: 172 # Degenerate case: not enough data, return identity (CF = 1) 173 self.theta_ = np.zeros(self._n_features()) 174 self.is_fitted_ = True 175 return self 176 177 # Design matrix for g(L, n) 178 X = self._design_matrix(np.array(sub_L), np.array(sub_n)) 179 y = np.array(sub_log_cf) 180 w = np.array(sub_weights) 181 w = w / w.sum() 182 183 # WLS: solve (X^T W X) theta = X^T W y 184 W = np.diag(w) 185 XtWX = X.T @ W @ X 186 XtWy = X.T @ (w[:, None] * y[:, None]).ravel() 187 188 try: 189 self.theta_ = np.linalg.solve(XtWX, XtWy) 190 except np.linalg.LinAlgError: 191 self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0] 192 193 self.is_fitted_ = True 194 return self 195 196 def predict(self, history: ClaimsHistory) -> float: 197 """Compute the credibility factor for a single policy. 198 199 Parameters 200 ---------- 201 history : ClaimsHistory 202 The policy's claims history. 203 204 Returns 205 ------- 206 float 207 Credibility factor CF = exp(g(L, n)). Multiply by prior_premium 208 to obtain the posterior premium. 209 210 Raises 211 ------ 212 RuntimeError 213 If fit() has not been called. 214 """ 215 self._check_fitted() 216 assert self.theta_ is not None 217 218 L = self._sufficient_stat(history) 219 n = float(history.n_periods) 220 x = self._design_matrix(np.array([L]), np.array([n]))[0] 221 log_cf = float(x @ self.theta_) 222 return float(np.exp(log_cf)) 223 224 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 225 """Score a batch of policies and return a Polars DataFrame. 226 227 Parameters 228 ---------- 229 histories : list[ClaimsHistory] 230 Policies to score. 231 232 Returns 233 ------- 234 pl.DataFrame 235 Columns: policy_id, prior_premium, credibility_factor, 236 posterior_premium, sufficient_stat. 237 """ 238 self._check_fitted() 239 rows = [] 240 for h in histories: 241 cf = self.predict(h) 242 rows.append( 243 { 244 "policy_id": h.policy_id, 245 "prior_premium": h.prior_premium, 246 "credibility_factor": cf, 247 "posterior_premium": h.prior_premium * cf, 248 "sufficient_stat": self._sufficient_stat(h), 249 } 250 ) 251 return pl.DataFrame(rows) 252 253 # ------------------------------------------------------------------ 254 # Private helpers 255 # ------------------------------------------------------------------ 256 257 def _is_posterior( 258 self, 259 history: ClaimsHistory, 260 theta_samples: np.ndarray, 261 mu_prior: float, 262 ) -> tuple[float, float]: 263 """Estimate the Bayesian posterior premium via importance sampling. 264 265 Computes E[mu(Theta) | Y_{1:n}] using self-normalised IS: 266 E_hat = sum_k w_k * mu(Theta_k) / sum_k w_k 267 where w_k = exp(log-likelihood of Y_{1:n} | Theta_k) and 268 Theta_k ~ prior (Gamma(1,1) in this implementation). 269 270 Returns (posterior_mean, standard_error). 271 """ 272 assert history.exposures is not None 273 log_weights = np.zeros(len(theta_samples)) 274 for y_t, e_t in zip(history.claim_counts, history.exposures): 275 # Y_t | Theta ~ Poi(mu_prior * e_t * Theta) 276 rate = mu_prior * e_t * theta_samples 277 # Poisson log PMF: y*log(rate) - rate - log(y!) 278 log_weights += y_t * np.log(rate + 1e-300) - rate 279 280 # Stabilise: subtract max log weight 281 log_weights -= log_weights.max() 282 weights = np.exp(log_weights) 283 weights /= weights.sum() 284 285 # Posterior premium: E[mu_prior * Theta | data] = mu_prior * E[Theta | data] 286 post_mean = float(mu_prior * np.sum(weights * theta_samples)) 287 288 # Effective sample size for SE calculation 289 ess = 1.0 / np.sum(weights**2) 290 if ess < 2: 291 se = abs(post_mean) * 0.5 # pessimistic SE 292 else: 293 # Monte Carlo SE 294 mean_theta = np.sum(weights * theta_samples) 295 var_theta = np.sum(weights * (theta_samples - mean_theta) ** 2) 296 se = float(mu_prior * np.sqrt(var_theta / max(ess, 1))) 297 298 return post_mean, se 299 300 def _sufficient_stat(self, history: ClaimsHistory) -> float: 301 if self.sufficient_stat_fn is not None: 302 return self.sufficient_stat_fn(history) 303 return history_sufficient_stat(history, theta_ref=self.theta_ref_) 304 305 def _get_prior(self, history: ClaimsHistory) -> float: 306 if self.prior_model is not None: 307 return self.prior_model(history) 308 return history.prior_premium 309 310 def _design_matrix(self, L: np.ndarray, n: np.ndarray) -> np.ndarray: 311 """Build polynomial design matrix for g(L, n).""" 312 cols = [np.ones(len(L))] 313 if self.poly_degree >= 1: 314 cols.extend([L, n]) 315 if self.poly_degree >= 2: 316 cols.extend([L**2, n**2, L * n]) 317 return np.column_stack(cols) 318 319 def _n_features(self) -> int: 320 if self.poly_degree >= 2: 321 return 6 322 if self.poly_degree >= 1: 323 return 3 324 return 1 325 326 def _check_fitted(self) -> None: 327 if not self.is_fitted_: 328 raise RuntimeError( 329 "Model has not been fitted. Call fit() before predict()." 330 ) 331 332 def __repr__(self) -> str: 333 if self.is_fitted_: 334 return ( 335 f"SurrogateModel(poly_degree={self.poly_degree}, " 336 f"n_is_samples={self.n_is_samples})" 337 ) 338 return f"SurrogateModel(poly_degree={self.poly_degree}, unfitted)"
Surrogate model for large-portfolio Bayesian experience rating.
Fits an approximate g(.) function on a sub-portfolio using importance sampling, then applies it to the full portfolio analytically.
The posterior premium for policy i is:
mu_post_i = mu_prior_i * exp(g(L_i, n_i))
where L_i is the log-likelihood sufficient statistic and n_i is the number of periods observed.
The form of g(.) is a polynomial in L and n: g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n
This is flexible enough to capture Bühlmann-Straub as a special case (linear in L = Y_bar) while allowing non-linear responses.
Parameters
prior_model : callable or None Function mapping ClaimsHistory -> float, returning the a priori premium for any policy. If None, uses history.prior_premium directly. This should be a GLM or similar model already fitted on the portfolio. n_is_samples : int Number of importance sampling draws from the prior. More samples give a better IS estimate but are slower. Default 2000. subsample_frac : float Fraction of the portfolio to use in the sub-portfolio for fitting g. Default 0.10 (10%). Minimum 20 policies regardless of fraction. poly_degree : int Polynomial degree for g(.). Degree 1 gives linear g; degree 2 allows quadratic response. Default 1. random_state : int or None Random seed for IS sampling and sub-portfolio selection. sufficient_stat_fn : callable or None Custom function mapping ClaimsHistory -> float for the sufficient statistic. If None, uses the log-likelihood under Poisson(theta_ref).
Attributes
theta_ : np.ndarray Fitted WLS coefficients for g(.). theta_ref_ : float Reference Poisson intensity used in the sufficient statistic. is_fitted_ : bool True after fit() has been called.
86 def __init__( 87 self, 88 prior_model: Optional[Callable[[ClaimsHistory], float]] = None, 89 n_is_samples: int = 2000, 90 subsample_frac: float = 0.10, 91 poly_degree: int = 1, 92 random_state: Optional[int] = None, 93 sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None, 94 ) -> None: 95 self.prior_model = prior_model 96 self.n_is_samples = n_is_samples 97 self.subsample_frac = subsample_frac 98 self.poly_degree = poly_degree 99 self.random_state = random_state 100 self.sufficient_stat_fn = sufficient_stat_fn 101 102 self.theta_: Optional[np.ndarray] = None 103 self.theta_ref_: Optional[float] = None 104 self.is_fitted_: bool = False 105 self._rng: Optional[np.random.Generator] = None
107 def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel": 108 """Fit the surrogate g(.) function on a representative sub-portfolio. 109 110 Steps: 111 1. Draw a sub-portfolio (random stratified selection by n_periods). 112 2. Estimate reference intensity theta_ref as portfolio mean frequency. 113 3. Draw IS samples Theta_k ~ prior once (shared across all policies). 114 4. For each sub-portfolio policy, compute IS-based Bayesian premium. 115 5. Fit g(.) by WLS minimising weighted squared log-credibility error. 116 117 Parameters 118 ---------- 119 histories : list[ClaimsHistory] 120 Full training portfolio. 121 122 Returns 123 ------- 124 SurrogateModel 125 self (fitted model). 126 """ 127 self._rng = np.random.default_rng(self.random_state) 128 129 # Estimate reference intensity 130 total_claims = sum(h.total_claims for h in histories) 131 total_exp = sum(h.total_exposure for h in histories) 132 self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0 133 134 # Select sub-portfolio 135 n_sub = max(20, int(len(histories) * self.subsample_frac)) 136 n_sub = min(n_sub, len(histories)) 137 idx = self._rng.choice(len(histories), size=n_sub, replace=False) 138 sub_histories = [histories[i] for i in idx] 139 140 # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior) 141 # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1 142 prior_alpha = 1.0 143 prior_beta = 1.0 144 theta_samples = self._rng.gamma( 145 prior_alpha, 1.0 / prior_beta, size=self.n_is_samples 146 ) 147 148 # Compute IS-based posterior premiums for sub-portfolio 149 sub_L = [] 150 sub_n = [] 151 sub_log_cf = [] 152 sub_weights = [] 153 154 for hist in sub_histories: 155 mu_prior = self._get_prior(hist) 156 L_i = self._sufficient_stat(hist) 157 n_i = float(hist.n_periods) 158 159 mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior) 160 161 if mu_prior <= 0.0 or mu_post_is <= 0.0: 162 continue 163 log_cf_i = np.log(mu_post_is / mu_prior) 164 w_i = 1.0 / max(se_i**2, 1e-8) 165 166 sub_L.append(L_i) 167 sub_n.append(n_i) 168 sub_log_cf.append(log_cf_i) 169 sub_weights.append(w_i) 170 171 if len(sub_L) < 4: 172 # Degenerate case: not enough data, return identity (CF = 1) 173 self.theta_ = np.zeros(self._n_features()) 174 self.is_fitted_ = True 175 return self 176 177 # Design matrix for g(L, n) 178 X = self._design_matrix(np.array(sub_L), np.array(sub_n)) 179 y = np.array(sub_log_cf) 180 w = np.array(sub_weights) 181 w = w / w.sum() 182 183 # WLS: solve (X^T W X) theta = X^T W y 184 W = np.diag(w) 185 XtWX = X.T @ W @ X 186 XtWy = X.T @ (w[:, None] * y[:, None]).ravel() 187 188 try: 189 self.theta_ = np.linalg.solve(XtWX, XtWy) 190 except np.linalg.LinAlgError: 191 self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0] 192 193 self.is_fitted_ = True 194 return self
Fit the surrogate g(.) function on a representative sub-portfolio.
Steps:
- Draw a sub-portfolio (random stratified selection by n_periods).
- Estimate reference intensity theta_ref as portfolio mean frequency.
- Draw IS samples Theta_k ~ prior once (shared across all policies).
- For each sub-portfolio policy, compute IS-based Bayesian premium.
- Fit g(.) by WLS minimising weighted squared log-credibility error.
Parameters
histories : list[ClaimsHistory] Full training portfolio.
Returns
SurrogateModel self (fitted model).
196 def predict(self, history: ClaimsHistory) -> float: 197 """Compute the credibility factor for a single policy. 198 199 Parameters 200 ---------- 201 history : ClaimsHistory 202 The policy's claims history. 203 204 Returns 205 ------- 206 float 207 Credibility factor CF = exp(g(L, n)). Multiply by prior_premium 208 to obtain the posterior premium. 209 210 Raises 211 ------ 212 RuntimeError 213 If fit() has not been called. 214 """ 215 self._check_fitted() 216 assert self.theta_ is not None 217 218 L = self._sufficient_stat(history) 219 n = float(history.n_periods) 220 x = self._design_matrix(np.array([L]), np.array([n]))[0] 221 log_cf = float(x @ self.theta_) 222 return float(np.exp(log_cf))
Compute the credibility factor for a single policy.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
float Credibility factor CF = exp(g(L, n)). Multiply by prior_premium to obtain the posterior premium.
Raises
RuntimeError If fit() has not been called.
224 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 225 """Score a batch of policies and return a Polars DataFrame. 226 227 Parameters 228 ---------- 229 histories : list[ClaimsHistory] 230 Policies to score. 231 232 Returns 233 ------- 234 pl.DataFrame 235 Columns: policy_id, prior_premium, credibility_factor, 236 posterior_premium, sufficient_stat. 237 """ 238 self._check_fitted() 239 rows = [] 240 for h in histories: 241 cf = self.predict(h) 242 rows.append( 243 { 244 "policy_id": h.policy_id, 245 "prior_premium": h.prior_premium, 246 "credibility_factor": cf, 247 "posterior_premium": h.prior_premium * cf, 248 "sufficient_stat": self._sufficient_stat(h), 249 } 250 ) 251 return pl.DataFrame(rows)
Score a batch of policies and return a Polars DataFrame.
Parameters
histories : list[ClaimsHistory] Policies to score.
Returns
pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, sufficient_stat.
28def balance_calibrate( 29 predict_fn: Callable[[ClaimsHistory], float], 30 histories: list[ClaimsHistory], 31 exposure_weighted: bool = True, 32) -> CalibrationResult: 33 """Compute the portfolio-level balance calibration factor. 34 35 The calibration factor delta satisfies: 36 sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i] 37 38 For multiplicative models: 39 delta = sum(actual * exposure) / sum(posterior * exposure) 40 41 This function computes delta and returns it as a CalibrationResult. 42 Apply delta to all posterior premiums to restore portfolio balance. 43 44 Parameters 45 ---------- 46 predict_fn : callable 47 Function mapping ClaimsHistory -> float, returning the credibility 48 factor (not the posterior premium). Typically model.predict. 49 histories : list[ClaimsHistory] 50 Calibration portfolio. Typically the same data used for fitting, 51 but can be a hold-out period for forward-looking calibration. 52 exposure_weighted : bool 53 If True, weight each policy by its total exposure when computing 54 the balance check. If False, use simple sums. Default True. 55 56 Returns 57 ------- 58 CalibrationResult 59 Contains calibration_factor, sum_actual, sum_predicted, n_policies. 60 61 Examples 62 -------- 63 >>> model = StaticCredibilityModel().fit(histories) 64 >>> result = balance_calibrate(model.predict, histories) 65 >>> print(f"Calibration factor: {result.calibration_factor:.4f}") 66 >>> # Apply to predictions: 67 >>> posterior = prior * model.predict(h) * result.calibration_factor 68 """ 69 sum_actual = 0.0 70 sum_predicted = 0.0 71 72 for h in histories: 73 assert h.exposures is not None 74 cf = predict_fn(h) 75 posterior = h.prior_premium * cf 76 77 if exposure_weighted: 78 weight = h.total_exposure 79 else: 80 weight = 1.0 81 82 # Actual: total claims weighted by exposure (as a rate) 83 # We sum actual * exposure / total_exposure * weight to get total weighted claims 84 actual_rate = h.claim_frequency # total_claims / total_exposure 85 sum_actual += actual_rate * weight 86 sum_predicted += posterior * weight 87 88 if sum_predicted <= 0.0: 89 calibration_factor = 1.0 90 else: 91 calibration_factor = sum_actual / sum_predicted 92 93 return CalibrationResult( 94 calibration_factor=float(calibration_factor), 95 sum_actual=float(sum_actual), 96 sum_predicted=float(sum_predicted), 97 n_policies=len(histories), 98 )
Compute the portfolio-level balance calibration factor.
The calibration factor delta satisfies:
sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i]
For multiplicative models:
delta = sum(actual * exposure) / sum(posterior * exposure)
This function computes delta and returns it as a CalibrationResult. Apply delta to all posterior premiums to restore portfolio balance.
Parameters
predict_fn : callable Function mapping ClaimsHistory -> float, returning the credibility factor (not the posterior premium). Typically model.predict. histories : list[ClaimsHistory] Calibration portfolio. Typically the same data used for fitting, but can be a hold-out period for forward-looking calibration. exposure_weighted : bool If True, weight each policy by its total exposure when computing the balance check. If False, use simple sums. Default True.
Returns
CalibrationResult Contains calibration_factor, sum_actual, sum_predicted, n_policies.
Examples
>>> model = StaticCredibilityModel().fit(histories)
>>> result = balance_calibrate(model.predict, histories)
>>> print(f"Calibration factor: {result.calibration_factor:.4f}")
>>> # Apply to predictions:
>>> posterior = prior * model.predict(h) * result.calibration_factor
101def apply_calibration( 102 credibility_factor: float, 103 calibration_result: CalibrationResult, 104) -> float: 105 """Apply a calibration factor to a single credibility factor. 106 107 Parameters 108 ---------- 109 credibility_factor : float 110 The raw credibility factor from a model's predict(). 111 calibration_result : CalibrationResult 112 Output of balance_calibrate(). 113 114 Returns 115 ------- 116 float 117 Calibrated credibility factor = CF * delta. 118 """ 119 return credibility_factor * calibration_result.calibration_factor
Apply a calibration factor to a single credibility factor.
Parameters
credibility_factor : float The raw credibility factor from a model's predict(). calibration_result : CalibrationResult Output of balance_calibrate().
Returns
float Calibrated credibility factor = CF * delta.
122def calibrated_predict_fn( 123 predict_fn: Callable[[ClaimsHistory], float], 124 calibration_result: CalibrationResult, 125) -> Callable[[ClaimsHistory], float]: 126 """Wrap a predict function to apply calibration automatically. 127 128 Parameters 129 ---------- 130 predict_fn : callable 131 Original predict function (ClaimsHistory -> credibility_factor). 132 calibration_result : CalibrationResult 133 Output of balance_calibrate(). 134 135 Returns 136 ------- 137 callable 138 New predict function that applies calibration to each prediction. 139 140 Examples 141 -------- 142 >>> cal = balance_calibrate(model.predict, histories) 143 >>> calibrated = calibrated_predict_fn(model.predict, cal) 144 >>> cf = calibrated(new_history) # already calibrated 145 """ 146 delta = calibration_result.calibration_factor 147 148 def _calibrated(history: ClaimsHistory) -> float: 149 return predict_fn(history) * delta 150 151 return _calibrated
Wrap a predict function to apply calibration automatically.
Parameters
predict_fn : callable Original predict function (ClaimsHistory -> credibility_factor). calibration_result : CalibrationResult Output of balance_calibrate().
Returns
callable New predict function that applies calibration to each prediction.
Examples
>>> cal = balance_calibrate(model.predict, histories)
>>> calibrated = calibrated_predict_fn(model.predict, cal)
>>> cf = calibrated(new_history) # already calibrated
154def balance_report( 155 predict_fn: Callable[[ClaimsHistory], float], 156 histories: list[ClaimsHistory], 157 by_n_periods: bool = False, 158) -> pl.DataFrame: 159 """Generate a portfolio-level balance report. 160 161 Shows the actual vs predicted claim frequency, and the credibility 162 factor distribution, optionally broken down by number of observed 163 periods (to check whether newer or longer-tenured policies have 164 systematic bias). 165 166 Parameters 167 ---------- 168 predict_fn : callable 169 Function mapping ClaimsHistory -> credibility_factor. 170 histories : list[ClaimsHistory] 171 Portfolio to assess. 172 by_n_periods : bool 173 If True, break down by number of observed periods. Default False. 174 175 Returns 176 ------- 177 pl.DataFrame 178 Summary statistics: policy_id, n_periods, actual_frequency, 179 prior_premium, credibility_factor, posterior_premium, 180 and residual (actual / posterior). 181 """ 182 rows = [] 183 for h in histories: 184 assert h.exposures is not None 185 cf = predict_fn(h) 186 posterior = h.prior_premium * cf 187 actual_freq = h.claim_frequency 188 residual = actual_freq / posterior if posterior > 0 else float("nan") 189 rows.append( 190 { 191 "policy_id": h.policy_id, 192 "n_periods": h.n_periods, 193 "total_exposure": h.total_exposure, 194 "actual_frequency": actual_freq, 195 "prior_premium": h.prior_premium, 196 "credibility_factor": cf, 197 "posterior_premium": posterior, 198 "residual": residual, 199 } 200 ) 201 df = pl.DataFrame(rows) 202 203 if by_n_periods: 204 return ( 205 df.group_by("n_periods") 206 .agg( 207 [ 208 pl.col("actual_frequency").mean().alias("mean_actual"), 209 pl.col("posterior_premium").mean().alias("mean_posterior"), 210 pl.col("credibility_factor").mean().alias("mean_cf"), 211 pl.col("residual").mean().alias("mean_residual"), 212 pl.len().alias("n_policies"), 213 ] 214 ) 215 .sort("n_periods") 216 ) 217 218 return df
Generate a portfolio-level balance report.
Shows the actual vs predicted claim frequency, and the credibility factor distribution, optionally broken down by number of observed periods (to check whether newer or longer-tenured policies have systematic bias).
Parameters
predict_fn : callable Function mapping ClaimsHistory -> credibility_factor. histories : list[ClaimsHistory] Portfolio to assess. by_n_periods : bool If True, break down by number of observed periods. Default False.
Returns
pl.DataFrame Summary statistics: policy_id, n_periods, actual_frequency, prior_premium, credibility_factor, posterior_premium, and residual (actual / posterior).
16def credibility_factor(posterior_premium: float, prior_premium: float) -> float: 17 """Compute the multiplicative credibility factor. 18 19 The credibility factor CF satisfies: 20 posterior_premium = prior_premium * CF 21 22 Parameters 23 ---------- 24 posterior_premium : float 25 The experience-adjusted (a posteriori) premium. 26 prior_premium : float 27 The a priori (GLM-based) premium. 28 29 Returns 30 ------- 31 float 32 Credibility factor. Values > 1 indicate the policy is worse than 33 the GLM base rate; values < 1 indicate better-than-average experience. 34 35 Raises 36 ------ 37 ValueError 38 If prior_premium is zero or negative. 39 """ 40 if prior_premium <= 0.0: 41 raise ValueError(f"prior_premium must be positive, got {prior_premium}") 42 return posterior_premium / prior_premium
Compute the multiplicative credibility factor.
The credibility factor CF satisfies:
posterior_premium = prior_premium * CF
Parameters
posterior_premium : float The experience-adjusted (a posteriori) premium. prior_premium : float The a priori (GLM-based) premium.
Returns
float Credibility factor. Values > 1 indicate the policy is worse than the GLM base rate; values < 1 indicate better-than-average experience.
Raises
ValueError If prior_premium is zero or negative.
68def seniority_weights( 69 n_periods: int, 70 p: float, 71 q: float, 72 exposures: list[float] | None = None, 73) -> np.ndarray: 74 """Compute seniority (recency) weights for claims history periods. 75 76 In the dynamic Poisson-gamma model, the effective weight on period t's 77 claims decays geometrically with age. This function computes the 78 relative weight assigned to each period in a t-period history when 79 fitting or predicting. 80 81 The weight assigned to period s (0-indexed from oldest) in a history 82 of length t is proportional to (p*q)^(t-s). The most recent period 83 receives weight 1; the oldest receives weight (p*q)^(t-1). 84 85 Parameters 86 ---------- 87 n_periods : int 88 Number of observation periods. 89 p : float 90 Transition parameter controlling how strongly the current state 91 reverts to mean. Must be in (0, 1]. 92 q : float 93 Decay parameter controlling how quickly older claims are 94 downweighted. Must be in (0, 1]. 95 exposures : list[float] or None 96 Per-period exposure weights. If None, uniform exposure is assumed. 97 98 Returns 99 ------- 100 np.ndarray 101 Array of shape (n_periods,) with seniority weights, normalised 102 to sum to 1. Most recent period is last. 103 104 Examples 105 -------- 106 >>> seniority_weights(3, p=0.9, q=0.8) 107 array([0.182..., 0.274..., 0.543...]) # most recent gets highest weight 108 """ 109 if not (0 < p <= 1.0): 110 raise ValueError(f"p must be in (0, 1], got {p}") 111 if not (0 < q <= 1.0): 112 raise ValueError(f"q must be in (0, 1], got {q}") 113 114 decay = p * q 115 # Weights for periods 0, 1, ..., n-1 (oldest to most recent) 116 raw = np.array([decay ** (n_periods - 1 - s) for s in range(n_periods)]) 117 118 if exposures is not None: 119 raw = raw * np.asarray(exposures, dtype=float) 120 121 total = raw.sum() 122 if total == 0.0: 123 return np.ones(n_periods) / n_periods 124 return raw / total
Compute seniority (recency) weights for claims history periods.
In the dynamic Poisson-gamma model, the effective weight on period t's claims decays geometrically with age. This function computes the relative weight assigned to each period in a t-period history when fitting or predicting.
The weight assigned to period s (0-indexed from oldest) in a history of length t is proportional to (pq)^(t-s). The most recent period receives weight 1; the oldest receives weight (pq)^(t-1).
Parameters
n_periods : int Number of observation periods. p : float Transition parameter controlling how strongly the current state reverts to mean. Must be in (0, 1]. q : float Decay parameter controlling how quickly older claims are downweighted. Must be in (0, 1]. exposures : list[float] or None Per-period exposure weights. If None, uniform exposure is assumed.
Returns
np.ndarray Array of shape (n_periods,) with seniority weights, normalised to sum to 1. Most recent period is last.
Examples
>>> seniority_weights(3, p=0.9, q=0.8)
array([0.182..., 0.274..., 0.543...]) # most recent gets highest weight
127def exposure_weighted_mean( 128 counts: list[int], 129 exposures: list[float], 130) -> float: 131 """Compute exposure-weighted mean claim frequency. 132 133 Parameters 134 ---------- 135 counts : list[int] 136 Claim counts per period. 137 exposures : list[float] 138 Exposure (e.g., years on risk) per period. 139 140 Returns 141 ------- 142 float 143 Total claims divided by total exposure. Returns 0.0 if total 144 exposure is zero. 145 """ 146 total_exp = sum(exposures) 147 if total_exp == 0.0: 148 return 0.0 149 return sum(counts) / total_exp
Compute exposure-weighted mean claim frequency.
Parameters
counts : list[int] Claim counts per period. exposures : list[float] Exposure (e.g., years on risk) per period.
Returns
float Total claims divided by total exposure. Returns 0.0 if total exposure is zero.
152def history_sufficient_stat( 153 history: ClaimsHistory, 154 theta_ref: float | None = None, 155) -> float: 156 """Compute the log-likelihood sufficient statistic for a claims history. 157 158 For Poisson models with a reference intensity theta_ref, the sufficient 159 statistic is the log-likelihood of the observed claim counts: 160 161 L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ] 162 163 When theta_ref is None, uses the empirical frequency as reference. 164 165 This is the input to the surrogate model's g(.) function. 166 167 Parameters 168 ---------- 169 history : ClaimsHistory 170 The policy's claims history. 171 theta_ref : float or None 172 Reference intensity (claims per exposure unit). If None, uses the 173 empirical claim frequency from the history. 174 175 Returns 176 ------- 177 float 178 Log-likelihood sufficient statistic. 179 """ 180 assert history.exposures is not None 181 if theta_ref is None: 182 theta_ref = history.claim_frequency 183 if theta_ref == 0.0: 184 theta_ref = 1e-6 # avoid log(0) 185 186 log_lik = 0.0 187 for y_t, e_t in zip(history.claim_counts, history.exposures): 188 rate = theta_ref * e_t 189 log_lik += y_t * np.log(rate) - rate 190 return log_lik
Compute the log-likelihood sufficient statistic for a claims history.
For Poisson models with a reference intensity theta_ref, the sufficient statistic is the log-likelihood of the observed claim counts:
L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ]
When theta_ref is None, uses the empirical frequency as reference.
This is the input to the surrogate model's g(.) function.
Parameters
history : ClaimsHistory The policy's claims history. theta_ref : float or None Reference intensity (claims per exposure unit). If None, uses the empirical claim frequency from the history.
Returns
float Log-likelihood sufficient statistic.