Edit on GitHub

insurance_credibility.experience

Individual policy-level Bayesian posterior experience rating.

Four progressive model tiers for updating a priori GLM premiums with claims history:

  1. StaticCredibilityModel — Bühlmann-Straub at individual policy level
  2. DynamicPoissonGammaModel — State-space model with seniority weighting (Ahn et al. 2023)
  3. SurrogateModel — IS-based surrogate for intractable posteriors (Calcetero et al. 2024)
  4. DeepAttentionModel — Neural attention credibility (Wüthrich 2024) [requires torch]
All models share the same interface:

model.fit(histories) # list[ClaimsHistory] cf = model.predict(history) # returns credibility factor (float) posterior = prior * cf

Quick start::

from insurance_credibility.experience import ClaimsHistory, StaticCredibilityModel
from insurance_credibility.experience import balance_calibrate

histories = [
    ClaimsHistory("POL001", periods=[1, 2, 3], claim_counts=[0, 1, 0],
                  exposures=[1.0, 1.0, 0.8], prior_premium=450.0),
    ClaimsHistory("POL002", periods=[1, 2, 3], claim_counts=[2, 1, 2],
                  exposures=[1.0, 1.0, 1.0], prior_premium=450.0),
]

model = StaticCredibilityModel()
model.fit(histories)
cf = model.predict(histories[0])
 1"""
 2Individual policy-level Bayesian posterior experience rating.
 3
 4Four progressive model tiers for updating a priori GLM premiums with
 5claims history:
 6
 71. StaticCredibilityModel  — Bühlmann-Straub at individual policy level
 82. DynamicPoissonGammaModel — State-space model with seniority weighting (Ahn et al. 2023)
 93. SurrogateModel          — IS-based surrogate for intractable posteriors (Calcetero et al. 2024)
104. DeepAttentionModel      — Neural attention credibility (Wüthrich 2024) [requires torch]
11
12All models share the same interface:
13    model.fit(histories)           # list[ClaimsHistory]
14    cf = model.predict(history)    # returns credibility factor (float)
15    posterior = prior * cf
16
17Quick start::
18
19    from insurance_credibility.experience import ClaimsHistory, StaticCredibilityModel
20    from insurance_credibility.experience import balance_calibrate
21
22    histories = [
23        ClaimsHistory("POL001", periods=[1, 2, 3], claim_counts=[0, 1, 0],
24                      exposures=[1.0, 1.0, 0.8], prior_premium=450.0),
25        ClaimsHistory("POL002", periods=[1, 2, 3], claim_counts=[2, 1, 2],
26                      exposures=[1.0, 1.0, 1.0], prior_premium=450.0),
27    ]
28
29    model = StaticCredibilityModel()
30    model.fit(histories)
31    cf = model.predict(histories[0])
32"""
33
34from ._types import CalibrationResult, ClaimsHistory
35from .calibration import (
36    apply_calibration,
37    balance_calibrate,
38    balance_report,
39    calibrated_predict_fn,
40)
41from .dynamic import DynamicPoissonGammaModel
42from .static import StaticCredibilityModel
43from .surrogate import SurrogateModel
44from .utils import (
45    credibility_factor,
46    exposure_weighted_mean,
47    history_sufficient_stat,
48    posterior_premium,
49    seniority_weights,
50)
51
52__all__ = [
53    # Data types
54    "ClaimsHistory",
55    "CalibrationResult",
56    # Models
57    "StaticCredibilityModel",
58    "DynamicPoissonGammaModel",
59    "SurrogateModel",
60    "DeepAttentionModel",
61    # Calibration
62    "balance_calibrate",
63    "apply_calibration",
64    "calibrated_predict_fn",
65    "balance_report",
66    # Utilities
67    "credibility_factor",
68    "posterior_premium",
69    "seniority_weights",
70    "exposure_weighted_mean",
71    "history_sufficient_stat",
72]
73
74
75def __getattr__(name: str):
76    """Lazy import for optional torch-dependent classes."""
77    if name == "DeepAttentionModel":
78        from .attention import DeepAttentionModel
79
80        return DeepAttentionModel
81    raise AttributeError(f"module 'insurance_credibility.experience' has no attribute {name!r}")
@dataclass
class ClaimsHistory:
 15@dataclass
 16class ClaimsHistory:
 17    """Claims history for a single policy.
 18
 19    This is the fundamental input type for all experience rating models.
 20    Each instance represents one policy's observed claims sequence across
 21    one or more periods (typically policy years).
 22
 23    Parameters
 24    ----------
 25    policy_id : str
 26        Unique identifier for the policy.
 27    periods : list[int]
 28        Year indices for each observation period. Typically [1, 2, 3, ...].
 29        Must be non-empty. Periods need not be contiguous but must be unique
 30        and in ascending order.
 31    claim_counts : list[int]
 32        Observed claim count in each period. Must have the same length as
 33        ``periods``. Non-negative integers.
 34    claim_amounts : list[float] or None
 35        Aggregate claim amount (incurred) in each period. Set to None for
 36        frequency-only models. When provided, must have the same length as
 37        ``periods``.
 38    exposures : list[float]
 39        Risk exposure in each period (e.g., years on risk, vehicle-years).
 40        Must have the same length as ``periods``. Defaults to 1.0 for each
 41        period if not supplied.
 42    prior_premium : float
 43        The a priori (GLM-based) expected annual claim cost for the next
 44        period. This is the base rate that experience rating will adjust.
 45        Must be strictly positive.
 46
 47    Examples
 48    --------
 49    >>> history = ClaimsHistory(
 50    ...     policy_id="POL001",
 51    ...     periods=[1, 2, 3],
 52    ...     claim_counts=[0, 1, 0],
 53    ...     exposures=[1.0, 1.0, 0.8],
 54    ...     prior_premium=450.0,
 55    ... )
 56    """
 57
 58    policy_id: str
 59    periods: list[int]
 60    claim_counts: list[int]
 61    claim_amounts: Optional[list[float]] = None
 62    exposures: Optional[list[float]] = None
 63    prior_premium: float = 1.0
 64
 65    def __post_init__(self) -> None:
 66        self._validate()
 67        # Normalise exposures to a list if not provided
 68        if self.exposures is None:
 69            self.exposures = [1.0] * len(self.periods)
 70
 71    def _validate(self) -> None:
 72        if len(self.periods) == 0:
 73            raise ValueError("periods must be non-empty")
 74        if len(self.claim_counts) != len(self.periods):
 75            raise ValueError(
 76                f"claim_counts length ({len(self.claim_counts)}) must match "
 77                f"periods length ({len(self.periods)})"
 78            )
 79        if self.claim_amounts is not None and len(self.claim_amounts) != len(
 80            self.periods
 81        ):
 82            raise ValueError(
 83                f"claim_amounts length ({len(self.claim_amounts)}) must match "
 84                f"periods length ({len(self.periods)})"
 85            )
 86        exposures = self.exposures
 87        if exposures is not None:
 88            if len(exposures) != len(self.periods):
 89                raise ValueError(
 90                    f"exposures length ({len(exposures)}) must match "
 91                    f"periods length ({len(self.periods)})"
 92                )
 93            if any(e <= 0.0 for e in exposures):
 94                raise ValueError("All exposures must be strictly positive")
 95        if any(c < 0 for c in self.claim_counts):
 96            raise ValueError("claim_counts must be non-negative")
 97        if self.prior_premium <= 0.0:
 98            raise ValueError("prior_premium must be strictly positive")
 99        if len(self.periods) != len(set(self.periods)):
100            raise ValueError("periods must be unique")
101
102    @property
103    def n_periods(self) -> int:
104        """Number of observation periods."""
105        return len(self.periods)
106
107    @property
108    def total_claims(self) -> int:
109        """Total claim count across all periods."""
110        return sum(self.claim_counts)
111
112    @property
113    def total_exposure(self) -> float:
114        """Total exposure across all periods."""
115        assert self.exposures is not None
116        return sum(self.exposures)
117
118    @property
119    def claim_frequency(self) -> float:
120        """Observed claim frequency (claims per unit exposure).
121
122        Returns 0.0 if total exposure is zero (guard only — exposures
123        are validated to be positive).
124        """
125        total_exp = self.total_exposure
126        if total_exp == 0.0:
127            return 0.0
128        return self.total_claims / total_exp
129
130    @property
131    def exposure_weighted_counts(self) -> list[float]:
132        """Claim counts adjusted by exposure: Y_t / e_t per period."""
133        assert self.exposures is not None
134        return [c / e for c, e in zip(self.claim_counts, self.exposures)]

Claims history for a single policy.

This is the fundamental input type for all experience rating models. Each instance represents one policy's observed claims sequence across one or more periods (typically policy years).

Parameters

policy_id : str Unique identifier for the policy. periods : list[int] Year indices for each observation period. Typically [1, 2, 3, ...]. Must be non-empty. Periods need not be contiguous but must be unique and in ascending order. claim_counts : list[int] Observed claim count in each period. Must have the same length as periods. Non-negative integers. claim_amounts : list[float] or None Aggregate claim amount (incurred) in each period. Set to None for frequency-only models. When provided, must have the same length as periods. exposures : list[float] Risk exposure in each period (e.g., years on risk, vehicle-years). Must have the same length as periods. Defaults to 1.0 for each period if not supplied. prior_premium : float The a priori (GLM-based) expected annual claim cost for the next period. This is the base rate that experience rating will adjust. Must be strictly positive.

Examples

>>> history = ClaimsHistory(
...     policy_id="POL001",
...     periods=[1, 2, 3],
...     claim_counts=[0, 1, 0],
...     exposures=[1.0, 1.0, 0.8],
...     prior_premium=450.0,
... )
ClaimsHistory( policy_id: str, periods: list[int], claim_counts: list[int], claim_amounts: Optional[list[float]] = None, exposures: Optional[list[float]] = None, prior_premium: float = 1.0)
policy_id: str
periods: list[int]
claim_counts: list[int]
claim_amounts: Optional[list[float]] = None
exposures: Optional[list[float]] = None
prior_premium: float = 1.0
n_periods: int
102    @property
103    def n_periods(self) -> int:
104        """Number of observation periods."""
105        return len(self.periods)

Number of observation periods.

total_claims: int
107    @property
108    def total_claims(self) -> int:
109        """Total claim count across all periods."""
110        return sum(self.claim_counts)

Total claim count across all periods.

total_exposure: float
112    @property
113    def total_exposure(self) -> float:
114        """Total exposure across all periods."""
115        assert self.exposures is not None
116        return sum(self.exposures)

Total exposure across all periods.

claim_frequency: float
118    @property
119    def claim_frequency(self) -> float:
120        """Observed claim frequency (claims per unit exposure).
121
122        Returns 0.0 if total exposure is zero (guard only — exposures
123        are validated to be positive).
124        """
125        total_exp = self.total_exposure
126        if total_exp == 0.0:
127            return 0.0
128        return self.total_claims / total_exp

Observed claim frequency (claims per unit exposure).

Returns 0.0 if total exposure is zero (guard only — exposures are validated to be positive).

exposure_weighted_counts: list[float]
130    @property
131    def exposure_weighted_counts(self) -> list[float]:
132        """Claim counts adjusted by exposure: Y_t / e_t per period."""
133        assert self.exposures is not None
134        return [c / e for c, e in zip(self.claim_counts, self.exposures)]

Claim counts adjusted by exposure: Y_t / e_t per period.

@dataclass
class CalibrationResult:
137@dataclass
138class CalibrationResult:
139    """Output of the balance calibration step.
140
141    Parameters
142    ----------
143    calibration_factor : float
144        Multiplicative factor applied to all posterior premiums to restore
145        portfolio balance. Values close to 1.0 indicate the model was
146        already approximately balanced before calibration.
147    sum_actual : float
148        Sum of observed claim counts (weighted by exposure) over the
149        calibration portfolio.
150    sum_predicted : float
151        Sum of posterior premiums (weighted by exposure) before calibration.
152    n_policies : int
153        Number of policies in the calibration portfolio.
154    """
155
156    calibration_factor: float
157    sum_actual: float
158    sum_predicted: float
159    n_policies: int
160
161    @property
162    def relative_bias(self) -> float:
163        """Relative bias before calibration: (predicted - actual) / actual."""
164        if self.sum_actual == 0.0:
165            return float("nan")
166        return (self.sum_predicted - self.sum_actual) / self.sum_actual

Output of the balance calibration step.

Parameters

calibration_factor : float Multiplicative factor applied to all posterior premiums to restore portfolio balance. Values close to 1.0 indicate the model was already approximately balanced before calibration. sum_actual : float Sum of observed claim counts (weighted by exposure) over the calibration portfolio. sum_predicted : float Sum of posterior premiums (weighted by exposure) before calibration. n_policies : int Number of policies in the calibration portfolio.

CalibrationResult( calibration_factor: float, sum_actual: float, sum_predicted: float, n_policies: int)
calibration_factor: float
sum_actual: float
sum_predicted: float
n_policies: int
relative_bias: float
161    @property
162    def relative_bias(self) -> float:
163        """Relative bias before calibration: (predicted - actual) / actual."""
164        if self.sum_actual == 0.0:
165            return float("nan")
166        return (self.sum_predicted - self.sum_actual) / self.sum_actual

Relative bias before calibration: (predicted - actual) / actual.

class StaticCredibilityModel:
 32class StaticCredibilityModel:
 33    """Bühlmann-Straub credibility at individual policy level.
 34
 35    Fits the structural parameter kappa from a portfolio of policy histories
 36    using the method of moments estimator. Predicts a multiplicative
 37    credibility factor for each policy.
 38
 39    The model assumes:
 40        Y_{t} | Theta ~ Poisson(mu * Theta * e_t)
 41        Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2
 42
 43    This gives the credibility approximation:
 44        E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t)
 45
 46    Parameters
 47    ----------
 48    kappa : float or None
 49        Known credibility coefficient kappa = sigma^2 / tau^2. If None,
 50        kappa is estimated from the portfolio during fit(). Set explicitly
 51        to share a kappa estimated from a larger portfolio.
 52    min_kappa : float
 53        Lower bound on the estimated kappa. Prevents omega from reaching 1
 54        when the portfolio is very homogeneous. Default 0.1.
 55    max_kappa : float
 56        Upper bound on the estimated kappa. Prevents degenerate credibility
 57        for very heterogeneous portfolios. Default 1000.0.
 58
 59    Attributes
 60    ----------
 61    kappa_ : float
 62        Fitted credibility coefficient (after calling fit()).
 63    within_variance_ : float
 64        Estimated within-policy variance component sigma^2.
 65    between_variance_ : float
 66        Estimated between-policy variance component tau^2.
 67    portfolio_mean_ : float
 68        Exposure-weighted mean claim frequency across the portfolio.
 69    is_fitted_ : bool
 70        True after fit() has been called.
 71
 72    Examples
 73    --------
 74    >>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0),
 75    ...              ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)]
 76    >>> model = StaticCredibilityModel()
 77    >>> model.fit(histories)
 78    StaticCredibilityModel(kappa=...)
 79    >>> cf = model.predict(histories[0])
 80    >>> posterior = histories[0].prior_premium * cf
 81    """
 82
 83    def __init__(
 84        self,
 85        kappa: Optional[float] = None,
 86        min_kappa: float = 0.1,
 87        max_kappa: float = 1000.0,
 88    ) -> None:
 89        self.kappa = kappa
 90        self.min_kappa = min_kappa
 91        self.max_kappa = max_kappa
 92
 93        # Set after fit()
 94        self.kappa_: Optional[float] = None
 95        self.within_variance_: Optional[float] = None
 96        self.between_variance_: Optional[float] = None
 97        self.portfolio_mean_: Optional[float] = None
 98        self.is_fitted_: bool = False
 99
100    def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel":
101        """Estimate structural parameters from a portfolio of policy histories.
102
103        Uses the method of moments estimator for Bühlmann-Straub:
104        - within-policy variance estimated from period-to-period fluctuations
105        - between-policy variance estimated from cross-sectional spread
106
107        Parameters
108        ----------
109        histories : list[ClaimsHistory]
110            Portfolio of policy histories. Must contain at least 2 policies
111            with at least 1 period each for parameter estimation.
112
113        Returns
114        -------
115        StaticCredibilityModel
116            self (fitted model).
117
118        Raises
119        ------
120        ValueError
121            If fewer than 2 histories are provided, or if all histories have
122            only 1 period (insufficient for within-variance estimation).
123        """
124        if len(histories) < 2:
125            raise ValueError(
126                "At least 2 policy histories are required to estimate kappa"
127            )
128
129        if self.kappa is not None:
130            # Use provided kappa, just estimate portfolio mean
131            self.kappa_ = self.kappa
132            self.within_variance_ = float("nan")
133            self.between_variance_ = float("nan")
134            self.portfolio_mean_ = self._portfolio_mean(histories)
135            self.is_fitted_ = True
136            return self
137
138        self.kappa_, self.within_variance_, self.between_variance_ = (
139            self._estimate_kappa(histories)
140        )
141        self.portfolio_mean_ = self._portfolio_mean(histories)
142        self.is_fitted_ = True
143        return self
144
145    def predict(self, history: ClaimsHistory) -> float:
146        """Compute the credibility factor for a single policy.
147
148        Parameters
149        ----------
150        history : ClaimsHistory
151            The policy's claims history. The prior_premium field is used as
152            the a priori rate mu.
153
154        Returns
155        -------
156        float
157            Credibility factor CF in (0, inf). Multiply by prior_premium to
158            obtain the posterior premium.
159            CF = omega_t * (Y_bar / mu) + (1 - omega_t)
160
161        Raises
162        ------
163        RuntimeError
164            If fit() has not been called.
165        """
166        self._check_fitted()
167        assert self.kappa_ is not None
168        assert self.exposures_ok(history)
169
170        assert history.exposures is not None
171        t = history.total_exposure  # effective credibility weight
172        omega_t = t / (t + self.kappa_)
173
174        mu = history.prior_premium  # a priori rate (expected frequency)
175        y_bar = history.claim_frequency  # empirical frequency
176
177        # Credibility factor: omega * (empirical / prior) + (1 - omega)
178        if mu <= 0.0:
179            return 1.0
180
181        cf = omega_t * (y_bar / mu) + (1.0 - omega_t)
182        return max(cf, 0.0)  # guard against negative values from rounding
183
184    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
185        """Score a batch of policies and return a Polars DataFrame.
186
187        Parameters
188        ----------
189        histories : list[ClaimsHistory]
190            Policies to score.
191
192        Returns
193        -------
194        pl.DataFrame
195            Columns: policy_id, prior_premium, credibility_factor,
196            posterior_premium.
197        """
198        self._check_fitted()
199        rows = []
200        for h in histories:
201            cf = self.predict(h)
202            rows.append(
203                {
204                    "policy_id": h.policy_id,
205                    "prior_premium": h.prior_premium,
206                    "credibility_factor": cf,
207                    "posterior_premium": h.prior_premium * cf,
208                }
209            )
210        return pl.DataFrame(rows)
211
212    def credibility_weight(self, history: ClaimsHistory) -> float:
213        """Return the Bühlmann credibility weight omega_t for a policy.
214
215        Parameters
216        ----------
217        history : ClaimsHistory
218            The policy's claims history.
219
220        Returns
221        -------
222        float
223            omega_t = t / (t + kappa) in [0, 1], where t is total exposure.
224        """
225        self._check_fitted()
226        assert self.kappa_ is not None
227        assert history.exposures is not None
228        t = history.total_exposure
229        return t / (t + self.kappa_)
230
231    # ------------------------------------------------------------------
232    # Private helpers
233    # ------------------------------------------------------------------
234
235    def _estimate_kappa(
236        self, histories: list[ClaimsHistory]
237    ) -> tuple[float, float, float]:
238        """Bühlmann-Straub method of moments estimation.
239
240        Returns (kappa, within_variance, between_variance).
241        """
242        # Compute per-policy exposure-weighted mean frequencies
243        assert all(h.exposures is not None for h in histories)
244        n = len(histories)
245
246        exposures = [np.array(h.exposures, dtype=float) for h in histories]
247        counts = [np.array(h.claim_counts, dtype=float) for h in histories]
248        rates = [c / e for c, e in zip(counts, exposures)]  # Y_t / e_t
249
250        total_exp = [float(e.sum()) for e in exposures]
251        y_bars = [float((c.sum()) / t) for c, t in zip(counts, total_exp)]
252
253        grand_total_exp = sum(total_exp)
254        grand_mean = sum(
255            c.sum() for c in counts
256        ) / grand_total_exp  # E_i[Y_bar_i]
257
258        # --- Within-policy variance estimate (sigma^2) ---
259        # sigma^2 = E[Var(Y_t | Theta)] = mu (for Poisson)
260        # MoM estimator: average exposure-weighted within-period variance
261        within_num = 0.0
262        within_denom = 0.0
263        for i, (rate_i, exp_i, y_bar_i) in enumerate(
264            zip(rates, exposures, y_bars)
265        ):
266            t_i = len(rate_i)
267            if t_i < 2:
268                continue
269            for t in range(t_i):
270                within_num += exp_i[t] * (rate_i[t] - y_bar_i) ** 2
271                within_denom += 1
272        if within_denom == 0 or within_num == 0:
273            # Fallback: use portfolio mean as within-variance estimate
274            sigma2 = grand_mean
275        else:
276            # Unbiased estimator: divide by (T_i - 1) per policy
277            total_periods = sum(len(r) for r in rates)
278            denom2 = total_periods - n
279            if denom2 <= 0:
280                sigma2 = grand_mean
281            else:
282                sigma2 = within_num / denom2
283
284        # --- Between-policy variance estimate (tau^2) ---
285        # tau^2 = Var(Theta)
286        # Between-policy sum of squares
287        c_const = grand_total_exp - sum(t**2 for t in total_exp) / grand_total_exp
288        if c_const <= 0:
289            tau2 = max(sigma2 / 10.0, 1e-6)
290        else:
291            between_ss = sum(
292                t_i * (y_bar_i - grand_mean) ** 2
293                for t_i, y_bar_i in zip(total_exp, y_bars)
294            )
295            tau2 = max((between_ss - (n - 1) * sigma2) / c_const, 1e-8)
296
297        kappa = sigma2 / tau2
298        kappa = float(np.clip(kappa, self.min_kappa, self.max_kappa))
299        return kappa, float(sigma2), float(tau2)
300
301    def _portfolio_mean(self, histories: list[ClaimsHistory]) -> float:
302        total_claims = sum(h.total_claims for h in histories)
303        total_exp = sum(h.total_exposure for h in histories)
304        if total_exp == 0.0:
305            return 0.0
306        return total_claims / total_exp
307
308    def _check_fitted(self) -> None:
309        if not self.is_fitted_:
310            raise RuntimeError(
311                "Model has not been fitted. Call fit() before predict()."
312            )
313
314    @staticmethod
315    def exposures_ok(history: ClaimsHistory) -> bool:
316        return history.exposures is not None and len(history.exposures) > 0
317
318    def __repr__(self) -> str:
319        if self.is_fitted_:
320            return f"StaticCredibilityModel(kappa={self.kappa_:.4f})"
321        return "StaticCredibilityModel(unfitted)"

Bühlmann-Straub credibility at individual policy level.

Fits the structural parameter kappa from a portfolio of policy histories using the method of moments estimator. Predicts a multiplicative credibility factor for each policy.

The model assumes:

Y_{t} | Theta ~ Poisson(mu * Theta * e_t) Theta ~ distribution with E[Theta]=1, Var[Theta]=tau^2

This gives the credibility approximation:

E[Theta | Y_{1:t}] ≈ omega_t * Y_bar / mu + (1 - omega_t)

Parameters

kappa : float or None Known credibility coefficient kappa = sigma^2 / tau^2. If None, kappa is estimated from the portfolio during fit(). Set explicitly to share a kappa estimated from a larger portfolio. min_kappa : float Lower bound on the estimated kappa. Prevents omega from reaching 1 when the portfolio is very homogeneous. Default 0.1. max_kappa : float Upper bound on the estimated kappa. Prevents degenerate credibility for very heterogeneous portfolios. Default 1000.0.

Attributes

kappa_ : float Fitted credibility coefficient (after calling fit()). within_variance_ : float Estimated within-policy variance component sigma^2. between_variance_ : float Estimated between-policy variance component tau^2. portfolio_mean_ : float Exposure-weighted mean claim frequency across the portfolio. is_fitted_ : bool True after fit() has been called.

Examples

>>> histories = [ClaimsHistory("P1", [1,2,3], [0,1,0], prior_premium=400.0),
...              ClaimsHistory("P2", [1,2,3], [2,1,2], prior_premium=400.0)]
>>> model = StaticCredibilityModel()
>>> model.fit(histories)
StaticCredibilityModel(kappa=...)
>>> cf = model.predict(histories[0])
>>> posterior = histories[0].prior_premium * cf
StaticCredibilityModel( kappa: Optional[float] = None, min_kappa: float = 0.1, max_kappa: float = 1000.0)
83    def __init__(
84        self,
85        kappa: Optional[float] = None,
86        min_kappa: float = 0.1,
87        max_kappa: float = 1000.0,
88    ) -> None:
89        self.kappa = kappa
90        self.min_kappa = min_kappa
91        self.max_kappa = max_kappa
92
93        # Set after fit()
94        self.kappa_: Optional[float] = None
95        self.within_variance_: Optional[float] = None
96        self.between_variance_: Optional[float] = None
97        self.portfolio_mean_: Optional[float] = None
98        self.is_fitted_: bool = False
kappa
min_kappa
max_kappa
kappa_: Optional[float]
within_variance_: Optional[float]
between_variance_: Optional[float]
portfolio_mean_: Optional[float]
is_fitted_: bool
def fit( self, histories: list[ClaimsHistory]) -> StaticCredibilityModel:
100    def fit(self, histories: list[ClaimsHistory]) -> "StaticCredibilityModel":
101        """Estimate structural parameters from a portfolio of policy histories.
102
103        Uses the method of moments estimator for Bühlmann-Straub:
104        - within-policy variance estimated from period-to-period fluctuations
105        - between-policy variance estimated from cross-sectional spread
106
107        Parameters
108        ----------
109        histories : list[ClaimsHistory]
110            Portfolio of policy histories. Must contain at least 2 policies
111            with at least 1 period each for parameter estimation.
112
113        Returns
114        -------
115        StaticCredibilityModel
116            self (fitted model).
117
118        Raises
119        ------
120        ValueError
121            If fewer than 2 histories are provided, or if all histories have
122            only 1 period (insufficient for within-variance estimation).
123        """
124        if len(histories) < 2:
125            raise ValueError(
126                "At least 2 policy histories are required to estimate kappa"
127            )
128
129        if self.kappa is not None:
130            # Use provided kappa, just estimate portfolio mean
131            self.kappa_ = self.kappa
132            self.within_variance_ = float("nan")
133            self.between_variance_ = float("nan")
134            self.portfolio_mean_ = self._portfolio_mean(histories)
135            self.is_fitted_ = True
136            return self
137
138        self.kappa_, self.within_variance_, self.between_variance_ = (
139            self._estimate_kappa(histories)
140        )
141        self.portfolio_mean_ = self._portfolio_mean(histories)
142        self.is_fitted_ = True
143        return self

Estimate structural parameters from a portfolio of policy histories.

Uses the method of moments estimator for Bühlmann-Straub:

  • within-policy variance estimated from period-to-period fluctuations
  • between-policy variance estimated from cross-sectional spread

Parameters

histories : list[ClaimsHistory] Portfolio of policy histories. Must contain at least 2 policies with at least 1 period each for parameter estimation.

Returns

StaticCredibilityModel self (fitted model).

Raises

ValueError If fewer than 2 histories are provided, or if all histories have only 1 period (insufficient for within-variance estimation).

def predict( self, history: ClaimsHistory) -> float:
145    def predict(self, history: ClaimsHistory) -> float:
146        """Compute the credibility factor for a single policy.
147
148        Parameters
149        ----------
150        history : ClaimsHistory
151            The policy's claims history. The prior_premium field is used as
152            the a priori rate mu.
153
154        Returns
155        -------
156        float
157            Credibility factor CF in (0, inf). Multiply by prior_premium to
158            obtain the posterior premium.
159            CF = omega_t * (Y_bar / mu) + (1 - omega_t)
160
161        Raises
162        ------
163        RuntimeError
164            If fit() has not been called.
165        """
166        self._check_fitted()
167        assert self.kappa_ is not None
168        assert self.exposures_ok(history)
169
170        assert history.exposures is not None
171        t = history.total_exposure  # effective credibility weight
172        omega_t = t / (t + self.kappa_)
173
174        mu = history.prior_premium  # a priori rate (expected frequency)
175        y_bar = history.claim_frequency  # empirical frequency
176
177        # Credibility factor: omega * (empirical / prior) + (1 - omega)
178        if mu <= 0.0:
179            return 1.0
180
181        cf = omega_t * (y_bar / mu) + (1.0 - omega_t)
182        return max(cf, 0.0)  # guard against negative values from rounding

Compute the credibility factor for a single policy.

Parameters

history : ClaimsHistory The policy's claims history. The prior_premium field is used as the a priori rate mu.

Returns

float Credibility factor CF in (0, inf). Multiply by prior_premium to obtain the posterior premium. CF = omega_t * (Y_bar / mu) + (1 - omega_t)

Raises

RuntimeError If fit() has not been called.

def predict_batch( self, histories: list[ClaimsHistory]) -> polars.dataframe.frame.DataFrame:
184    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
185        """Score a batch of policies and return a Polars DataFrame.
186
187        Parameters
188        ----------
189        histories : list[ClaimsHistory]
190            Policies to score.
191
192        Returns
193        -------
194        pl.DataFrame
195            Columns: policy_id, prior_premium, credibility_factor,
196            posterior_premium.
197        """
198        self._check_fitted()
199        rows = []
200        for h in histories:
201            cf = self.predict(h)
202            rows.append(
203                {
204                    "policy_id": h.policy_id,
205                    "prior_premium": h.prior_premium,
206                    "credibility_factor": cf,
207                    "posterior_premium": h.prior_premium * cf,
208                }
209            )
210        return pl.DataFrame(rows)

Score a batch of policies and return a Polars DataFrame.

Parameters

histories : list[ClaimsHistory] Policies to score.

Returns

pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium.

def credibility_weight( self, history: ClaimsHistory) -> float:
212    def credibility_weight(self, history: ClaimsHistory) -> float:
213        """Return the Bühlmann credibility weight omega_t for a policy.
214
215        Parameters
216        ----------
217        history : ClaimsHistory
218            The policy's claims history.
219
220        Returns
221        -------
222        float
223            omega_t = t / (t + kappa) in [0, 1], where t is total exposure.
224        """
225        self._check_fitted()
226        assert self.kappa_ is not None
227        assert history.exposures is not None
228        t = history.total_exposure
229        return t / (t + self.kappa_)

Return the Bühlmann credibility weight omega_t for a policy.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

float omega_t = t / (t + kappa) in [0, 1], where t is total exposure.

@staticmethod
def exposures_ok(history: ClaimsHistory) -> bool:
314    @staticmethod
315    def exposures_ok(history: ClaimsHistory) -> bool:
316        return history.exposures is not None and len(history.exposures) > 0
class DynamicPoissonGammaModel:
 69class DynamicPoissonGammaModel:
 70    """Poisson-gamma conjugate state-space model with seniority weighting.
 71
 72    The model fits two decay parameters (p, q) from the portfolio using
 73    empirical Bayes maximum likelihood. At prediction time, it runs the
 74    forward recursion to compute the posterior state (alpha_t, beta_t)
 75    and returns the corresponding posterior mean as a credibility factor.
 76
 77    Parameters
 78    ----------
 79    p0 : float
 80        Initial guess for the state-reversion parameter p. Must be in (0, 1].
 81    q0 : float
 82        Initial guess for the decay parameter q. Must be in (0, 1].
 83    alpha0 : float
 84        Initial shape parameter of the prior Gamma distribution for Theta.
 85        Must be > 0. Default 1.0 (prior mode at 1 with matching scale).
 86    beta0_multiplier : float
 87        The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier
 88        per unit of a priori rate. This keeps the prior mean at 1.0.
 89        Default 1.0.
 90    bounds : tuple
 91        (lower, upper) bounds for p and q during optimisation.
 92        Default ((0.01, 0.99), (0.01, 0.99)).
 93
 94    Attributes
 95    ----------
 96    p_ : float
 97        Fitted state-reversion parameter (after fit()).
 98    q_ : float
 99        Fitted decay parameter (after fit()).
100    loglik_ : float
101        Log-likelihood at the fitted parameters.
102    is_fitted_ : bool
103        True after fit() has been called.
104
105    Examples
106    --------
107    >>> import numpy as np
108    >>> rng = np.random.default_rng(42)
109    >>> histories = [
110    ...     ClaimsHistory(f"P{i}", [1,2,3],
111    ...                   rng.poisson(1.2, size=3).tolist(),
112    ...                   prior_premium=400.0)
113    ...     for i in range(100)
114    ... ]
115    >>> model = DynamicPoissonGammaModel()
116    >>> model.fit(histories)
117    DynamicPoissonGammaModel(p=..., q=...)
118    """
119
120    def __init__(
121        self,
122        p0: float = 0.5,
123        q0: float = 0.8,
124        alpha0: float = 1.0,
125        beta0_multiplier: float = 1.0,
126        bounds: tuple = ((0.01, 0.99), (0.01, 0.99)),
127    ) -> None:
128        self.p0 = p0
129        self.q0 = q0
130        self.alpha0 = alpha0
131        self.beta0_multiplier = beta0_multiplier
132        self.bounds = bounds
133
134        self.p_: Optional[float] = None
135        self.q_: Optional[float] = None
136        self.loglik_: Optional[float] = None
137        self.is_fitted_: bool = False
138
139    def fit(
140        self, histories: list[ClaimsHistory], verbose: bool = False
141    ) -> "DynamicPoissonGammaModel":
142        """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.
143
144        The log-likelihood sums over all policies and all periods. Within each
145        policy, the marginal Y_t is Negative Binomial with state-dependent
146        shape and mean parameters derived from the forward recursion.
147
148        Parameters
149        ----------
150        histories : list[ClaimsHistory]
151            Training portfolio. At least 10 policies recommended for stable
152            parameter estimation.
153        verbose : bool
154            If True, print optimisation progress.
155
156        Returns
157        -------
158        DynamicPoissonGammaModel
159            self (fitted model).
160        """
161        if len(histories) < 2:
162            raise ValueError(
163                "At least 2 policy histories are required for fitting"
164            )
165
166        def neg_loglik(params: np.ndarray) -> float:
167            p, q = float(params[0]), float(params[1])
168            total_ll = 0.0
169            for history in histories:
170                total_ll += self._policy_loglik(history, p, q)
171            return -total_ll
172
173        x0 = np.array([self.p0, self.q0])
174        bounds_list = [self.bounds[0], self.bounds[1]]
175
176        result = optimize.minimize(
177            neg_loglik,
178            x0,
179            method="L-BFGS-B",
180            bounds=bounds_list,
181            options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6},
182        )
183
184        if verbose and not result.success:
185            print(f"Optimisation warning: {result.message}")
186
187        self.p_ = float(result.x[0])
188        self.q_ = float(result.x[1])
189        self.loglik_ = float(-result.fun)
190        self.is_fitted_ = True
191        return self
192
193    def predict(self, history: ClaimsHistory) -> float:
194        """Compute the credibility factor for a single policy.
195
196        Runs the forward recursion using the fitted (p, q) parameters to
197        propagate the Gamma posterior through the observed claim sequence.
198        The credibility factor is the ratio of the posterior mean to the
199        a priori rate.
200
201        Parameters
202        ----------
203        history : ClaimsHistory
204            The policy's claims history.
205
206        Returns
207        -------
208        float
209            Credibility factor CF = mu_post / mu_prior. Multiply by
210            prior_premium to obtain the posterior premium.
211
212        Raises
213        ------
214        RuntimeError
215            If fit() has not been called.
216        """
217        self._check_fitted()
218        assert self.p_ is not None
219        assert self.q_ is not None
220
221        alpha_t, beta_t = self._forward_recursion(
222            history, self.p_, self.q_
223        )
224        mu = history.prior_premium
225        mu_post = (alpha_t / beta_t) * mu
226        cf = mu_post / mu
227        return max(cf, 0.0)
228
229    def predict_posterior_params(
230        self, history: ClaimsHistory
231    ) -> tuple[float, float]:
232        """Return the posterior Gamma parameters (alpha, beta) after recursion.
233
234        The posterior distribution of Theta_{t+1} | Y_{1:t} is:
235            Gamma(alpha_{t+1}, beta_{t+1})
236        with mean alpha_{t+1} / beta_{t+1}.
237
238        This is useful for uncertainty quantification: the posterior variance
239        is alpha / beta^2.
240
241        Parameters
242        ----------
243        history : ClaimsHistory
244            The policy's claims history.
245
246        Returns
247        -------
248        tuple[float, float]
249            (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.
250        """
251        self._check_fitted()
252        assert self.p_ is not None
253        assert self.q_ is not None
254        return self._forward_recursion(history, self.p_, self.q_)
255
256    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
257        """Score a batch of policies and return a Polars DataFrame.
258
259        Parameters
260        ----------
261        histories : list[ClaimsHistory]
262            Policies to score.
263
264        Returns
265        -------
266        pl.DataFrame
267            Columns: policy_id, prior_premium, credibility_factor,
268            posterior_premium, posterior_alpha, posterior_beta,
269            posterior_variance.
270        """
271        self._check_fitted()
272        rows = []
273        for h in histories:
274            cf = self.predict(h)
275            alpha_t, beta_t = self.predict_posterior_params(h)
276            rows.append(
277                {
278                    "policy_id": h.policy_id,
279                    "prior_premium": h.prior_premium,
280                    "credibility_factor": cf,
281                    "posterior_premium": h.prior_premium * cf,
282                    "posterior_alpha": alpha_t,
283                    "posterior_beta": beta_t,
284                    "posterior_variance": alpha_t / (beta_t**2),
285                }
286            )
287        return pl.DataFrame(rows)
288
289    # ------------------------------------------------------------------
290    # Core recursion (also used in log-likelihood)
291    # ------------------------------------------------------------------
292
293    def _forward_recursion(
294        self,
295        history: ClaimsHistory,
296        p: float,
297        q: float,
298    ) -> tuple[float, float]:
299        """Run the Gamma state recursion through the claims history.
300
301        Returns the posterior parameters (alpha, beta) after observing
302        all periods in ``history``. The initial prior is set so that the
303        prior mean of Theta is 1 (i.e., alpha_0 / beta_0 = 1).
304
305        Parameters
306        ----------
307        history : ClaimsHistory
308            Policy's claims history.
309        p : float
310            State-reversion parameter.
311        q : float
312            Decay parameter.
313
314        Returns
315        -------
316        tuple[float, float]
317            (alpha_{T+1}, beta_{T+1}) — the one-step-ahead posterior state.
318        """
319        assert history.exposures is not None
320        mu = history.prior_premium
321
322        # Initialise prior: Theta ~ Gamma(alpha0, beta0) with E[Theta]=1
323        alpha = self.alpha0
324        beta = self.alpha0 * self.beta0_multiplier  # alpha0 / beta0 = 1
325
326        for y_t, e_t in zip(history.claim_counts, history.exposures):
327            # Bayesian update: observe Y_t ~ Poi(mu * e_t * Theta)
328            # Posterior: Theta | Y_t ~ Gamma(alpha + Y_t, beta + mu*e_t)
329            alpha_post = alpha + float(y_t)
330            beta_post = beta + mu * e_t
331
332            # State transition to next period (Ahn et al. 2023, eq. 2.4)
333            # beta_next = q * beta_{t|t}  — posterior rate, not double-counted
334            beta_next = q * beta_post
335            alpha_next = p * q * alpha_post + (1.0 - p) * beta_next
336
337            # Guard against numerical underflow
338            alpha = max(alpha_next, 1e-10)
339            beta = max(beta_next, 1e-10)
340
341        return float(alpha), float(beta)
342
343    def _policy_loglik(
344        self,
345        history: ClaimsHistory,
346        p: float,
347        q: float,
348    ) -> float:
349        """Compute the marginal log-likelihood for a single policy.
350
351        The marginal distribution of Y_t given Y_{1:t-1} is Negative Binomial.
352        The shape parameter r_t = alpha_t and the mean is mu * e_t * alpha_t / beta_t.
353
354        Summing these marginal log-likelihoods over all periods gives the
355        complete-data log-likelihood tractable via the forward recursion.
356        """
357        assert history.exposures is not None
358        mu = history.prior_premium
359
360        alpha = self.alpha0
361        beta = self.alpha0 * self.beta0_multiplier
362
363        total_ll = 0.0
364        for y_t, e_t in zip(history.claim_counts, history.exposures):
365            # Marginal: Y_t | Y_{1:t-1} ~ NegBin(r=alpha, mu=alpha/beta * mu*e_t)
366            r_t = alpha
367            mu_t = (alpha / beta) * mu * e_t
368            total_ll += _negbin_logpmf(int(y_t), r_t, mu_t)
369
370            # Update state (Ahn et al. 2023, eq. 2.4)
371            alpha_post = alpha + float(y_t)
372            beta_post = beta + mu * e_t
373            # beta_next = q * beta_{t|t}  — posterior rate only, no double-count
374            beta_next = q * beta_post
375            alpha_next = p * q * alpha_post + (1.0 - p) * beta_next
376
377            alpha = max(alpha_next, 1e-10)
378            beta = max(beta_next, 1e-10)
379
380        return total_ll
381
382    def _check_fitted(self) -> None:
383        if not self.is_fitted_:
384            raise RuntimeError(
385                "Model has not been fitted. Call fit() before predict()."
386            )
387
388    def __repr__(self) -> str:
389        if self.is_fitted_:
390            return (
391                f"DynamicPoissonGammaModel(p={self.p_:.4f}, q={self.q_:.4f})"
392            )
393        return "DynamicPoissonGammaModel(unfitted)"

Poisson-gamma conjugate state-space model with seniority weighting.

The model fits two decay parameters (p, q) from the portfolio using empirical Bayes maximum likelihood. At prediction time, it runs the forward recursion to compute the posterior state (alpha_t, beta_t) and returns the corresponding posterior mean as a credibility factor.

Parameters

p0 : float Initial guess for the state-reversion parameter p. Must be in (0, 1]. q0 : float Initial guess for the decay parameter q. Must be in (0, 1]. alpha0 : float Initial shape parameter of the prior Gamma distribution for Theta. Must be > 0. Default 1.0 (prior mode at 1 with matching scale). beta0_multiplier : float The initial rate parameter beta_0 is set to alpha0 * beta0_multiplier per unit of a priori rate. This keeps the prior mean at 1.0. Default 1.0. bounds : tuple (lower, upper) bounds for p and q during optimisation. Default ((0.01, 0.99), (0.01, 0.99)).

Attributes

p_ : float Fitted state-reversion parameter (after fit()). q_ : float Fitted decay parameter (after fit()). loglik_ : float Log-likelihood at the fitted parameters. is_fitted_ : bool True after fit() has been called.

Examples

>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> histories = [
...     ClaimsHistory(f"P{i}", [1,2,3],
...                   rng.poisson(1.2, size=3).tolist(),
...                   prior_premium=400.0)
...     for i in range(100)
... ]
>>> model = DynamicPoissonGammaModel()
>>> model.fit(histories)
DynamicPoissonGammaModel(p=..., q=...)
DynamicPoissonGammaModel( p0: float = 0.5, q0: float = 0.8, alpha0: float = 1.0, beta0_multiplier: float = 1.0, bounds: tuple = ((0.01, 0.99), (0.01, 0.99)))
120    def __init__(
121        self,
122        p0: float = 0.5,
123        q0: float = 0.8,
124        alpha0: float = 1.0,
125        beta0_multiplier: float = 1.0,
126        bounds: tuple = ((0.01, 0.99), (0.01, 0.99)),
127    ) -> None:
128        self.p0 = p0
129        self.q0 = q0
130        self.alpha0 = alpha0
131        self.beta0_multiplier = beta0_multiplier
132        self.bounds = bounds
133
134        self.p_: Optional[float] = None
135        self.q_: Optional[float] = None
136        self.loglik_: Optional[float] = None
137        self.is_fitted_: bool = False
p0
q0
alpha0
beta0_multiplier
bounds
p_: Optional[float]
q_: Optional[float]
loglik_: Optional[float]
is_fitted_: bool
def fit( self, histories: list[ClaimsHistory], verbose: bool = False) -> DynamicPoissonGammaModel:
139    def fit(
140        self, histories: list[ClaimsHistory], verbose: bool = False
141    ) -> "DynamicPoissonGammaModel":
142        """Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.
143
144        The log-likelihood sums over all policies and all periods. Within each
145        policy, the marginal Y_t is Negative Binomial with state-dependent
146        shape and mean parameters derived from the forward recursion.
147
148        Parameters
149        ----------
150        histories : list[ClaimsHistory]
151            Training portfolio. At least 10 policies recommended for stable
152            parameter estimation.
153        verbose : bool
154            If True, print optimisation progress.
155
156        Returns
157        -------
158        DynamicPoissonGammaModel
159            self (fitted model).
160        """
161        if len(histories) < 2:
162            raise ValueError(
163                "At least 2 policy histories are required for fitting"
164            )
165
166        def neg_loglik(params: np.ndarray) -> float:
167            p, q = float(params[0]), float(params[1])
168            total_ll = 0.0
169            for history in histories:
170                total_ll += self._policy_loglik(history, p, q)
171            return -total_ll
172
173        x0 = np.array([self.p0, self.q0])
174        bounds_list = [self.bounds[0], self.bounds[1]]
175
176        result = optimize.minimize(
177            neg_loglik,
178            x0,
179            method="L-BFGS-B",
180            bounds=bounds_list,
181            options={"maxiter": 500, "ftol": 1e-9, "gtol": 1e-6},
182        )
183
184        if verbose and not result.success:
185            print(f"Optimisation warning: {result.message}")
186
187        self.p_ = float(result.x[0])
188        self.q_ = float(result.x[1])
189        self.loglik_ = float(-result.fun)
190        self.is_fitted_ = True
191        return self

Fit p and q by empirical Bayes MLE on the portfolio log-likelihood.

The log-likelihood sums over all policies and all periods. Within each policy, the marginal Y_t is Negative Binomial with state-dependent shape and mean parameters derived from the forward recursion.

Parameters

histories : list[ClaimsHistory] Training portfolio. At least 10 policies recommended for stable parameter estimation. verbose : bool If True, print optimisation progress.

Returns

DynamicPoissonGammaModel self (fitted model).

def predict( self, history: ClaimsHistory) -> float:
193    def predict(self, history: ClaimsHistory) -> float:
194        """Compute the credibility factor for a single policy.
195
196        Runs the forward recursion using the fitted (p, q) parameters to
197        propagate the Gamma posterior through the observed claim sequence.
198        The credibility factor is the ratio of the posterior mean to the
199        a priori rate.
200
201        Parameters
202        ----------
203        history : ClaimsHistory
204            The policy's claims history.
205
206        Returns
207        -------
208        float
209            Credibility factor CF = mu_post / mu_prior. Multiply by
210            prior_premium to obtain the posterior premium.
211
212        Raises
213        ------
214        RuntimeError
215            If fit() has not been called.
216        """
217        self._check_fitted()
218        assert self.p_ is not None
219        assert self.q_ is not None
220
221        alpha_t, beta_t = self._forward_recursion(
222            history, self.p_, self.q_
223        )
224        mu = history.prior_premium
225        mu_post = (alpha_t / beta_t) * mu
226        cf = mu_post / mu
227        return max(cf, 0.0)

Compute the credibility factor for a single policy.

Runs the forward recursion using the fitted (p, q) parameters to propagate the Gamma posterior through the observed claim sequence. The credibility factor is the ratio of the posterior mean to the a priori rate.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

float Credibility factor CF = mu_post / mu_prior. Multiply by prior_premium to obtain the posterior premium.

Raises

RuntimeError If fit() has not been called.

def predict_posterior_params( self, history: ClaimsHistory) -> tuple[float, float]:
229    def predict_posterior_params(
230        self, history: ClaimsHistory
231    ) -> tuple[float, float]:
232        """Return the posterior Gamma parameters (alpha, beta) after recursion.
233
234        The posterior distribution of Theta_{t+1} | Y_{1:t} is:
235            Gamma(alpha_{t+1}, beta_{t+1})
236        with mean alpha_{t+1} / beta_{t+1}.
237
238        This is useful for uncertainty quantification: the posterior variance
239        is alpha / beta^2.
240
241        Parameters
242        ----------
243        history : ClaimsHistory
244            The policy's claims history.
245
246        Returns
247        -------
248        tuple[float, float]
249            (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.
250        """
251        self._check_fitted()
252        assert self.p_ is not None
253        assert self.q_ is not None
254        return self._forward_recursion(history, self.p_, self.q_)

Return the posterior Gamma parameters (alpha, beta) after recursion.

The posterior distribution of Theta_{t+1} | Y_{1:t} is: Gamma(alpha_{t+1}, beta_{t+1}) with mean alpha_{t+1} / beta_{t+1}.

This is useful for uncertainty quantification: the posterior variance is alpha / beta^2.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

tuple[float, float] (alpha_{t+1}, beta_{t+1}) — shape and rate of posterior Gamma.

def predict_batch( self, histories: list[ClaimsHistory]) -> polars.dataframe.frame.DataFrame:
256    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
257        """Score a batch of policies and return a Polars DataFrame.
258
259        Parameters
260        ----------
261        histories : list[ClaimsHistory]
262            Policies to score.
263
264        Returns
265        -------
266        pl.DataFrame
267            Columns: policy_id, prior_premium, credibility_factor,
268            posterior_premium, posterior_alpha, posterior_beta,
269            posterior_variance.
270        """
271        self._check_fitted()
272        rows = []
273        for h in histories:
274            cf = self.predict(h)
275            alpha_t, beta_t = self.predict_posterior_params(h)
276            rows.append(
277                {
278                    "policy_id": h.policy_id,
279                    "prior_premium": h.prior_premium,
280                    "credibility_factor": cf,
281                    "posterior_premium": h.prior_premium * cf,
282                    "posterior_alpha": alpha_t,
283                    "posterior_beta": beta_t,
284                    "posterior_variance": alpha_t / (beta_t**2),
285                }
286            )
287        return pl.DataFrame(rows)

Score a batch of policies and return a Polars DataFrame.

Parameters

histories : list[ClaimsHistory] Policies to score.

Returns

pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, posterior_alpha, posterior_beta, posterior_variance.

class SurrogateModel:
 38class SurrogateModel:
 39    """Surrogate model for large-portfolio Bayesian experience rating.
 40
 41    Fits an approximate g(.) function on a sub-portfolio using importance
 42    sampling, then applies it to the full portfolio analytically.
 43
 44    The posterior premium for policy i is:
 45        mu_post_i = mu_prior_i * exp(g(L_i, n_i))
 46    where L_i is the log-likelihood sufficient statistic and n_i is the
 47    number of periods observed.
 48
 49    The form of g(.) is a polynomial in L and n:
 50        g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n
 51
 52    This is flexible enough to capture Bühlmann-Straub as a special case
 53    (linear in L = Y_bar) while allowing non-linear responses.
 54
 55    Parameters
 56    ----------
 57    prior_model : callable or None
 58        Function mapping ClaimsHistory -> float, returning the a priori
 59        premium for any policy. If None, uses history.prior_premium directly.
 60        This should be a GLM or similar model already fitted on the portfolio.
 61    n_is_samples : int
 62        Number of importance sampling draws from the prior. More samples
 63        give a better IS estimate but are slower. Default 2000.
 64    subsample_frac : float
 65        Fraction of the portfolio to use in the sub-portfolio for fitting g.
 66        Default 0.10 (10%). Minimum 20 policies regardless of fraction.
 67    poly_degree : int
 68        Polynomial degree for g(.). Degree 1 gives linear g; degree 2
 69        allows quadratic response. Default 1.
 70    random_state : int or None
 71        Random seed for IS sampling and sub-portfolio selection.
 72    sufficient_stat_fn : callable or None
 73        Custom function mapping ClaimsHistory -> float for the sufficient
 74        statistic. If None, uses the log-likelihood under Poisson(theta_ref).
 75
 76    Attributes
 77    ----------
 78    theta_ : np.ndarray
 79        Fitted WLS coefficients for g(.).
 80    theta_ref_ : float
 81        Reference Poisson intensity used in the sufficient statistic.
 82    is_fitted_ : bool
 83        True after fit() has been called.
 84    """
 85
 86    def __init__(
 87        self,
 88        prior_model: Optional[Callable[[ClaimsHistory], float]] = None,
 89        n_is_samples: int = 2000,
 90        subsample_frac: float = 0.10,
 91        poly_degree: int = 1,
 92        random_state: Optional[int] = None,
 93        sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None,
 94    ) -> None:
 95        self.prior_model = prior_model
 96        self.n_is_samples = n_is_samples
 97        self.subsample_frac = subsample_frac
 98        self.poly_degree = poly_degree
 99        self.random_state = random_state
100        self.sufficient_stat_fn = sufficient_stat_fn
101
102        self.theta_: Optional[np.ndarray] = None
103        self.theta_ref_: Optional[float] = None
104        self.is_fitted_: bool = False
105        self._rng: Optional[np.random.Generator] = None
106
107    def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel":
108        """Fit the surrogate g(.) function on a representative sub-portfolio.
109
110        Steps:
111        1. Draw a sub-portfolio (random stratified selection by n_periods).
112        2. Estimate reference intensity theta_ref as portfolio mean frequency.
113        3. Draw IS samples Theta_k ~ prior once (shared across all policies).
114        4. For each sub-portfolio policy, compute IS-based Bayesian premium.
115        5. Fit g(.) by WLS minimising weighted squared log-credibility error.
116
117        Parameters
118        ----------
119        histories : list[ClaimsHistory]
120            Full training portfolio.
121
122        Returns
123        -------
124        SurrogateModel
125            self (fitted model).
126        """
127        self._rng = np.random.default_rng(self.random_state)
128
129        # Estimate reference intensity
130        total_claims = sum(h.total_claims for h in histories)
131        total_exp = sum(h.total_exposure for h in histories)
132        self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0
133
134        # Select sub-portfolio
135        n_sub = max(20, int(len(histories) * self.subsample_frac))
136        n_sub = min(n_sub, len(histories))
137        idx = self._rng.choice(len(histories), size=n_sub, replace=False)
138        sub_histories = [histories[i] for i in idx]
139
140        # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior)
141        # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1
142        prior_alpha = 1.0
143        prior_beta = 1.0
144        theta_samples = self._rng.gamma(
145            prior_alpha, 1.0 / prior_beta, size=self.n_is_samples
146        )
147
148        # Compute IS-based posterior premiums for sub-portfolio
149        sub_L = []
150        sub_n = []
151        sub_log_cf = []
152        sub_weights = []
153
154        for hist in sub_histories:
155            mu_prior = self._get_prior(hist)
156            L_i = self._sufficient_stat(hist)
157            n_i = float(hist.n_periods)
158
159            mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior)
160
161            if mu_prior <= 0.0 or mu_post_is <= 0.0:
162                continue
163            log_cf_i = np.log(mu_post_is / mu_prior)
164            w_i = 1.0 / max(se_i**2, 1e-8)
165
166            sub_L.append(L_i)
167            sub_n.append(n_i)
168            sub_log_cf.append(log_cf_i)
169            sub_weights.append(w_i)
170
171        if len(sub_L) < 4:
172            # Degenerate case: not enough data, return identity (CF = 1)
173            self.theta_ = np.zeros(self._n_features())
174            self.is_fitted_ = True
175            return self
176
177        # Design matrix for g(L, n)
178        X = self._design_matrix(np.array(sub_L), np.array(sub_n))
179        y = np.array(sub_log_cf)
180        w = np.array(sub_weights)
181        w = w / w.sum()
182
183        # WLS: solve (X^T W X) theta = X^T W y
184        W = np.diag(w)
185        XtWX = X.T @ W @ X
186        XtWy = X.T @ (w[:, None] * y[:, None]).ravel()
187
188        try:
189            self.theta_ = np.linalg.solve(XtWX, XtWy)
190        except np.linalg.LinAlgError:
191            self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0]
192
193        self.is_fitted_ = True
194        return self
195
196    def predict(self, history: ClaimsHistory) -> float:
197        """Compute the credibility factor for a single policy.
198
199        Parameters
200        ----------
201        history : ClaimsHistory
202            The policy's claims history.
203
204        Returns
205        -------
206        float
207            Credibility factor CF = exp(g(L, n)). Multiply by prior_premium
208            to obtain the posterior premium.
209
210        Raises
211        ------
212        RuntimeError
213            If fit() has not been called.
214        """
215        self._check_fitted()
216        assert self.theta_ is not None
217
218        L = self._sufficient_stat(history)
219        n = float(history.n_periods)
220        x = self._design_matrix(np.array([L]), np.array([n]))[0]
221        log_cf = float(x @ self.theta_)
222        return float(np.exp(log_cf))
223
224    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
225        """Score a batch of policies and return a Polars DataFrame.
226
227        Parameters
228        ----------
229        histories : list[ClaimsHistory]
230            Policies to score.
231
232        Returns
233        -------
234        pl.DataFrame
235            Columns: policy_id, prior_premium, credibility_factor,
236            posterior_premium, sufficient_stat.
237        """
238        self._check_fitted()
239        rows = []
240        for h in histories:
241            cf = self.predict(h)
242            rows.append(
243                {
244                    "policy_id": h.policy_id,
245                    "prior_premium": h.prior_premium,
246                    "credibility_factor": cf,
247                    "posterior_premium": h.prior_premium * cf,
248                    "sufficient_stat": self._sufficient_stat(h),
249                }
250            )
251        return pl.DataFrame(rows)
252
253    # ------------------------------------------------------------------
254    # Private helpers
255    # ------------------------------------------------------------------
256
257    def _is_posterior(
258        self,
259        history: ClaimsHistory,
260        theta_samples: np.ndarray,
261        mu_prior: float,
262    ) -> tuple[float, float]:
263        """Estimate the Bayesian posterior premium via importance sampling.
264
265        Computes E[mu(Theta) | Y_{1:n}] using self-normalised IS:
266            E_hat = sum_k w_k * mu(Theta_k) / sum_k w_k
267        where w_k = exp(log-likelihood of Y_{1:n} | Theta_k) and
268        Theta_k ~ prior (Gamma(1,1) in this implementation).
269
270        Returns (posterior_mean, standard_error).
271        """
272        assert history.exposures is not None
273        log_weights = np.zeros(len(theta_samples))
274        for y_t, e_t in zip(history.claim_counts, history.exposures):
275            # Y_t | Theta ~ Poi(mu_prior * e_t * Theta)
276            rate = mu_prior * e_t * theta_samples
277            # Poisson log PMF: y*log(rate) - rate - log(y!)
278            log_weights += y_t * np.log(rate + 1e-300) - rate
279
280        # Stabilise: subtract max log weight
281        log_weights -= log_weights.max()
282        weights = np.exp(log_weights)
283        weights /= weights.sum()
284
285        # Posterior premium: E[mu_prior * Theta | data] = mu_prior * E[Theta | data]
286        post_mean = float(mu_prior * np.sum(weights * theta_samples))
287
288        # Effective sample size for SE calculation
289        ess = 1.0 / np.sum(weights**2)
290        if ess < 2:
291            se = abs(post_mean) * 0.5  # pessimistic SE
292        else:
293            # Monte Carlo SE
294            mean_theta = np.sum(weights * theta_samples)
295            var_theta = np.sum(weights * (theta_samples - mean_theta) ** 2)
296            se = float(mu_prior * np.sqrt(var_theta / max(ess, 1)))
297
298        return post_mean, se
299
300    def _sufficient_stat(self, history: ClaimsHistory) -> float:
301        if self.sufficient_stat_fn is not None:
302            return self.sufficient_stat_fn(history)
303        return history_sufficient_stat(history, theta_ref=self.theta_ref_)
304
305    def _get_prior(self, history: ClaimsHistory) -> float:
306        if self.prior_model is not None:
307            return self.prior_model(history)
308        return history.prior_premium
309
310    def _design_matrix(self, L: np.ndarray, n: np.ndarray) -> np.ndarray:
311        """Build polynomial design matrix for g(L, n)."""
312        cols = [np.ones(len(L))]
313        if self.poly_degree >= 1:
314            cols.extend([L, n])
315        if self.poly_degree >= 2:
316            cols.extend([L**2, n**2, L * n])
317        return np.column_stack(cols)
318
319    def _n_features(self) -> int:
320        if self.poly_degree >= 2:
321            return 6
322        if self.poly_degree >= 1:
323            return 3
324        return 1
325
326    def _check_fitted(self) -> None:
327        if not self.is_fitted_:
328            raise RuntimeError(
329                "Model has not been fitted. Call fit() before predict()."
330            )
331
332    def __repr__(self) -> str:
333        if self.is_fitted_:
334            return (
335                f"SurrogateModel(poly_degree={self.poly_degree}, "
336                f"n_is_samples={self.n_is_samples})"
337            )
338        return f"SurrogateModel(poly_degree={self.poly_degree}, unfitted)"

Surrogate model for large-portfolio Bayesian experience rating.

Fits an approximate g(.) function on a sub-portfolio using importance sampling, then applies it to the full portfolio analytically.

The posterior premium for policy i is:

mu_post_i = mu_prior_i * exp(g(L_i, n_i))

where L_i is the log-likelihood sufficient statistic and n_i is the number of periods observed.

The form of g(.) is a polynomial in L and n: g(L, n) = theta_0 + theta_1 * L + theta_2 * n + theta_3 * L * n

This is flexible enough to capture Bühlmann-Straub as a special case (linear in L = Y_bar) while allowing non-linear responses.

Parameters

prior_model : callable or None Function mapping ClaimsHistory -> float, returning the a priori premium for any policy. If None, uses history.prior_premium directly. This should be a GLM or similar model already fitted on the portfolio. n_is_samples : int Number of importance sampling draws from the prior. More samples give a better IS estimate but are slower. Default 2000. subsample_frac : float Fraction of the portfolio to use in the sub-portfolio for fitting g. Default 0.10 (10%). Minimum 20 policies regardless of fraction. poly_degree : int Polynomial degree for g(.). Degree 1 gives linear g; degree 2 allows quadratic response. Default 1. random_state : int or None Random seed for IS sampling and sub-portfolio selection. sufficient_stat_fn : callable or None Custom function mapping ClaimsHistory -> float for the sufficient statistic. If None, uses the log-likelihood under Poisson(theta_ref).

Attributes

theta_ : np.ndarray Fitted WLS coefficients for g(.). theta_ref_ : float Reference Poisson intensity used in the sufficient statistic. is_fitted_ : bool True after fit() has been called.

SurrogateModel( prior_model: Optional[Callable[[ClaimsHistory], float]] = None, n_is_samples: int = 2000, subsample_frac: float = 0.1, poly_degree: int = 1, random_state: Optional[int] = None, sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None)
 86    def __init__(
 87        self,
 88        prior_model: Optional[Callable[[ClaimsHistory], float]] = None,
 89        n_is_samples: int = 2000,
 90        subsample_frac: float = 0.10,
 91        poly_degree: int = 1,
 92        random_state: Optional[int] = None,
 93        sufficient_stat_fn: Optional[Callable[[ClaimsHistory], float]] = None,
 94    ) -> None:
 95        self.prior_model = prior_model
 96        self.n_is_samples = n_is_samples
 97        self.subsample_frac = subsample_frac
 98        self.poly_degree = poly_degree
 99        self.random_state = random_state
100        self.sufficient_stat_fn = sufficient_stat_fn
101
102        self.theta_: Optional[np.ndarray] = None
103        self.theta_ref_: Optional[float] = None
104        self.is_fitted_: bool = False
105        self._rng: Optional[np.random.Generator] = None
prior_model
n_is_samples
subsample_frac
poly_degree
random_state
sufficient_stat_fn
theta_: Optional[numpy.ndarray]
theta_ref_: Optional[float]
is_fitted_: bool
def fit( self, histories: list[ClaimsHistory]) -> SurrogateModel:
107    def fit(self, histories: list[ClaimsHistory]) -> "SurrogateModel":
108        """Fit the surrogate g(.) function on a representative sub-portfolio.
109
110        Steps:
111        1. Draw a sub-portfolio (random stratified selection by n_periods).
112        2. Estimate reference intensity theta_ref as portfolio mean frequency.
113        3. Draw IS samples Theta_k ~ prior once (shared across all policies).
114        4. For each sub-portfolio policy, compute IS-based Bayesian premium.
115        5. Fit g(.) by WLS minimising weighted squared log-credibility error.
116
117        Parameters
118        ----------
119        histories : list[ClaimsHistory]
120            Full training portfolio.
121
122        Returns
123        -------
124        SurrogateModel
125            self (fitted model).
126        """
127        self._rng = np.random.default_rng(self.random_state)
128
129        # Estimate reference intensity
130        total_claims = sum(h.total_claims for h in histories)
131        total_exp = sum(h.total_exposure for h in histories)
132        self.theta_ref_ = total_claims / total_exp if total_exp > 0 else 1.0
133
134        # Select sub-portfolio
135        n_sub = max(20, int(len(histories) * self.subsample_frac))
136        n_sub = min(n_sub, len(histories))
137        idx = self._rng.choice(len(histories), size=n_sub, replace=False)
138        sub_histories = [histories[i] for i in idx]
139
140        # Draw IS samples from prior: Theta_k ~ Gamma(alpha_prior, beta_prior)
141        # We use a weakly informative prior: Theta ~ Gamma(1, 1) -> E[Theta] = 1
142        prior_alpha = 1.0
143        prior_beta = 1.0
144        theta_samples = self._rng.gamma(
145            prior_alpha, 1.0 / prior_beta, size=self.n_is_samples
146        )
147
148        # Compute IS-based posterior premiums for sub-portfolio
149        sub_L = []
150        sub_n = []
151        sub_log_cf = []
152        sub_weights = []
153
154        for hist in sub_histories:
155            mu_prior = self._get_prior(hist)
156            L_i = self._sufficient_stat(hist)
157            n_i = float(hist.n_periods)
158
159            mu_post_is, se_i = self._is_posterior(hist, theta_samples, mu_prior)
160
161            if mu_prior <= 0.0 or mu_post_is <= 0.0:
162                continue
163            log_cf_i = np.log(mu_post_is / mu_prior)
164            w_i = 1.0 / max(se_i**2, 1e-8)
165
166            sub_L.append(L_i)
167            sub_n.append(n_i)
168            sub_log_cf.append(log_cf_i)
169            sub_weights.append(w_i)
170
171        if len(sub_L) < 4:
172            # Degenerate case: not enough data, return identity (CF = 1)
173            self.theta_ = np.zeros(self._n_features())
174            self.is_fitted_ = True
175            return self
176
177        # Design matrix for g(L, n)
178        X = self._design_matrix(np.array(sub_L), np.array(sub_n))
179        y = np.array(sub_log_cf)
180        w = np.array(sub_weights)
181        w = w / w.sum()
182
183        # WLS: solve (X^T W X) theta = X^T W y
184        W = np.diag(w)
185        XtWX = X.T @ W @ X
186        XtWy = X.T @ (w[:, None] * y[:, None]).ravel()
187
188        try:
189            self.theta_ = np.linalg.solve(XtWX, XtWy)
190        except np.linalg.LinAlgError:
191            self.theta_ = np.linalg.lstsq(XtWX, XtWy, rcond=None)[0]
192
193        self.is_fitted_ = True
194        return self

Fit the surrogate g(.) function on a representative sub-portfolio.

Steps:

  1. Draw a sub-portfolio (random stratified selection by n_periods).
  2. Estimate reference intensity theta_ref as portfolio mean frequency.
  3. Draw IS samples Theta_k ~ prior once (shared across all policies).
  4. For each sub-portfolio policy, compute IS-based Bayesian premium.
  5. Fit g(.) by WLS minimising weighted squared log-credibility error.

Parameters

histories : list[ClaimsHistory] Full training portfolio.

Returns

SurrogateModel self (fitted model).

def predict( self, history: ClaimsHistory) -> float:
196    def predict(self, history: ClaimsHistory) -> float:
197        """Compute the credibility factor for a single policy.
198
199        Parameters
200        ----------
201        history : ClaimsHistory
202            The policy's claims history.
203
204        Returns
205        -------
206        float
207            Credibility factor CF = exp(g(L, n)). Multiply by prior_premium
208            to obtain the posterior premium.
209
210        Raises
211        ------
212        RuntimeError
213            If fit() has not been called.
214        """
215        self._check_fitted()
216        assert self.theta_ is not None
217
218        L = self._sufficient_stat(history)
219        n = float(history.n_periods)
220        x = self._design_matrix(np.array([L]), np.array([n]))[0]
221        log_cf = float(x @ self.theta_)
222        return float(np.exp(log_cf))

Compute the credibility factor for a single policy.

Parameters

history : ClaimsHistory The policy's claims history.

Returns

float Credibility factor CF = exp(g(L, n)). Multiply by prior_premium to obtain the posterior premium.

Raises

RuntimeError If fit() has not been called.

def predict_batch( self, histories: list[ClaimsHistory]) -> polars.dataframe.frame.DataFrame:
224    def predict_batch(self, histories: list[ClaimsHistory]) -> pl.DataFrame:
225        """Score a batch of policies and return a Polars DataFrame.
226
227        Parameters
228        ----------
229        histories : list[ClaimsHistory]
230            Policies to score.
231
232        Returns
233        -------
234        pl.DataFrame
235            Columns: policy_id, prior_premium, credibility_factor,
236            posterior_premium, sufficient_stat.
237        """
238        self._check_fitted()
239        rows = []
240        for h in histories:
241            cf = self.predict(h)
242            rows.append(
243                {
244                    "policy_id": h.policy_id,
245                    "prior_premium": h.prior_premium,
246                    "credibility_factor": cf,
247                    "posterior_premium": h.prior_premium * cf,
248                    "sufficient_stat": self._sufficient_stat(h),
249                }
250            )
251        return pl.DataFrame(rows)

Score a batch of policies and return a Polars DataFrame.

Parameters

histories : list[ClaimsHistory] Policies to score.

Returns

pl.DataFrame Columns: policy_id, prior_premium, credibility_factor, posterior_premium, sufficient_stat.

DeepAttentionModel
def balance_calibrate( predict_fn: Callable[[ClaimsHistory], float], histories: list[ClaimsHistory], exposure_weighted: bool = True) -> CalibrationResult:
28def balance_calibrate(
29    predict_fn: Callable[[ClaimsHistory], float],
30    histories: list[ClaimsHistory],
31    exposure_weighted: bool = True,
32) -> CalibrationResult:
33    """Compute the portfolio-level balance calibration factor.
34
35    The calibration factor delta satisfies:
36        sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i]
37
38    For multiplicative models:
39        delta = sum(actual * exposure) / sum(posterior * exposure)
40
41    This function computes delta and returns it as a CalibrationResult.
42    Apply delta to all posterior premiums to restore portfolio balance.
43
44    Parameters
45    ----------
46    predict_fn : callable
47        Function mapping ClaimsHistory -> float, returning the credibility
48        factor (not the posterior premium). Typically model.predict.
49    histories : list[ClaimsHistory]
50        Calibration portfolio. Typically the same data used for fitting,
51        but can be a hold-out period for forward-looking calibration.
52    exposure_weighted : bool
53        If True, weight each policy by its total exposure when computing
54        the balance check. If False, use simple sums. Default True.
55
56    Returns
57    -------
58    CalibrationResult
59        Contains calibration_factor, sum_actual, sum_predicted, n_policies.
60
61    Examples
62    --------
63    >>> model = StaticCredibilityModel().fit(histories)
64    >>> result = balance_calibrate(model.predict, histories)
65    >>> print(f"Calibration factor: {result.calibration_factor:.4f}")
66    >>> # Apply to predictions:
67    >>> posterior = prior * model.predict(h) * result.calibration_factor
68    """
69    sum_actual = 0.0
70    sum_predicted = 0.0
71
72    for h in histories:
73        assert h.exposures is not None
74        cf = predict_fn(h)
75        posterior = h.prior_premium * cf
76
77        if exposure_weighted:
78            weight = h.total_exposure
79        else:
80            weight = 1.0
81
82        # Actual: total claims weighted by exposure (as a rate)
83        # We sum actual * exposure / total_exposure * weight to get total weighted claims
84        actual_rate = h.claim_frequency  # total_claims / total_exposure
85        sum_actual += actual_rate * weight
86        sum_predicted += posterior * weight
87
88    if sum_predicted <= 0.0:
89        calibration_factor = 1.0
90    else:
91        calibration_factor = sum_actual / sum_predicted
92
93    return CalibrationResult(
94        calibration_factor=float(calibration_factor),
95        sum_actual=float(sum_actual),
96        sum_predicted=float(sum_predicted),
97        n_policies=len(histories),
98    )

Compute the portfolio-level balance calibration factor.

The calibration factor delta satisfies:

sum_i [delta * CF_i * mu_prior_i * e_i] = sum_i [Y_i * e_i]

For multiplicative models:

delta = sum(actual * exposure) / sum(posterior * exposure)

This function computes delta and returns it as a CalibrationResult. Apply delta to all posterior premiums to restore portfolio balance.

Parameters

predict_fn : callable Function mapping ClaimsHistory -> float, returning the credibility factor (not the posterior premium). Typically model.predict. histories : list[ClaimsHistory] Calibration portfolio. Typically the same data used for fitting, but can be a hold-out period for forward-looking calibration. exposure_weighted : bool If True, weight each policy by its total exposure when computing the balance check. If False, use simple sums. Default True.

Returns

CalibrationResult Contains calibration_factor, sum_actual, sum_predicted, n_policies.

Examples

>>> model = StaticCredibilityModel().fit(histories)
>>> result = balance_calibrate(model.predict, histories)
>>> print(f"Calibration factor: {result.calibration_factor:.4f}")
>>> # Apply to predictions:
>>> posterior = prior * model.predict(h) * result.calibration_factor
def apply_calibration( credibility_factor: float, calibration_result: CalibrationResult) -> float:
101def apply_calibration(
102    credibility_factor: float,
103    calibration_result: CalibrationResult,
104) -> float:
105    """Apply a calibration factor to a single credibility factor.
106
107    Parameters
108    ----------
109    credibility_factor : float
110        The raw credibility factor from a model's predict().
111    calibration_result : CalibrationResult
112        Output of balance_calibrate().
113
114    Returns
115    -------
116    float
117        Calibrated credibility factor = CF * delta.
118    """
119    return credibility_factor * calibration_result.calibration_factor

Apply a calibration factor to a single credibility factor.

Parameters

credibility_factor : float The raw credibility factor from a model's predict(). calibration_result : CalibrationResult Output of balance_calibrate().

Returns

float Calibrated credibility factor = CF * delta.

def calibrated_predict_fn( predict_fn: Callable[[ClaimsHistory], float], calibration_result: CalibrationResult) -> Callable[[ClaimsHistory], float]:
122def calibrated_predict_fn(
123    predict_fn: Callable[[ClaimsHistory], float],
124    calibration_result: CalibrationResult,
125) -> Callable[[ClaimsHistory], float]:
126    """Wrap a predict function to apply calibration automatically.
127
128    Parameters
129    ----------
130    predict_fn : callable
131        Original predict function (ClaimsHistory -> credibility_factor).
132    calibration_result : CalibrationResult
133        Output of balance_calibrate().
134
135    Returns
136    -------
137    callable
138        New predict function that applies calibration to each prediction.
139
140    Examples
141    --------
142    >>> cal = balance_calibrate(model.predict, histories)
143    >>> calibrated = calibrated_predict_fn(model.predict, cal)
144    >>> cf = calibrated(new_history)  # already calibrated
145    """
146    delta = calibration_result.calibration_factor
147
148    def _calibrated(history: ClaimsHistory) -> float:
149        return predict_fn(history) * delta
150
151    return _calibrated

Wrap a predict function to apply calibration automatically.

Parameters

predict_fn : callable Original predict function (ClaimsHistory -> credibility_factor). calibration_result : CalibrationResult Output of balance_calibrate().

Returns

callable New predict function that applies calibration to each prediction.

Examples

>>> cal = balance_calibrate(model.predict, histories)
>>> calibrated = calibrated_predict_fn(model.predict, cal)
>>> cf = calibrated(new_history)  # already calibrated
def balance_report( predict_fn: Callable[[ClaimsHistory], float], histories: list[ClaimsHistory], by_n_periods: bool = False) -> polars.dataframe.frame.DataFrame:
154def balance_report(
155    predict_fn: Callable[[ClaimsHistory], float],
156    histories: list[ClaimsHistory],
157    by_n_periods: bool = False,
158) -> pl.DataFrame:
159    """Generate a portfolio-level balance report.
160
161    Shows the actual vs predicted claim frequency, and the credibility
162    factor distribution, optionally broken down by number of observed
163    periods (to check whether newer or longer-tenured policies have
164    systematic bias).
165
166    Parameters
167    ----------
168    predict_fn : callable
169        Function mapping ClaimsHistory -> credibility_factor.
170    histories : list[ClaimsHistory]
171        Portfolio to assess.
172    by_n_periods : bool
173        If True, break down by number of observed periods. Default False.
174
175    Returns
176    -------
177    pl.DataFrame
178        Summary statistics: policy_id, n_periods, actual_frequency,
179        prior_premium, credibility_factor, posterior_premium,
180        and residual (actual / posterior).
181    """
182    rows = []
183    for h in histories:
184        assert h.exposures is not None
185        cf = predict_fn(h)
186        posterior = h.prior_premium * cf
187        actual_freq = h.claim_frequency
188        residual = actual_freq / posterior if posterior > 0 else float("nan")
189        rows.append(
190            {
191                "policy_id": h.policy_id,
192                "n_periods": h.n_periods,
193                "total_exposure": h.total_exposure,
194                "actual_frequency": actual_freq,
195                "prior_premium": h.prior_premium,
196                "credibility_factor": cf,
197                "posterior_premium": posterior,
198                "residual": residual,
199            }
200        )
201    df = pl.DataFrame(rows)
202
203    if by_n_periods:
204        return (
205            df.group_by("n_periods")
206            .agg(
207                [
208                    pl.col("actual_frequency").mean().alias("mean_actual"),
209                    pl.col("posterior_premium").mean().alias("mean_posterior"),
210                    pl.col("credibility_factor").mean().alias("mean_cf"),
211                    pl.col("residual").mean().alias("mean_residual"),
212                    pl.len().alias("n_policies"),
213                ]
214            )
215            .sort("n_periods")
216        )
217
218    return df

Generate a portfolio-level balance report.

Shows the actual vs predicted claim frequency, and the credibility factor distribution, optionally broken down by number of observed periods (to check whether newer or longer-tenured policies have systematic bias).

Parameters

predict_fn : callable Function mapping ClaimsHistory -> credibility_factor. histories : list[ClaimsHistory] Portfolio to assess. by_n_periods : bool If True, break down by number of observed periods. Default False.

Returns

pl.DataFrame Summary statistics: policy_id, n_periods, actual_frequency, prior_premium, credibility_factor, posterior_premium, and residual (actual / posterior).

def credibility_factor(posterior_premium: float, prior_premium: float) -> float:
16def credibility_factor(posterior_premium: float, prior_premium: float) -> float:
17    """Compute the multiplicative credibility factor.
18
19    The credibility factor CF satisfies:
20        posterior_premium = prior_premium * CF
21
22    Parameters
23    ----------
24    posterior_premium : float
25        The experience-adjusted (a posteriori) premium.
26    prior_premium : float
27        The a priori (GLM-based) premium.
28
29    Returns
30    -------
31    float
32        Credibility factor. Values > 1 indicate the policy is worse than
33        the GLM base rate; values < 1 indicate better-than-average experience.
34
35    Raises
36    ------
37    ValueError
38        If prior_premium is zero or negative.
39    """
40    if prior_premium <= 0.0:
41        raise ValueError(f"prior_premium must be positive, got {prior_premium}")
42    return posterior_premium / prior_premium

Compute the multiplicative credibility factor.

The credibility factor CF satisfies:

posterior_premium = prior_premium * CF

Parameters

posterior_premium : float The experience-adjusted (a posteriori) premium. prior_premium : float The a priori (GLM-based) premium.

Returns

float Credibility factor. Values > 1 indicate the policy is worse than the GLM base rate; values < 1 indicate better-than-average experience.

Raises

ValueError If prior_premium is zero or negative.

def posterior_premium( prior_premium: float, cf: float, calibration_factor: float = 1.0) -> float:
45def posterior_premium(
46    prior_premium: float, cf: float, calibration_factor: float = 1.0
47) -> float:
48    """Compute the posterior premium from a credibility factor.
49
50    Parameters
51    ----------
52    prior_premium : float
53        The a priori (GLM-based) premium.
54    cf : float
55        Credibility factor (posterior / prior ratio).
56    calibration_factor : float
57        Optional portfolio-level calibration factor from balance_calibrate().
58        Default 1.0 (no calibration applied).
59
60    Returns
61    -------
62    float
63        Posterior premium after experience adjustment and optional calibration.
64    """
65    return prior_premium * cf * calibration_factor

Compute the posterior premium from a credibility factor.

Parameters

prior_premium : float The a priori (GLM-based) premium. cf : float Credibility factor (posterior / prior ratio). calibration_factor : float Optional portfolio-level calibration factor from balance_calibrate(). Default 1.0 (no calibration applied).

Returns

float Posterior premium after experience adjustment and optional calibration.

def seniority_weights( n_periods: int, p: float, q: float, exposures: list[float] | None = None) -> numpy.ndarray:
 68def seniority_weights(
 69    n_periods: int,
 70    p: float,
 71    q: float,
 72    exposures: list[float] | None = None,
 73) -> np.ndarray:
 74    """Compute seniority (recency) weights for claims history periods.
 75
 76    In the dynamic Poisson-gamma model, the effective weight on period t's
 77    claims decays geometrically with age. This function computes the
 78    relative weight assigned to each period in a t-period history when
 79    fitting or predicting.
 80
 81    The weight assigned to period s (0-indexed from oldest) in a history
 82    of length t is proportional to (p*q)^(t-s). The most recent period
 83    receives weight 1; the oldest receives weight (p*q)^(t-1).
 84
 85    Parameters
 86    ----------
 87    n_periods : int
 88        Number of observation periods.
 89    p : float
 90        Transition parameter controlling how strongly the current state
 91        reverts to mean. Must be in (0, 1].
 92    q : float
 93        Decay parameter controlling how quickly older claims are
 94        downweighted. Must be in (0, 1].
 95    exposures : list[float] or None
 96        Per-period exposure weights. If None, uniform exposure is assumed.
 97
 98    Returns
 99    -------
100    np.ndarray
101        Array of shape (n_periods,) with seniority weights, normalised
102        to sum to 1. Most recent period is last.
103
104    Examples
105    --------
106    >>> seniority_weights(3, p=0.9, q=0.8)
107    array([0.182..., 0.274..., 0.543...])  # most recent gets highest weight
108    """
109    if not (0 < p <= 1.0):
110        raise ValueError(f"p must be in (0, 1], got {p}")
111    if not (0 < q <= 1.0):
112        raise ValueError(f"q must be in (0, 1], got {q}")
113
114    decay = p * q
115    # Weights for periods 0, 1, ..., n-1 (oldest to most recent)
116    raw = np.array([decay ** (n_periods - 1 - s) for s in range(n_periods)])
117
118    if exposures is not None:
119        raw = raw * np.asarray(exposures, dtype=float)
120
121    total = raw.sum()
122    if total == 0.0:
123        return np.ones(n_periods) / n_periods
124    return raw / total

Compute seniority (recency) weights for claims history periods.

In the dynamic Poisson-gamma model, the effective weight on period t's claims decays geometrically with age. This function computes the relative weight assigned to each period in a t-period history when fitting or predicting.

The weight assigned to period s (0-indexed from oldest) in a history of length t is proportional to (pq)^(t-s). The most recent period receives weight 1; the oldest receives weight (pq)^(t-1).

Parameters

n_periods : int Number of observation periods. p : float Transition parameter controlling how strongly the current state reverts to mean. Must be in (0, 1]. q : float Decay parameter controlling how quickly older claims are downweighted. Must be in (0, 1]. exposures : list[float] or None Per-period exposure weights. If None, uniform exposure is assumed.

Returns

np.ndarray Array of shape (n_periods,) with seniority weights, normalised to sum to 1. Most recent period is last.

Examples

>>> seniority_weights(3, p=0.9, q=0.8)
array([0.182..., 0.274..., 0.543...])  # most recent gets highest weight
def exposure_weighted_mean(counts: list[int], exposures: list[float]) -> float:
127def exposure_weighted_mean(
128    counts: list[int],
129    exposures: list[float],
130) -> float:
131    """Compute exposure-weighted mean claim frequency.
132
133    Parameters
134    ----------
135    counts : list[int]
136        Claim counts per period.
137    exposures : list[float]
138        Exposure (e.g., years on risk) per period.
139
140    Returns
141    -------
142    float
143        Total claims divided by total exposure. Returns 0.0 if total
144        exposure is zero.
145    """
146    total_exp = sum(exposures)
147    if total_exp == 0.0:
148        return 0.0
149    return sum(counts) / total_exp

Compute exposure-weighted mean claim frequency.

Parameters

counts : list[int] Claim counts per period. exposures : list[float] Exposure (e.g., years on risk) per period.

Returns

float Total claims divided by total exposure. Returns 0.0 if total exposure is zero.

def history_sufficient_stat( history: ClaimsHistory, theta_ref: float | None = None) -> float:
152def history_sufficient_stat(
153    history: ClaimsHistory,
154    theta_ref: float | None = None,
155) -> float:
156    """Compute the log-likelihood sufficient statistic for a claims history.
157
158    For Poisson models with a reference intensity theta_ref, the sufficient
159    statistic is the log-likelihood of the observed claim counts:
160
161        L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ]
162
163    When theta_ref is None, uses the empirical frequency as reference.
164
165    This is the input to the surrogate model's g(.) function.
166
167    Parameters
168    ----------
169    history : ClaimsHistory
170        The policy's claims history.
171    theta_ref : float or None
172        Reference intensity (claims per exposure unit). If None, uses the
173        empirical claim frequency from the history.
174
175    Returns
176    -------
177    float
178        Log-likelihood sufficient statistic.
179    """
180    assert history.exposures is not None
181    if theta_ref is None:
182        theta_ref = history.claim_frequency
183        if theta_ref == 0.0:
184            theta_ref = 1e-6  # avoid log(0)
185
186    log_lik = 0.0
187    for y_t, e_t in zip(history.claim_counts, history.exposures):
188        rate = theta_ref * e_t
189        log_lik += y_t * np.log(rate) - rate
190    return log_lik

Compute the log-likelihood sufficient statistic for a claims history.

For Poisson models with a reference intensity theta_ref, the sufficient statistic is the log-likelihood of the observed claim counts:

L(Y; theta) = sum_t [ Y_t * log(theta * e_t) - theta * e_t ]

When theta_ref is None, uses the empirical frequency as reference.

This is the input to the surrogate model's g(.) function.

Parameters

history : ClaimsHistory The policy's claims history. theta_ref : float or None Reference intensity (claims per exposure unit). If None, uses the empirical claim frequency from the history.

Returns

float Log-likelihood sufficient statistic.