insurance_credibility.experience
Individual policy-level Bayesian posterior experience rating.
Four progressive model tiers for updating a priori GLM premiums with claims history:
- StaticCredibilityModel — Bühlmann-Straub at individual policy level
- DynamicPoissonGammaModel — State-space model with seniority weighting (Ahn et al. 2023)
- SurrogateModel — IS-based surrogate for intractable posteriors (Calcetero et al. 2024)
- DeepAttentionModel — Neural attention credibility (Wüthrich 2024) [requires torch]
All models share the same interface:
model.fit(histories) # list[ClaimsHistory] cf = model.predict(history) # returns credibility factor (float) posterior = prior * cf
Quick start::
from insurance_credibility.experience import ClaimsHistory, StaticCredibilityModel
from insurance_credibility.experience import balance_calibrate
histories = [
ClaimsHistory("POL001", periods=[1, 2, 3], claim_counts=[0, 1, 0],
exposures=[1.0, 1.0, 0.8], prior_premium=450.0),
ClaimsHistory("POL002", periods=[1, 2, 3], claim_counts=[2, 1, 2],
exposures=[1.0, 1.0, 1.0], prior_premium=450.0),
]
model = StaticCredibilityModel()
model.fit(histories)
cf = model.predict(histories[0])
1""" 2Individual policy-level Bayesian posterior experience rating. 3 4Four progressive model tiers for updating a priori GLM premiums with 5claims history: 6 71. StaticCredibilityModel — Bühlmann-Straub at individual policy level 82. DynamicPoissonGammaModel — State-space model with seniority weighting (Ahn et al. 2023) 93. SurrogateModel — IS-based surrogate for intractable posteriors (Calcetero et al. 2024) 104. DeepAttentionModel — Neural attention credibility (Wüthrich 2024) [requires torch] 11 12All models share the same interface: 13 model.fit(histories) # list[ClaimsHistory] 14 cf = model.predict(history) # returns credibility factor (float) 15 posterior = prior * cf 16 17Quick start:: 18 19 from insurance_credibility.experience import ClaimsHistory, StaticCredibilityModel 20 from insurance_credibility.experience import balance_calibrate 21 22 histories = [ 23 ClaimsHistory("POL001", periods=[1, 2, 3], claim_counts=[0, 1, 0], 24 exposures=[1.0, 1.0, 0.8], prior_premium=450.0), 25 ClaimsHistory("POL002", periods=[1, 2, 3], claim_counts=[2, 1, 2], 26 exposures=[1.0, 1.0, 1.0], prior_premium=450.0), 27 ] 28 29 model = StaticCredibilityModel() 30 model.fit(histories) 31 cf = model.predict(histories[0]) 32""" 33 34from ._types import CalibrationResult, ClaimsHistory 35from .calibration import ( 36 apply_calibration, 37 balance_calibrate, 38 balance_report, 39 calibrated_predict_fn, 40) 41from .dynamic import DynamicPoissonGammaModel 42from .static import StaticCredibilityModel 43from .surrogate import SurrogateModel 44from .utils import ( 45 credibility_factor, 46 exposure_weighted_mean, 47 history_sufficient_stat, 48 posterior_premium, 49 seniority_weights, 50) 51 52__all__ = [ 53 # Data types 54 "ClaimsHistory", 55 "CalibrationResult", 56 # Models 57 "StaticCredibilityModel", 58 "DynamicPoissonGammaModel", 59 "SurrogateModel", 60 "DeepAttentionModel", 61 # Calibration 62 "balance_calibrate", 63 "apply_calibration", 64 "calibrated_predict_fn", 65 "balance_report", 66 # Utilities 67 "credibility_factor", 68 "posterior_premium", 69 "seniority_weights", 70 "exposure_weighted_mean", 71 "history_sufficient_stat", 72] 73 74 75def __getattr__(name: str): 76 """Lazy import for optional torch-dependent classes.""" 77 if name == "DeepAttentionModel": 78 from .attention import DeepAttentionModel 79 80 return DeepAttentionModel 81 raise AttributeError(f"module 'insurance_credibility.experience' has no attribute {name!r}")
15@dataclass 16class ClaimsHistory: 17 """Claims history for a single policy. 18 19 This is the fundamental input type for all experience rating models. 20 Each instance represents one policy's observed claims sequence across 21 one or more periods (typically policy years). 22 23 Parameters 24 ---------- 25 policy_id : str 26 Unique identifier for the policy. 27 periods : list[int] 28 Year indices for each observation period. Typically [1, 2, 3, ...]. 29 Must be non-empty. Periods need not be contiguous but must be unique 30 and in ascending order. 31 claim_counts : list[int] 32 Observed claim count in each period. Must have the same length as 33 ``periods``. Non-negative integers. 34 claim_amounts : list[float] or None 35 Aggregate claim amount (incurred) in each period. Set to None for 36 frequency-only models. When provided, must have the same length as 37 ``periods``. 38 exposures : list[float] 39 Risk exposure in each period (e.g., years on risk, vehicle-years). 40 Must have the same length as ``periods``. Defaults to 1.0 for each 41 period if not supplied. 42 prior_premium : float 43 The a priori (GLM-based) expected annual claim cost for the next 44 period. This is the base rate that experience rating will adjust. 45 Must be strictly positive. 46 47 Examples 48 -------- 49 >>> history = ClaimsHistory( 50 ... policy_id="POL001", 51 ... periods=[1, 2, 3], 52 ... claim_counts=[0, 1, 0], 53 ... exposures=[1.0, 1.0, 0.8], 54 ... prior_premium=450.0, 55 ... ) 56 """ 57 58 policy_id: str 59 periods: list[int] 60 claim_counts: list[int] 61 claim_amounts: Optional[list[float]] = None 62 exposures: Optional[list[float]] = None 63 prior_premium: float = 1.0 64 65 def __post_init__(self) -> None: 66 self._validate() 67 # Normalise exposures to a list if not provided 68 if self.exposures is None: 69 self.exposures = [1.0] * len(self.periods) 70 71 def _validate(self) -> None: 72 if len(self.periods) == 0: 73 raise ValueError("periods must be non-empty") 74 if len(self.claim_counts) != len(self.periods): 75 raise ValueError( 76 f"claim_counts length ({len(self.claim_counts)}) must match " 77 f"periods length ({len(self.periods)})" 78 ) 79 if self.claim_amounts is not None and len(self.claim_amounts) != len( 80 self.periods 81 ): 82 raise ValueError( 83 f"claim_amounts length ({len(self.claim_amounts)}) must match " 84 f"periods length ({len(self.periods)})" 85 ) 86 exposures = self.exposures 87 if exposures is not None: 88 if len(exposures) != len(self.periods): 89 raise ValueError( 90 f"exposures length ({len(exposures)}) must match " 91 f"periods length ({len(self.periods)})" 92 ) 93 if any(e <= 0.0 for e in exposures): 94 raise ValueError("All exposures must be strictly positive") 95 if any(c < 0 for c in self.claim_counts): 96 raise ValueError("claim_counts must be non-negative") 97 if self.prior_premium <= 0.0: 98 raise ValueError("prior_premium must be strictly positive") 99 if len(self.periods) != len(set(self.periods)): 100 raise ValueError("periods must be unique") 101 102 @property 103 def n_periods(self) -> int: 104 """Number of observation periods.""" 105 return len(self.periods) 106 107 @property 108 def total_claims(self) -> int: 109 """Total claim count across all periods.""" 110 return sum(self.claim_counts) 111 112 @property 113 def total_exposure(self) -> float: 114 """Total exposure across all periods.""" 115 assert self.exposures is not None 116 return sum(self.exposures) 117 118 @property 119 def claim_frequency(self) -> float: 120 """Observed claim frequency (claims per unit exposure). 121 122 Returns 0.0 if total exposure is zero (guard only — exposures 123 are validated to be positive). 124 """ 125 total_exp = self.total_exposure 126 if total_exp == 0.0: 127 return 0.0 128 return self.total_claims / total_exp 129 130 @property 131 def exposure_weighted_counts(self) -> list[float]: 132 """Claim counts adjusted by exposure: Y_t / e_t per period.""" 133 assert self.exposures is not None 134 return [c / e for c, e in zip(self.claim_counts, self.exposures)]
Claims history for a single policy.
This is the fundamental input type for all experience rating models. Each instance represents one policy's observed claims sequence across one or more periods (typically policy years).
Parameters
policy_id : str
Unique identifier for the policy.
periods : list[int]
Year indices for each observation period. Typically [1, 2, 3, ...].
Must be non-empty. Periods need not be contiguous but must be unique
and in ascending order.
claim_counts : list[int]
Observed claim count in each period. Must have the same length as
periods. Non-negative integers.
claim_amounts : list[float] or None
Aggregate claim amount (incurred) in each period. Set to None for
frequency-only models. When provided, must have the same length as
periods.
exposures : list[float]
Risk exposure in each period (e.g., years on risk, vehicle-years).
Must have the same length as periods. Defaults to 1.0 for each
period if not supplied.
prior_premium : float
The a priori (GLM-based) expected annual claim cost for the next
period. This is the base rate that experience rating will adjust.
Must be strictly positive.
Examples
>>> history = ClaimsHistory(
... policy_id="POL001",
... periods=[1, 2, 3],
... claim_counts=[0, 1, 0],
... exposures=[1.0, 1.0, 0.8],
... prior_premium=450.0,
... )
102 @property 103 def n_periods(self) -> int: 104 """Number of observation periods.""" 105 return len(self.periods)
Number of observation periods.
107 @property 108 def total_claims(self) -> int: 109 """Total claim count across all periods.""" 110 return sum(self.claim_counts)
Total claim count across all periods.
112 @property 113 def total_exposure(self) -> float: 114 """Total exposure across all periods.""" 115 assert self.exposures is not None 116 return sum(self.exposures)
Total exposure across all periods.
118 @property 119 def claim_frequency(self) -> float: 120 """Observed claim frequency (claims per unit exposure). 121 122 Returns 0.0 if total exposure is zero (guard only — exposures 123 are validated to be positive). 124 """ 125 total_exp = self.total_exposure 126 if total_exp == 0.0: 127 return 0.0 128 return self.total_claims / total_exp
Observed claim frequency (claims per unit exposure).
Returns 0.0 if total exposure is zero (guard only — exposures are validated to be positive).
130 @property 131 def exposure_weighted_counts(self) -> list[float]: 132 """Claim counts adjusted by exposure: Y_t / e_t per period.""" 133 assert self.exposures is not None 134 return [c / e for c, e in zip(self.claim_counts, self.exposures)]
Claim counts adjusted by exposure: Y_t / e_t per period.
137@dataclass 138class CalibrationResult: 139 """Output of the balance calibration step. 140 141 Parameters 142 ---------- 143 calibration_factor : float 144 Multiplicative factor applied to all posterior premiums to restore 145 portfolio balance. Values close to 1.0 indicate the model was 146 already approximately balanced before calibration. 147 sum_actual : float 148 Sum of observed claim counts (weighted by exposure) over the 149 calibration portfolio. 150 sum_predicted : float 151 Sum of posterior premiums (weighted by exposure) before calibration. 152 n_policies : int 153 Number of policies in the calibration portfolio. 154 """ 155 156 calibration_factor: float 157 sum_actual: float 158 sum_predicted: float 159 n_policies: int 160 161 @property 162 def relative_bias(self) -> float: 163 """Relative bias before calibration: (predicted - actual) / actual.""" 164 if self.sum_actual == 0.0: 165 return float("nan") 166 return (self.sum_predicted - self.sum_actual) / self.sum_actual
Output of the balance calibration step.
Parameters
calibration_factor : float Multiplicative factor applied to all posterior premiums to restore portfolio balance. Values close to 1.0 indicate the model was already approximately balanced before calibration. sum_actual : float Sum of observed claim counts (weighted by exposure) over the calibration portfolio. sum_predicted : float Sum of posterior premiums (weighted by exposure) before calibration. n_policies : int Number of policies in the calibration portfolio.
161 @property 162 def relative_bias(self) -> float: 163 """Relative bias before calibration: (predicted - actual) / actual.""" 164 if self.sum_actual == 0.0: 165 return float("nan") 166 return (self.sum_predicted - self.sum_actual) / self.sum_actual
Relative bias before calibration: (predicted - actual) / actual.
32class StaticCredibilityModel: 33 """Bühlmann-Straub credibility at individual policy level. 34 35 Fits the structural parameter kappa from a portfolio of policy histories 36 using the method of moments estimator. Predicts a multiplicative 37 credibility factor for each policy. 38 39 The model assumes: 40 Y_{t} | Theta ~ Poisson(mu * Theta * e_t) 41 Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2 42 43 This gives the credibility approximation: 44 E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t) 45 46 Parameters 47 ---------- 48 kappa : float or None 49 Known credibility coefficient kappa = sigma^2 / tau^2. If None, 50 kappa is estimated from the portfolio during fit(). Set explicitly 51 to share a kappa estimated from a larger portfolio. 52 min_kappa : float 53 Lower bound on the estimated kappa. Prevents omega from reaching 1 54 when the portfolio is very homogeneous. Default 0.1. 55 max_kappa : float 56 Upper bound on the estimated kappa. Prevents degenerate credibility 57 for very heterogeneous portfolios. Default 1000.0. 58 59 Attributes 60 ---------- 61 kappa_ : float 62 Fitted credibility coefficient (after calling fit()). 63 within_variance_ : float 64 Estimated within-policy variance component sigma^2. 65 between_variance_ : float 66 Estimated between-policy variance component tau^2. 67 portfolio_mean_ : float 68 Exposure-weighted mean claim frequency across the portfolio. 69 is_fitted_ : bool 70 True after fit() has been called. 71 72 Examples 73 -------- 74 >>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0), 75 ... ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)] 76 >>> model = StaticCredibilityModel() 77 >>> model.fit(histories) 78 StaticCredibilityModel(kappa=...) 79 >>> cf = model.predict(histories[0]) 80 >>> posterior = histories[0].prior_premium * cf 81 """ 82 83 def __init__( 84 self, 85 kappa: Optional[float] = None, 86 min_kappa: float = 0.1, 87 max_kappa: float = 1000.0, 88 ) -> None: 89 self.kappa = kappa 90 self.min_kappa = min_kappa 91 self.max_kappa = max_kappa 92 93 # Set after fit() 94 self.kappa_: Optional[float] = None 95 self.within_variance_: Optional[float] = None 96 self.between_variance_: Optional[float] = None 97 self.portfolio_mean_: Optional[float] = None 98 self.is_fitted_: bool = False 99 100 def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel": 101 """Estimate structural parameters from a portfolio of policy histories. 102 103 Uses the method of moments estimator for Bühlmann-Straub: 104 - within-policy variance estimated from period-to-period fluctuations 105 - between-policy variance estimated from cross-sectional spread 106 107 Parameters 108 ---------- 109 histories : list[ClaimsHistory] 110 Portfolio of policy histories. Must contain at least 2 policies 111 with at least 1 period each for parameter estimation. 112 113 Returns 114 ------- 115 StaticCredibilityModel 116 self (fitted model). 117 118 Raises 119 ------ 120 ValueError 121 If fewer than 2 histories are provided, or if all histories have 122 only 1 period (insufficient for within-variance estimation). 123 """ 124 if len(histories) < 2: 125 raise ValueError( 126 "At least 2 policy histories are required to estimate kappa" 127 ) 128 129 if self.kappa is not None: 130 # Use provided kappa, just estimate portfolio mean 131 self.kappa_ = self.kappa 132 self.within_variance_ = float("nan") 133 self.between_variance_ = float("nan") 134 self.portfolio_mean_ = self._portfolio_mean(histories) 135 self.is_fitted_ = True 136 return self 137 138 self.kappa_, self.within_variance_, self.between_variance_ = ( 139 self._estimate_kappa(histories) 140 ) 141 self.portfolio_mean_ = self._portfolio_mean(histories) 142 self.is_fitted_ = True 143 return self 144 145 def predict(self, history: ClaimsHistory) -> float: 146 """Compute the credibility factor for a single policy. 147 148 Parameters 149 ---------- 150 history : ClaimsHistory 151 The policy's claims history. The prior_premium field is used as 152 the a priori rate mu. 153 154 Returns 155 ------- 156 float 157 Credibility factor CF in (0, inf). Multiply by prior_premium to 158 obtain the posterior premium. 159 CF = omega_t * (Y_bar / mu) + (1 - omega_t) 160 161 Raises 162 ------ 163 RuntimeError 164 If fit() has not been called. 165 """ 166 self._check_fitted() 167 assert self.kappa_ is not None 168 assert self.exposures_ok(history) 169 170 assert history.exposures is not None 171 t = history.total_exposure # effective credibility weight 172 omega_t = t / (t + self.kappa_) 173 174 mu = history.prior_premium # a priori rate (expected frequency) 175 y_bar = history.claim_frequency # empirical frequency 176 177 # Credibility factor: omega * (empirical / prior) + (1 - omega) 178 if mu <= 0.0: 179 return 1.0 180 181 cf = omega_t * (y_bar / mu) + (1.0 - omega_t) 182 return max(cf, 0.0) # guard against negative values from rounding 183 184 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 185 """Score a batch of policies and return a Polars DataFrame. 186 187 Parameters 188 ---------- 189 histories : list[ClaimsHistory] 190 Policies to score. 191 192 Returns 193 ------- 194 pl.DataFrame 195 Columns: policy_id, prior_premium, credibility_factor, 196 posterior_premium. 197 """ 198 self._check_fitted() 199 rows = [] 200 for h in histories: 201 cf = self.predict(h) 202 rows.append( 203 { 204 "policy_id": h.policy_id, 205 "prior_premium": h.prior_premium, 206 "credibility_factor": cf, 207 "posterior_premium": h.prior_premium * cf, 208 } 209 ) 210 return pl.DataFrame(rows) 211 212 def credibility_weight(self, history: ClaimsHistory) -> float: 213 """Return the Bühlmann credibility weight omega_t for a policy. 214 215 Parameters 216 ---------- 217 history : ClaimsHistory 218 The policy's claims history. 219 220 Returns 221 ------- 222 float 223 omega_t = t / (t + kappa) in [0, 1], where t is total exposure. 224 """ 225 self._check_fitted() 226 assert self.kappa_ is not None 227 assert history.exposures is not None 228 t = history.total_exposure 229 return t / (t + self.kappa_) 230 231 # ------------------------------------------------------------------ 232 # Private helpers 233 # ------------------------------------------------------------------ 234 235 def _estimate_kappa( 236 self, histories: list[ClaimsHistory] 237 ) -> tuple[float, float, float]: 238 """Bühlmann-Straub method of moments estimation. 239 240 Returns (kappa, within_variance, between_variance). 241 """ 242 # Compute per-policy exposure-weighted mean frequencies 243 assert all(h.exposures is not None for h in histories) 244 n = len(histories) 245 246 exposures = [np.array(h.exposures, dtype=float) for h in histories] 247 counts = [np.array(h.claim_counts, dtype=float) for h in histories] 248 rates = [c / e for c, e in zip(counts, exposures)] # Y_t / e_t 249 250 total_exp = [float(e.sum()) for e in exposures] 251 y_bars = [float((c.sum()) / t) for c, t in zip(counts, total_exp)] 252 253 grand_total_exp = sum(total_exp) 254 grand_mean = sum( 255 c.sum() for c in counts 256 ) / grand_total_exp # E_i[Y_bar_i] 257 258 # --- Within-policy variance estimate (sigma^2) --- 259 # sigma^2 = E[Var(Y_t | Theta)] = mu (for Poisson) 260 # MoM estimator: average exposure-weighted within-period variance 261 within_num = 0.0 262 within_denom = 0.0 263 for i, (rate_i, exp_i, y_bar_i) in enumerate( 264 zip(rates, exposures, y_bars) 265 ): 266 t_i = len(rate_i) 267 if t_i < 2: 268 continue 269 for t in range(t_i): 270 within_num += exp_i[t] * (rate_i[t] - y_bar_i) ** 2 271 within_denom += 1 272 if within_denom == 0 or within_num == 0: 273 # Fallback: use portfolio mean as within-variance estimate 274 sigma2 = grand_mean 275 else: 276 # Unbiased estimator: divide by (T_i - 1) per policy 277 total_periods = sum(len(r) for r in rates) 278 denom2 = total_periods - n 279 if denom2 <= 0: 280 sigma2 = grand_mean 281 else: 282 sigma2 = within_num / denom2 283 284 # --- Between-policy variance estimate (tau^2) --- 285 # tau^2 = Var(Theta) 286 # Between-policy sum of squares 287 c_const = grand_total_exp - sum(t**2 for t in total_exp) / grand_total_exp 288 if c_const <= 0: 289 tau2 = max(sigma2 / 10.0, 1e-6) 290 else: 291 between_ss = sum( 292 t_i * (y_bar_i - grand_mean) ** 2 293 for t_i, y_bar_i in zip(total_exp, y_bars) 294 ) 295 tau2 = max((between_ss - (n - 1) * sigma2) / c_const, 1e-8) 296 297 kappa = sigma2 / tau2 298 kappa = float(np.clip(kappa, self.min_kappa, self.max_kappa)) 299 return kappa, float(sigma2), float(tau2) 300 301 def _portfolio_mean(self, histories: list[ClaimsHistory]) -> float: 302 total_claims = sum(h.total_claims for h in histories) 303 total_exp = sum(h.total_exposure for h in histories) 304 if total_exp == 0.0: 305 return 0.0 306 return total_claims / total_exp 307 308 def _check_fitted(self) -> None: 309 if not self.is_fitted_: 310 raise RuntimeError( 311 "Model has not been fitted. Call fit() before predict()." 312 ) 313 314 @staticmethod 315 def exposures_ok(history: ClaimsHistory) -> bool: 316 return history.exposures is not None and len(history.exposures) > 0 317 318 def __repr__(self) -> str: 319 if self.is_fitted_: 320 return f"StaticCredibilityModel(kappa={self.kappa_:.4f})" 321 return "StaticCredibilityModel(unfitted)"
Bühlmann-Straub credibility at individual policy level.
Fits the structural parameter kappa from a portfolio of policy histories using the method of moments estimator. Predicts a multiplicative credibility factor for each policy.
The model assumes:
Y_{t} | Theta ~ Poisson(mu * Theta * e_t) Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2
This gives the credibility approximation:
E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t)
Parameters
kappa : float or None Known credibility coefficient kappa = sigma^2 / tau^2. If None, kappa is estimated from the portfolio during fit(). Set explicitly to share a kappa estimated from a larger portfolio. min_kappa : float Lower bound on the estimated kappa. Prevents omega from reaching 1 when the portfolio is very homogeneous. Default 0.1. max_kappa : float Upper bound on the estimated kappa. Prevents degenerate credibility for very heterogeneous portfolios. Default 1000.0.
Attributes
kappa_ : float Fitted credibility coefficient (after calling fit()). within_variance_ : float Estimated within-policy variance component sigma^2. between_variance_ : float Estimated between-policy variance component tau^2. portfolio_mean_ : float Exposure-weighted mean claim frequency across the portfolio. is_fitted_ : bool True after fit() has been called.
Examples
>>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0),
... ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)]
>>> model = StaticCredibilityModel()
>>> model.fit(histories)
StaticCredibilityModel(kappa=...)
>>> cf = model.predict(histories[0])
>>> posterior = histories[0].prior_premium * cf
83 def __init__( 84 self, 85 kappa: Optional[float] = None, 86 min_kappa: float = 0.1, 87 max_kappa: float = 1000.0, 88 ) -> None: 89 self.kappa = kappa 90 self.min_kappa = min_kappa 91 self.max_kappa = max_kappa 92 93 # Set after fit() 94 self.kappa_: Optional[float] = None 95 self.within_variance_: Optional[float] = None 96 self.between_variance_: Optional[float] = None 97 self.portfolio_mean_: Optional[float] = None 98 self.is_fitted_: bool = False
100 def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel": 101 """Estimate structural parameters from a portfolio of policy histories. 102 103 Uses the method of moments estimator for Bühlmann-Straub: 104 - within-policy variance estimated from period-to-period fluctuations 105 - between-policy variance estimated from cross-sectional spread 106 107 Parameters 108 ---------- 109 histories : list[ClaimsHistory] 110 Portfolio of policy histories. Must contain at least 2 policies 111 with at least 1 period each for parameter estimation. 112 113 Returns 114 ------- 115 StaticCredibilityModel 116 self (fitted model). 117 118 Raises 119 ------ 120 ValueError 121 If fewer than 2 histories are provided, or if all histories have 122 only 1 period (insufficient for within-variance estimation). 123 """ 124 if len(histories) < 2: 125 raise ValueError( 126 "At least 2 policy histories are required to estimate kappa" 127 ) 128 129 if self.kappa is not None: 130 # Use provided kappa, just estimate portfolio mean 131 self.kappa_ = self.kappa 132 self.within_variance_ = float("nan") 133 self.between_variance_ = float("nan") 134 self.portfolio_mean_ = self._portfolio_mean(histories) 135 self.is_fitted_ = True 136 return self 137 138 self.kappa_, self.within_variance_, self.between_variance_ = ( 139 self._estimate_kappa(histories) 140 ) 141 self.portfolio_mean_ = self._portfolio_mean(histories) 142 self.is_fitted_ = True 143 return self
Estimate structural parameters from a portfolio of policy histories.
Uses the method of moments estimator for Bühlmann-Straub:
- within-policy variance estimated from period-to-period fluctuations
- between-policy variance estimated from cross-sectional spread
Parameters
histories : list[ClaimsHistory] Portfolio of policy histories. Must contain at least 2 policies with at least 1 period each for parameter estimation.
Returns
StaticCredibilityModel self (fitted model).
Raises
ValueError If fewer than 2 histories are provided, or if all histories have only 1 period (insufficient for within-variance estimation).
145 def predict(self, history: ClaimsHistory) -> float: 146 """Compute the credibility factor for a single policy. 147 148 Parameters 149 ---------- 150 history : ClaimsHistory 151 The policy's claims history. The prior_premium field is used as 152 the a priori rate mu. 153 154 Returns 155 ------- 156 float 157 Credibility factor CF in (0, inf). Multiply by prior_premium to 158 obtain the posterior premium. 159 CF = omega_t * (Y_bar / mu) + (1 - omega_t) 160 161 Raises 162 ------ 163 RuntimeError 164 If fit() has not been called. 165 """ 166 self._check_fitted() 167 assert self.kappa_ is not None 168 assert self.exposures_ok(history) 169 170 assert history.exposures is not None 171 t = history.total_exposure # effective credibility weight 172 omega_t = t / (t + self.kappa_) 173 174 mu = history.prior_premium # a priori rate (expected frequency) 175 y_bar = history.claim_frequency # empirical frequency 176 177 # Credibility factor: omega * (empirical / prior) + (1 - omega) 178 if mu <= 0.0: 179 return 1.0 180 181 cf = omega_t * (y_bar / mu) + (1.0 - omega_t) 182 return max(cf, 0.0) # guard against negative values from rounding
Compute the credibility factor for a single policy.
Parameters
history : ClaimsHistory The policy's claims history. The prior_premium field is used as the a priori rate mu.
Returns
float Credibility factor CF in (0, inf). Multiply by prior_premium to obtain the posterior premium. CF = omega_t * (Y_bar / mu) + (1 - omega_t)
Raises
RuntimeError If fit() has not been called.
184 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 185 """Score a batch of policies and return a Polars DataFrame. 186 187 Parameters 188 ---------- 189 histories : list[ClaimsHistory] 190 Policies to score. 191 192 Returns 193 ------- 194 pl.DataFrame 195 Columns: policy_id, prior_premium, credibility_factor, 196 posterior_premium. 197 """ 198 self._check_fitted() 199 rows = [] 200 for h in histories: 201 cf = self.predict(h) 202 rows.append( 203 { 204 "policy_id": h.policy_id, 205 "prior_premium": h.prior_premium, 206 "credibility_factor": cf, 207 "posterior_premium": h.prior_premium * cf, 208 } 209 ) 210 return pl.DataFrame(rows)
Score a batch of policies and return a Polars DataFrame.
Parameters
histories : list[ClaimsHistory] Policies to score.
Returns
pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium.
212 def credibility_weight(self, history: ClaimsHistory) -> float: 213 """Return the Bühlmann credibility weight omega_t for a policy. 214 215 Parameters 216 ---------- 217 history : ClaimsHistory 218 The policy's claims history. 219 220 Returns 221 ------- 222 float 223 omega_t = t / (t + kappa) in [0, 1], where t is total exposure. 224 """ 225 self._check_fitted() 226 assert self.kappa_ is not None 227 assert history.exposures is not None 228 t = history.total_exposure 229 return t / (t + self.kappa_)
Return the Bühlmann credibility weight omega_t for a policy.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
float omega_t = t / (t + kappa) in [0, 1], where t is total exposure.
66class DynamicPoissonGammaModel: 67 """Poisson-gamma conjugate state-space model with seniority weighting. 68 69 The model fits two decay parameters (p, q) from the portfolio using 70 empirical Bayes maximum likelihood. At prediction time, it runs the 71 forward recursion to compute the posterior state (alpha_t, beta_t) 72 and returns the corresponding posterior mean as a credibility factor. 73 74 Parameters 75 ---------- 76 p0 : float 77 Initial guess for the state-reversion parameter p. Must be in (0, 1]. 78 q0 : float 79 Initial guess for the decay parameter q. Must be in (0, 1]. 80 alpha0 : float 81 Initial shape parameter of the prior Gamma distribution for Theta. 82 Must be > 0. Default 1.0 (prior mode at 1 with matching scale). 83 beta0_multiplier : float 84 The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier 85 per unit of a priori rate. This keeps the prior mean at 1.0. 86 Default 1.0. 87 bounds : tuple 88 (lower, upper) bounds for p and q during optimisation. 89 Default ((0.01, 0.99), (0.01, 0.99)). 90 91 Attributes 92 ---------- 93 p_ : float 94 Fitted state-reversion parameter (after fit()). 95 q_ : float 96 Fitted decay parameter (after fit()). 97 loglik_ : float 98 Log-likelihood at the fitted parameters. 99 is_fitted_ : bool 100 True after fit() has been called. 101 102 Examples 103 -------- 104 >>> import numpy as np 105 >>> rng = np.random.default_rng(42) 106 >>> histories = [ 107 ... ClaimsHistory(f"P{i}", [1,2,3], 108 ... rng.poisson(1.2, size=3).tolist(), 109 ... prior_premium=400.0) 110 ... for i in range(100) 111 ... ] 112 >>> model = DynamicPoissonGammaModel() 113 >>> model.fit(histories) 114 DynamicPoissonGammaModel(p=..., q=...) 115 """ 116 117 def __init__( 118 self, 119 p0: float = 0.5, 120 q0: float = 0.8, 121 alpha0: float = 1.0, 122 beta0_multiplier: float = 1.0, 123 bounds: tuple = ((0.01, 0.99), (0.01, 0.99)), 124 ) -> None: 125 self.p0 = p0 126 self.q0 = q0 127 self.alpha0 = alpha0 128 self.beta0_multiplier = beta0_multiplier 129 self.bounds = bounds 130 131 self.p_: Optional[float] = None 132 self.q_: Optional[float] = None 133 self.loglik_: Optional[float] = None 134 self.is_fitted_: bool = False 135 136 def fit( 137 self, histories: list[ClaimsHistory], verbose: bool = False 138 ) -> "DynamicPoissonGammaModel": 139 """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood. 140 141 The log-likelihood sums over all policies and all periods. Within each 142 policy, the marginal Y_t is Negative Binomial with state-dependent 143 shape and mean parameters derived from the forward recursion. 144 145 Parameters 146 ---------- 147 histories : list[ClaimsHistory] 148 Training portfolio. At least 10 policies recommended for stable 149 parameter estimation. 150 verbose : bool 151 If True, print optimisation progress. 152 153 Returns 154 ------- 155 DynamicPoissonGammaModel 156 self (fitted model). 157 """ 158 if len(histories) < 2: 159 raise ValueError( 160 "At least 2 policy histories are required for fitting" 161 ) 162 163 def neg_loglik(params: np.ndarray) -> float: 164 p, q = float(params[0]), float(params[1]) 165 total_ll = 0.0 166 for history in histories: 167 total_ll += self._policy_loglik(history, p, q) 168 return -total_ll 169 170 x0 = np.array([self.p0, self.q0]) 171 bounds_list = [self.bounds[0], self.bounds[1]] 172 173 result = optimize.minimize( 174 neg_loglik, 175 x0, 176 method="L-BFGS-B", 177 bounds=bounds_list, 178 options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6}, 179 ) 180 181 if verbose and not result.success: 182 print(f"Optimisation warning: {result.message}") 183 184 self.p_ = float(result.x[0]) 185 self.q_ = float(result.x[1]) 186 self.loglik_ = float(-result.fun) 187 self.is_fitted_ = True 188 return self 189 190 def predict(self, history: ClaimsHistory) -> float: 191 """Compute the credibility factor for a single policy. 192 193 Runs the forward recursion using the fitted (p, q) parameters to 194 propagate the Gamma posterior through the observed claim sequence. 195 The credibility factor is the ratio of the posterior mean to the 196 a priori rate. 197 198 Parameters 199 ---------- 200 history : ClaimsHistory 201 The policy's claims history. 202 203 Returns 204 ------- 205 float 206 Credibility factor CF = mu_post / mu_prior. Multiply by 207 prior_premium to obtain the posterior premium. 208 209 Raises 210 ------ 211 RuntimeError 212 If fit() has not been called. 213 """ 214 self._check_fitted() 215 assert self.p_ is not None 216 assert self.q_ is not None 217 218 alpha_t, beta_t = self._forward_recursion( 219 history, self.p_, self.q_ 220 ) 221 mu = history.prior_premium 222 mu_post = (alpha_t / beta_t) * mu 223 cf = mu_post / mu 224 return max(cf, 0.0) 225 226 def predict_posterior_params( 227 self, history: ClaimsHistory 228 ) -> tuple[float, float]: 229 """Return the posterior Gamma parameters (alpha, beta) after recursion. 230 231 The posterior distribution of Theta_{t+1} | Y_{1:t} is: 232 Gamma(alpha_{t+1}, beta_{t+1}) 233 with mean alpha_{t+1} / beta_{t+1}. 234 235 This is useful for uncertainty quantification: the posterior variance 236 is alpha / beta^2. 237 238 Parameters 239 ---------- 240 history : ClaimsHistory 241 The policy's claims history. 242 243 Returns 244 ------- 245 tuple[float, float] 246 (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma. 247 """ 248 self._check_fitted() 249 assert self.p_ is not None 250 assert self.q_ is not None 251 return self._forward_recursion(history, self.p_, self.q_) 252 253 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 254 """Score a batch of policies and return a Polars DataFrame. 255 256 Parameters 257 ---------- 258 histories : list[ClaimsHistory] 259 Policies to score. 260 261 Returns 262 ------- 263 pl.DataFrame 264 Columns: policy_id, prior_premium, credibility_factor, 265 posterior_premium, posterior_alpha, posterior_beta, 266 posterior_variance. 267 """ 268 self._check_fitted() 269 rows = [] 270 for h in histories: 271 cf = self.predict(h) 272 alpha_t, beta_t = self.predict_posterior_params(h) 273 rows.append( 274 { 275 "policy_id": h.policy_id, 276 "prior_premium": h.prior_premium, 277 "credibility_factor": cf, 278 "posterior_premium": h.prior_premium * cf, 279 "posterior_alpha": alpha_t, 280 "posterior_beta": beta_t, 281 "posterior_variance": alpha_t / (beta_t**2), 282 } 283 ) 284 return pl.DataFrame(rows) 285 286 # ------------------------------------------------------------------ 287 # Core recursion (also used in log-likelihood) 288 # ------------------------------------------------------------------ 289 290 def _forward_recursion( 291 self, 292 history: ClaimsHistory, 293 p: float, 294 q: float, 295 ) -> tuple[float, float]: 296 """Run the Gamma state recursion through the claims history. 297 298 Returns the posterior parameters (alpha, beta) after observing 299 all periods in ``history``. The initial prior is set so that the 300 prior mean of Theta is 1 (i.e., alpha_0 / beta_0 = 1). 301 302 Parameters 303 ---------- 304 history : ClaimsHistory 305 Policy's claims history. 306 p : float 307 State-reversion parameter. 308 q : float 309 Decay parameter. 310 311 Returns 312 ------- 313 tuple[float, float] 314 (alpha_{T+1}, beta_{T+1}) — the one-step-ahead posterior state. 315 """ 316 assert history.exposures is not None 317 mu = history.prior_premium 318 319 # Initialise prior: Theta ~ Gamma(alpha0, beta0) with E[Theta]=1 320 alpha = self.alpha0 321 beta = self.alpha0 * self.beta0_multiplier # alpha0 / beta0 = 1 322 323 for y_t, e_t in zip(history.claim_counts, history.exposures): 324 # Bayesian update: observe Y_t ~ Poi(mu * e_t * Theta) 325 # Posterior: Theta | Y_t ~ Gamma(alpha + Y_t, beta + mu*e_t) 326 alpha_post = alpha + float(y_t) 327 beta_post = beta + mu * e_t 328 329 # State transition to next period 330 beta_next = q * beta_post 331 alpha_next = p * q * alpha_post + (1.0 - p) * beta_next 332 333 # Guard against numerical underflow 334 alpha = max(alpha_next, 1e-10) 335 beta = max(beta_next, 1e-10) 336 337 return float(alpha), float(beta) 338 339 def _policy_loglik( 340 self, 341 history: ClaimsHistory, 342 p: float, 343 q: float, 344 ) -> float: 345 """Compute the marginal log-likelihood for a single policy. 346 347 The marginal distribution of Y_t given Y_{1:t-1} is Negative Binomial. 348 The shape parameter r_t = alpha_t and the mean is mu * e_t * alpha_t / beta_t. 349 350 Summing these marginal log-likelihoods over all periods gives the 351 complete-data log-likelihood tractable via the forward recursion. 352 """ 353 assert history.exposures is not None 354 mu = history.prior_premium 355 356 alpha = self.alpha0 357 beta = self.alpha0 * self.beta0_multiplier 358 359 total_ll = 0.0 360 for y_t, e_t in zip(history.claim_counts, history.exposures): 361 # Marginal: Y_t | Y_{1:t-1} ~ NegBin(r=alpha, mu=alpha/beta * mu*e_t) 362 r_t = alpha 363 mu_t = (alpha / beta) * mu * e_t 364 total_ll += _negbin_logpmf(int(y_t), r_t, mu_t) 365 366 # Update state 367 alpha_post = alpha + float(y_t) 368 beta_post = beta + mu * e_t 369 beta_next = q * beta_post 370 alpha_next = p * q * alpha_post + (1.0 - p) * beta_next 371 372 alpha = max(alpha_next, 1e-10) 373 beta = max(beta_next, 1e-10) 374 375 return total_ll 376 377 def _check_fitted(self) -> None: 378 if not self.is_fitted_: 379 raise RuntimeError( 380 "Model has not been fitted. Call fit() before predict()." 381 ) 382 383 def __repr__(self) -> str: 384 if self.is_fitted_: 385 return ( 386 f"DynamicPoissonGammaModel(p={self.p_:.4f}, q={self.q_:.4f})" 387 ) 388 return "DynamicPoissonGammaModel(unfitted)"
Poisson-gamma conjugate state-space model with seniority weighting.
The model fits two decay parameters (p, q) from the portfolio using empirical Bayes maximum likelihood. At prediction time, it runs the forward recursion to compute the posterior state (alpha_t, beta_t) and returns the corresponding posterior mean as a credibility factor.
Parameters
p0 : float Initial guess for the state-reversion parameter p. Must be in (0, 1]. q0 : float Initial guess for the decay parameter q. Must be in (0, 1]. alpha0 : float Initial shape parameter of the prior Gamma distribution for Theta. Must be > 0. Default 1.0 (prior mode at 1 with matching scale). beta0_multiplier : float The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier per unit of a priori rate. This keeps the prior mean at 1.0. Default 1.0. bounds : tuple (lower, upper) bounds for p and q during optimisation. Default ((0.01, 0.99), (0.01, 0.99)).
Attributes
p_ : float Fitted state-reversion parameter (after fit()). q_ : float Fitted decay parameter (after fit()). loglik_ : float Log-likelihood at the fitted parameters. is_fitted_ : bool True after fit() has been called.
Examples
>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> histories = [
... ClaimsHistory(f"P{i}", [1,2,3],
... rng.poisson(1.2, size=3).tolist(),
... prior_premium=400.0)
... for i in range(100)
... ]
>>> model = DynamicPoissonGammaModel()
>>> model.fit(histories)
DynamicPoissonGammaModel(p=..., q=...)
117 def __init__( 118 self, 119 p0: float = 0.5, 120 q0: float = 0.8, 121 alpha0: float = 1.0, 122 beta0_multiplier: float = 1.0, 123 bounds: tuple = ((0.01, 0.99), (0.01, 0.99)), 124 ) -> None: 125 self.p0 = p0 126 self.q0 = q0 127 self.alpha0 = alpha0 128 self.beta0_multiplier = beta0_multiplier 129 self.bounds = bounds 130 131 self.p_: Optional[float] = None 132 self.q_: Optional[float] = None 133 self.loglik_: Optional[float] = None 134 self.is_fitted_: bool = False
136 def fit( 137 self, histories: list[ClaimsHistory], verbose: bool = False 138 ) -> "DynamicPoissonGammaModel": 139 """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood. 140 141 The log-likelihood sums over all policies and all periods. Within each 142 policy, the marginal Y_t is Negative Binomial with state-dependent 143 shape and mean parameters derived from the forward recursion. 144 145 Parameters 146 ---------- 147 histories : list[ClaimsHistory] 148 Training portfolio. At least 10 policies recommended for stable 149 parameter estimation. 150 verbose : bool 151 If True, print optimisation progress. 152 153 Returns 154 ------- 155 DynamicPoissonGammaModel 156 self (fitted model). 157 """ 158 if len(histories) < 2: 159 raise ValueError( 160 "At least 2 policy histories are required for fitting" 161 ) 162 163 def neg_loglik(params: np.ndarray) -> float: 164 p, q = float(params[0]), float(params[1]) 165 total_ll = 0.0 166 for history in histories: 167 total_ll += self._policy_loglik(history, p, q) 168 return -total_ll 169 170 x0 = np.array([self.p0, self.q0]) 171 bounds_list = [self.bounds[0], self.bounds[1]] 172 173 result = optimize.minimize( 174 neg_loglik, 175 x0, 176 method="L-BFGS-B", 177 bounds=bounds_list, 178 options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6}, 179 ) 180 181 if verbose and not result.success: 182 print(f"Optimisation warning: {result.message}") 183 184 self.p_ = float(result.x[0]) 185 self.q_ = float(result.x[1]) 186 self.loglik_ = float(-result.fun) 187 self.is_fitted_ = True 188 return self
Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.
The log-likelihood sums over all policies and all periods. Within each policy, the marginal Y_t is Negative Binomial with state-dependent shape and mean parameters derived from the forward recursion.
Parameters
histories : list[ClaimsHistory] Training portfolio. At least 10 policies recommended for stable parameter estimation. verbose : bool If True, print optimisation progress.
Returns
DynamicPoissonGammaModel self (fitted model).
190 def predict(self, history: ClaimsHistory) -> float: 191 """Compute the credibility factor for a single policy. 192 193 Runs the forward recursion using the fitted (p, q) parameters to 194 propagate the Gamma posterior through the observed claim sequence. 195 The credibility factor is the ratio of the posterior mean to the 196 a priori rate. 197 198 Parameters 199 ---------- 200 history : ClaimsHistory 201 The policy's claims history. 202 203 Returns 204 ------- 205 float 206 Credibility factor CF = mu_post / mu_prior. Multiply by 207 prior_premium to obtain the posterior premium. 208 209 Raises 210 ------ 211 RuntimeError 212 If fit() has not been called. 213 """ 214 self._check_fitted() 215 assert self.p_ is not None 216 assert self.q_ is not None 217 218 alpha_t, beta_t = self._forward_recursion( 219 history, self.p_, self.q_ 220 ) 221 mu = history.prior_premium 222 mu_post = (alpha_t / beta_t) * mu 223 cf = mu_post / mu 224 return max(cf, 0.0)
Compute the credibility factor for a single policy.
Runs the forward recursion using the fitted (p, q) parameters to propagate the Gamma posterior through the observed claim sequence. The credibility factor is the ratio of the posterior mean to the a priori rate.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
float Credibility factor CF = mu_post / mu_prior. Multiply by prior_premium to obtain the posterior premium.
Raises
RuntimeError If fit() has not been called.
226 def predict_posterior_params( 227 self, history: ClaimsHistory 228 ) -> tuple[float, float]: 229 """Return the posterior Gamma parameters (alpha, beta) after recursion. 230 231 The posterior distribution of Theta_{t+1} | Y_{1:t} is: 232 Gamma(alpha_{t+1}, beta_{t+1}) 233 with mean alpha_{t+1} / beta_{t+1}. 234 235 This is useful for uncertainty quantification: the posterior variance 236 is alpha / beta^2. 237 238 Parameters 239 ---------- 240 history : ClaimsHistory 241 The policy's claims history. 242 243 Returns 244 ------- 245 tuple[float, float] 246 (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma. 247 """ 248 self._check_fitted() 249 assert self.p_ is not None 250 assert self.q_ is not None 251 return self._forward_recursion(history, self.p_, self.q_)
Return the posterior Gamma parameters (alpha, beta) after recursion.
The posterior distribution of Theta_{t+1} | Y_{1:t} is: Gamma(alpha_{t+1}, beta_{t+1}) with mean alpha_{t+1} / beta_{t+1}.
This is useful for uncertainty quantification: the posterior variance is alpha / beta^2.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
tuple[float, float] (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.
253 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 254 """Score a batch of policies and return a Polars DataFrame. 255 256 Parameters 257 ---------- 258 histories : list[ClaimsHistory] 259 Policies to score. 260 261 Returns 262 ------- 263 pl.DataFrame 264 Columns: policy_id, prior_premium, credibility_factor, 265 posterior_premium, posterior_alpha, posterior_beta, 266 posterior_variance. 267 """ 268 self._check_fitted() 269 rows = [] 270 for h in histories: 271 cf = self.predict(h) 272 alpha_t, beta_t = self.predict_posterior_params(h) 273 rows.append( 274 { 275 "policy_id": h.policy_id, 276 "prior_premium": h.prior_premium, 277 "credibility_factor": cf, 278 "posterior_premium": h.prior_premium * cf, 279 "posterior_alpha": alpha_t, 280 "posterior_beta": beta_t, 281 "posterior_variance": alpha_t / (beta_t**2), 282 } 283 ) 284 return pl.DataFrame(rows)
Score a batch of policies and return a Polars DataFrame.
Parameters
histories : list[ClaimsHistory] Policies to score.
Returns
pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, posterior_alpha, posterior_beta, posterior_variance.
38class SurrogateModel: 39 """Surrogate model for large-portfolio Bayesian experience rating. 40 41 Fits an approximate g(.) function on a sub-portfolio using importance 42 sampling, then applies it to the full portfolio analytically. 43 44 The posterior premium for policy i is: 45 mu_post_i = mu_prior_i * exp(g(L_i, n_i)) 46 where L_i is the log-likelihood sufficient statistic and n_i is the 47 number of periods observed. 48 49 The form of g(.) is a polynomial in L and n: 50 g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n 51 52 This is flexible enough to capture Bühlmann-Straub as a special case 53 (linear in L = Y_bar) while allowing non-linear responses. 54 55 Parameters 56 ---------- 57 prior_model : callable or None 58 Function mapping ClaimsHistory -> float, returning the a priori 59 premium for any policy. If None, uses history.prior_premium directly. 60 This should be a GLM or similar model already fitted on the portfolio. 61 n_is_samples : int 62 Number of importance sampling draws from the prior. More samples 63 give a better IS estimate but are slower. Default 2000. 64 subsample_frac : float 65 Fraction of the portfolio to use in the sub-portfolio for fitting g. 66 Default 0.10 (10%). Minimum 20 policies regardless of fraction. 67 poly_degree : int 68 Polynomial degree for g(.). Degree 1 gives linear g; degree 2 69 allows quadratic response. Default 1. 70 random_state : int or None 71 Random seed for IS sampling and sub-portfolio selection. 72 sufficient_stat_fn : callable or None 73 Custom function mapping ClaimsHistory -> float for the sufficient 74 statistic. If None, uses the log-likelihood under Poisson(theta_ref). 75 76 Attributes 77 ---------- 78 theta_ : np.ndarray 79 Fitted WLS coefficients for g(.). 80 theta_ref_ : float 81 Reference Poisson intensity used in the sufficient statistic. 82 is_fitted_ : bool 83 True after fit() has been called. 84 """ 85 86 def __init__( 87 self, 88 prior_model: Optional[Callable[[ClaimsHistory], float]] = None, 89 n_is_samples: int = 2000, 90 subsample_frac: float = 0.10, 91 poly_degree: int = 1, 92 random_state: Optional[int] = None, 93 sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None, 94 ) -> None: 95 self.prior_model = prior_model 96 self.n_is_samples = n_is_samples 97 self.subsample_frac = subsample_frac 98 self.poly_degree = poly_degree 99 self.random_state = random_state 100 self.sufficient_stat_fn = sufficient_stat_fn 101 102 self.theta_: Optional[np.ndarray] = None 103 self.theta_ref_: Optional[float] = None 104 self.is_fitted_: bool = False 105 self._rng: Optional[np.random.Generator] = None 106 107 def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel": 108 """Fit the surrogate g(.) function on a representative sub-portfolio. 109 110 Steps: 111 1. Draw a sub-portfolio (random stratified selection by n_periods). 112 2. Estimate reference intensity theta_ref as portfolio mean frequency. 113 3. Draw IS samples Theta_k ~ prior once (shared across all policies). 114 4. For each sub-portfolio policy, compute IS-based Bayesian premium. 115 5. Fit g(.) by WLS minimising weighted squared log-credibility error. 116 117 Parameters 118 ---------- 119 histories : list[ClaimsHistory] 120 Full training portfolio. 121 122 Returns 123 ------- 124 SurrogateModel 125 self (fitted model). 126 """ 127 self._rng = np.random.default_rng(self.random_state) 128 129 # Estimate reference intensity 130 total_claims = sum(h.total_claims for h in histories) 131 total_exp = sum(h.total_exposure for h in histories) 132 self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0 133 134 # Select sub-portfolio 135 n_sub = max(20, int(len(histories) * self.subsample_frac)) 136 n_sub = min(n_sub, len(histories)) 137 idx = self._rng.choice(len(histories), size=n_sub, replace=False) 138 sub_histories = [histories[i] for i in idx] 139 140 # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior) 141 # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1 142 prior_alpha = 1.0 143 prior_beta = 1.0 144 theta_samples = self._rng.gamma( 145 prior_alpha, 1.0 / prior_beta, size=self.n_is_samples 146 ) 147 148 # Compute IS-based posterior premiums for sub-portfolio 149 sub_L = [] 150 sub_n = [] 151 sub_log_cf = [] 152 sub_weights = [] 153 154 for hist in sub_histories: 155 mu_prior = self._get_prior(hist) 156 L_i = self._sufficient_stat(hist) 157 n_i = float(hist.n_periods) 158 159 mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior) 160 161 if mu_prior <= 0.0 or mu_post_is <= 0.0: 162 continue 163 log_cf_i = np.log(mu_post_is / mu_prior) 164 w_i = 1.0 / max(se_i**2, 1e-8) 165 166 sub_L.append(L_i) 167 sub_n.append(n_i) 168 sub_log_cf.append(log_cf_i) 169 sub_weights.append(w_i) 170 171 if len(sub_L) < 4: 172 # Degenerate case: not enough data, return identity (CF = 1) 173 self.theta_ = np.zeros(self._n_features()) 174 self.is_fitted_ = True 175 return self 176 177 # Design matrix for g(L, n) 178 X = self._design_matrix(np.array(sub_L), np.array(sub_n)) 179 y = np.array(sub_log_cf) 180 w = np.array(sub_weights) 181 w = w / w.sum() 182 183 # WLS: solve (X^T W X) theta = X^T W y 184 W = np.diag(w) 185 XtWX = X.T @ W @ X 186 XtWy = X.T @ (w[:, None] * y[:, None]).ravel() 187 188 try: 189 self.theta_ = np.linalg.solve(XtWX, XtWy) 190 except np.linalg.LinAlgError: 191 self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0] 192 193 self.is_fitted_ = True 194 return self 195 196 def predict(self, history: ClaimsHistory) -> float: 197 """Compute the credibility factor for a single policy. 198 199 Parameters 200 ---------- 201 history : ClaimsHistory 202 The policy's claims history. 203 204 Returns 205 ------- 206 float 207 Credibility factor CF = exp(g(L, n)). Multiply by prior_premium 208 to obtain the posterior premium. 209 210 Raises 211 ------ 212 RuntimeError 213 If fit() has not been called. 214 """ 215 self._check_fitted() 216 assert self.theta_ is not None 217 218 L = self._sufficient_stat(history) 219 n = float(history.n_periods) 220 x = self._design_matrix(np.array([L]), np.array([n]))[0] 221 log_cf = float(x @ self.theta_) 222 return float(np.exp(log_cf)) 223 224 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 225 """Score a batch of policies and return a Polars DataFrame. 226 227 Parameters 228 ---------- 229 histories : list[ClaimsHistory] 230 Policies to score. 231 232 Returns 233 ------- 234 pl.DataFrame 235 Columns: policy_id, prior_premium, credibility_factor, 236 posterior_premium, sufficient_stat. 237 """ 238 self._check_fitted() 239 rows = [] 240 for h in histories: 241 cf = self.predict(h) 242 rows.append( 243 { 244 "policy_id": h.policy_id, 245 "prior_premium": h.prior_premium, 246 "credibility_factor": cf, 247 "posterior_premium": h.prior_premium * cf, 248 "sufficient_stat": self._sufficient_stat(h), 249 } 250 ) 251 return pl.DataFrame(rows) 252 253 # ------------------------------------------------------------------ 254 # Private helpers 255 # ------------------------------------------------------------------ 256 257 def _is_posterior( 258 self, 259 history: ClaimsHistory, 260 theta_samples: np.ndarray, 261 mu_prior: float, 262 ) -> tuple[float, float]: 263 """Estimate the Bayesian posterior premium via importance sampling. 264 265 Computes E[mu(Theta) | Y_{1:n}] using self-normalised IS: 266 E_hat = sum_k w_k * mu(Theta_k) / sum_k w_k 267 where w_k = exp(log-likelihood of Y_{1:n} | Theta_k) and 268 Theta_k ~ prior (Gamma(1,1) in this implementation). 269 270 Returns (posterior_mean, standard_error). 271 """ 272 assert history.exposures is not None 273 log_weights = np.zeros(len(theta_samples)) 274 for y_t, e_t in zip(history.claim_counts, history.exposures): 275 # Y_t | Theta ~ Poi(mu_prior * e_t * Theta) 276 rate = mu_prior * e_t * theta_samples 277 # Poisson log PMF: y*log(rate) - rate - log(y!) 278 log_weights += y_t * np.log(rate + 1e-300) - rate 279 280 # Stabilise: subtract max log weight 281 log_weights -= log_weights.max() 282 weights = np.exp(log_weights) 283 weights /= weights.sum() 284 285 # Posterior premium: E[mu_prior * Theta | data] = mu_prior * E[Theta | data] 286 post_mean = float(mu_prior * np.sum(weights * theta_samples)) 287 288 # Effective sample size for SE calculation 289 ess = 1.0 / np.sum(weights**2) 290 if ess < 2: 291 se = abs(post_mean) * 0.5 # pessimistic SE 292 else: 293 # Monte Carlo SE 294 mean_theta = np.sum(weights * theta_samples) 295 var_theta = np.sum(weights * (theta_samples - mean_theta) ** 2) 296 se = float(mu_prior * np.sqrt(var_theta / max(ess, 1))) 297 298 return post_mean, se 299 300 def _sufficient_stat(self, history: ClaimsHistory) -> float: 301 if self.sufficient_stat_fn is not None: 302 return self.sufficient_stat_fn(history) 303 return history_sufficient_stat(history, theta_ref=self.theta_ref_) 304 305 def _get_prior(self, history: ClaimsHistory) -> float: 306 if self.prior_model is not None: 307 return self.prior_model(history) 308 return history.prior_premium 309 310 def _design_matrix(self, L: np.ndarray, n: np.ndarray) -> np.ndarray: 311 """Build polynomial design matrix for g(L, n).""" 312 cols = [np.ones(len(L))] 313 if self.poly_degree >= 1: 314 cols.extend([L, n]) 315 if self.poly_degree >= 2: 316 cols.extend([L**2, n**2, L * n]) 317 return np.column_stack(cols) 318 319 def _n_features(self) -> int: 320 if self.poly_degree >= 2: 321 return 6 322 if self.poly_degree >= 1: 323 return 3 324 return 1 325 326 def _check_fitted(self) -> None: 327 if not self.is_fitted_: 328 raise RuntimeError( 329 "Model has not been fitted. Call fit() before predict()." 330 ) 331 332 def __repr__(self) -> str: 333 if self.is_fitted_: 334 return ( 335 f"SurrogateModel(poly_degree={self.poly_degree}, " 336 f"n_is_samples={self.n_is_samples})" 337 ) 338 return f"SurrogateModel(poly_degree={self.poly_degree}, unfitted)"
Surrogate model for large-portfolio Bayesian experience rating.
Fits an approximate g(.) function on a sub-portfolio using importance sampling, then applies it to the full portfolio analytically.
The posterior premium for policy i is:
mu_post_i = mu_prior_i * exp(g(L_i, n_i))
where L_i is the log-likelihood sufficient statistic and n_i is the number of periods observed.
The form of g(.) is a polynomial in L and n: g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n
This is flexible enough to capture Bühlmann-Straub as a special case (linear in L = Y_bar) while allowing non-linear responses.
Parameters
prior_model : callable or None Function mapping ClaimsHistory -> float, returning the a priori premium for any policy. If None, uses history.prior_premium directly. This should be a GLM or similar model already fitted on the portfolio. n_is_samples : int Number of importance sampling draws from the prior. More samples give a better IS estimate but are slower. Default 2000. subsample_frac : float Fraction of the portfolio to use in the sub-portfolio for fitting g. Default 0.10 (10%). Minimum 20 policies regardless of fraction. poly_degree : int Polynomial degree for g(.). Degree 1 gives linear g; degree 2 allows quadratic response. Default 1. random_state : int or None Random seed for IS sampling and sub-portfolio selection. sufficient_stat_fn : callable or None Custom function mapping ClaimsHistory -> float for the sufficient statistic. If None, uses the log-likelihood under Poisson(theta_ref).
Attributes
theta_ : np.ndarray Fitted WLS coefficients for g(.). theta_ref_ : float Reference Poisson intensity used in the sufficient statistic. is_fitted_ : bool True after fit() has been called.
86 def __init__( 87 self, 88 prior_model: Optional[Callable[[ClaimsHistory], float]] = None, 89 n_is_samples: int = 2000, 90 subsample_frac: float = 0.10, 91 poly_degree: int = 1, 92 random_state: Optional[int] = None, 93 sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None, 94 ) -> None: 95 self.prior_model = prior_model 96 self.n_is_samples = n_is_samples 97 self.subsample_frac = subsample_frac 98 self.poly_degree = poly_degree 99 self.random_state = random_state 100 self.sufficient_stat_fn = sufficient_stat_fn 101 102 self.theta_: Optional[np.ndarray] = None 103 self.theta_ref_: Optional[float] = None 104 self.is_fitted_: bool = False 105 self._rng: Optional[np.random.Generator] = None
107 def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel": 108 """Fit the surrogate g(.) function on a representative sub-portfolio. 109 110 Steps: 111 1. Draw a sub-portfolio (random stratified selection by n_periods). 112 2. Estimate reference intensity theta_ref as portfolio mean frequency. 113 3. Draw IS samples Theta_k ~ prior once (shared across all policies). 114 4. For each sub-portfolio policy, compute IS-based Bayesian premium. 115 5. Fit g(.) by WLS minimising weighted squared log-credibility error. 116 117 Parameters 118 ---------- 119 histories : list[ClaimsHistory] 120 Full training portfolio. 121 122 Returns 123 ------- 124 SurrogateModel 125 self (fitted model). 126 """ 127 self._rng = np.random.default_rng(self.random_state) 128 129 # Estimate reference intensity 130 total_claims = sum(h.total_claims for h in histories) 131 total_exp = sum(h.total_exposure for h in histories) 132 self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0 133 134 # Select sub-portfolio 135 n_sub = max(20, int(len(histories) * self.subsample_frac)) 136 n_sub = min(n_sub, len(histories)) 137 idx = self._rng.choice(len(histories), size=n_sub, replace=False) 138 sub_histories = [histories[i] for i in idx] 139 140 # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior) 141 # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1 142 prior_alpha = 1.0 143 prior_beta = 1.0 144 theta_samples = self._rng.gamma( 145 prior_alpha, 1.0 / prior_beta, size=self.n_is_samples 146 ) 147 148 # Compute IS-based posterior premiums for sub-portfolio 149 sub_L = [] 150 sub_n = [] 151 sub_log_cf = [] 152 sub_weights = [] 153 154 for hist in sub_histories: 155 mu_prior = self._get_prior(hist) 156 L_i = self._sufficient_stat(hist) 157 n_i = float(hist.n_periods) 158 159 mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior) 160 161 if mu_prior <= 0.0 or mu_post_is <= 0.0: 162 continue 163 log_cf_i = np.log(mu_post_is / mu_prior) 164 w_i = 1.0 / max(se_i**2, 1e-8) 165 166 sub_L.append(L_i) 167 sub_n.append(n_i) 168 sub_log_cf.append(log_cf_i) 169 sub_weights.append(w_i) 170 171 if len(sub_L) < 4: 172 # Degenerate case: not enough data, return identity (CF = 1) 173 self.theta_ = np.zeros(self._n_features()) 174 self.is_fitted_ = True 175 return self 176 177 # Design matrix for g(L, n) 178 X = self._design_matrix(np.array(sub_L), np.array(sub_n)) 179 y = np.array(sub_log_cf) 180 w = np.array(sub_weights) 181 w = w / w.sum() 182 183 # WLS: solve (X^T W X) theta = X^T W y 184 W = np.diag(w) 185 XtWX = X.T @ W @ X 186 XtWy = X.T @ (w[:, None] * y[:, None]).ravel() 187 188 try: 189 self.theta_ = np.linalg.solve(XtWX, XtWy) 190 except np.linalg.LinAlgError: 191 self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0] 192 193 self.is_fitted_ = True 194 return self
Fit the surrogate g(.) function on a representative sub-portfolio.
Steps:
- Draw a sub-portfolio (random stratified selection by n_periods).
- Estimate reference intensity theta_ref as portfolio mean frequency.
- Draw IS samples Theta_k ~ prior once (shared across all policies).
- For each sub-portfolio policy, compute IS-based Bayesian premium.
- Fit g(.) by WLS minimising weighted squared log-credibility error.
Parameters
histories : list[ClaimsHistory] Full training portfolio.
Returns
SurrogateModel self (fitted model).
196 def predict(self, history: ClaimsHistory) -> float: 197 """Compute the credibility factor for a single policy. 198 199 Parameters 200 ---------- 201 history : ClaimsHistory 202 The policy's claims history. 203 204 Returns 205 ------- 206 float 207 Credibility factor CF = exp(g(L, n)). Multiply by prior_premium 208 to obtain the posterior premium. 209 210 Raises 211 ------ 212 RuntimeError 213 If fit() has not been called. 214 """ 215 self._check_fitted() 216 assert self.theta_ is not None 217 218 L = self._sufficient_stat(history) 219 n = float(history.n_periods) 220 x = self._design_matrix(np.array([L]), np.array([n]))[0] 221 log_cf = float(x @ self.theta_) 222 return float(np.exp(log_cf))
Compute the credibility factor for a single policy.
Parameters
history : ClaimsHistory The policy's claims history.
Returns
float Credibility factor CF = exp(g(L, n)). Multiply by prior_premium to obtain the posterior premium.
Raises
RuntimeError If fit() has not been called.
224 def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame: 225 """Score a batch of policies and return a Polars DataFrame. 226 227 Parameters 228 ---------- 229 histories : list[ClaimsHistory] 230 Policies to score. 231 232 Returns 233 ------- 234 pl.DataFrame 235 Columns: policy_id, prior_premium, credibility_factor, 236 posterior_premium, sufficient_stat. 237 """ 238 self._check_fitted() 239 rows = [] 240 for h in histories: 241 cf = self.predict(h) 242 rows.append( 243 { 244 "policy_id": h.policy_id, 245 "prior_premium": h.prior_premium, 246 "credibility_factor": cf, 247 "posterior_premium": h.prior_premium * cf, 248 "sufficient_stat": self._sufficient_stat(h), 249 } 250 ) 251 return pl.DataFrame(rows)
Score a batch of policies and return a Polars DataFrame.
Parameters
histories : list[ClaimsHistory] Policies to score.
Returns
pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, sufficient_stat.
28def balance_calibrate( 29 predict_fn: Callable[[ClaimsHistory], float], 30 histories: list[ClaimsHistory], 31 exposure_weighted: bool = True, 32) -> CalibrationResult: 33 """Compute the portfolio-level balance calibration factor. 34 35 The calibration factor delta satisfies: 36 sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i] 37 38 For multiplicative models: 39 delta = sum(actual * exposure) / sum(posterior * exposure) 40 41 This function computes delta and returns it as a CalibrationResult. 42 Apply delta to all posterior premiums to restore portfolio balance. 43 44 Parameters 45 ---------- 46 predict_fn : callable 47 Function mapping ClaimsHistory -> float, returning the credibility 48 factor (not the posterior premium). Typically model.predict. 49 histories : list[ClaimsHistory] 50 Calibration portfolio. Typically the same data used for fitting, 51 but can be a hold-out period for forward-looking calibration. 52 exposure_weighted : bool 53 If True, weight each policy by its total exposure when computing 54 the balance check. If False, use simple sums. Default True. 55 56 Returns 57 ------- 58 CalibrationResult 59 Contains calibration_factor, sum_actual, sum_predicted, n_policies. 60 61 Examples 62 -------- 63 >>> model = StaticCredibilityModel().fit(histories) 64 >>> result = balance_calibrate(model.predict, histories) 65 >>> print(f"Calibration factor: {result.calibration_factor:.4f}") 66 >>> # Apply to predictions: 67 >>> posterior = prior * model.predict(h) * result.calibration_factor 68 """ 69 sum_actual = 0.0 70 sum_predicted = 0.0 71 72 for h in histories: 73 assert h.exposures is not None 74 cf = predict_fn(h) 75 posterior = h.prior_premium * cf 76 77 if exposure_weighted: 78 weight = h.total_exposure 79 else: 80 weight = 1.0 81 82 # Actual: total claims weighted by exposure (as a rate) 83 # We sum actual * exposure / total_exposure * weight to get total weighted claims 84 actual_rate = h.claim_frequency # total_claims / total_exposure 85 sum_actual += actual_rate * weight 86 sum_predicted += posterior * weight 87 88 if sum_predicted <= 0.0: 89 calibration_factor = 1.0 90 else: 91 calibration_factor = sum_actual / sum_predicted 92 93 return CalibrationResult( 94 calibration_factor=float(calibration_factor), 95 sum_actual=float(sum_actual), 96 sum_predicted=float(sum_predicted), 97 n_policies=len(histories), 98 )
Compute the portfolio-level balance calibration factor.
The calibration factor delta satisfies:
sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i]
For multiplicative models:
delta = sum(actual * exposure) / sum(posterior * exposure)
This function computes delta and returns it as a CalibrationResult. Apply delta to all posterior premiums to restore portfolio balance.
Parameters
predict_fn : callable Function mapping ClaimsHistory -> float, returning the credibility factor (not the posterior premium). Typically model.predict. histories : list[ClaimsHistory] Calibration portfolio. Typically the same data used for fitting, but can be a hold-out period for forward-looking calibration. exposure_weighted : bool If True, weight each policy by its total exposure when computing the balance check. If False, use simple sums. Default True.
Returns
CalibrationResult Contains calibration_factor, sum_actual, sum_predicted, n_policies.
Examples
>>> model = StaticCredibilityModel().fit(histories)
>>> result = balance_calibrate(model.predict, histories)
>>> print(f"Calibration factor: {result.calibration_factor:.4f}")
>>> # Apply to predictions:
>>> posterior = prior * model.predict(h) * result.calibration_factor
101def apply_calibration( 102 credibility_factor: float, 103 calibration_result: CalibrationResult, 104) -> float: 105 """Apply a calibration factor to a single credibility factor. 106 107 Parameters 108 ---------- 109 credibility_factor : float 110 The raw credibility factor from a model's predict(). 111 calibration_result : CalibrationResult 112 Output of balance_calibrate(). 113 114 Returns 115 ------- 116 float 117 Calibrated credibility factor = CF * delta. 118 """ 119 return credibility_factor * calibration_result.calibration_factor
Apply a calibration factor to a single credibility factor.
Parameters
credibility_factor : float The raw credibility factor from a model's predict(). calibration_result : CalibrationResult Output of balance_calibrate().
Returns
float Calibrated credibility factor = CF * delta.
122def calibrated_predict_fn( 123 predict_fn: Callable[[ClaimsHistory], float], 124 calibration_result: CalibrationResult, 125) -> Callable[[ClaimsHistory], float]: 126 """Wrap a predict function to apply calibration automatically. 127 128 Parameters 129 ---------- 130 predict_fn : callable 131 Original predict function (ClaimsHistory -> credibility_factor). 132 calibration_result : CalibrationResult 133 Output of balance_calibrate(). 134 135 Returns 136 ------- 137 callable 138 New predict function that applies calibration to each prediction. 139 140 Examples 141 -------- 142 >>> cal = balance_calibrate(model.predict, histories) 143 >>> calibrated = calibrated_predict_fn(model.predict, cal) 144 >>> cf = calibrated(new_history) # already calibrated 145 """ 146 delta = calibration_result.calibration_factor 147 148 def _calibrated(history: ClaimsHistory) -> float: 149 return predict_fn(history) * delta 150 151 return _calibrated
Wrap a predict function to apply calibration automatically.
Parameters
predict_fn : callable Original predict function (ClaimsHistory -> credibility_factor). calibration_result : CalibrationResult Output of balance_calibrate().
Returns
callable New predict function that applies calibration to each prediction.
Examples
>>> cal = balance_calibrate(model.predict, histories)
>>> calibrated = calibrated_predict_fn(model.predict, cal)
>>> cf = calibrated(new_history) # already calibrated
154def balance_report( 155 predict_fn: Callable[[ClaimsHistory], float], 156 histories: list[ClaimsHistory], 157 by_n_periods: bool = False, 158) -> pl.DataFrame: 159 """Generate a portfolio-level balance report. 160 161 Shows the actual vs predicted claim frequency, and the credibility 162 factor distribution, optionally broken down by number of observed 163 periods (to check whether newer or longer-tenured policies have 164 systematic bias). 165 166 Parameters 167 ---------- 168 predict_fn : callable 169 Function mapping ClaimsHistory -> credibility_factor. 170 histories : list[ClaimsHistory] 171 Portfolio to assess. 172 by_n_periods : bool 173 If True, break down by number of observed periods. Default False. 174 175 Returns 176 ------- 177 pl.DataFrame 178 Summary statistics: policy_id, n_periods, actual_frequency, 179 prior_premium, credibility_factor, posterior_premium, 180 and residual (actual / posterior). 181 """ 182 rows = [] 183 for h in histories: 184 assert h.exposures is not None 185 cf = predict_fn(h) 186 posterior = h.prior_premium * cf 187 actual_freq = h.claim_frequency 188 residual = actual_freq / posterior if posterior > 0 else float("nan") 189 rows.append( 190 { 191 "policy_id": h.policy_id, 192 "n_periods": h.n_periods, 193 "total_exposure": h.total_exposure, 194 "actual_frequency": actual_freq, 195 "prior_premium": h.prior_premium, 196 "credibility_factor": cf, 197 "posterior_premium": posterior, 198 "residual": residual, 199 } 200 ) 201 df = pl.DataFrame(rows) 202 203 if by_n_periods: 204 return ( 205 df.group_by("n_periods") 206 .agg( 207 [ 208 pl.col("actual_frequency").mean().alias("mean_actual"), 209 pl.col("posterior_premium").mean().alias("mean_posterior"), 210 pl.col("credibility_factor").mean().alias("mean_cf"), 211 pl.col("residual").mean().alias("mean_residual"), 212 pl.len().alias("n_policies"), 213 ] 214 ) 215 .sort("n_periods") 216 ) 217 218 return df
Generate a portfolio-level balance report.
Shows the actual vs predicted claim frequency, and the credibility factor distribution, optionally broken down by number of observed periods (to check whether newer or longer-tenured policies have systematic bias).
Parameters
predict_fn : callable Function mapping ClaimsHistory -> credibility_factor. histories : list[ClaimsHistory] Portfolio to assess. by_n_periods : bool If True, break down by number of observed periods. Default False.
Returns
pl.DataFrame Summary statistics: policy_id, n_periods, actual_frequency, prior_premium, credibility_factor, posterior_premium, and residual (actual / posterior).
16def credibility_factor(posterior_premium: float, prior_premium: float) -> float: 17 """Compute the multiplicative credibility factor. 18 19 The credibility factor CF satisfies: 20 posterior_premium = prior_premium * CF 21 22 Parameters 23 ---------- 24 posterior_premium : float 25 The experience-adjusted (a posteriori) premium. 26 prior_premium : float 27 The a priori (GLM-based) premium. 28 29 Returns 30 ------- 31 float 32 Credibility factor. Values > 1 indicate the policy is worse than 33 the GLM base rate; values < 1 indicate better-than-average experience. 34 35 Raises 36 ------ 37 ValueError 38 If prior_premium is zero or negative. 39 """ 40 if prior_premium <= 0.0: 41 raise ValueError(f"prior_premium must be positive, got {prior_premium}") 42 return posterior_premium / prior_premium
Compute the multiplicative credibility factor.
The credibility factor CF satisfies:
posterior_premium = prior_premium * CF
Parameters
posterior_premium : float The experience-adjusted (a posteriori) premium. prior_premium : float The a priori (GLM-based) premium.
Returns
float Credibility factor. Values > 1 indicate the policy is worse than the GLM base rate; values < 1 indicate better-than-average experience.
Raises
ValueError If prior_premium is zero or negative.
68def seniority_weights( 69 n_periods: int, 70 p: float, 71 q: float, 72 exposures: list[float] | None = None, 73) -> np.ndarray: 74 """Compute seniority (recency) weights for claims history periods. 75 76 In the dynamic Poisson-gamma model, the effective weight on period t's 77 claims decays geometrically with age. This function computes the 78 relative weight assigned to each period in a t-period history when 79 fitting or predicting. 80 81 The weight assigned to period s (0-indexed from oldest) in a history 82 of length t is proportional to (p*q)^(t-s). The most recent period 83 receives weight 1; the oldest receives weight (p*q)^(t-1). 84 85 Parameters 86 ---------- 87 n_periods : int 88 Number of observation periods. 89 p : float 90 Transition parameter controlling how strongly the current state 91 reverts to mean. Must be in (0, 1]. 92 q : float 93 Decay parameter controlling how quickly older claims are 94 downweighted. Must be in (0, 1]. 95 exposures : list[float] or None 96 Per-period exposure weights. If None, uniform exposure is assumed. 97 98 Returns 99 ------- 100 np.ndarray 101 Array of shape (n_periods,) with seniority weights, normalised 102 to sum to 1. Most recent period is last. 103 104 Examples 105 -------- 106 >>> seniority_weights(3, p=0.9, q=0.8) 107 array([0.182..., 0.274..., 0.543...]) # most recent gets highest weight 108 """ 109 if not (0 < p <= 1.0): 110 raise ValueError(f"p must be in (0, 1], got {p}") 111 if not (0 < q <= 1.0): 112 raise ValueError(f"q must be in (0, 1], got {q}") 113 114 decay = p * q 115 # Weights for periods 0, 1, ..., n-1 (oldest to most recent) 116 raw = np.array([decay ** (n_periods - 1 - s) for s in range(n_periods)]) 117 118 if exposures is not None: 119 raw = raw * np.asarray(exposures, dtype=float) 120 121 total = raw.sum() 122 if total == 0.0: 123 return np.ones(n_periods) / n_periods 124 return raw / total
Compute seniority (recency) weights for claims history periods.
In the dynamic Poisson-gamma model, the effective weight on period t's claims decays geometrically with age. This function computes the relative weight assigned to each period in a t-period history when fitting or predicting.
The weight assigned to period s (0-indexed from oldest) in a history of length t is proportional to (pq)^(t-s). The most recent period receives weight 1; the oldest receives weight (pq)^(t-1).
Parameters
n_periods : int Number of observation periods. p : float Transition parameter controlling how strongly the current state reverts to mean. Must be in (0, 1]. q : float Decay parameter controlling how quickly older claims are downweighted. Must be in (0, 1]. exposures : list[float] or None Per-period exposure weights. If None, uniform exposure is assumed.
Returns
np.ndarray Array of shape (n_periods,) with seniority weights, normalised to sum to 1. Most recent period is last.
Examples
>>> seniority_weights(3, p=0.9, q=0.8)
array([0.182..., 0.274..., 0.543...]) # most recent gets highest weight
127def exposure_weighted_mean( 128 counts: list[int], 129 exposures: list[float], 130) -> float: 131 """Compute exposure-weighted mean claim frequency. 132 133 Parameters 134 ---------- 135 counts : list[int] 136 Claim counts per period. 137 exposures : list[float] 138 Exposure (e.g., years on risk) per period. 139 140 Returns 141 ------- 142 float 143 Total claims divided by total exposure. Returns 0.0 if total 144 exposure is zero. 145 """ 146 total_exp = sum(exposures) 147 if total_exp == 0.0: 148 return 0.0 149 return sum(counts) / total_exp
Compute exposure-weighted mean claim frequency.
Parameters
counts : list[int] Claim counts per period. exposures : list[float] Exposure (e.g., years on risk) per period.
Returns
float Total claims divided by total exposure. Returns 0.0 if total exposure is zero.
152def history_sufficient_stat( 153 history: ClaimsHistory, 154 theta_ref: float | None = None, 155) -> float: 156 """Compute the log-likelihood sufficient statistic for a claims history. 157 158 For Poisson models with a reference intensity theta_ref, the sufficient 159 statistic is the log-likelihood of the observed claim counts: 160 161 L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ] 162 163 When theta_ref is None, uses the empirical frequency as reference. 164 165 This is the input to the surrogate model's g(.) function. 166 167 Parameters 168 ---------- 169 history : ClaimsHistory 170 The policy's claims history. 171 theta_ref : float or None 172 Reference intensity (claims per exposure unit). If None, uses the 173 empirical claim frequency from the history. 174 175 Returns 176 ------- 177 float 178 Log-likelihood sufficient statistic. 179 """ 180 assert history.exposures is not None 181 if theta_ref is None: 182 theta_ref = history.claim_frequency 183 if theta_ref == 0.0: 184 theta_ref = 1e-6 # avoid log(0) 185 186 log_lik = 0.0 187 for y_t, e_t in zip(history.claim_counts, history.exposures): 188 rate = theta_ref * e_t 189 log_lik += y_t * np.log(rate) - rate 190 return log_lik
Compute the log-likelihood sufficient statistic for a claims history.
For Poisson models with a reference intensity theta_ref, the sufficient statistic is the log-likelihood of the observed claim counts:
L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ]
When theta_ref is None, uses the empirical frequency as reference.
This is the input to the surrogate model's g(.) function.
Parameters
history : ClaimsHistory The policy's claims history. theta_ref : float or None Reference intensity (claims per exposure unit). If None, uses the empirical claim frequency from the history.
Returns
float Log-likelihood sufficient statistic.