Edit on GitHub

insurance_credibility

insurance-credibility: Credibility models for UK non-life insurance pricing.

Two subpackages covering the full credibility toolkit:

classical Bühlmann-Straub (1970) group credibility and its hierarchical extension (Jewell 1975) for nested structures (scheme → book, sector → district → area).

experience Individual policy-level Bayesian experience rating. Four model tiers: static Bühlmann-Straub, dynamic Poisson-gamma state-space, IS-surrogate, and deep attention (Wüthrich 2024).

Quick start::

# Group-level credibility (scheme pricing)
from insurance_credibility import BuhlmannStraub
bs = BuhlmannStraub()
bs.fit(df, group_col="scheme", period_col="year",
       loss_col="loss_rate", weight_col="exposure")

# Individual policy experience rating
from insurance_credibility import ClaimsHistory, StaticCredibilityModel
model = StaticCredibilityModel()
model.fit(histories)
cf = model.predict(history)
  1"""
  2insurance-credibility: Credibility models for UK non-life insurance pricing.
  3
  4Two subpackages covering the full credibility toolkit:
  5
  6classical
  7    Bühlmann-Straub (1970) group credibility and its hierarchical extension
  8    (Jewell 1975) for nested structures (scheme → book, sector → district → area).
  9
 10experience
 11    Individual policy-level Bayesian experience rating. Four model tiers:
 12    static Bühlmann-Straub, dynamic Poisson-gamma state-space, IS-surrogate,
 13    and deep attention (Wüthrich 2024).
 14
 15Quick start::
 16
 17    # Group-level credibility (scheme pricing)
 18    from insurance_credibility import BuhlmannStraub
 19    bs = BuhlmannStraub()
 20    bs.fit(df, group_col="scheme", period_col="year",
 21           loss_col="loss_rate", weight_col="exposure")
 22
 23    # Individual policy experience rating
 24    from insurance_credibility import ClaimsHistory, StaticCredibilityModel
 25    model = StaticCredibilityModel()
 26    model.fit(histories)
 27    cf = model.predict(history)
 28"""
 29
 30# Classical credibility
 31from .classical import BuhlmannStraub, HierarchicalBuhlmannStraub, LevelResult
 32
 33# Experience rating data types
 34from .experience import CalibrationResult, ClaimsHistory
 35
 36# Experience rating models
 37from .experience import (
 38    DynamicPoissonGammaModel,
 39    StaticCredibilityModel,
 40    SurrogateModel,
 41)
 42
 43# Experience rating calibration
 44from .experience import (
 45    apply_calibration,
 46    balance_calibrate,
 47    balance_report,
 48    calibrated_predict_fn,
 49)
 50
 51# Experience rating utilities
 52from .experience import (
 53    credibility_factor,
 54    exposure_weighted_mean,
 55    history_sufficient_stat,
 56    posterior_premium,
 57    seniority_weights,
 58)
 59
 60from importlib.metadata import version, PackageNotFoundError
 61
 62try:
 63    __version__ = version("insurance-credibility")
 64except PackageNotFoundError:
 65    __version__ = "0.0.0"  # not installed
 66
 67__all__ = [
 68    # Classical
 69    "BuhlmannStraub",
 70    "HierarchicalBuhlmannStraub",
 71    "LevelResult",
 72    # Experience — data types
 73    "ClaimsHistory",
 74    "CalibrationResult",
 75    # Experience — models
 76    "StaticCredibilityModel",
 77    "DynamicPoissonGammaModel",
 78    "SurrogateModel",
 79    "DeepAttentionModel",
 80    # Experience — calibration
 81    "balance_calibrate",
 82    "apply_calibration",
 83    "calibrated_predict_fn",
 84    "balance_report",
 85    # Experience — utilities
 86    "credibility_factor",
 87    "posterior_premium",
 88    "seniority_weights",
 89    "exposure_weighted_mean",
 90    "history_sufficient_stat",
 91    # Meta
 92    "__version__",
 93]
 94
 95
 96def __getattr__(name: str):
 97    """Lazy import for optional torch-dependent classes."""
 98    if name == "DeepAttentionModel":
 99        from .experience.attention import DeepAttentionModel
100
101        return DeepAttentionModel
102    raise AttributeError(f"module 'insurance_credibility' has no attribute {name!r}")
class BuhlmannStraub:
 59class BuhlmannStraub:
 60    """
 61    Bühlmann-Straub credibility model with non-parametric structural parameter
 62    estimation.
 63
 64    Fit the model to a panel dataset - one row per (group, period) - where
 65    each row has a loss rate and an exposure weight. The model estimates the
 66    structural parameters v (within-group variance) and a (between-group
 67    variance) from the data itself, then computes a credibility factor Z_i
 68    for each group.
 69
 70    The result is a credibility premium for each group that blends its own
 71    weighted-average loss rate with the portfolio mean, with the blend governed
 72    entirely by the ratio K = v/a and the group's total exposure.
 73
 74    Parameters
 75    ----------
 76    truncate_a : bool, default True
 77        If True (the default), truncate a_hat at zero when the between-group
 78        variance estimate is negative. This sets K → ∞ and Z_i → 0 for all
 79        groups, so every group gets the collective mean. If False, a negative
 80        a_hat raises an error instead.
 81
 82    Examples
 83    --------
 84    >>> import polars as pl
 85    >>> from insurance_credibility import BuhlmannStraub
 86    >>>
 87    >>> df = pl.DataFrame({
 88    ...     "scheme": ["A", "A", "A", "B", "B", "B", "C", "C", "C"],
 89    ...     "year":   [2021, 2022, 2023] * 3,
 90    ...     "loss_rate": [0.55, 0.60, 0.58, 0.80, 0.75, 0.82, 0.40, 0.42, 0.38],
 91    ...     "exposure": [1000, 1200, 1100, 300, 350, 320, 5000, 4800, 5200],
 92    ... })
 93    >>> bs = BuhlmannStraub()
 94    >>> bs.fit(df, group_col="scheme", period_col="year",
 95    ...        loss_col="loss_rate", weight_col="exposure")
 96    >>> bs.summary()
 97    """
 98
 99    def __init__(self, truncate_a: bool = True) -> None:
100        self.truncate_a = truncate_a
101
102        # Fitted attributes - set by .fit()
103        self._mu_hat: Optional[float] = None
104        self._v_hat: Optional[float] = None
105        self._a_hat: Optional[float] = None
106        self._k: Optional[float] = None
107        self._z: Optional[pl.DataFrame] = None
108        self._premiums: Optional[pl.DataFrame] = None
109        self._fitted = False
110
111    # ------------------------------------------------------------------
112    # Scikit-learn style fit
113    # ------------------------------------------------------------------
114
115    def fit(
116        self,
117        data: Union[pl.DataFrame, "pd.DataFrame"],  # type: ignore[name-defined]
118        group_col: str = "group",
119        period_col: str = "period",
120        loss_col: str = "loss",
121        weight_col: str = "weight",
122    ) -> "BuhlmannStraub":
123        """
124        Estimate structural parameters and credibility factors from panel data.
125
126        Parameters
127        ----------
128        data:
129            A Polars DataFrame (preferred) or pandas DataFrame (converted
130            internally) with one row per (group, period). Columns need not be
131            in any particular order.
132        group_col:
133            Column identifying the group (e.g. scheme ID, territory, NCD class).
134        period_col:
135            Column identifying the time period (e.g. accident year). Used only
136            to count periods per group; the ordering does not matter.
137        loss_col:
138            Column containing the loss rate or loss ratio. This should be losses
139            per unit of exposure - not total losses. If you have total losses,
140            divide by the exposure column before fitting.
141        weight_col:
142            Column containing the exposure weight (e.g. earned car years,
143            earned premium, policy count). Must be strictly positive.
144
145        Returns
146        -------
147        self
148            Returns the fitted estimator so calls can be chained.
149        """
150        data = _to_polars(data)
151
152        validate_panel_data(data, group_col, period_col, loss_col, weight_col)
153        check_duplicate_periods(data, group_col, period_col)
154
155        self._group_col = group_col
156        self._period_col = period_col
157        self._loss_col = loss_col
158        self._weight_col = weight_col
159
160        # Build group-level summary: w_i, X_bar_i, T_i
161        groups = self._build_group_summary(data, group_col, period_col, loss_col, weight_col)
162        self._groups = groups
163
164        # Estimate structural parameters
165        mu_hat, v_hat, a_hat_raw = self._estimate_structural_params(
166            data, groups, group_col, period_col, loss_col, weight_col
167        )
168
169        # Handle negative a_hat
170        if a_hat_raw <= 0:
171            if self.truncate_a:
172                warnings.warn(
173                    f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. "
174                    "Truncating to zero. This means the model finds no detectable "
175                    "heterogeneity between groups - all groups will receive the "
176                    "collective mean as their credibility premium (Z_i = 0). "
177                    "Consider whether your data genuinely lacks between-group "
178                    "variation, or whether you have too few groups to estimate a reliably.",
179                    stacklevel=2,
180                )
181                a_hat = 0.0
182                k = np.inf
183            else:
184                raise ValueError(
185                    f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. "
186                    "Set truncate_a=True to handle this automatically."
187                )
188        else:
189            a_hat = a_hat_raw
190            k = v_hat / a_hat
191
192        # Credibility factors
193        w = groups["w_i"].to_numpy()
194        if np.isinf(k):
195            z_values = np.zeros(len(groups))
196        else:
197            z_values = w / (w + k)
198
199        group_ids = groups["group"].to_list()
200        x_bar = groups["x_bar_i"].to_numpy()
201
202        # z_ is a Polars DataFrame with columns ["group", "Z"]
203        z_df = pl.DataFrame({
204            "group": group_ids,
205            "Z": z_values,
206        })
207
208        # premiums_ is a Polars DataFrame with a "group" column (no index)
209        premiums = pl.DataFrame({
210            "group": group_ids,
211            "exposure": w,
212            "observed_mean": x_bar,
213            "Z": z_values,
214            "credibility_premium": z_values * x_bar + (1 - z_values) * mu_hat,
215            "complement": np.full(len(groups), mu_hat),
216        })
217
218        self._mu_hat = float(mu_hat)
219        self._v_hat = float(v_hat)
220        self._a_hat = float(a_hat)
221        self._a_hat_raw = float(a_hat_raw)
222        self._k = float(k) if not np.isinf(k) else np.inf
223        self._z = z_df
224        self._premiums = premiums
225        self._fitted = True
226
227        return self
228
229    # ------------------------------------------------------------------
230    # Structural parameter properties
231    # ------------------------------------------------------------------
232
233    @property
234    def mu_hat_(self) -> float:
235        """
236        Collective mean - the grand weighted average loss rate across all groups.
237
238        This is the complement: what every group's credibility premium regresses
239        toward as exposure shrinks.
240        """
241        self._check_fitted()
242        return self._mu_hat  # type: ignore[return-value]
243
244    @property
245    def v_hat_(self) -> float:
246        """
247        EPV - Expected value of Process Variance.
248
249        Measures within-group year-to-year volatility, averaged across groups.
250        High v_hat means individual group experience is noisy - lean on the
251        collective mean.
252        """
253        self._check_fitted()
254        return self._v_hat  # type: ignore[return-value]
255
256    @property
257    def a_hat_(self) -> float:
258        """
259        VHM - Variance of Hypothetical Means.
260
261        Measures how much the true underlying loss rates differ between groups.
262        High a_hat means groups are genuinely heterogeneous - trust their own
263        experience. Truncated at zero if truncate_a=True.
264        """
265        self._check_fitted()
266        return self._a_hat  # type: ignore[return-value]
267
268    @property
269    def k_(self) -> float:
270        """
271        Bühlmann's k - the credibility parameter.
272
273        k = v / a. This is the exposure a group needs to achieve Z = 0.5 (equal
274        weight on own experience and collective mean). A group with exposure w_i
275        has Z_i = w_i / (w_i + k).
276
277        Returns np.inf when a_hat is zero (all groups get the collective mean).
278        """
279        self._check_fitted()
280        return self._k  # type: ignore[return-value]
281
282    @property
283    def z_(self) -> pl.DataFrame:
284        """
285        Credibility factors - a Polars DataFrame with columns ["group", "Z"].
286
287        Z_i = w_i / (w_i + k), ranging from 0 (no credibility, use collective
288        mean) to 1 (full credibility, use group's own experience).
289
290        To look up a specific group::
291
292            bs.z_.filter(pl.col("group") == "Motor Guild")["Z"][0]
293
294        To find the group with the highest Z::
295
296            bs.z_.sort("Z", descending=True)["group"][0]
297        """
298        self._check_fitted()
299        return self._z  # type: ignore[return-value]
300
301    @property
302    def premiums_(self) -> pl.DataFrame:
303        """
304        Credibility premiums - one row per group, as a Polars DataFrame.
305
306        Columns:
307
308        - ``group``: group identifier
309        - ``exposure``: total exposure weight for the group
310        - ``observed_mean``: exposure-weighted average loss rate
311        - ``Z``: credibility factor
312        - ``credibility_premium``: Z * observed_mean + (1 - Z) * mu_hat
313        - ``complement``: mu_hat (the collective mean complement)
314        """
315        self._check_fitted()
316        return self._premiums  # type: ignore[return-value]
317
318    # ------------------------------------------------------------------
319    # Output
320    # ------------------------------------------------------------------
321
322    def summary(self) -> pl.DataFrame:
323        """
324        Return a formatted summary of the fit.
325
326        The table shows structural parameters at the top and per-group credibility
327        results below, formatted for an actuarial audience.
328
329        Returns
330        -------
331        pl.DataFrame
332            Per-group results. Structural parameters are printed to stdout.
333        """
334        self._check_fitted()
335
336        print("Bühlmann-Straub Credibility Model")
337        print("=" * 42)
338        print(f"  Collective mean    mu  = {self._mu_hat:.6g}")
339        print(f"  Process variance   v   = {self._v_hat:.6g}   (EPV, within-group)")
340        print(f"  Between-group var  a   = {self._a_hat:.6g}   (VHM, between-group)")
341        if np.isinf(self._k):
342            print(f"  Credibility param  k   = inf   (a ≤ 0, all Z = 0)")
343        else:
344            print(f"  Credibility param  k   = {self._k:.6g}   (v / a)")
345        print()
346        print("  Interpretation: a group needs exposure = k to achieve Z = 0.50")
347        if self._a_hat_raw <= 0:
348            print(f"  (raw a_hat before truncation: {self._a_hat_raw:.6g})")
349        print()
350
351        tbl = self._premiums.rename({
352            "exposure": "Exposure",
353            "observed_mean": "Obs. Mean",
354            "Z": "Z",
355            "credibility_premium": "Cred. Premium",
356            "complement": "Complement",
357        })
358        return tbl
359
360    # ------------------------------------------------------------------
361    # Internal helpers
362    # ------------------------------------------------------------------
363
364    @staticmethod
365    def _build_group_summary(
366        data: pl.DataFrame,
367        group_col: str,
368        period_col: str,
369        loss_col: str,
370        weight_col: str,
371    ) -> pl.DataFrame:
372        """
373        Compute per-group summary statistics needed for parameter estimation.
374
375        Returns a Polars DataFrame with columns:
376            group   : group identifier (same type as group_col)
377            w_i     : total exposure (sum of weights)
378            x_bar_i : exposure-weighted mean loss rate
379            T_i     : number of periods
380        """
381        groups = (
382            data.group_by(group_col)
383            .agg([
384                pl.col(weight_col).sum().alias("w_i"),
385                (
386                    (pl.col(weight_col) * pl.col(loss_col)).sum()
387                    / pl.col(weight_col).sum()
388                ).alias("x_bar_i"),
389                pl.len().alias("T_i"),
390            ])
391            .rename({group_col: "group"})
392            .sort("group")
393        )
394        return groups
395
396    @staticmethod
397    def _estimate_structural_params(
398        data: pl.DataFrame,
399        groups: pl.DataFrame,
400        group_col: str,
401        period_col: str,
402        loss_col: str,
403        weight_col: str,
404    ) -> tuple[float, float, float]:
405        """
406        Compute unbiased estimates of mu, v, and a.
407
408        These are the standard Bühlmann-Straub (1970) non-parametric estimators,
409        as presented in Bühlmann & Gisler (2005), Chapter 4.
410
411        Parameters
412        ----------
413        data:
414            Full panel data (Polars DataFrame).
415        groups:
416            Per-group summary from _build_group_summary.
417
418        Returns
419        -------
420        mu_hat, v_hat, a_hat_raw
421            a_hat_raw may be negative - the caller decides how to handle it.
422        """
423        w = groups["w_i"].to_numpy()
424        x_bar = groups["x_bar_i"].to_numpy()
425        T = groups["T_i"].to_numpy()
426        group_ids = groups["group"].to_list()
427        r = len(groups)
428
429        # Collective mean: grand weighted average
430        w_total = w.sum()
431        mu_hat = (w * x_bar).sum() / w_total
432
433        # v_hat: within-group weighted mean squared deviation
434        # v_hat = (1 / sum_i(T_i - 1)) * sum_i sum_j [ w_{ij} * (X_{ij} - X_bar_i)^2 ]
435        denom_v = (T - 1).sum()  # degrees of freedom for v
436
437        # Build a lookup from group id → x_bar_i for the within-group deviation
438        x_bar_map = dict(zip(group_ids, x_bar))
439
440        # Join x_bar_i into the full data, then compute the weighted squared deviation
441        # We use a join rather than a Python loop for efficiency
442        x_bar_lf = pl.DataFrame({
443            group_col: group_ids,
444            "_x_bar_i": x_bar,
445        }).lazy()
446
447        numerator_v = (
448            data.lazy()
449            .join(x_bar_lf, on=group_col, how="left")
450            .with_columns(
451                (
452                    pl.col(weight_col) * (pl.col(loss_col) - pl.col("_x_bar_i")) ** 2
453                ).alias("_sq_dev")
454            )
455            .select(pl.col("_sq_dev").sum())
456            .collect()
457            ["_sq_dev"][0]
458        )
459
460        v_hat = float(numerator_v) / denom_v if denom_v > 0 else 0.0
461
462        # a_hat: between-group variance
463        # c = w_total - sum_i(w_i^2) / w_total
464        # s2 = sum_i [ w_i * (x_bar_i - mu_hat)^2 ]
465        # a_hat = (s2 - (r - 1) * v_hat) / c
466        c = w_total - (w ** 2).sum() / w_total
467        s2 = (w * (x_bar - mu_hat) ** 2).sum()
468        a_hat_raw = (s2 - (r - 1) * v_hat) / c
469
470        return float(mu_hat), float(v_hat), float(a_hat_raw)
471
472    def _check_fitted(self) -> None:
473        if not self._fitted:
474            raise RuntimeError(
475                "Model has not been fitted. Call .fit() first."
476            )
477
478    def __repr__(self) -> str:
479        if not self._fitted:
480            return "BuhlmannStraub(not fitted)"
481        return (
482            f"BuhlmannStraub("
483            f"mu={self._mu_hat:.4g}, "
484            f"v={self._v_hat:.4g}, "
485            f"a={self._a_hat:.4g}, "
486            f"k={self._k:.4g})"
487        )

Bühlmann-Straub credibility model with non-parametric structural parameter estimation.

Fit the model to a panel dataset - one row per (group, period) - where each row has a loss rate and an exposure weight. The model estimates the structural parameters v (within-group variance) and a (between-group variance) from the data itself, then computes a credibility factor Z_i for each group.

The result is a credibility premium for each group that blends its own weighted-average loss rate with the portfolio mean, with the blend governed entirely by the ratio K = v/a and the group's total exposure.

Parameters

truncate_a : bool, default True If True (the default), truncate a_hat at zero when the between-group variance estimate is negative. This sets K → ∞ and Z_i → 0 for all groups, so every group gets the collective mean. If False, a negative a_hat raises an error instead.

Examples

>>> import polars as pl
>>> from insurance_credibility import BuhlmannStraub
>>>
>>> df = pl.DataFrame({
...     "scheme": ["A", "A", "A", "B", "B", "B", "C", "C", "C"],
...     "year":   [2021, 2022, 2023] * 3,
...     "loss_rate": [0.55, 0.60, 0.58, 0.80, 0.75, 0.82, 0.40, 0.42, 0.38],
...     "exposure": [1000, 1200, 1100, 300, 350, 320, 5000, 4800, 5200],
... })
>>> bs = BuhlmannStraub()
>>> bs.fit(df, group_col="scheme", period_col="year",
...        loss_col="loss_rate", weight_col="exposure")
>>> bs.summary()
BuhlmannStraub(truncate_a: bool = True)
 99    def __init__(self, truncate_a: bool = True) -> None:
100        self.truncate_a = truncate_a
101
102        # Fitted attributes - set by .fit()
103        self._mu_hat: Optional[float] = None
104        self._v_hat: Optional[float] = None
105        self._a_hat: Optional[float] = None
106        self._k: Optional[float] = None
107        self._z: Optional[pl.DataFrame] = None
108        self._premiums: Optional[pl.DataFrame] = None
109        self._fitted = False
truncate_a
def fit( self, data: "Union[pl.DataFrame, 'pd.DataFrame']", group_col: str = 'group', period_col: str = 'period', loss_col: str = 'loss', weight_col: str = 'weight') -> BuhlmannStraub:
115    def fit(
116        self,
117        data: Union[pl.DataFrame, "pd.DataFrame"],  # type: ignore[name-defined]
118        group_col: str = "group",
119        period_col: str = "period",
120        loss_col: str = "loss",
121        weight_col: str = "weight",
122    ) -> "BuhlmannStraub":
123        """
124        Estimate structural parameters and credibility factors from panel data.
125
126        Parameters
127        ----------
128        data:
129            A Polars DataFrame (preferred) or pandas DataFrame (converted
130            internally) with one row per (group, period). Columns need not be
131            in any particular order.
132        group_col:
133            Column identifying the group (e.g. scheme ID, territory, NCD class).
134        period_col:
135            Column identifying the time period (e.g. accident year). Used only
136            to count periods per group; the ordering does not matter.
137        loss_col:
138            Column containing the loss rate or loss ratio. This should be losses
139            per unit of exposure - not total losses. If you have total losses,
140            divide by the exposure column before fitting.
141        weight_col:
142            Column containing the exposure weight (e.g. earned car years,
143            earned premium, policy count). Must be strictly positive.
144
145        Returns
146        -------
147        self
148            Returns the fitted estimator so calls can be chained.
149        """
150        data = _to_polars(data)
151
152        validate_panel_data(data, group_col, period_col, loss_col, weight_col)
153        check_duplicate_periods(data, group_col, period_col)
154
155        self._group_col = group_col
156        self._period_col = period_col
157        self._loss_col = loss_col
158        self._weight_col = weight_col
159
160        # Build group-level summary: w_i, X_bar_i, T_i
161        groups = self._build_group_summary(data, group_col, period_col, loss_col, weight_col)
162        self._groups = groups
163
164        # Estimate structural parameters
165        mu_hat, v_hat, a_hat_raw = self._estimate_structural_params(
166            data, groups, group_col, period_col, loss_col, weight_col
167        )
168
169        # Handle negative a_hat
170        if a_hat_raw <= 0:
171            if self.truncate_a:
172                warnings.warn(
173                    f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. "
174                    "Truncating to zero. This means the model finds no detectable "
175                    "heterogeneity between groups - all groups will receive the "
176                    "collective mean as their credibility premium (Z_i = 0). "
177                    "Consider whether your data genuinely lacks between-group "
178                    "variation, or whether you have too few groups to estimate a reliably.",
179                    stacklevel=2,
180                )
181                a_hat = 0.0
182                k = np.inf
183            else:
184                raise ValueError(
185                    f"Between-group variance estimate a_hat = {a_hat_raw:.6g} ≤ 0. "
186                    "Set truncate_a=True to handle this automatically."
187                )
188        else:
189            a_hat = a_hat_raw
190            k = v_hat / a_hat
191
192        # Credibility factors
193        w = groups["w_i"].to_numpy()
194        if np.isinf(k):
195            z_values = np.zeros(len(groups))
196        else:
197            z_values = w / (w + k)
198
199        group_ids = groups["group"].to_list()
200        x_bar = groups["x_bar_i"].to_numpy()
201
202        # z_ is a Polars DataFrame with columns ["group", "Z"]
203        z_df = pl.DataFrame({
204            "group": group_ids,
205            "Z": z_values,
206        })
207
208        # premiums_ is a Polars DataFrame with a "group" column (no index)
209        premiums = pl.DataFrame({
210            "group": group_ids,
211            "exposure": w,
212            "observed_mean": x_bar,
213            "Z": z_values,
214            "credibility_premium": z_values * x_bar + (1 - z_values) * mu_hat,
215            "complement": np.full(len(groups), mu_hat),
216        })
217
218        self._mu_hat = float(mu_hat)
219        self._v_hat = float(v_hat)
220        self._a_hat = float(a_hat)
221        self._a_hat_raw = float(a_hat_raw)
222        self._k = float(k) if not np.isinf(k) else np.inf
223        self._z = z_df
224        self._premiums = premiums
225        self._fitted = True
226
227        return self

Estimate structural parameters and credibility factors from panel data.

Parameters

data: A Polars DataFrame (preferred) or pandas DataFrame (converted internally) with one row per (group, period). Columns need not be in any particular order. group_col: Column identifying the group (e.g. scheme ID, territory, NCD class). period_col: Column identifying the time period (e.g. accident year). Used only to count periods per group; the ordering does not matter. loss_col: Column containing the loss rate or loss ratio. This should be losses per unit of exposure - not total losses. If you have total losses, divide by the exposure column before fitting. weight_col: Column containing the exposure weight (e.g. earned car years, earned premium, policy count). Must be strictly positive.

Returns

self Returns the fitted estimator so calls can be chained.

mu_hat_: float
233    @property
234    def mu_hat_(self) -> float:
235        """
236        Collective mean - the grand weighted average loss rate across all groups.
237
238        This is the complement: what every group's credibility premium regresses
239        toward as exposure shrinks.
240        """
241        self._check_fitted()
242        return self._mu_hat  # type: ignore[return-value]

Collective mean - the grand weighted average loss rate across all groups.

This is the complement: what every group's credibility premium regresses toward as exposure shrinks.

v_hat_: float
244    @property
245    def v_hat_(self) -> float:
246        """
247        EPV - Expected value of Process Variance.
248
249        Measures within-group year-to-year volatility, averaged across groups.
250        High v_hat means individual group experience is noisy - lean on the
251        collective mean.
252        """
253        self._check_fitted()
254        return self._v_hat  # type: ignore[return-value]

EPV - Expected value of Process Variance.

Measures within-group year-to-year volatility, averaged across groups. High v_hat means individual group experience is noisy - lean on the collective mean.

a_hat_: float
256    @property
257    def a_hat_(self) -> float:
258        """
259        VHM - Variance of Hypothetical Means.
260
261        Measures how much the true underlying loss rates differ between groups.
262        High a_hat means groups are genuinely heterogeneous - trust their own
263        experience. Truncated at zero if truncate_a=True.
264        """
265        self._check_fitted()
266        return self._a_hat  # type: ignore[return-value]

VHM - Variance of Hypothetical Means.

Measures how much the true underlying loss rates differ between groups. High a_hat means groups are genuinely heterogeneous - trust their own experience. Truncated at zero if truncate_a=True.

k_: float
268    @property
269    def k_(self) -> float:
270        """
271        Bühlmann's k - the credibility parameter.
272
273        k = v / a. This is the exposure a group needs to achieve Z = 0.5 (equal
274        weight on own experience and collective mean). A group with exposure w_i
275        has Z_i = w_i / (w_i + k).
276
277        Returns np.inf when a_hat is zero (all groups get the collective mean).
278        """
279        self._check_fitted()
280        return self._k  # type: ignore[return-value]

Bühlmann's k - the credibility parameter.

k = v / a. This is the exposure a group needs to achieve Z = 0.5 (equal weight on own experience and collective mean). A group with exposure w_i has Z_i = w_i / (w_i + k).

Returns np.inf when a_hat is zero (all groups get the collective mean).

z_: polars.dataframe.frame.DataFrame
282    @property
283    def z_(self) -> pl.DataFrame:
284        """
285        Credibility factors - a Polars DataFrame with columns ["group", "Z"].
286
287        Z_i = w_i / (w_i + k), ranging from 0 (no credibility, use collective
288        mean) to 1 (full credibility, use group's own experience).
289
290        To look up a specific group::
291
292            bs.z_.filter(pl.col("group") == "Motor Guild")["Z"][0]
293
294        To find the group with the highest Z::
295
296            bs.z_.sort("Z", descending=True)["group"][0]
297        """
298        self._check_fitted()
299        return self._z  # type: ignore[return-value]

Credibility factors - a Polars DataFrame with columns ["group", "Z"].

Z_i = w_i / (w_i + k), ranging from 0 (no credibility, use collective mean) to 1 (full credibility, use group's own experience).

To look up a specific group::

bs.z_.filter(pl.col("group") == "Motor Guild")["Z"][0]

To find the group with the highest Z::

bs.z_.sort("Z", descending=True)["group"][0]
premiums_: polars.dataframe.frame.DataFrame
301    @property
302    def premiums_(self) -> pl.DataFrame:
303        """
304        Credibility premiums - one row per group, as a Polars DataFrame.
305
306        Columns:
307
308        - ``group``: group identifier
309        - ``exposure``: total exposure weight for the group
310        - ``observed_mean``: exposure-weighted average loss rate
311        - ``Z``: credibility factor
312        - ``credibility_premium``: Z * observed_mean + (1 - Z) * mu_hat
313        - ``complement``: mu_hat (the collective mean complement)
314        """
315        self._check_fitted()
316        return self._premiums  # type: ignore[return-value]

Credibility premiums - one row per group, as a Polars DataFrame.

Columns:

  • group: group identifier
  • exposure: total exposure weight for the group
  • observed_mean: exposure-weighted average loss rate
  • Z: credibility factor
  • credibility_premium: Z * observed_mean + (1 - Z) * mu_hat
  • complement: mu_hat (the collective mean complement)
def summary(self) -> polars.dataframe.frame.DataFrame:
322    def summary(self) -> pl.DataFrame:
323        """
324        Return a formatted summary of the fit.
325
326        The table shows structural parameters at the top and per-group credibility
327        results below, formatted for an actuarial audience.
328
329        Returns
330        -------
331        pl.DataFrame
332            Per-group results. Structural parameters are printed to stdout.
333        """
334        self._check_fitted()
335
336        print("Bühlmann-Straub Credibility Model")
337        print("=" * 42)
338        print(f"  Collective mean    mu  = {self._mu_hat:.6g}")
339        print(f"  Process variance   v   = {self._v_hat:.6g}   (EPV, within-group)")
340        print(f"  Between-group var  a   = {self._a_hat:.6g}   (VHM, between-group)")
341        if np.isinf(self._k):
342            print(f"  Credibility param  k   = inf   (a ≤ 0, all Z = 0)")
343        else:
344            print(f"  Credibility param  k   = {self._k:.6g}   (v / a)")
345        print()
346        print("  Interpretation: a group needs exposure = k to achieve Z = 0.50")
347        if self._a_hat_raw <= 0:
348            print(f"  (raw a_hat before truncation: {self._a_hat_raw:.6g})")
349        print()
350
351        tbl = self._premiums.rename({
352            "exposure": "Exposure",
353            "observed_mean": "Obs. Mean",
354            "Z": "Z",
355            "credibility_premium": "Cred. Premium",
356            "complement": "Complement",
357        })
358        return tbl

Return a formatted summary of the fit.

The table shows structural parameters at the top and per-group credibility results below, formatted for an actuarial audience.

Returns

pl.DataFrame Per-group results. Structural parameters are printed to stdout.

class HierarchicalBuhlmannStraub:
102class HierarchicalBuhlmannStraub:
103    """
104    Multi-level Bühlmann-Straub credibility model for nested group structures.
105
106    This model is appropriate when groups are arranged in a strict hierarchy -
107    every observation belongs to exactly one parent at each level. Typical uses:
108
109    - Geographic: postcode sector → district → area
110    - Organisational: risk → scheme → portfolio
111    - Time: accident quarter → accident year → underwriting year
112
113    Parameters
114    ----------
115    level_cols:
116        List of column names defining the hierarchy from outermost (top) to
117        innermost (bottom). For a three-level model, provide three names.
118        Example: ``["area", "district", "sector"]``.
119    truncate_a:
120        Whether to truncate negative between-group variance estimates at zero.
121        Passed through to the BuhlmannStraub model at each level.
122
123    Examples
124    --------
125    Geographic three-level model::
126
127        >>> model = HierarchicalBuhlmannStraub(
128        ...     level_cols=["region", "district", "sector"]
129        ... )
130        >>> model.fit(
131        ...     data=df,
132        ...     period_col="year",
133        ...     loss_col="loss_rate",
134        ...     weight_col="exposure",
135        ... )
136        >>> model.premiums_at("sector")   # blended sector-level premiums
137        >>> model.level_results_["district"].k   # k at district level
138
139    Two-level scheme model (equivalent to BuhlmannStraub with a book-level prior)::
140
141        >>> model = HierarchicalBuhlmannStraub(level_cols=["book", "scheme"])
142        >>> model.fit(df, period_col="year", loss_col="loss_rate", weight_col="exposure")
143    """
144
145    def __init__(
146        self,
147        level_cols: List[str],
148        truncate_a: bool = True,
149    ) -> None:
150        if len(level_cols) < 2:
151            raise ValueError(
152                "At least two levels are required for a hierarchical model. "
153                "For a single-level model, use BuhlmannStraub."
154            )
155        self.level_cols = level_cols
156        self.truncate_a = truncate_a
157        self._fitted = False
158        self._level_results: Optional[Dict[str, LevelResult]] = None
159        self._bottom_premiums: Optional[pl.DataFrame] = None
160
161    def fit(
162        self,
163        data: Union[pl.DataFrame, "pd.DataFrame"],  # type: ignore[name-defined]
164        period_col: str = "period",
165        loss_col: str = "loss",
166        weight_col: str = "weight",
167    ) -> "HierarchicalBuhlmannStraub":
168        """
169        Fit the hierarchical model to panel data.
170
171        Parameters
172        ----------
173        data:
174            Panel DataFrame (Polars preferred, pandas accepted) with one row
175            per (leaf node, period). Must contain all columns in ``level_cols``
176            plus ``period_col``, ``loss_col``, and ``weight_col``.
177        period_col:
178            Column identifying the time period.
179        loss_col:
180            Column containing the per-unit loss rate.
181        weight_col:
182            Column containing exposure weights (must be strictly positive).
183
184        Returns
185        -------
186        self
187        """
188        data = _to_polars(data)
189
190        required_cols = set(self.level_cols) | {period_col, loss_col, weight_col}
191        missing = required_cols - set(data.columns)
192        if missing:
193            raise ValueError(f"Columns not found in data: {missing}")
194
195        self._period_col = period_col
196        self._loss_col = loss_col
197        self._weight_col = weight_col
198
199        # Validate that hierarchy is strict (no cross-level ambiguity)
200        self._validate_hierarchy(data)
201
202        # Bottom-up: estimate variance components at each level
203        level_results: Dict[str, LevelResult] = {}
204        bottom_level = self.level_cols[-1]
205
206        # At the innermost level, fit BuhlmannStraub with the leaf node as group
207        # and each period as an observation
208        innermost_fit = self._fit_innermost_level(data, bottom_level, period_col, loss_col, weight_col)
209        level_results[bottom_level] = innermost_fit
210
211        # For each higher level, aggregate upward and fit B-S treating each node
212        # at that level as a "group" with its children's credibility premiums as
213        # "observations" (but we use the raw aggregated data for cleaner estimation)
214        for depth in range(len(self.level_cols) - 2, -1, -1):
215            parent_col = self.level_cols[depth]
216            child_col = self.level_cols[depth + 1]
217            level_result = self._fit_upper_level(
218                data,
219                parent_col,
220                child_col,
221                loss_col,
222                weight_col,
223                lower_v=level_results[child_col].v_hat,
224            )
225            level_results[parent_col] = level_result
226
227        self._level_results = level_results
228
229        # Top-down: compute final blended premiums at each level
230        self._bottom_premiums = self._compute_top_down_premiums(data, level_results)
231        self._fitted = True
232        return self
233
234    # ------------------------------------------------------------------
235    # Output
236    # ------------------------------------------------------------------
237
238    @property
239    def level_results_(self) -> Dict[str, LevelResult]:
240        """
241        Dictionary mapping level name to LevelResult with fitted parameters.
242        """
243        self._check_fitted()
244        return self._level_results  # type: ignore[return-value]
245
246    @property
247    def premiums_(self) -> pl.DataFrame:
248        """
249        Credibility premiums at the bottom (leaf) level of the hierarchy.
250
251        Equivalent to ``premiums_at(self.level_cols[-1])``.
252        """
253        self._check_fitted()
254        return self._bottom_premiums  # type: ignore[return-value]
255
256    def premiums_at(self, level: str) -> pl.DataFrame:
257        """
258        Return the credibility premiums at a specified level of the hierarchy.
259
260        Parameters
261        ----------
262        level:
263            One of the column names from ``level_cols``.
264
265        Returns
266        -------
267        pl.DataFrame
268            With a ``group`` column plus ``exposure``, ``observed_mean``,
269            ``Z``, ``credibility_premium``, and ``complement`` columns.
270        """
271        self._check_fitted()
272        if level not in self.level_cols:
273            raise ValueError(
274                f"'{level}' is not one of the fitted levels: {self.level_cols}"
275            )
276        return self._level_results[level].premiums
277
278    def summary(self) -> None:
279        """Print structural parameters at each level."""
280        self._check_fitted()
281        print("Hierarchical Bühlmann-Straub Credibility Model")
282        print("=" * 50)
283        print(f"Levels (outer → inner): {' → '.join(self.level_cols)}")
284        print()
285        for level in self.level_cols:
286            lr = self._level_results[level]
287            print(f"  Level: {level}")
288            print(f"    mu  = {lr.mu_hat:.6g}")
289            print(f"    v   = {lr.v_hat:.6g}   (EPV within-node)")
290            print(f"    a   = {lr.a_hat:.6g}   (VHM between-node)")
291            k_str = "inf" if np.isinf(lr.k) else f"{lr.k:.6g}"
292            print(f"    k   = {k_str}")
293            print()
294
295    # ------------------------------------------------------------------
296    # Internal helpers
297    # ------------------------------------------------------------------
298
299    def _validate_hierarchy(self, data: pl.DataFrame) -> None:
300        """Check that the hierarchy is strict - each child belongs to exactly one parent."""
301        for depth in range(len(self.level_cols) - 1):
302            parent = self.level_cols[depth]
303            child = self.level_cols[depth + 1]
304            parents_per_child = (
305                data.select([parent, child])
306                .unique()
307                .group_by(child)
308                .agg(pl.col(parent).n_unique().alias("n_parents"))
309            )
310            ambiguous = parents_per_child.filter(pl.col("n_parents") > 1)
311            if ambiguous.height > 0:
312                examples = ambiguous[child].to_list()[:3]
313                raise ValueError(
314                    f"Hierarchy is not strict at level '{child}' → '{parent}': "
315                    f"{ambiguous.height} child node(s) appear under multiple parents. "
316                    f"Examples: {examples}"
317                )
318
319    def _fit_innermost_level(
320        self,
321        data: pl.DataFrame,
322        leaf_col: str,
323        period_col: str,
324        loss_col: str,
325        weight_col: str,
326    ) -> LevelResult:
327        """Fit B-S at the innermost level using raw period data."""
328        bs = BuhlmannStraub(truncate_a=self.truncate_a)
329        bs.fit(
330            data,
331            group_col=leaf_col,
332            period_col=period_col,
333            loss_col=loss_col,
334            weight_col=weight_col,
335        )
336        return LevelResult(
337            level_name=leaf_col,
338            mu_hat=bs.mu_hat_,
339            v_hat=bs.v_hat_,
340            a_hat=bs.a_hat_,
341            k=bs.k_,
342            z=bs.z_,
343            premiums=bs.premiums_,
344        )
345
346    def _fit_upper_level(
347        self,
348        data: pl.DataFrame,
349        parent_col: str,
350        child_col: str,
351        loss_col: str,
352        weight_col: str,
353        lower_v: float,
354    ) -> LevelResult:
355        """
356        Fit B-S at a higher level by aggregating child-level data.
357
358        We treat each child node as a "period" within the parent node,
359        using the child's total exposure as weight and its weighted mean loss
360        rate as the observation. The within-parent variance v at this level
361        is the between-child variance a from the level below.
362
363        This is the Jewell (1975) / actuar-compatible approach: v at level L
364        equals a at level L+1, propagating variance components up the hierarchy.
365        """
366        # Aggregate data to child level (summing over periods)
367        child_summary = (
368            data.select([parent_col, child_col, weight_col, loss_col])
369            .group_by([parent_col, child_col])
370            .agg([
371                pl.col(weight_col).sum().alias("exposure"),
372                (
373                    (pl.col(weight_col) * pl.col(loss_col)).sum()
374                    / pl.col(weight_col).sum()
375                ).alias("loss_rate"),
376            ])
377        )
378
379        # Fit B-S at parent level, treating each child as one "observation"
380        # The period col here is child_col, loss col is loss_rate, weight is exposure
381        bs = BuhlmannStraub(truncate_a=self.truncate_a)
382        bs.fit(
383            child_summary,
384            group_col=parent_col,
385            period_col=child_col,
386            loss_col="loss_rate",
387            weight_col="exposure",
388        )
389
390        return LevelResult(
391            level_name=parent_col,
392            mu_hat=bs.mu_hat_,
393            v_hat=bs.v_hat_,
394            a_hat=bs.a_hat_,
395            k=bs.k_,
396            z=bs.z_,
397            premiums=bs.premiums_,
398        )
399
400    def _compute_top_down_premiums(
401        self,
402        data: pl.DataFrame,
403        level_results: Dict[str, LevelResult],
404    ) -> pl.DataFrame:
405        """
406        Compute the final blended premiums by propagating the hierarchy top-down.
407
408        At the topmost level, the complement is the grand mean. At each level
409        below, the complement for a node is the credibility premium of its parent.
410        The final output is a premium for each leaf node.
411
412        Blending formula at each level:
413            P_node = Z_node * X̄_node + (1 - Z_node) * P_parent
414        """
415        top_level = self.level_cols[0]
416        top_lr = level_results[top_level]
417
418        # Start: parent premiums map from group id → credibility_premium at top level
419        # Represented as a pl.DataFrame with [group, parent_premium] for joining
420        parent_premiums_df = top_lr.premiums.select([
421            pl.col("group"),
422            pl.col("credibility_premium").alias("parent_premium"),
423        ])
424
425        # Work downward, level by level
426        for depth in range(1, len(self.level_cols)):
427            current_level = self.level_cols[depth]
428            parent_level = self.level_cols[depth - 1]
429            current_lr = level_results[current_level]
430
431            # Map: child node id → parent node id (unique pairs)
432            child_to_parent = (
433                data.select([current_level, parent_level])
434                .unique()
435                .rename({current_level: "group", parent_level: "parent_id"})
436            )
437
438            # Join child→parent map with parent premiums to get P_parent for each child
439            child_with_parent_premium = (
440                child_to_parent
441                .join(
442                    parent_premiums_df.rename({"group": "parent_id"}),
443                    on="parent_id",
444                    how="left",
445                )
446            )
447
448            # Join current level premiums (Z, observed_mean) with parent premium
449            blended = (
450                current_lr.premiums.select(["group", "Z", "observed_mean"])
451                .join(child_with_parent_premium.select(["group", "parent_premium"]), on="group", how="left")
452                .with_columns(
453                    (
454                        pl.col("Z") * pl.col("observed_mean")
455                        + (1 - pl.col("Z")) * pl.col("parent_premium")
456                    ).alias("blended_premium")
457                )
458                .select(["group", "blended_premium"])
459                .rename({"blended_premium": "parent_premium"})
460            )
461
462            parent_premiums_df = blended
463
464        # Build final DataFrame: start from bottom-level premiums, replace credibility_premium
465        bottom_level = self.level_cols[-1]
466        bottom_lr = level_results[bottom_level]
467        result = (
468            bottom_lr.premiums
469            .drop("credibility_premium")
470            .join(
471                parent_premiums_df.rename({"parent_premium": "credibility_premium"}),
472                on="group",
473                how="left",
474            )
475        )
476        return result
477
478    def _check_fitted(self) -> None:
479        if not self._fitted:
480            raise RuntimeError(
481                "Model has not been fitted. Call .fit() first."
482            )
483
484    def __repr__(self) -> str:
485        levels_str = " → ".join(self.level_cols)
486        if not self._fitted:
487            return f"HierarchicalBuhlmannStraub(levels=[{levels_str}], not fitted)"
488        return f"HierarchicalBuhlmannStraub(levels=[{levels_str}])"

Multi-level Bühlmann-Straub credibility model for nested group structures.

This model is appropriate when groups are arranged in a strict hierarchy - every observation belongs to exactly one parent at each level. Typical uses:

  • Geographic: postcode sector → district → area
  • Organisational: risk → scheme → portfolio
  • Time: accident quarter → accident year → underwriting year

Parameters

level_cols: List of column names defining the hierarchy from outermost (top) to innermost (bottom). For a three-level model, provide three names. Example: ["area", "district", "sector"]. truncate_a: Whether to truncate negative between-group variance estimates at zero. Passed through to the BuhlmannStraub model at each level.

Examples

Geographic three-level model::

>>> model = HierarchicalBuhlmannStraub(
...     level_cols=["region", "district", "sector"]
... )
>>> model.fit(
...     data=df,
...     period_col="year",
...     loss_col="loss_rate",
...     weight_col="exposure",
... )
>>> model.premiums_at("sector")   # blended sector-level premiums
>>> model.level_results_["district"].k   # k at district level

Two-level scheme model (equivalent to BuhlmannStraub with a book-level prior)::

>>> model = HierarchicalBuhlmannStraub(level_cols=["book", "scheme"])
>>> model.fit(df, period_col="year", loss_col="loss_rate", weight_col="exposure")
HierarchicalBuhlmannStraub(level_cols: List[str], truncate_a: bool = True)
145    def __init__(
146        self,
147        level_cols: List[str],
148        truncate_a: bool = True,
149    ) -> None:
150        if len(level_cols) < 2:
151            raise ValueError(
152                "At least two levels are required for a hierarchical model. "
153                "For a single-level model, use BuhlmannStraub."
154            )
155        self.level_cols = level_cols
156        self.truncate_a = truncate_a
157        self._fitted = False
158        self._level_results: Optional[Dict[str, LevelResult]] = None
159        self._bottom_premiums: Optional[pl.DataFrame] = None
level_cols
truncate_a
def fit( self, data: "Union[pl.DataFrame, 'pd.DataFrame']", period_col: str = 'period', loss_col: str = 'loss', weight_col: str = 'weight') -> HierarchicalBuhlmannStraub:
161    def fit(
162        self,
163        data: Union[pl.DataFrame, "pd.DataFrame"],  # type: ignore[name-defined]
164        period_col: str = "period",
165        loss_col: str = "loss",
166        weight_col: str = "weight",
167    ) -> "HierarchicalBuhlmannStraub":
168        """
169        Fit the hierarchical model to panel data.
170
171        Parameters
172        ----------
173        data:
174            Panel DataFrame (Polars preferred, pandas accepted) with one row
175            per (leaf node, period). Must contain all columns in ``level_cols``
176            plus ``period_col``, ``loss_col``, and ``weight_col``.
177        period_col:
178            Column identifying the time period.
179        loss_col:
180            Column containing the per-unit loss rate.
181        weight_col:
182            Column containing exposure weights (must be strictly positive).
183
184        Returns
185        -------
186        self
187        """
188        data = _to_polars(data)
189
190        required_cols = set(self.level_cols) | {period_col, loss_col, weight_col}
191        missing = required_cols - set(data.columns)
192        if missing:
193            raise ValueError(f"Columns not found in data: {missing}")
194
195        self._period_col = period_col
196        self._loss_col = loss_col
197        self._weight_col = weight_col
198
199        # Validate that hierarchy is strict (no cross-level ambiguity)
200        self._validate_hierarchy(data)
201
202        # Bottom-up: estimate variance components at each level
203        level_results: Dict[str, LevelResult] = {}
204        bottom_level = self.level_cols[-1]
205
206        # At the innermost level, fit BuhlmannStraub with the leaf node as group
207        # and each period as an observation
208        innermost_fit = self._fit_innermost_level(data, bottom_level, period_col, loss_col, weight_col)
209        level_results[bottom_level] = innermost_fit
210
211        # For each higher level, aggregate upward and fit B-S treating each node
212        # at that level as a "group" with its children's credibility premiums as
213        # "observations" (but we use the raw aggregated data for cleaner estimation)
214        for depth in range(len(self.level_cols) - 2, -1, -1):
215            parent_col = self.level_cols[depth]
216            child_col = self.level_cols[depth + 1]
217            level_result = self._fit_upper_level(
218                data,
219                parent_col,
220                child_col,
221                loss_col,
222                weight_col,
223                lower_v=level_results[child_col].v_hat,
224            )
225            level_results[parent_col] = level_result
226
227        self._level_results = level_results
228
229        # Top-down: compute final blended premiums at each level
230        self._bottom_premiums = self._compute_top_down_premiums(data, level_results)
231        self._fitted = True
232        return self

Fit the hierarchical model to panel data.

Parameters

data: Panel DataFrame (Polars preferred, pandas accepted) with one row per (leaf node, period). Must contain all columns in level_cols plus period_col, loss_col, and weight_col. period_col: Column identifying the time period. loss_col: Column containing the per-unit loss rate. weight_col: Column containing exposure weights (must be strictly positive).

Returns

self

level_results_: Dict[str, LevelResult]
238    @property
239    def level_results_(self) -> Dict[str, LevelResult]:
240        """
241        Dictionary mapping level name to LevelResult with fitted parameters.
242        """
243        self._check_fitted()
244        return self._level_results  # type: ignore[return-value]

Dictionary mapping level name to LevelResult with fitted parameters.

premiums_: polars.dataframe.frame.DataFrame
246    @property
247    def premiums_(self) -> pl.DataFrame:
248        """
249        Credibility premiums at the bottom (leaf) level of the hierarchy.
250
251        Equivalent to ``premiums_at(self.level_cols[-1])``.
252        """
253        self._check_fitted()
254        return self._bottom_premiums  # type: ignore[return-value]

Credibility premiums at the bottom (leaf) level of the hierarchy.

Equivalent to premiums_at(self.level_cols[-1]).

def premiums_at(self, level: str) -> polars.dataframe.frame.DataFrame:
256    def premiums_at(self, level: str) -> pl.DataFrame:
257        """
258        Return the credibility premiums at a specified level of the hierarchy.
259
260        Parameters
261        ----------
262        level:
263            One of the column names from ``level_cols``.
264
265        Returns
266        -------
267        pl.DataFrame
268            With a ``group`` column plus ``exposure``, ``observed_mean``,
269            ``Z``, ``credibility_premium``, and ``complement`` columns.
270        """
271        self._check_fitted()
272        if level not in self.level_cols:
273            raise ValueError(
274                f"'{level}' is not one of the fitted levels: {self.level_cols}"
275            )
276        return self._level_results[level].premiums

Return the credibility premiums at a specified level of the hierarchy.

Parameters

level: One of the column names from level_cols.

Returns

pl.DataFrame With a group column plus exposure, observed_mean, Z, credibility_premium, and complement columns.

def summary(self) -> None:
278    def summary(self) -> None:
279        """Print structural parameters at each level."""
280        self._check_fitted()
281        print("Hierarchical Bühlmann-Straub Credibility Model")
282        print("=" * 50)
283        print(f"Levels (outer → inner): {' → '.join(self.level_cols)}")
284        print()
285        for level in self.level_cols:
286            lr = self._level_results[level]
287            print(f"  Level: {level}")
288            print(f"    mu  = {lr.mu_hat:.6g}")
289            print(f"    v   = {lr.v_hat:.6g}   (EPV within-node)")
290            print(f"    a   = {lr.a_hat:.6g}   (VHM between-node)")
291            k_str = "inf" if np.isinf(lr.k) else f"{lr.k:.6g}"
292            print(f"    k   = {k_str}")
293            print()

Print structural parameters at each level.

class LevelResult:
54class LevelResult:
55    """
56    Fitted parameters and premiums for one level of the hierarchy.
57
58    Attributes
59    ----------
60    level_name:
61        The column name that identifies this level (e.g. "district").
62    mu_hat:
63        Collective mean at this level.
64    v_hat:
65        Within-node variance at this level (EPV).
66    a_hat:
67        Between-node variance at this level (VHM). Zero if truncated.
68    k:
69        Bühlmann's k = v / a at this level.
70    z:
71        Credibility factors as a pl.DataFrame with columns ["group", "Z"].
72    premiums:
73        Credibility premiums as a pl.DataFrame with a "group" column.
74    """
75
76    def __init__(
77        self,
78        level_name: str,
79        mu_hat: float,
80        v_hat: float,
81        a_hat: float,
82        k: float,
83        z: pl.DataFrame,
84        premiums: pl.DataFrame,
85    ) -> None:
86        self.level_name = level_name
87        self.mu_hat = mu_hat
88        self.v_hat = v_hat
89        self.a_hat = a_hat
90        self.k = k
91        self.z = z
92        self.premiums = premiums
93
94    def __repr__(self) -> str:
95        return (
96            f"LevelResult(level='{self.level_name}', "
97            f"mu={self.mu_hat:.4g}, v={self.v_hat:.4g}, "
98            f"a={self.a_hat:.4g}, k={self.k:.4g})"
99        )

Fitted parameters and premiums for one level of the hierarchy.

Attributes

level_name: The column name that identifies this level (e.g. "district"). mu_hat: Collective mean at this level. v_hat: Within-node variance at this level (EPV). a_hat: Between-node variance at this level (VHM). Zero if truncated. k: Bühlmann's k = v / a at this level. z: Credibility factors as a pl.DataFrame with columns ["group", "Z"]. premiums: Credibility premiums as a pl.DataFrame with a "group" column.

LevelResult( level_name: str, mu_hat: float, v_hat: float, a_hat: float, k: float, z: polars.dataframe.frame.DataFrame, premiums: polars.dataframe.frame.DataFrame)
76    def __init__(
77        self,
78        level_name: str,
79        mu_hat: float,
80        v_hat: float,
81        a_hat: float,
82        k: float,
83        z: pl.DataFrame,
84        premiums: pl.DataFrame,
85    ) -> None:
86        self.level_name = level_name
87        self.mu_hat = mu_hat
88        self.v_hat = v_hat
89        self.a_hat = a_hat
90        self.k = k
91        self.z = z
92        self.premiums = premiums
level_name
mu_hat
v_hat
a_hat
k
z
premiums
@dataclass
class ClaimsHistory:
 15@dataclass
 16class ClaimsHistory:
 17    """Claims history for a single policy.
 18
 19    This is the fundamental input type for all experience rating models.
 20    Each instance represents one policy's observed claims sequence across
 21    one or more periods (typically policy years).
 22
 23    Parameters
 24    ----------
 25    policy_id : str
 26        Unique identifier for the policy.
 27    periods : list[int]
 28        Year indices for each observation period. Typically [1, 2, 3, ...].
 29        Must be non-empty. Periods need not be contiguous but must be unique
 30        and in ascending order.
 31    claim_counts : list[int]
 32        Observed claim count in each period. Must have the same length as
 33        ``periods``. Non-negative integers.
 34    claim_amounts : list[float] or None
 35        Aggregate claim amount (incurred) in each period. Set to None for
 36        frequency-only models. When provided, must have the same length as
 37        ``periods``.
 38    exposures : list[float]
 39        Risk exposure in each period (e.g., years on risk, vehicle-years).
 40        Must have the same length as ``periods``. Defaults to 1.0 for each
 41        period if not supplied.
 42    prior_premium : float
 43        The a priori (GLM-based) expected annual claim cost for the next
 44        period. This is the base rate that experience rating will adjust.
 45        Must be strictly positive.
 46
 47    Examples
 48    --------
 49    >>> history = ClaimsHistory(
 50    ...     policy_id="POL001",
 51    ...     periods=[1, 2, 3],
 52    ...     claim_counts=[0, 1, 0],
 53    ...     exposures=[1.0, 1.0, 0.8],
 54    ...     prior_premium=450.0,
 55    ... )
 56    """
 57
 58    policy_id: str
 59    periods: list[int]
 60    claim_counts: list[int]
 61    claim_amounts: Optional[list[float]] = None
 62    exposures: Optional[list[float]] = None
 63    prior_premium: float = 1.0
 64
 65    def __post_init__(self) -> None:
 66        self._validate()
 67        # Normalise exposures to a list if not provided
 68        if self.exposures is None:
 69            self.exposures = [1.0] * len(self.periods)
 70
 71    def _validate(self) -> None:
 72        if len(self.periods) == 0:
 73            raise ValueError("periods must be non-empty")
 74        if len(self.claim_counts) != len(self.periods):
 75            raise ValueError(
 76                f"claim_counts length ({len(self.claim_counts)}) must match "
 77                f"periods length ({len(self.periods)})"
 78            )
 79        if self.claim_amounts is not None and len(self.claim_amounts) != len(
 80            self.periods
 81        ):
 82            raise ValueError(
 83                f"claim_amounts length ({len(self.claim_amounts)}) must match "
 84                f"periods length ({len(self.periods)})"
 85            )
 86        exposures = self.exposures
 87        if exposures is not None:
 88            if len(exposures) != len(self.periods):
 89                raise ValueError(
 90                    f"exposures length ({len(exposures)}) must match "
 91                    f"periods length ({len(self.periods)})"
 92                )
 93            if any(e <= 0.0 for e in exposures):
 94                raise ValueError("All exposures must be strictly positive")
 95        if any(c < 0 for c in self.claim_counts):
 96            raise ValueError("claim_counts must be non-negative")
 97        if self.prior_premium <= 0.0:
 98            raise ValueError("prior_premium must be strictly positive")
 99        if len(self.periods) != len(set(self.periods)):
100            raise ValueError("periods must be unique")
101
102    @property
103    def n_periods(self) -> int:
104        """Number of observation periods."""
105        return len(self.periods)
106
107    @property
108    def total_claims(self) -> int:
109        """Total claim count across all periods."""
110        return sum(self.claim_counts)
111
112    @property
113    def total_exposure(self) -> float:
114        """Total exposure across all periods."""
115        assert self.exposures is not None
116        return sum(self.exposures)
117
118    @property
119    def claim_frequency(self) -> float:
120        """Observed claim frequency (claims per unit exposure).
121
122        Returns 0.0 if total exposure is zero (guard only — exposures
123        are validated to be positive).
124        """
125        total_exp = self.total_exposure
126        if total_exp == 0.0:
127            return 0.0
128        return self.total_claims / total_exp
129
130    @property
131    def exposure_weighted_counts(self) -> list[float]:
132        """Claim counts adjusted by exposure: Y_t / e_t per period."""
133        assert self.exposures is not None
134        return [c / e for c, e in zip(self.claim_counts, self.exposures)]

Claims history for a single policy.

This is the fundamental input type for all experience rating models. Each instance represents one policy's observed claims sequence across one or more periods (typically policy years).

Parameters

policy_id : str Unique identifier for the policy. periods : list[int] Year indices for each observation period. Typically [1, 2, 3, ...]. Must be non-empty. Periods need not be contiguous but must be unique and in ascending order. claim_counts : list[int] Observed claim count in each period. Must have the same length as periods. Non-negative integers. claim_amounts : list[float] or None Aggregate claim amount (incurred) in each period. Set to None for frequency-only models. When provided, must have the same length as periods. exposures : list[float] Risk exposure in each period (e.g., years on risk, vehicle-years). Must have the same length as periods. Defaults to 1.0 for each period if not supplied. prior_premium : float The a priori (GLM-based) expected annual claim cost for the next period. This is the base rate that experience rating will adjust. Must be strictly positive.

Examples

>>> history = ClaimsHistory(
...     policy_id="POL001",
...     periods=[1, 2, 3],
...     claim_counts=[0, 1, 0],
...     exposures=[1.0, 1.0, 0.8],
...     prior_premium=450.0,
... )
ClaimsHistory( policy_id: str, periods: list[int], claim_counts: list[int], claim_amounts: Optional[list[float]] = None, exposures: Optional[list[float]] = None, prior_premium: float = 1.0)
policy_id: str
periods: list[int]
claim_counts: list[int]
claim_amounts: Optional[list[float]] = None
exposures: Optional[list[float]] = None
prior_premium: float = 1.0
n_periods: int
102    @property
103    def n_periods(self) -> int:
104        """Number of observation periods."""
105        return len(self.periods)

Number of observation periods.

total_claims: int
107    @property
108    def total_claims(self) -> int:
109        """Total claim count across all periods."""
110        return sum(self.claim_counts)

Total claim count across all periods.

total_exposure: float
112    @property
113    def total_exposure(self) -> float:
114        """Total exposure across all periods."""
115        assert self.exposures is not None
116        return sum(self.exposures)

Total exposure across all periods.

claim_frequency: float
118    @property
119    def claim_frequency(self) -> float:
120        """Observed claim frequency (claims per unit exposure).
121
122        Returns 0.0 if total exposure is zero (guard only — exposures
123        are validated to be positive).
124        """
125        total_exp = self.total_exposure
126        if total_exp == 0.0:
127            return 0.0
128        return self.total_claims / total_exp

Observed claim frequency (claims per unit exposure).

Returns 0.0 if total exposure is zero (guard only — exposures are validated to be positive).

exposure_weighted_counts: list[float]
130    @property
131    def exposure_weighted_counts(self) -> list[float]:
132        """Claim counts adjusted by exposure: Y_t / e_t per period."""
133        assert self.exposures is not None
134        return [c / e for c, e in zip(self.claim_counts, self.exposures)]

Claim counts adjusted by exposure: Y_t / e_t per period.

@dataclass
class CalibrationResult:
137@dataclass
138class CalibrationResult:
139    """Output of the balance calibration step.
140
141    Parameters
142    ----------
143    calibration_factor : float
144        Multiplicative factor applied to all posterior premiums to restore
145        portfolio balance. Values close to 1.0 indicate the model was
146        already approximately balanced before calibration.
147    sum_actual : float
148        Sum of observed claim counts (weighted by exposure) over the
149        calibration portfolio.
150    sum_predicted : float
151        Sum of posterior premiums (weighted by exposure) before calibration.
152    n_policies : int
153        Number of policies in the calibration portfolio.
154    """
155
156    calibration_factor: float
157    sum_actual: float
158    sum_predicted: float
159    n_policies: int
160
161    @property
162    def relative_bias(self) -> float:
163        """Relative bias before calibration: (predicted - actual) / actual."""
164        if self.sum_actual == 0.0:
165            return float("nan")
166        return (self.sum_predicted - self.sum_actual) / self.sum_actual

Output of the balance calibration step.

Parameters

calibration_factor : float Multiplicative factor applied to all posterior premiums to restore portfolio balance. Values close to 1.0 indicate the model was already approximately balanced before calibration. sum_actual : float Sum of observed claim counts (weighted by exposure) over the calibration portfolio. sum_predicted : float Sum of posterior premiums (weighted by exposure) before calibration. n_policies : int Number of policies in the calibration portfolio.

CalibrationResult( calibration_factor: float, sum_actual: float, sum_predicted: float, n_policies: int)
calibration_factor: float
sum_actual: float
sum_predicted: float
n_policies: int
relative_bias: float
161    @property
162    def relative_bias(self) -> float:
163        """Relative bias before calibration: (predicted - actual) / actual."""
164        if self.sum_actual == 0.0:
165            return float("nan")
166        return (self.sum_predicted - self.sum_actual) / self.sum_actual

Relative bias before calibration: (predicted - actual) / actual.

class StaticCredibilityModel:
 32class StaticCredibilityModel:
 33    """Bühlmann-Straub credibility at individual policy level.
 34
 35    Fits the structural parameter kappa from a portfolio of policy histories
 36    using the method of moments estimator. Predicts a multiplicative
 37    credibility factor for each policy.
 38
 39    The model assumes:
 40        Y_{t} | Theta ~ Poisson(mu * Theta * e_t)
 41        Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2
 42
 43    This gives the credibility approximation:
 44        E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t)
 45
 46    Parameters
 47    ----------
 48    kappa : float or None
 49        Known credibility coefficient kappa = sigma^2 / tau^2. If None,
 50        kappa is estimated from the portfolio during fit(). Set explicitly
 51        to share a kappa estimated from a larger portfolio.
 52    min_kappa : float
 53        Lower bound on the estimated kappa. Prevents omega from reaching 1
 54        when the portfolio is very homogeneous. Default 0.1.
 55    max_kappa : float
 56        Upper bound on the estimated kappa. Prevents degenerate credibility
 57        for very heterogeneous portfolios. Default 1000.0.
 58
 59    Attributes
 60    ----------
 61    kappa_ : float
 62        Fitted credibility coefficient (after calling fit()).
 63    within_variance_ : float
 64        Estimated within-policy variance component sigma^2.
 65    between_variance_ : float
 66        Estimated between-policy variance component tau^2.
 67    portfolio_mean_ : float
 68        Exposure-weighted mean claim frequency across the portfolio.
 69    is_fitted_ : bool
 70        True after fit() has been called.
 71
 72    Examples
 73    --------
 74    >>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0),
 75    ...              ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)]
 76    >>> model = StaticCredibilityModel()
 77    >>> model.fit(histories)
 78    StaticCredibilityModel(kappa=...)
 79    >>> cf = model.predict(histories[0])
 80    >>> posterior = histories[0].prior_premium * cf
 81    """
 82
 83    def __init__(
 84        self,
 85        kappa: Optional[float] = None,
 86        min_kappa: float = 0.1,
 87        max_kappa: float = 1000.0,
 88    ) -> None:
 89        self.kappa = kappa
 90        self.min_kappa = min_kappa
 91        self.max_kappa = max_kappa
 92
 93        # Set after fit()
 94        self.kappa_: Optional[float] = None
 95        self.within_variance_: Optional[float] = None
 96        self.between_variance_: Optional[float] = None
 97        self.portfolio_mean_: Optional[float] = None
 98        self.is_fitted_: bool = False
 99
100    def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel":
101        """Estimate structural parameters from a portfolio of policy histories.
102
103        Uses the method of moments estimator for Bühlmann-Straub:
104        - within-policy variance estimated from period-to-period fluctuations
105        - between-policy variance estimated from cross-sectional spread
106
107        Parameters
108        ----------
109        histories : list[ClaimsHistory]
110            Portfolio of policy histories. Must contain at least 2 policies
111            with at least 1 period each for parameter estimation.
112
113        Returns
114        -------
115        StaticCredibilityModel
116            self (fitted model).
117
118        Raises
119        ------
120        ValueError
121            If fewer than 2 histories are provided, or if all histories have
122            only 1 period (insufficient for within-variance estimation).
123        """
124        if len(histories) < 2:
125            raise ValueError(
126                "At least 2 policy histories are required to estimate kappa"
127            )
128
129        if self.kappa is not None:
130            # Use provided kappa, just estimate portfolio mean
131            self.kappa_ = self.kappa
132            self.within_variance_ = float("nan")
133            self.between_variance_ = float("nan")
134            self.portfolio_mean_ = self._portfolio_mean(histories)
135            self.is_fitted_ = True
136            return self
137
138        self.kappa_, self.within_variance_, self.between_variance_ = (
139            self._estimate_kappa(histories)
140        )
141        self.portfolio_mean_ = self._portfolio_mean(histories)
142        self.is_fitted_ = True
143        return self
144
145    def predict(self, history: ClaimsHistory) -> float:
146        """Compute the credibility factor for a single policy.
147
148        Parameters
149        ----------
150        history : ClaimsHistory
151            The policy's claims history. The prior_premium field is used as
152            the a priori rate mu.
153
154        Returns
155        -------
156        float
157            Credibility factor CF in (0, inf). Multiply by prior_premium to
158            obtain the posterior premium.
159            CF = omega_t * (Y_bar / mu) + (1 - omega_t)
160
161        Raises
162        ------
163        RuntimeError
164            If fit() has not been called.
165        """
166        self._check_fitted()
167        assert self.kappa_ is not None
168        assert self.exposures_ok(history)
169
170        assert history.exposures is not None
171        t = history.total_exposure  # effective credibility weight
172        omega_t = t / (t + self.kappa_)
173
174        mu = history.prior_premium  # a priori rate (expected frequency)
175        y_bar = history.claim_frequency  # empirical frequency
176
177        # Credibility factor: omega * (empirical / prior) + (1 - omega)
178        if mu <= 0.0:
179            return 1.0
180
181        cf = omega_t * (y_bar / mu) + (1.0 - omega_t)
182        return max(cf, 0.0)  # guard against negative values from rounding
183
184    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
185        """Score a batch of policies and return a Polars DataFrame.
186
187        Parameters
188        ----------
189        histories : list[ClaimsHistory]
190            Policies to score.
191
192        Returns
193        -------
194        pl.DataFrame
195            Columns: policy_id, prior_premium, credibility_factor,
196            posterior_premium.
197        """
198        self._check_fitted()
199        rows = []
200        for h in histories:
201            cf = self.predict(h)
202            rows.append(
203                {
204                    "policy_id": h.policy_id,
205                    "prior_premium": h.prior_premium,
206                    "credibility_factor": cf,
207                    "posterior_premium": h.prior_premium * cf,
208                }
209            )
210        return pl.DataFrame(rows)
211
212    def credibility_weight(self, history: ClaimsHistory) -> float:
213        """Return the Bühlmann credibility weight omega_t for a policy.
214
215        Parameters
216        ----------
217        history : ClaimsHistory
218            The policy's claims history.
219
220        Returns
221        -------
222        float
223            omega_t = t / (t + kappa) in [0, 1], where t is total exposure.
224        """
225        self._check_fitted()
226        assert self.kappa_ is not None
227        assert history.exposures is not None
228        t = history.total_exposure
229        return t / (t + self.kappa_)
230
231    # ------------------------------------------------------------------
232    # Private helpers
233    # ------------------------------------------------------------------
234
235    def _estimate_kappa(
236        self, histories: list[ClaimsHistory]
237    ) -> tuple[float, float, float]:
238        """Bühlmann-Straub method of moments estimation.
239
240        Returns (kappa, within_variance, between_variance).
241        """
242        # Compute per-policy exposure-weighted mean frequencies
243        assert all(h.exposures is not None for h in histories)
244        n = len(histories)
245
246        exposures = [np.array(h.exposures, dtype=float) for h in histories]
247        counts = [np.array(h.claim_counts, dtype=float) for h in histories]
248        rates = [c / e for c, e in zip(counts, exposures)]  # Y_t / e_t
249
250        total_exp = [float(e.sum()) for e in exposures]
251        y_bars = [float((c.sum()) / t) for c, t in zip(counts, total_exp)]
252
253        grand_total_exp = sum(total_exp)
254        grand_mean = sum(
255            c.sum() for c in counts
256        ) / grand_total_exp  # E_i[Y_bar_i]
257
258        # --- Within-policy variance estimate (sigma^2) ---
259        # sigma^2 = E[Var(Y_t | Theta)] = mu (for Poisson)
260        # MoM estimator: average exposure-weighted within-period variance
261        within_num = 0.0
262        within_denom = 0.0
263        for i, (rate_i, exp_i, y_bar_i) in enumerate(
264            zip(rates, exposures, y_bars)
265        ):
266            t_i = len(rate_i)
267            if t_i < 2:
268                continue
269            for t in range(t_i):
270                within_num += exp_i[t] * (rate_i[t] - y_bar_i) ** 2
271                within_denom += 1
272        if within_denom == 0 or within_num == 0:
273            # Fallback: use portfolio mean as within-variance estimate
274            sigma2 = grand_mean
275        else:
276            # Unbiased estimator: divide by (T_i - 1) per policy
277            total_periods = sum(len(r) for r in rates)
278            denom2 = total_periods - n
279            if denom2 <= 0:
280                sigma2 = grand_mean
281            else:
282                sigma2 = within_num / denom2
283
284        # --- Between-policy variance estimate (tau^2) ---
285        # tau^2 = Var(Theta)
286        # Between-policy sum of squares
287        c_const = grand_total_exp - sum(t**2 for t in total_exp) / grand_total_exp
288        if c_const <= 0:
289            tau2 = max(sigma2 / 10.0, 1e-6)
290        else:
291            between_ss = sum(
292                t_i * (y_bar_i - grand_mean) ** 2
293                for t_i, y_bar_i in zip(total_exp, y_bars)
294            )
295            tau2 = max((between_ss - (n - 1) * sigma2) / c_const, 1e-8)
296
297        kappa = sigma2 / tau2
298        kappa = float(np.clip(kappa, self.min_kappa, self.max_kappa))
299        return kappa, float(sigma2), float(tau2)
300
301    def _portfolio_mean(self, histories: list[ClaimsHistory]) -> float:
302        total_claims = sum(h.total_claims for h in histories)
303        total_exp = sum(h.total_exposure for h in histories)
304        if total_exp == 0.0:
305            return 0.0
306        return total_claims / total_exp
307
308    def _check_fitted(self) -> None:
309        if not self.is_fitted_:
310            raise RuntimeError(
311                "Model has not been fitted. Call fit() before predict()."
312            )
313
314    @staticmethod
315    def exposures_ok(history: ClaimsHistory) -> bool:
316        return history.exposures is not None and len(history.exposures) > 0
317
318    def __repr__(self) -> str:
319        if self.is_fitted_:
320            return f"StaticCredibilityModel(kappa={self.kappa_:.4f})"
321        return "StaticCredibilityModel(unfitted)"

Bühlmann-Straub credibility at individual policy level.

Fits the structural parameter kappa from a portfolio of policy histories using the method of moments estimator. Predicts a multiplicative credibility factor for each policy.

The model assumes:

Y_{t} | Theta ~ Poisson(mu * Theta * e_t) Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2

This gives the credibility approximation:

E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t)

Parameters

kappa : float or None Known credibility coefficient kappa = sigma^2 / tau^2. If None, kappa is estimated from the portfolio during fit(). Set explicitly to share a kappa estimated from a larger portfolio. min_kappa : float Lower bound on the estimated kappa. Prevents omega from reaching 1 when the portfolio is very homogeneous. Default 0.1. max_kappa : float Upper bound on the estimated kappa. Prevents degenerate credibility for very heterogeneous portfolios. Default 1000.0.

Attributes

kappa_ : float Fitted credibility coefficient (after calling fit()). within_variance_ : float Estimated within-policy variance component sigma^2. between_variance_ : float Estimated between-policy variance component tau^2. portfolio_mean_ : float Exposure-weighted mean claim frequency across the portfolio. is_fitted_ : bool True after fit() has been called.

Examples

>>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0),
...              ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)]
>>> model = StaticCredibilityModel()
>>> model.fit(histories)
StaticCredibilityModel(kappa=...)
>>> cf = model.predict(histories[0])
>>> posterior = histories[0].prior_premium * cf
StaticCredibilityModel( kappa: Optional[float] = None, min_kappa: float = 0.1, max_kappa: float = 1000.0)
83    def __init__(
84        self,
85        kappa: Optional[float] = None,
86        min_kappa: float = 0.1,
87        max_kappa: float = 1000.0,
88    ) -> None:
89        self.kappa = kappa
90        self.min_kappa = min_kappa
91        self.max_kappa = max_kappa
92
93        # Set after fit()
94        self.kappa_: Optional[float] = None
95        self.within_variance_: Optional[float] = None
96        self.between_variance_: Optional[float] = None
97        self.portfolio_mean_: Optional[float] = None
98        self.is_fitted_: bool = False
kappa
min_kappa
max_kappa
kappa_: Optional[float]
within_variance_: Optional[float]
between_variance_: Optional[float]
portfolio_mean_: Optional[float]
is_fitted_: bool
def fit( self, histories: list[ClaimsHistory]) -> StaticCredibilityModel:
100    def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel":
101        """Estimate structural parameters from a portfolio of policy histories.
102
103        Uses the method of moments estimator for Bühlmann-Straub:
104        - within-policy variance estimated from period-to-period fluctuations
105        - between-policy variance estimated from cross-sectional spread
106
107        Parameters
108        ----------
109        histories : list[ClaimsHistory]
110            Portfolio of policy histories. Must contain at least 2 policies
111            with at least 1 period each for parameter estimation.
112
113        Returns
114        -------
115        StaticCredibilityModel
116            self (fitted model).
117
118        Raises
119        ------
120        ValueError
121            If fewer than 2 histories are provided, or if all histories have
122            only 1 period (insufficient for within-variance estimation).
123        """
124        if len(histories) < 2:
125            raise ValueError(
126                "At least 2 policy histories are required to estimate kappa"
127            )
128
129        if self.kappa is not None:
130            # Use provided kappa, just estimate portfolio mean
131            self.kappa_ = self.kappa
132            self.within_variance_ = float("nan")
133            self.between_variance_ = float("nan")
134            self.portfolio_mean_ = self._portfolio_mean(histories)
135            self.is_fitted_ = True
136            return self
137
138        self.kappa_, self.within_variance_, self.between_variance_ = (
139            self._estimate_kappa(histories)
140        )
141        self.portfolio_mean_ = self._portfolio_mean(histories)
142        self.is_fitted_ = True
143        return self

Estimate structural parameters from a portfolio of policy histories.

Uses the method of moments estimator for Bühlmann-Straub:

  • within-policy variance estimated from period-to-period fluctuations
  • between-policy variance estimated from cross-sectional spread

Parameters

histories : list[ClaimsHistory] Portfolio of policy histories. Must contain at least 2 policies with at least 1 period each for parameter estimation.

Returns

StaticCredibilityModel self (fitted model).

Raises

ValueError If fewer than 2 histories are provided, or if all histories have only 1 period (insufficient for within-variance estimation).

def predict( self, history: ClaimsHistory) -> float:
145    def predict(self, history: ClaimsHistory) -> float:
146        """Compute the credibility factor for a single policy.
147
148        Parameters
149        ----------
150        history : ClaimsHistory
151            The policy's claims history. The prior_premium field is used as
152            the a priori rate mu.
153
154        Returns
155        -------
156        float
157            Credibility factor CF in (0, inf). Multiply by prior_premium to
158            obtain the posterior premium.
159            CF = omega_t * (Y_bar / mu) + (1 - omega_t)
160
161        Raises
162        ------
163        RuntimeError
164            If fit() has not been called.
165        """
166        self._check_fitted()
167        assert self.kappa_ is not None
168        assert self.exposures_ok(history)
169
170        assert history.exposures is not None
171        t = history.total_exposure  # effective credibility weight
172        omega_t = t / (t + self.kappa_)
173
174        mu = history.prior_premium  # a priori rate (expected frequency)
175        y_bar = history.claim_frequency  # empirical frequency
176
177        # Credibility factor: omega * (empirical / prior) + (1 - omega)
178        if mu <= 0.0:
179            return 1.0
180
181        cf = omega_t * (y_bar / mu) + (1.0 - omega_t)
182        return max(cf, 0.0)  # guard against negative values from rounding

Compute the credibility factor for a single policy.

Parameters

history : ClaimsHistory The policy's claims history. The prior_premium field is used as the a priori rate mu.

Returns

float Credibility factor CF in (0, inf). Multiply by prior_premium to obtain the posterior premium. CF = omega_t * (Y_bar / mu) + (1 - omega_t)

Raises

RuntimeError If fit() has not been called.

def predict_batch( self, histories: list[ClaimsHistory]) -> polars.dataframe.frame.DataFrame:
184    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
185        """Score a batch of policies and return a Polars DataFrame.
186
187        Parameters
188        ----------
189        histories : list[ClaimsHistory]
190            Policies to score.
191
192        Returns
193        -------
194        pl.DataFrame
195            Columns: policy_id, prior_premium, credibility_factor,
196            posterior_premium.
197        """
198        self._check_fitted()
199        rows = []
200        for h in histories:
201            cf = self.predict(h)
202            rows.append(
203                {
204                    "policy_id": h.policy_id,
205                    "prior_premium": h.prior_premium,
206                    "credibility_factor": cf,
207                    "posterior_premium": h.prior_premium * cf,
208                }
209            )
210        return pl.DataFrame(rows)

Score a batch of policies and return a Polars DataFrame.

Parameters

histories : list[ClaimsHistory] Policies to score.

Returns

pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium.

def credibility_weight( self, history: ClaimsHistory) -> float:
212    def credibility_weight(self, history: ClaimsHistory) -> float:
213        """Return the Bühlmann credibility weight omega_t for a policy.
214
215        Parameters
216        ----------
217        history : ClaimsHistory
218            The policy's claims history.
219
220        Returns
221        -------
222        float
223            omega_t = t / (t + kappa) in [0, 1], where t is total exposure.
224        """
225        self._check_fitted()
226        assert self.kappa_ is not None
227        assert history.exposures is not None
228        t = history.total_exposure
229        return t / (t + self.kappa_)

Return the Bühlmann credibility weight omega_t for a policy.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

float omega_t = t / (t + kappa) in [0, 1], where t is total exposure.

@staticmethod
def exposures_ok(history: ClaimsHistory) -> bool:
314    @staticmethod
315    def exposures_ok(history: ClaimsHistory) -> bool:
316        return history.exposures is not None and len(history.exposures) > 0
class DynamicPoissonGammaModel:
 69class DynamicPoissonGammaModel:
 70    """Poisson-gamma conjugate state-space model with seniority weighting.
 71
 72    The model fits two decay parameters (p, q) from the portfolio using
 73    empirical Bayes maximum likelihood. At prediction time, it runs the
 74    forward recursion to compute the posterior state (alpha_t, beta_t)
 75    and returns the corresponding posterior mean as a credibility factor.
 76
 77    Parameters
 78    ----------
 79    p0 : float
 80        Initial guess for the state-reversion parameter p. Must be in (0, 1].
 81    q0 : float
 82        Initial guess for the decay parameter q. Must be in (0, 1].
 83    alpha0 : float
 84        Initial shape parameter of the prior Gamma distribution for Theta.
 85        Must be > 0. Default 1.0 (prior mode at 1 with matching scale).
 86    beta0_multiplier : float
 87        The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier
 88        per unit of a priori rate. This keeps the prior mean at 1.0.
 89        Default 1.0.
 90    bounds : tuple
 91        (lower, upper) bounds for p and q during optimisation.
 92        Default ((0.01, 0.99), (0.01, 0.99)).
 93
 94    Attributes
 95    ----------
 96    p_ : float
 97        Fitted state-reversion parameter (after fit()).
 98    q_ : float
 99        Fitted decay parameter (after fit()).
100    loglik_ : float
101        Log-likelihood at the fitted parameters.
102    is_fitted_ : bool
103        True after fit() has been called.
104
105    Examples
106    --------
107    >>> import numpy as np
108    >>> rng = np.random.default_rng(42)
109    >>> histories = [
110    ...     ClaimsHistory(f"P{i}", [1,2,3],
111    ...                   rng.poisson(1.2, size=3).tolist(),
112    ...                   prior_premium=400.0)
113    ...     for i in range(100)
114    ... ]
115    >>> model = DynamicPoissonGammaModel()
116    >>> model.fit(histories)
117    DynamicPoissonGammaModel(p=..., q=...)
118    """
119
120    def __init__(
121        self,
122        p0: float = 0.5,
123        q0: float = 0.8,
124        alpha0: float = 1.0,
125        beta0_multiplier: float = 1.0,
126        bounds: tuple = ((0.01, 0.99), (0.01, 0.99)),
127    ) -> None:
128        self.p0 = p0
129        self.q0 = q0
130        self.alpha0 = alpha0
131        self.beta0_multiplier = beta0_multiplier
132        self.bounds = bounds
133
134        self.p_: Optional[float] = None
135        self.q_: Optional[float] = None
136        self.loglik_: Optional[float] = None
137        self.is_fitted_: bool = False
138
139    def fit(
140        self, histories: list[ClaimsHistory], verbose: bool = False
141    ) -> "DynamicPoissonGammaModel":
142        """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.
143
144        The log-likelihood sums over all policies and all periods. Within each
145        policy, the marginal Y_t is Negative Binomial with state-dependent
146        shape and mean parameters derived from the forward recursion.
147
148        Parameters
149        ----------
150        histories : list[ClaimsHistory]
151            Training portfolio. At least 10 policies recommended for stable
152            parameter estimation.
153        verbose : bool
154            If True, print optimisation progress.
155
156        Returns
157        -------
158        DynamicPoissonGammaModel
159            self (fitted model).
160        """
161        if len(histories) < 2:
162            raise ValueError(
163                "At least 2 policy histories are required for fitting"
164            )
165
166        def neg_loglik(params: np.ndarray) -> float:
167            p, q = float(params[0]), float(params[1])
168            total_ll = 0.0
169            for history in histories:
170                total_ll += self._policy_loglik(history, p, q)
171            return -total_ll
172
173        x0 = np.array([self.p0, self.q0])
174        bounds_list = [self.bounds[0], self.bounds[1]]
175
176        result = optimize.minimize(
177            neg_loglik,
178            x0,
179            method="L-BFGS-B",
180            bounds=bounds_list,
181            options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6},
182        )
183
184        if verbose and not result.success:
185            print(f"Optimisation warning: {result.message}")
186
187        self.p_ = float(result.x[0])
188        self.q_ = float(result.x[1])
189        self.loglik_ = float(-result.fun)
190        self.is_fitted_ = True
191        return self
192
193    def predict(self, history: ClaimsHistory) -> float:
194        """Compute the credibility factor for a single policy.
195
196        Runs the forward recursion using the fitted (p, q) parameters to
197        propagate the Gamma posterior through the observed claim sequence.
198        The credibility factor is the ratio of the posterior mean to the
199        a priori rate.
200
201        Parameters
202        ----------
203        history : ClaimsHistory
204            The policy's claims history.
205
206        Returns
207        -------
208        float
209            Credibility factor CF = mu_post / mu_prior. Multiply by
210            prior_premium to obtain the posterior premium.
211
212        Raises
213        ------
214        RuntimeError
215            If fit() has not been called.
216        """
217        self._check_fitted()
218        assert self.p_ is not None
219        assert self.q_ is not None
220
221        alpha_t, beta_t = self._forward_recursion(
222            history, self.p_, self.q_
223        )
224        mu = history.prior_premium
225        mu_post = (alpha_t / beta_t) * mu
226        cf = mu_post / mu
227        return max(cf, 0.0)
228
229    def predict_posterior_params(
230        self, history: ClaimsHistory
231    ) -> tuple[float, float]:
232        """Return the posterior Gamma parameters (alpha, beta) after recursion.
233
234        The posterior distribution of Theta_{t+1} | Y_{1:t} is:
235            Gamma(alpha_{t+1}, beta_{t+1})
236        with mean alpha_{t+1} / beta_{t+1}.
237
238        This is useful for uncertainty quantification: the posterior variance
239        is alpha / beta^2.
240
241        Parameters
242        ----------
243        history : ClaimsHistory
244            The policy's claims history.
245
246        Returns
247        -------
248        tuple[float, float]
249            (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.
250        """
251        self._check_fitted()
252        assert self.p_ is not None
253        assert self.q_ is not None
254        return self._forward_recursion(history, self.p_, self.q_)
255
256    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
257        """Score a batch of policies and return a Polars DataFrame.
258
259        Parameters
260        ----------
261        histories : list[ClaimsHistory]
262            Policies to score.
263
264        Returns
265        -------
266        pl.DataFrame
267            Columns: policy_id, prior_premium, credibility_factor,
268            posterior_premium, posterior_alpha, posterior_beta,
269            posterior_variance.
270        """
271        self._check_fitted()
272        rows = []
273        for h in histories:
274            cf = self.predict(h)
275            alpha_t, beta_t = self.predict_posterior_params(h)
276            rows.append(
277                {
278                    "policy_id": h.policy_id,
279                    "prior_premium": h.prior_premium,
280                    "credibility_factor": cf,
281                    "posterior_premium": h.prior_premium * cf,
282                    "posterior_alpha": alpha_t,
283                    "posterior_beta": beta_t,
284                    "posterior_variance": alpha_t / (beta_t**2),
285                }
286            )
287        return pl.DataFrame(rows)
288
289    # ------------------------------------------------------------------
290    # Core recursion (also used in log-likelihood)
291    # ------------------------------------------------------------------
292
293    def _forward_recursion(
294        self,
295        history: ClaimsHistory,
296        p: float,
297        q: float,
298    ) -> tuple[float, float]:
299        """Run the Gamma state recursion through the claims history.
300
301        Returns the posterior parameters (alpha, beta) after observing
302        all periods in ``history``. The initial prior is set so that the
303        prior mean of Theta is 1 (i.e., alpha_0 / beta_0 = 1).
304
305        Parameters
306        ----------
307        history : ClaimsHistory
308            Policy's claims history.
309        p : float
310            State-reversion parameter.
311        q : float
312            Decay parameter.
313
314        Returns
315        -------
316        tuple[float, float]
317            (alpha_{T+1}, beta_{T+1}) — the one-step-ahead posterior state.
318        """
319        assert history.exposures is not None
320        mu = history.prior_premium
321
322        # Initialise prior: Theta ~ Gamma(alpha0, beta0) with E[Theta]=1
323        alpha = self.alpha0
324        beta = self.alpha0 * self.beta0_multiplier  # alpha0 / beta0 = 1
325
326        for y_t, e_t in zip(history.claim_counts, history.exposures):
327            # Bayesian update: observe Y_t ~ Poi(mu * e_t * Theta)
328            # Posterior: Theta | Y_t ~ Gamma(alpha + Y_t, beta + mu*e_t)
329            alpha_post = alpha + float(y_t)
330            beta_post = beta + mu * e_t
331
332            # State transition to next period (Ahn et al. 2023, eq. 2.4)
333            # beta_next = q * beta_{t|t}  — posterior rate, not double-counted
334            beta_next = q * beta_post
335            alpha_next = p * q * alpha_post + (1.0 - p) * beta_next
336
337            # Guard against numerical underflow
338            alpha = max(alpha_next, 1e-10)
339            beta = max(beta_next, 1e-10)
340
341        return float(alpha), float(beta)
342
343    def _policy_loglik(
344        self,
345        history: ClaimsHistory,
346        p: float,
347        q: float,
348    ) -> float:
349        """Compute the marginal log-likelihood for a single policy.
350
351        The marginal distribution of Y_t given Y_{1:t-1} is Negative Binomial.
352        The shape parameter r_t = alpha_t and the mean is mu * e_t * alpha_t / beta_t.
353
354        Summing these marginal log-likelihoods over all periods gives the
355        complete-data log-likelihood tractable via the forward recursion.
356        """
357        assert history.exposures is not None
358        mu = history.prior_premium
359
360        alpha = self.alpha0
361        beta = self.alpha0 * self.beta0_multiplier
362
363        total_ll = 0.0
364        for y_t, e_t in zip(history.claim_counts, history.exposures):
365            # Marginal: Y_t | Y_{1:t-1} ~ NegBin(r=alpha, mu=alpha/beta * mu*e_t)
366            r_t = alpha
367            mu_t = (alpha / beta) * mu * e_t
368            total_ll += _negbin_logpmf(int(y_t), r_t, mu_t)
369
370            # Update state (Ahn et al. 2023, eq. 2.4)
371            alpha_post = alpha + float(y_t)
372            beta_post = beta + mu * e_t
373            # beta_next = q * beta_{t|t}  — posterior rate only, no double-count
374            beta_next = q * beta_post
375            alpha_next = p * q * alpha_post + (1.0 - p) * beta_next
376
377            alpha = max(alpha_next, 1e-10)
378            beta = max(beta_next, 1e-10)
379
380        return total_ll
381
382    def _check_fitted(self) -> None:
383        if not self.is_fitted_:
384            raise RuntimeError(
385                "Model has not been fitted. Call fit() before predict()."
386            )
387
388    def __repr__(self) -> str:
389        if self.is_fitted_:
390            return (
391                f"DynamicPoissonGammaModel(p={self.p_:.4f}, q={self.q_:.4f})"
392            )
393        return "DynamicPoissonGammaModel(unfitted)"

Poisson-gamma conjugate state-space model with seniority weighting.

The model fits two decay parameters (p, q) from the portfolio using empirical Bayes maximum likelihood. At prediction time, it runs the forward recursion to compute the posterior state (alpha_t, beta_t) and returns the corresponding posterior mean as a credibility factor.

Parameters

p0 : float Initial guess for the state-reversion parameter p. Must be in (0, 1]. q0 : float Initial guess for the decay parameter q. Must be in (0, 1]. alpha0 : float Initial shape parameter of the prior Gamma distribution for Theta. Must be > 0. Default 1.0 (prior mode at 1 with matching scale). beta0_multiplier : float The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier per unit of a priori rate. This keeps the prior mean at 1.0. Default 1.0. bounds : tuple (lower, upper) bounds for p and q during optimisation. Default ((0.01, 0.99), (0.01, 0.99)).

Attributes

p_ : float Fitted state-reversion parameter (after fit()). q_ : float Fitted decay parameter (after fit()). loglik_ : float Log-likelihood at the fitted parameters. is_fitted_ : bool True after fit() has been called.

Examples

>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> histories = [
...     ClaimsHistory(f"P{i}", [1,2,3],
...                   rng.poisson(1.2, size=3).tolist(),
...                   prior_premium=400.0)
...     for i in range(100)
... ]
>>> model = DynamicPoissonGammaModel()
>>> model.fit(histories)
DynamicPoissonGammaModel(p=..., q=...)
DynamicPoissonGammaModel( p0: float = 0.5, q0: float = 0.8, alpha0: float = 1.0, beta0_multiplier: float = 1.0, bounds: tuple = ((0.01, 0.99), (0.01, 0.99)))
120    def __init__(
121        self,
122        p0: float = 0.5,
123        q0: float = 0.8,
124        alpha0: float = 1.0,
125        beta0_multiplier: float = 1.0,
126        bounds: tuple = ((0.01, 0.99), (0.01, 0.99)),
127    ) -> None:
128        self.p0 = p0
129        self.q0 = q0
130        self.alpha0 = alpha0
131        self.beta0_multiplier = beta0_multiplier
132        self.bounds = bounds
133
134        self.p_: Optional[float] = None
135        self.q_: Optional[float] = None
136        self.loglik_: Optional[float] = None
137        self.is_fitted_: bool = False
p0
q0
alpha0
beta0_multiplier
bounds
p_: Optional[float]
q_: Optional[float]
loglik_: Optional[float]
is_fitted_: bool
def fit( self, histories: list[ClaimsHistory], verbose: bool = False) -> DynamicPoissonGammaModel:
139    def fit(
140        self, histories: list[ClaimsHistory], verbose: bool = False
141    ) -> "DynamicPoissonGammaModel":
142        """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.
143
144        The log-likelihood sums over all policies and all periods. Within each
145        policy, the marginal Y_t is Negative Binomial with state-dependent
146        shape and mean parameters derived from the forward recursion.
147
148        Parameters
149        ----------
150        histories : list[ClaimsHistory]
151            Training portfolio. At least 10 policies recommended for stable
152            parameter estimation.
153        verbose : bool
154            If True, print optimisation progress.
155
156        Returns
157        -------
158        DynamicPoissonGammaModel
159            self (fitted model).
160        """
161        if len(histories) < 2:
162            raise ValueError(
163                "At least 2 policy histories are required for fitting"
164            )
165
166        def neg_loglik(params: np.ndarray) -> float:
167            p, q = float(params[0]), float(params[1])
168            total_ll = 0.0
169            for history in histories:
170                total_ll += self._policy_loglik(history, p, q)
171            return -total_ll
172
173        x0 = np.array([self.p0, self.q0])
174        bounds_list = [self.bounds[0], self.bounds[1]]
175
176        result = optimize.minimize(
177            neg_loglik,
178            x0,
179            method="L-BFGS-B",
180            bounds=bounds_list,
181            options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6},
182        )
183
184        if verbose and not result.success:
185            print(f"Optimisation warning: {result.message}")
186
187        self.p_ = float(result.x[0])
188        self.q_ = float(result.x[1])
189        self.loglik_ = float(-result.fun)
190        self.is_fitted_ = True
191        return self

Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.

The log-likelihood sums over all policies and all periods. Within each policy, the marginal Y_t is Negative Binomial with state-dependent shape and mean parameters derived from the forward recursion.

Parameters

histories : list[ClaimsHistory] Training portfolio. At least 10 policies recommended for stable parameter estimation. verbose : bool If True, print optimisation progress.

Returns

DynamicPoissonGammaModel self (fitted model).

def predict( self, history: ClaimsHistory) -> float:
193    def predict(self, history: ClaimsHistory) -> float:
194        """Compute the credibility factor for a single policy.
195
196        Runs the forward recursion using the fitted (p, q) parameters to
197        propagate the Gamma posterior through the observed claim sequence.
198        The credibility factor is the ratio of the posterior mean to the
199        a priori rate.
200
201        Parameters
202        ----------
203        history : ClaimsHistory
204            The policy's claims history.
205
206        Returns
207        -------
208        float
209            Credibility factor CF = mu_post / mu_prior. Multiply by
210            prior_premium to obtain the posterior premium.
211
212        Raises
213        ------
214        RuntimeError
215            If fit() has not been called.
216        """
217        self._check_fitted()
218        assert self.p_ is not None
219        assert self.q_ is not None
220
221        alpha_t, beta_t = self._forward_recursion(
222            history, self.p_, self.q_
223        )
224        mu = history.prior_premium
225        mu_post = (alpha_t / beta_t) * mu
226        cf = mu_post / mu
227        return max(cf, 0.0)

Compute the credibility factor for a single policy.

Runs the forward recursion using the fitted (p, q) parameters to propagate the Gamma posterior through the observed claim sequence. The credibility factor is the ratio of the posterior mean to the a priori rate.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

float Credibility factor CF = mu_post / mu_prior. Multiply by prior_premium to obtain the posterior premium.

Raises

RuntimeError If fit() has not been called.

def predict_posterior_params( self, history: ClaimsHistory) -> tuple[float, float]:
229    def predict_posterior_params(
230        self, history: ClaimsHistory
231    ) -> tuple[float, float]:
232        """Return the posterior Gamma parameters (alpha, beta) after recursion.
233
234        The posterior distribution of Theta_{t+1} | Y_{1:t} is:
235            Gamma(alpha_{t+1}, beta_{t+1})
236        with mean alpha_{t+1} / beta_{t+1}.
237
238        This is useful for uncertainty quantification: the posterior variance
239        is alpha / beta^2.
240
241        Parameters
242        ----------
243        history : ClaimsHistory
244            The policy's claims history.
245
246        Returns
247        -------
248        tuple[float, float]
249            (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.
250        """
251        self._check_fitted()
252        assert self.p_ is not None
253        assert self.q_ is not None
254        return self._forward_recursion(history, self.p_, self.q_)

Return the posterior Gamma parameters (alpha, beta) after recursion.

The posterior distribution of Theta_{t+1} | Y_{1:t} is: Gamma(alpha_{t+1}, beta_{t+1}) with mean alpha_{t+1} / beta_{t+1}.

This is useful for uncertainty quantification: the posterior variance is alpha / beta^2.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

tuple[float, float] (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.

def predict_batch( self, histories: list[ClaimsHistory]) -> polars.dataframe.frame.DataFrame:
256    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
257        """Score a batch of policies and return a Polars DataFrame.
258
259        Parameters
260        ----------
261        histories : list[ClaimsHistory]
262            Policies to score.
263
264        Returns
265        -------
266        pl.DataFrame
267            Columns: policy_id, prior_premium, credibility_factor,
268            posterior_premium, posterior_alpha, posterior_beta,
269            posterior_variance.
270        """
271        self._check_fitted()
272        rows = []
273        for h in histories:
274            cf = self.predict(h)
275            alpha_t, beta_t = self.predict_posterior_params(h)
276            rows.append(
277                {
278                    "policy_id": h.policy_id,
279                    "prior_premium": h.prior_premium,
280                    "credibility_factor": cf,
281                    "posterior_premium": h.prior_premium * cf,
282                    "posterior_alpha": alpha_t,
283                    "posterior_beta": beta_t,
284                    "posterior_variance": alpha_t / (beta_t**2),
285                }
286            )
287        return pl.DataFrame(rows)

Score a batch of policies and return a Polars DataFrame.

Parameters

histories : list[ClaimsHistory] Policies to score.

Returns

pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, posterior_alpha, posterior_beta, posterior_variance.

class SurrogateModel:
 38class SurrogateModel:
 39    """Surrogate model for large-portfolio Bayesian experience rating.
 40
 41    Fits an approximate g(.) function on a sub-portfolio using importance
 42    sampling, then applies it to the full portfolio analytically.
 43
 44    The posterior premium for policy i is:
 45        mu_post_i = mu_prior_i * exp(g(L_i, n_i))
 46    where L_i is the log-likelihood sufficient statistic and n_i is the
 47    number of periods observed.
 48
 49    The form of g(.) is a polynomial in L and n:
 50        g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n
 51
 52    This is flexible enough to capture Bühlmann-Straub as a special case
 53    (linear in L = Y_bar) while allowing non-linear responses.
 54
 55    Parameters
 56    ----------
 57    prior_model : callable or None
 58        Function mapping ClaimsHistory -> float, returning the a priori
 59        premium for any policy. If None, uses history.prior_premium directly.
 60        This should be a GLM or similar model already fitted on the portfolio.
 61    n_is_samples : int
 62        Number of importance sampling draws from the prior. More samples
 63        give a better IS estimate but are slower. Default 2000.
 64    subsample_frac : float
 65        Fraction of the portfolio to use in the sub-portfolio for fitting g.
 66        Default 0.10 (10%). Minimum 20 policies regardless of fraction.
 67    poly_degree : int
 68        Polynomial degree for g(.). Degree 1 gives linear g; degree 2
 69        allows quadratic response. Default 1.
 70    random_state : int or None
 71        Random seed for IS sampling and sub-portfolio selection.
 72    sufficient_stat_fn : callable or None
 73        Custom function mapping ClaimsHistory -> float for the sufficient
 74        statistic. If None, uses the log-likelihood under Poisson(theta_ref).
 75
 76    Attributes
 77    ----------
 78    theta_ : np.ndarray
 79        Fitted WLS coefficients for g(.).
 80    theta_ref_ : float
 81        Reference Poisson intensity used in the sufficient statistic.
 82    is_fitted_ : bool
 83        True after fit() has been called.
 84    """
 85
 86    def __init__(
 87        self,
 88        prior_model: Optional[Callable[[ClaimsHistory], float]] = None,
 89        n_is_samples: int = 2000,
 90        subsample_frac: float = 0.10,
 91        poly_degree: int = 1,
 92        random_state: Optional[int] = None,
 93        sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None,
 94    ) -> None:
 95        self.prior_model = prior_model
 96        self.n_is_samples = n_is_samples
 97        self.subsample_frac = subsample_frac
 98        self.poly_degree = poly_degree
 99        self.random_state = random_state
100        self.sufficient_stat_fn = sufficient_stat_fn
101
102        self.theta_: Optional[np.ndarray] = None
103        self.theta_ref_: Optional[float] = None
104        self.is_fitted_: bool = False
105        self._rng: Optional[np.random.Generator] = None
106
107    def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel":
108        """Fit the surrogate g(.) function on a representative sub-portfolio.
109
110        Steps:
111        1. Draw a sub-portfolio (random stratified selection by n_periods).
112        2. Estimate reference intensity theta_ref as portfolio mean frequency.
113        3. Draw IS samples Theta_k ~ prior once (shared across all policies).
114        4. For each sub-portfolio policy, compute IS-based Bayesian premium.
115        5. Fit g(.) by WLS minimising weighted squared log-credibility error.
116
117        Parameters
118        ----------
119        histories : list[ClaimsHistory]
120            Full training portfolio.
121
122        Returns
123        -------
124        SurrogateModel
125            self (fitted model).
126        """
127        self._rng = np.random.default_rng(self.random_state)
128
129        # Estimate reference intensity
130        total_claims = sum(h.total_claims for h in histories)
131        total_exp = sum(h.total_exposure for h in histories)
132        self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0
133
134        # Select sub-portfolio
135        n_sub = max(20, int(len(histories) * self.subsample_frac))
136        n_sub = min(n_sub, len(histories))
137        idx = self._rng.choice(len(histories), size=n_sub, replace=False)
138        sub_histories = [histories[i] for i in idx]
139
140        # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior)
141        # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1
142        prior_alpha = 1.0
143        prior_beta = 1.0
144        theta_samples = self._rng.gamma(
145            prior_alpha, 1.0 / prior_beta, size=self.n_is_samples
146        )
147
148        # Compute IS-based posterior premiums for sub-portfolio
149        sub_L = []
150        sub_n = []
151        sub_log_cf = []
152        sub_weights = []
153
154        for hist in sub_histories:
155            mu_prior = self._get_prior(hist)
156            L_i = self._sufficient_stat(hist)
157            n_i = float(hist.n_periods)
158
159            mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior)
160
161            if mu_prior <= 0.0 or mu_post_is <= 0.0:
162                continue
163            log_cf_i = np.log(mu_post_is / mu_prior)
164            w_i = 1.0 / max(se_i**2, 1e-8)
165
166            sub_L.append(L_i)
167            sub_n.append(n_i)
168            sub_log_cf.append(log_cf_i)
169            sub_weights.append(w_i)
170
171        if len(sub_L) < 4:
172            # Degenerate case: not enough data, return identity (CF = 1)
173            self.theta_ = np.zeros(self._n_features())
174            self.is_fitted_ = True
175            return self
176
177        # Design matrix for g(L, n)
178        X = self._design_matrix(np.array(sub_L), np.array(sub_n))
179        y = np.array(sub_log_cf)
180        w = np.array(sub_weights)
181        w = w / w.sum()
182
183        # WLS: solve (X^T W X) theta = X^T W y
184        W = np.diag(w)
185        XtWX = X.T @ W @ X
186        XtWy = X.T @ (w[:, None] * y[:, None]).ravel()
187
188        try:
189            self.theta_ = np.linalg.solve(XtWX, XtWy)
190        except np.linalg.LinAlgError:
191            self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0]
192
193        self.is_fitted_ = True
194        return self
195
196    def predict(self, history: ClaimsHistory) -> float:
197        """Compute the credibility factor for a single policy.
198
199        Parameters
200        ----------
201        history : ClaimsHistory
202            The policy's claims history.
203
204        Returns
205        -------
206        float
207            Credibility factor CF = exp(g(L, n)). Multiply by prior_premium
208            to obtain the posterior premium.
209
210        Raises
211        ------
212        RuntimeError
213            If fit() has not been called.
214        """
215        self._check_fitted()
216        assert self.theta_ is not None
217
218        L = self._sufficient_stat(history)
219        n = float(history.n_periods)
220        x = self._design_matrix(np.array([L]), np.array([n]))[0]
221        log_cf = float(x @ self.theta_)
222        return float(np.exp(log_cf))
223
224    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
225        """Score a batch of policies and return a Polars DataFrame.
226
227        Parameters
228        ----------
229        histories : list[ClaimsHistory]
230            Policies to score.
231
232        Returns
233        -------
234        pl.DataFrame
235            Columns: policy_id, prior_premium, credibility_factor,
236            posterior_premium, sufficient_stat.
237        """
238        self._check_fitted()
239        rows = []
240        for h in histories:
241            cf = self.predict(h)
242            rows.append(
243                {
244                    "policy_id": h.policy_id,
245                    "prior_premium": h.prior_premium,
246                    "credibility_factor": cf,
247                    "posterior_premium": h.prior_premium * cf,
248                    "sufficient_stat": self._sufficient_stat(h),
249                }
250            )
251        return pl.DataFrame(rows)
252
253    # ------------------------------------------------------------------
254    # Private helpers
255    # ------------------------------------------------------------------
256
257    def _is_posterior(
258        self,
259        history: ClaimsHistory,
260        theta_samples: np.ndarray,
261        mu_prior: float,
262    ) -> tuple[float, float]:
263        """Estimate the Bayesian posterior premium via importance sampling.
264
265        Computes E[mu(Theta) | Y_{1:n}] using self-normalised IS:
266            E_hat = sum_k w_k * mu(Theta_k) / sum_k w_k
267        where w_k = exp(log-likelihood of Y_{1:n} | Theta_k) and
268        Theta_k ~ prior (Gamma(1,1) in this implementation).
269
270        Returns (posterior_mean, standard_error).
271        """
272        assert history.exposures is not None
273        log_weights = np.zeros(len(theta_samples))
274        for y_t, e_t in zip(history.claim_counts, history.exposures):
275            # Y_t | Theta ~ Poi(mu_prior * e_t * Theta)
276            rate = mu_prior * e_t * theta_samples
277            # Poisson log PMF: y*log(rate) - rate - log(y!)
278            log_weights += y_t * np.log(rate + 1e-300) - rate
279
280        # Stabilise: subtract max log weight
281        log_weights -= log_weights.max()
282        weights = np.exp(log_weights)
283        weights /= weights.sum()
284
285        # Posterior premium: E[mu_prior * Theta | data] = mu_prior * E[Theta | data]
286        post_mean = float(mu_prior * np.sum(weights * theta_samples))
287
288        # Effective sample size for SE calculation
289        ess = 1.0 / np.sum(weights**2)
290        if ess < 2:
291            se = abs(post_mean) * 0.5  # pessimistic SE
292        else:
293            # Monte Carlo SE
294            mean_theta = np.sum(weights * theta_samples)
295            var_theta = np.sum(weights * (theta_samples - mean_theta) ** 2)
296            se = float(mu_prior * np.sqrt(var_theta / max(ess, 1)))
297
298        return post_mean, se
299
300    def _sufficient_stat(self, history: ClaimsHistory) -> float:
301        if self.sufficient_stat_fn is not None:
302            return self.sufficient_stat_fn(history)
303        return history_sufficient_stat(history, theta_ref=self.theta_ref_)
304
305    def _get_prior(self, history: ClaimsHistory) -> float:
306        if self.prior_model is not None:
307            return self.prior_model(history)
308        return history.prior_premium
309
310    def _design_matrix(self, L: np.ndarray, n: np.ndarray) -> np.ndarray:
311        """Build polynomial design matrix for g(L, n)."""
312        cols = [np.ones(len(L))]
313        if self.poly_degree >= 1:
314            cols.extend([L, n])
315        if self.poly_degree >= 2:
316            cols.extend([L**2, n**2, L * n])
317        return np.column_stack(cols)
318
319    def _n_features(self) -> int:
320        if self.poly_degree >= 2:
321            return 6
322        if self.poly_degree >= 1:
323            return 3
324        return 1
325
326    def _check_fitted(self) -> None:
327        if not self.is_fitted_:
328            raise RuntimeError(
329                "Model has not been fitted. Call fit() before predict()."
330            )
331
332    def __repr__(self) -> str:
333        if self.is_fitted_:
334            return (
335                f"SurrogateModel(poly_degree={self.poly_degree}, "
336                f"n_is_samples={self.n_is_samples})"
337            )
338        return f"SurrogateModel(poly_degree={self.poly_degree}, unfitted)"

Surrogate model for large-portfolio Bayesian experience rating.

Fits an approximate g(.) function on a sub-portfolio using importance sampling, then applies it to the full portfolio analytically.

The posterior premium for policy i is:

mu_post_i = mu_prior_i * exp(g(L_i, n_i))

where L_i is the log-likelihood sufficient statistic and n_i is the number of periods observed.

The form of g(.) is a polynomial in L and n: g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n

This is flexible enough to capture Bühlmann-Straub as a special case (linear in L = Y_bar) while allowing non-linear responses.

Parameters

prior_model : callable or None Function mapping ClaimsHistory -> float, returning the a priori premium for any policy. If None, uses history.prior_premium directly. This should be a GLM or similar model already fitted on the portfolio. n_is_samples : int Number of importance sampling draws from the prior. More samples give a better IS estimate but are slower. Default 2000. subsample_frac : float Fraction of the portfolio to use in the sub-portfolio for fitting g. Default 0.10 (10%). Minimum 20 policies regardless of fraction. poly_degree : int Polynomial degree for g(.). Degree 1 gives linear g; degree 2 allows quadratic response. Default 1. random_state : int or None Random seed for IS sampling and sub-portfolio selection. sufficient_stat_fn : callable or None Custom function mapping ClaimsHistory -> float for the sufficient statistic. If None, uses the log-likelihood under Poisson(theta_ref).

Attributes

theta_ : np.ndarray Fitted WLS coefficients for g(.). theta_ref_ : float Reference Poisson intensity used in the sufficient statistic. is_fitted_ : bool True after fit() has been called.

SurrogateModel( prior_model: Optional[Callable[[ClaimsHistory], float]] = None, n_is_samples: int = 2000, subsample_frac: float = 0.1, poly_degree: int = 1, random_state: Optional[int] = None, sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None)
 86    def __init__(
 87        self,
 88        prior_model: Optional[Callable[[ClaimsHistory], float]] = None,
 89        n_is_samples: int = 2000,
 90        subsample_frac: float = 0.10,
 91        poly_degree: int = 1,
 92        random_state: Optional[int] = None,
 93        sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None,
 94    ) -> None:
 95        self.prior_model = prior_model
 96        self.n_is_samples = n_is_samples
 97        self.subsample_frac = subsample_frac
 98        self.poly_degree = poly_degree
 99        self.random_state = random_state
100        self.sufficient_stat_fn = sufficient_stat_fn
101
102        self.theta_: Optional[np.ndarray] = None
103        self.theta_ref_: Optional[float] = None
104        self.is_fitted_: bool = False
105        self._rng: Optional[np.random.Generator] = None
prior_model
n_is_samples
subsample_frac
poly_degree
random_state
sufficient_stat_fn
theta_: Optional[numpy.ndarray]
theta_ref_: Optional[float]
is_fitted_: bool
def fit( self, histories: list[ClaimsHistory]) -> SurrogateModel:
107    def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel":
108        """Fit the surrogate g(.) function on a representative sub-portfolio.
109
110        Steps:
111        1. Draw a sub-portfolio (random stratified selection by n_periods).
112        2. Estimate reference intensity theta_ref as portfolio mean frequency.
113        3. Draw IS samples Theta_k ~ prior once (shared across all policies).
114        4. For each sub-portfolio policy, compute IS-based Bayesian premium.
115        5. Fit g(.) by WLS minimising weighted squared log-credibility error.
116
117        Parameters
118        ----------
119        histories : list[ClaimsHistory]
120            Full training portfolio.
121
122        Returns
123        -------
124        SurrogateModel
125            self (fitted model).
126        """
127        self._rng = np.random.default_rng(self.random_state)
128
129        # Estimate reference intensity
130        total_claims = sum(h.total_claims for h in histories)
131        total_exp = sum(h.total_exposure for h in histories)
132        self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0
133
134        # Select sub-portfolio
135        n_sub = max(20, int(len(histories) * self.subsample_frac))
136        n_sub = min(n_sub, len(histories))
137        idx = self._rng.choice(len(histories), size=n_sub, replace=False)
138        sub_histories = [histories[i] for i in idx]
139
140        # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior)
141        # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1
142        prior_alpha = 1.0
143        prior_beta = 1.0
144        theta_samples = self._rng.gamma(
145            prior_alpha, 1.0 / prior_beta, size=self.n_is_samples
146        )
147
148        # Compute IS-based posterior premiums for sub-portfolio
149        sub_L = []
150        sub_n = []
151        sub_log_cf = []
152        sub_weights = []
153
154        for hist in sub_histories:
155            mu_prior = self._get_prior(hist)
156            L_i = self._sufficient_stat(hist)
157            n_i = float(hist.n_periods)
158
159            mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior)
160
161            if mu_prior <= 0.0 or mu_post_is <= 0.0:
162                continue
163            log_cf_i = np.log(mu_post_is / mu_prior)
164            w_i = 1.0 / max(se_i**2, 1e-8)
165
166            sub_L.append(L_i)
167            sub_n.append(n_i)
168            sub_log_cf.append(log_cf_i)
169            sub_weights.append(w_i)
170
171        if len(sub_L) < 4:
172            # Degenerate case: not enough data, return identity (CF = 1)
173            self.theta_ = np.zeros(self._n_features())
174            self.is_fitted_ = True
175            return self
176
177        # Design matrix for g(L, n)
178        X = self._design_matrix(np.array(sub_L), np.array(sub_n))
179        y = np.array(sub_log_cf)
180        w = np.array(sub_weights)
181        w = w / w.sum()
182
183        # WLS: solve (X^T W X) theta = X^T W y
184        W = np.diag(w)
185        XtWX = X.T @ W @ X
186        XtWy = X.T @ (w[:, None] * y[:, None]).ravel()
187
188        try:
189            self.theta_ = np.linalg.solve(XtWX, XtWy)
190        except np.linalg.LinAlgError:
191            self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0]
192
193        self.is_fitted_ = True
194        return self

Fit the surrogate g(.) function on a representative sub-portfolio.

Steps:

  1. Draw a sub-portfolio (random stratified selection by n_periods).
  2. Estimate reference intensity theta_ref as portfolio mean frequency.
  3. Draw IS samples Theta_k ~ prior once (shared across all policies).
  4. For each sub-portfolio policy, compute IS-based Bayesian premium.
  5. Fit g(.) by WLS minimising weighted squared log-credibility error.

Parameters

histories : list[ClaimsHistory] Full training portfolio.

Returns

SurrogateModel self (fitted model).

def predict( self, history: ClaimsHistory) -> float:
196    def predict(self, history: ClaimsHistory) -> float:
197        """Compute the credibility factor for a single policy.
198
199        Parameters
200        ----------
201        history : ClaimsHistory
202            The policy's claims history.
203
204        Returns
205        -------
206        float
207            Credibility factor CF = exp(g(L, n)). Multiply by prior_premium
208            to obtain the posterior premium.
209
210        Raises
211        ------
212        RuntimeError
213            If fit() has not been called.
214        """
215        self._check_fitted()
216        assert self.theta_ is not None
217
218        L = self._sufficient_stat(history)
219        n = float(history.n_periods)
220        x = self._design_matrix(np.array([L]), np.array([n]))[0]
221        log_cf = float(x @ self.theta_)
222        return float(np.exp(log_cf))

Compute the credibility factor for a single policy.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

float Credibility factor CF = exp(g(L, n)). Multiply by prior_premium to obtain the posterior premium.

Raises

RuntimeError If fit() has not been called.

def predict_batch( self, histories: list[ClaimsHistory]) -> polars.dataframe.frame.DataFrame:
224    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
225        """Score a batch of policies and return a Polars DataFrame.
226
227        Parameters
228        ----------
229        histories : list[ClaimsHistory]
230            Policies to score.
231
232        Returns
233        -------
234        pl.DataFrame
235            Columns: policy_id, prior_premium, credibility_factor,
236            posterior_premium, sufficient_stat.
237        """
238        self._check_fitted()
239        rows = []
240        for h in histories:
241            cf = self.predict(h)
242            rows.append(
243                {
244                    "policy_id": h.policy_id,
245                    "prior_premium": h.prior_premium,
246                    "credibility_factor": cf,
247                    "posterior_premium": h.prior_premium * cf,
248                    "sufficient_stat": self._sufficient_stat(h),
249                }
250            )
251        return pl.DataFrame(rows)

Score a batch of policies and return a Polars DataFrame.

Parameters

histories : list[ClaimsHistory] Policies to score.

Returns

pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, sufficient_stat.

DeepAttentionModel
def balance_calibrate( predict_fn: Callable[[ClaimsHistory], float], histories: list[ClaimsHistory], exposure_weighted: bool = True) -> CalibrationResult:
28def balance_calibrate(
29    predict_fn: Callable[[ClaimsHistory], float],
30    histories: list[ClaimsHistory],
31    exposure_weighted: bool = True,
32) -> CalibrationResult:
33    """Compute the portfolio-level balance calibration factor.
34
35    The calibration factor delta satisfies:
36        sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i]
37
38    For multiplicative models:
39        delta = sum(actual * exposure) / sum(posterior * exposure)
40
41    This function computes delta and returns it as a CalibrationResult.
42    Apply delta to all posterior premiums to restore portfolio balance.
43
44    Parameters
45    ----------
46    predict_fn : callable
47        Function mapping ClaimsHistory -> float, returning the credibility
48        factor (not the posterior premium). Typically model.predict.
49    histories : list[ClaimsHistory]
50        Calibration portfolio. Typically the same data used for fitting,
51        but can be a hold-out period for forward-looking calibration.
52    exposure_weighted : bool
53        If True, weight each policy by its total exposure when computing
54        the balance check. If False, use simple sums. Default True.
55
56    Returns
57    -------
58    CalibrationResult
59        Contains calibration_factor, sum_actual, sum_predicted, n_policies.
60
61    Examples
62    --------
63    >>> model = StaticCredibilityModel().fit(histories)
64    >>> result = balance_calibrate(model.predict, histories)
65    >>> print(f"Calibration factor: {result.calibration_factor:.4f}")
66    >>> # Apply to predictions:
67    >>> posterior = prior * model.predict(h) * result.calibration_factor
68    """
69    sum_actual = 0.0
70    sum_predicted = 0.0
71
72    for h in histories:
73        assert h.exposures is not None
74        cf = predict_fn(h)
75        posterior = h.prior_premium * cf
76
77        if exposure_weighted:
78            weight = h.total_exposure
79        else:
80            weight = 1.0
81
82        # Actual: total claims weighted by exposure (as a rate)
83        # We sum actual * exposure / total_exposure * weight to get total weighted claims
84        actual_rate = h.claim_frequency  # total_claims / total_exposure
85        sum_actual += actual_rate * weight
86        sum_predicted += posterior * weight
87
88    if sum_predicted <= 0.0:
89        calibration_factor = 1.0
90    else:
91        calibration_factor = sum_actual / sum_predicted
92
93    return CalibrationResult(
94        calibration_factor=float(calibration_factor),
95        sum_actual=float(sum_actual),
96        sum_predicted=float(sum_predicted),
97        n_policies=len(histories),
98    )

Compute the portfolio-level balance calibration factor.

The calibration factor delta satisfies:

sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i]

For multiplicative models:

delta = sum(actual * exposure) / sum(posterior * exposure)

This function computes delta and returns it as a CalibrationResult. Apply delta to all posterior premiums to restore portfolio balance.

Parameters

predict_fn : callable Function mapping ClaimsHistory -> float, returning the credibility factor (not the posterior premium). Typically model.predict. histories : list[ClaimsHistory] Calibration portfolio. Typically the same data used for fitting, but can be a hold-out period for forward-looking calibration. exposure_weighted : bool If True, weight each policy by its total exposure when computing the balance check. If False, use simple sums. Default True.

Returns

CalibrationResult Contains calibration_factor, sum_actual, sum_predicted, n_policies.

Examples

>>> model = StaticCredibilityModel().fit(histories)
>>> result = balance_calibrate(model.predict, histories)
>>> print(f"Calibration factor: {result.calibration_factor:.4f}")
>>> # Apply to predictions:
>>> posterior = prior * model.predict(h) * result.calibration_factor
def apply_calibration( credibility_factor: float, calibration_result: CalibrationResult) -> float:
101def apply_calibration(
102    credibility_factor: float,
103    calibration_result: CalibrationResult,
104) -> float:
105    """Apply a calibration factor to a single credibility factor.
106
107    Parameters
108    ----------
109    credibility_factor : float
110        The raw credibility factor from a model's predict().
111    calibration_result : CalibrationResult
112        Output of balance_calibrate().
113
114    Returns
115    -------
116    float
117        Calibrated credibility factor = CF * delta.
118    """
119    return credibility_factor * calibration_result.calibration_factor

Apply a calibration factor to a single credibility factor.

Parameters

credibility_factor : float The raw credibility factor from a model's predict(). calibration_result : CalibrationResult Output of balance_calibrate().

Returns

float Calibrated credibility factor = CF * delta.

def calibrated_predict_fn( predict_fn: Callable[[ClaimsHistory], float], calibration_result: CalibrationResult) -> Callable[[ClaimsHistory], float]:
122def calibrated_predict_fn(
123    predict_fn: Callable[[ClaimsHistory], float],
124    calibration_result: CalibrationResult,
125) -> Callable[[ClaimsHistory], float]:
126    """Wrap a predict function to apply calibration automatically.
127
128    Parameters
129    ----------
130    predict_fn : callable
131        Original predict function (ClaimsHistory -> credibility_factor).
132    calibration_result : CalibrationResult
133        Output of balance_calibrate().
134
135    Returns
136    -------
137    callable
138        New predict function that applies calibration to each prediction.
139
140    Examples
141    --------
142    >>> cal = balance_calibrate(model.predict, histories)
143    >>> calibrated = calibrated_predict_fn(model.predict, cal)
144    >>> cf = calibrated(new_history)  # already calibrated
145    """
146    delta = calibration_result.calibration_factor
147
148    def _calibrated(history: ClaimsHistory) -> float:
149        return predict_fn(history) * delta
150
151    return _calibrated

Wrap a predict function to apply calibration automatically.

Parameters

predict_fn : callable Original predict function (ClaimsHistory -> credibility_factor). calibration_result : CalibrationResult Output of balance_calibrate().

Returns

callable New predict function that applies calibration to each prediction.

Examples

>>> cal = balance_calibrate(model.predict, histories)
>>> calibrated = calibrated_predict_fn(model.predict, cal)
>>> cf = calibrated(new_history)  # already calibrated
def balance_report( predict_fn: Callable[[ClaimsHistory], float], histories: list[ClaimsHistory], by_n_periods: bool = False) -> polars.dataframe.frame.DataFrame:
154def balance_report(
155    predict_fn: Callable[[ClaimsHistory], float],
156    histories: list[ClaimsHistory],
157    by_n_periods: bool = False,
158) -> pl.DataFrame:
159    """Generate a portfolio-level balance report.
160
161    Shows the actual vs predicted claim frequency, and the credibility
162    factor distribution, optionally broken down by number of observed
163    periods (to check whether newer or longer-tenured policies have
164    systematic bias).
165
166    Parameters
167    ----------
168    predict_fn : callable
169        Function mapping ClaimsHistory -> credibility_factor.
170    histories : list[ClaimsHistory]
171        Portfolio to assess.
172    by_n_periods : bool
173        If True, break down by number of observed periods. Default False.
174
175    Returns
176    -------
177    pl.DataFrame
178        Summary statistics: policy_id, n_periods, actual_frequency,
179        prior_premium, credibility_factor, posterior_premium,
180        and residual (actual / posterior).
181    """
182    rows = []
183    for h in histories:
184        assert h.exposures is not None
185        cf = predict_fn(h)
186        posterior = h.prior_premium * cf
187        actual_freq = h.claim_frequency
188        residual = actual_freq / posterior if posterior > 0 else float("nan")
189        rows.append(
190            {
191                "policy_id": h.policy_id,
192                "n_periods": h.n_periods,
193                "total_exposure": h.total_exposure,
194                "actual_frequency": actual_freq,
195                "prior_premium": h.prior_premium,
196                "credibility_factor": cf,
197                "posterior_premium": posterior,
198                "residual": residual,
199            }
200        )
201    df = pl.DataFrame(rows)
202
203    if by_n_periods:
204        return (
205            df.group_by("n_periods")
206            .agg(
207                [
208                    pl.col("actual_frequency").mean().alias("mean_actual"),
209                    pl.col("posterior_premium").mean().alias("mean_posterior"),
210                    pl.col("credibility_factor").mean().alias("mean_cf"),
211                    pl.col("residual").mean().alias("mean_residual"),
212                    pl.len().alias("n_policies"),
213                ]
214            )
215            .sort("n_periods")
216        )
217
218    return df

Generate a portfolio-level balance report.

Shows the actual vs predicted claim frequency, and the credibility factor distribution, optionally broken down by number of observed periods (to check whether newer or longer-tenured policies have systematic bias).

Parameters

predict_fn : callable Function mapping ClaimsHistory -> credibility_factor. histories : list[ClaimsHistory] Portfolio to assess. by_n_periods : bool If True, break down by number of observed periods. Default False.

Returns

pl.DataFrame Summary statistics: policy_id, n_periods, actual_frequency, prior_premium, credibility_factor, posterior_premium, and residual (actual / posterior).

def credibility_factor(posterior_premium: float, prior_premium: float) -> float:
16def credibility_factor(posterior_premium: float, prior_premium: float) -> float:
17    """Compute the multiplicative credibility factor.
18
19    The credibility factor CF satisfies:
20        posterior_premium = prior_premium * CF
21
22    Parameters
23    ----------
24    posterior_premium : float
25        The experience-adjusted (a posteriori) premium.
26    prior_premium : float
27        The a priori (GLM-based) premium.
28
29    Returns
30    -------
31    float
32        Credibility factor. Values > 1 indicate the policy is worse than
33        the GLM base rate; values < 1 indicate better-than-average experience.
34
35    Raises
36    ------
37    ValueError
38        If prior_premium is zero or negative.
39    """
40    if prior_premium <= 0.0:
41        raise ValueError(f"prior_premium must be positive, got {prior_premium}")
42    return posterior_premium / prior_premium

Compute the multiplicative credibility factor.

The credibility factor CF satisfies:

posterior_premium = prior_premium * CF

Parameters

posterior_premium : float The experience-adjusted (a posteriori) premium. prior_premium : float The a priori (GLM-based) premium.

Returns

float Credibility factor. Values > 1 indicate the policy is worse than the GLM base rate; values < 1 indicate better-than-average experience.

Raises

ValueError If prior_premium is zero or negative.

def posterior_premium( prior_premium: float, cf: float, calibration_factor: float = 1.0) -> float:
45def posterior_premium(
46    prior_premium: float, cf: float, calibration_factor: float = 1.0
47) -> float:
48    """Compute the posterior premium from a credibility factor.
49
50    Parameters
51    ----------
52    prior_premium : float
53        The a priori (GLM-based) premium.
54    cf : float
55        Credibility factor (posterior / prior ratio).
56    calibration_factor : float
57        Optional portfolio-level calibration factor from balance_calibrate().
58        Default 1.0 (no calibration applied).
59
60    Returns
61    -------
62    float
63        Posterior premium after experience adjustment and optional calibration.
64    """
65    return prior_premium * cf * calibration_factor

Compute the posterior premium from a credibility factor.

Parameters

prior_premium : float The a priori (GLM-based) premium. cf : float Credibility factor (posterior / prior ratio). calibration_factor : float Optional portfolio-level calibration factor from balance_calibrate(). Default 1.0 (no calibration applied).

Returns

float Posterior premium after experience adjustment and optional calibration.

def seniority_weights( n_periods: int, p: float, q: float, exposures: list[float] | None = None) -> numpy.ndarray:
 68def seniority_weights(
 69    n_periods: int,
 70    p: float,
 71    q: float,
 72    exposures: list[float] | None = None,
 73) -> np.ndarray:
 74    """Compute seniority (recency) weights for claims history periods.
 75
 76    In the dynamic Poisson-gamma model, the effective weight on period t's
 77    claims decays geometrically with age. This function computes the
 78    relative weight assigned to each period in a t-period history when
 79    fitting or predicting.
 80
 81    The weight assigned to period s (0-indexed from oldest) in a history
 82    of length t is proportional to (p*q)^(t-s). The most recent period
 83    receives weight 1; the oldest receives weight (p*q)^(t-1).
 84
 85    Parameters
 86    ----------
 87    n_periods : int
 88        Number of observation periods.
 89    p : float
 90        Transition parameter controlling how strongly the current state
 91        reverts to mean. Must be in (0, 1].
 92    q : float
 93        Decay parameter controlling how quickly older claims are
 94        downweighted. Must be in (0, 1].
 95    exposures : list[float] or None
 96        Per-period exposure weights. If None, uniform exposure is assumed.
 97
 98    Returns
 99    -------
100    np.ndarray
101        Array of shape (n_periods,) with seniority weights, normalised
102        to sum to 1. Most recent period is last.
103
104    Examples
105    --------
106    >>> seniority_weights(3, p=0.9, q=0.8)
107    array([0.182..., 0.274..., 0.543...])  # most recent gets highest weight
108    """
109    if not (0 < p <= 1.0):
110        raise ValueError(f"p must be in (0, 1], got {p}")
111    if not (0 < q <= 1.0):
112        raise ValueError(f"q must be in (0, 1], got {q}")
113
114    decay = p * q
115    # Weights for periods 0, 1, ..., n-1 (oldest to most recent)
116    raw = np.array([decay ** (n_periods - 1 - s) for s in range(n_periods)])
117
118    if exposures is not None:
119        raw = raw * np.asarray(exposures, dtype=float)
120
121    total = raw.sum()
122    if total == 0.0:
123        return np.ones(n_periods) / n_periods
124    return raw / total

Compute seniority (recency) weights for claims history periods.

In the dynamic Poisson-gamma model, the effective weight on period t's claims decays geometrically with age. This function computes the relative weight assigned to each period in a t-period history when fitting or predicting.

The weight assigned to period s (0-indexed from oldest) in a history of length t is proportional to (pq)^(t-s). The most recent period receives weight 1; the oldest receives weight (pq)^(t-1).

Parameters

n_periods : int Number of observation periods. p : float Transition parameter controlling how strongly the current state reverts to mean. Must be in (0, 1]. q : float Decay parameter controlling how quickly older claims are downweighted. Must be in (0, 1]. exposures : list[float] or None Per-period exposure weights. If None, uniform exposure is assumed.

Returns

np.ndarray Array of shape (n_periods,) with seniority weights, normalised to sum to 1. Most recent period is last.

Examples

>>> seniority_weights(3, p=0.9, q=0.8)
array([0.182..., 0.274..., 0.543...])  # most recent gets highest weight
def exposure_weighted_mean(counts: list[int], exposures: list[float]) -> float:
127def exposure_weighted_mean(
128    counts: list[int],
129    exposures: list[float],
130) -> float:
131    """Compute exposure-weighted mean claim frequency.
132
133    Parameters
134    ----------
135    counts : list[int]
136        Claim counts per period.
137    exposures : list[float]
138        Exposure (e.g., years on risk) per period.
139
140    Returns
141    -------
142    float
143        Total claims divided by total exposure. Returns 0.0 if total
144        exposure is zero.
145    """
146    total_exp = sum(exposures)
147    if total_exp == 0.0:
148        return 0.0
149    return sum(counts) / total_exp

Compute exposure-weighted mean claim frequency.

Parameters

counts : list[int] Claim counts per period. exposures : list[float] Exposure (e.g., years on risk) per period.

Returns

float Total claims divided by total exposure. Returns 0.0 if total exposure is zero.

def history_sufficient_stat( history: ClaimsHistory, theta_ref: float | None = None) -> float:
152def history_sufficient_stat(
153    history: ClaimsHistory,
154    theta_ref: float | None = None,
155) -> float:
156    """Compute the log-likelihood sufficient statistic for a claims history.
157
158    For Poisson models with a reference intensity theta_ref, the sufficient
159    statistic is the log-likelihood of the observed claim counts:
160
161        L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ]
162
163    When theta_ref is None, uses the empirical frequency as reference.
164
165    This is the input to the surrogate model's g(.) function.
166
167    Parameters
168    ----------
169    history : ClaimsHistory
170        The policy's claims history.
171    theta_ref : float or None
172        Reference intensity (claims per exposure unit). If None, uses the
173        empirical claim frequency from the history.
174
175    Returns
176    -------
177    float
178        Log-likelihood sufficient statistic.
179    """
180    assert history.exposures is not None
181    if theta_ref is None:
182        theta_ref = history.claim_frequency
183        if theta_ref == 0.0:
184            theta_ref = 1e-6  # avoid log(0)
185
186    log_lik = 0.0
187    for y_t, e_t in zip(history.claim_counts, history.exposures):
188        rate = theta_ref * e_t
189        log_lik += y_t * np.log(rate) - rate
190    return log_lik

Compute the log-likelihood sufficient statistic for a claims history.

For Poisson models with a reference intensity theta_ref, the sufficient statistic is the log-likelihood of the observed claim counts:

L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ]

When theta_ref is None, uses the empirical frequency as reference.

This is the input to the surrogate model's g(.) function.

Parameters

history : ClaimsHistory The policy's claims history. theta_ref : float or None Reference intensity (claims per exposure unit). If None, uses the empirical claim frequency from the history.

Returns

float Log-likelihood sufficient statistic.

__version__ = '0.1.4'