insurance_frequency_severity.dependent
insurance_frequency_severity.dependent
Dependent frequency-severity neural two-part model for insurance pricing.
The core idea is multi-task learning: a single shared encoder trunk processes covariates and produces a latent representation that feeds both a Poisson frequency head and a Gamma severity head. Gradients from both losses flow through the trunk simultaneously, so it learns features that are jointly informative for frequency and severity. That shared information is where the implicit frequency-severity dependence lives.
On top of the latent dependence you can add the explicit Garrido-Genest-Schulz conditional covariate (log μ += γ·N), which gives a semi-analytical pure premium correction and a directly interpretable dependence parameter.
This subpackage is distinct from the Sarmanov copula approach in the parent package. Use Sarmanov when you need an analytical bivariate density and a simple omega parameter for a regulator; use this subpackage when you have a large dataset, suspect nonlinear interactions, and want a single model that learns both tasks jointly.
Public API
Model
~
DependentFreqSevNet – PyTorch nn.Module (shared trunk + heads)
SharedTrunkConfig – dataclass for trunk hyperparameters
FrequencyHead – Poisson head module
SeverityHead – Gamma head module
Training
~~~~
JointLoss – Poisson + Gamma NLL with configurable balancing
DependentFSTrainer – training loop with early stopping and LR scheduling
Wrapper
~~~
DependentFSModel – sklearn-compatible estimator (fit/predict/score)
Premium
~~~
PurePremiumEstimator – Monte Carlo + optional MGF analytical correction
Diagnostics
~~~
DependentFSDiagnostics – Lorenz, calibration, dependence tests, latent corr
Data
~~~~
FreqSevDataset – PyTorch Dataset with exposure handling
prepare_features – numeric encoding helper
Benchmarks
~~
make_dependent_claims – synthetic claims with known γ dependence
make_independent_claims – synthetic independent baseline
1""" 2insurance_frequency_severity.dependent 3======================================= 4Dependent frequency-severity neural two-part model for insurance pricing. 5 6The core idea is multi-task learning: a single shared encoder trunk processes 7covariates and produces a latent representation that feeds both a Poisson 8frequency head and a Gamma severity head. Gradients from both losses flow 9through the trunk simultaneously, so it learns features that are jointly 10informative for frequency *and* severity. That shared information is where the 11implicit frequency-severity dependence lives. 12 13On top of the latent dependence you can add the explicit Garrido-Genest-Schulz 14conditional covariate (log μ += γ·N), which gives a semi-analytical pure 15premium correction and a directly interpretable dependence parameter. 16 17This subpackage is distinct from the Sarmanov copula approach in the parent 18package. Use Sarmanov when you need an analytical bivariate density and a 19simple omega parameter for a regulator; use this subpackage when you have a 20large dataset, suspect nonlinear interactions, and want a single model that 21learns both tasks jointly. 22 23Public API 24---------- 25Model 26~~~~~ 27``DependentFreqSevNet`` – PyTorch nn.Module (shared trunk + heads) 28``SharedTrunkConfig`` – dataclass for trunk hyperparameters 29``FrequencyHead`` – Poisson head module 30``SeverityHead`` – Gamma head module 31 32Training 33~~~~~~~~ 34``JointLoss`` – Poisson + Gamma NLL with configurable balancing 35``DependentFSTrainer`` – training loop with early stopping and LR scheduling 36 37Wrapper 38~~~~~~~ 39``DependentFSModel`` – sklearn-compatible estimator (fit/predict/score) 40 41Premium 42~~~~~~~ 43``PurePremiumEstimator`` – Monte Carlo + optional MGF analytical correction 44 45Diagnostics 46~~~~~~~~~~~ 47``DependentFSDiagnostics`` – Lorenz, calibration, dependence tests, latent corr 48 49Data 50~~~~ 51``FreqSevDataset`` – PyTorch Dataset with exposure handling 52``prepare_features`` – numeric encoding helper 53 54Benchmarks 55~~~~~~~~~~ 56``make_dependent_claims`` – synthetic claims with known γ dependence 57``make_independent_claims`` – synthetic independent baseline 58""" 59 60from insurance_frequency_severity.dependent.model import ( 61 DependentFreqSevNet, 62 FrequencyHead, 63 SeverityHead, 64 SharedTrunkConfig, 65) 66from insurance_frequency_severity.dependent.training import DependentFSTrainer, JointLoss 67from insurance_frequency_severity.dependent.wrapper import DependentFSModel 68from insurance_frequency_severity.dependent.premium import PurePremiumEstimator 69from insurance_frequency_severity.dependent.diagnostics import DependentFSDiagnostics 70from insurance_frequency_severity.dependent.data import FreqSevDataset, prepare_features 71from insurance_frequency_severity.dependent.benchmarks import make_dependent_claims, make_independent_claims 72 73__all__ = [ 74 "DependentFreqSevNet", 75 "FrequencyHead", 76 "SeverityHead", 77 "SharedTrunkConfig", 78 "JointLoss", 79 "DependentFSTrainer", 80 "DependentFSModel", 81 "PurePremiumEstimator", 82 "DependentFSDiagnostics", 83 "FreqSevDataset", 84 "prepare_features", 85 "make_dependent_claims", 86 "make_independent_claims", 87]
237class DependentFreqSevNet(nn.Module): 238 """Shared-trunk frequency-severity neural network. 239 240 Architecture:: 241 242 x ∈ R^p → SharedTrunk → h ∈ R^d_latent 243 │ 244 ┌───────────────┴──────────────┐ 245 FrequencyHead SeverityHead 246 log λ + log t log μ [+ γ·N] 247 │ │ 248 Poisson loss Gamma loss 249 └──────── joint loss ─────────┘ 250 251 Both Poisson and Gamma gradients flow through the shared trunk. This is the 252 mechanism by which the model captures frequency-severity dependence in the 253 latent space. 254 255 Parameters 256 ---------- 257 in_features: 258 Number of input features after preprocessing. 259 trunk_config: 260 Architecture for the shared encoder. See ``SharedTrunkConfig``. 261 use_explicit_gamma: 262 Whether to include the γ·N conditional covariate term in the severity 263 head. Recommended: ``True`` when you want an interpretable dependence 264 parameter and a semi-analytical pure premium. 265 266 Attributes 267 ---------- 268 trunk: 269 The shared ``SharedTrunk`` module. 270 freq_head: 271 ``FrequencyHead`` module. 272 sev_head: 273 ``SeverityHead`` module. 274 log_phi: 275 Learnable log of the Gamma dispersion parameter φ. Initialised to 0 276 (φ=1). Shared across observations (one scalar per model). 277 """ 278 279 def __init__( 280 self, 281 in_features: int, 282 trunk_config: Optional[SharedTrunkConfig] = None, 283 use_explicit_gamma: bool = True, 284 ) -> None: 285 super().__init__() 286 if trunk_config is None: 287 trunk_config = SharedTrunkConfig() 288 self.trunk_config = trunk_config 289 self.use_explicit_gamma = use_explicit_gamma 290 self.trunk = SharedTrunk(in_features, trunk_config) 291 self.freq_head = FrequencyHead(trunk_config.latent_dim) 292 self.sev_head = SeverityHead(trunk_config.latent_dim, use_explicit_gamma) 293 # Log Gamma dispersion: φ = exp(log_phi). One scalar for the whole model. 294 self.log_phi = nn.Parameter(torch.zeros(1)) 295 296 @property 297 def phi(self) -> Tensor: 298 """Gamma dispersion parameter φ = exp(log φ).""" 299 return torch.exp(self.log_phi) 300 301 @property 302 def gamma(self) -> Optional[Tensor]: 303 """Explicit dependence parameter γ (None if not used).""" 304 if self.use_explicit_gamma: 305 return self.sev_head.gamma 306 return None 307 308 def forward( 309 self, 310 x: Tensor, 311 log_exposure: Tensor, 312 n_claims: Optional[Tensor] = None, 313 ) -> Tuple[Tensor, Tensor, Tensor]: 314 """Forward pass. 315 316 Parameters 317 ---------- 318 x: 319 Feature matrix, shape (batch, in_features). 320 log_exposure: 321 Log exposure, shape (batch,) or (batch, 1). 322 n_claims: 323 Realised claim counts, shape (batch,). Only needed when 324 ``use_explicit_gamma=True``. 325 326 Returns 327 ------- 328 log_lambda: shape (batch,) 329 Log Poisson rate (frequency × exposure). 330 log_mu: shape (batch,) 331 Log expected average severity. 332 phi: shape (1,) 333 Gamma dispersion parameter. 334 """ 335 h = self.trunk(x) 336 log_lambda = self.freq_head(h, log_exposure) 337 log_mu = self.sev_head(h, n_claims) 338 return log_lambda, log_mu, self.phi 339 340 def latent(self, x: Tensor) -> Tensor: 341 """Return the shared latent representation h for diagnostic purposes. 342 343 Parameters 344 ---------- 345 x: 346 Feature matrix, shape (batch, in_features). 347 348 Returns 349 ------- 350 Tensor of shape (batch, latent_dim). 351 """ 352 self.eval() 353 with torch.no_grad(): 354 return self.trunk(x) 355 356 def count_parameters(self) -> dict: 357 """Return a breakdown of trainable parameter counts by component.""" 358 def _count(module: nn.Module) -> int: 359 return sum(p.numel() for p in module.parameters() if p.requires_grad) 360 361 return { 362 "trunk": _count(self.trunk), 363 "freq_head": _count(self.freq_head), 364 "sev_head": _count(self.sev_head), 365 "log_phi": 1, 366 "total": _count(self), 367 } 368 369 def extra_repr(self) -> str: 370 cfg = self.trunk_config 371 gamma_str = f", gamma={self.sev_head.gamma.item():.4f}" if self.use_explicit_gamma else "" 372 return ( 373 f"hidden_dims={cfg.hidden_dims}, latent_dim={cfg.latent_dim}, " 374 f"phi={self.phi.item():.4f}{gamma_str}" 375 )
Shared-trunk frequency-severity neural network.
Architecture::
x ∈ R^p → SharedTrunk → h ∈ R^d_latent
│
┌───────────────┴──────────────┐
FrequencyHead SeverityHead
log λ + log t log μ [+ γ·N]
│ │
Poisson loss Gamma loss
└──────── joint loss ─────────┘
Both Poisson and Gamma gradients flow through the shared trunk. This is the mechanism by which the model captures frequency-severity dependence in the latent space.
Parameters
in_features:
Number of input features after preprocessing.
trunk_config:
Architecture for the shared encoder. See SharedTrunkConfig.
use_explicit_gamma:
Whether to include the γ·N conditional covariate term in the severity
head. Recommended: True when you want an interpretable dependence
parameter and a semi-analytical pure premium.
Attributes
trunk:
The shared SharedTrunk module.
freq_head:
FrequencyHead module.
sev_head:
SeverityHead module.
log_phi:
Learnable log of the Gamma dispersion parameter φ. Initialised to 0
(φ=1). Shared across observations (one scalar per model).
279 def __init__( 280 self, 281 in_features: int, 282 trunk_config: Optional[SharedTrunkConfig] = None, 283 use_explicit_gamma: bool = True, 284 ) -> None: 285 super().__init__() 286 if trunk_config is None: 287 trunk_config = SharedTrunkConfig() 288 self.trunk_config = trunk_config 289 self.use_explicit_gamma = use_explicit_gamma 290 self.trunk = SharedTrunk(in_features, trunk_config) 291 self.freq_head = FrequencyHead(trunk_config.latent_dim) 292 self.sev_head = SeverityHead(trunk_config.latent_dim, use_explicit_gamma) 293 # Log Gamma dispersion: φ = exp(log_phi). One scalar for the whole model. 294 self.log_phi = nn.Parameter(torch.zeros(1))
Initialize internal Module state, shared by both nn.Module and ScriptModule.
296 @property 297 def phi(self) -> Tensor: 298 """Gamma dispersion parameter φ = exp(log φ).""" 299 return torch.exp(self.log_phi)
Gamma dispersion parameter φ = exp(log φ).
301 @property 302 def gamma(self) -> Optional[Tensor]: 303 """Explicit dependence parameter γ (None if not used).""" 304 if self.use_explicit_gamma: 305 return self.sev_head.gamma 306 return None
Explicit dependence parameter γ (None if not used).
308 def forward( 309 self, 310 x: Tensor, 311 log_exposure: Tensor, 312 n_claims: Optional[Tensor] = None, 313 ) -> Tuple[Tensor, Tensor, Tensor]: 314 """Forward pass. 315 316 Parameters 317 ---------- 318 x: 319 Feature matrix, shape (batch, in_features). 320 log_exposure: 321 Log exposure, shape (batch,) or (batch, 1). 322 n_claims: 323 Realised claim counts, shape (batch,). Only needed when 324 ``use_explicit_gamma=True``. 325 326 Returns 327 ------- 328 log_lambda: shape (batch,) 329 Log Poisson rate (frequency × exposure). 330 log_mu: shape (batch,) 331 Log expected average severity. 332 phi: shape (1,) 333 Gamma dispersion parameter. 334 """ 335 h = self.trunk(x) 336 log_lambda = self.freq_head(h, log_exposure) 337 log_mu = self.sev_head(h, n_claims) 338 return log_lambda, log_mu, self.phi
Forward pass.
Parameters
x:
Feature matrix, shape (batch, in_features).
log_exposure:
Log exposure, shape (batch,) or (batch, 1).
n_claims:
Realised claim counts, shape (batch,). Only needed when
use_explicit_gamma=True.
Returns
log_lambda: shape (batch,) Log Poisson rate (frequency × exposure). log_mu: shape (batch,) Log expected average severity. phi: shape (1,) Gamma dispersion parameter.
340 def latent(self, x: Tensor) -> Tensor: 341 """Return the shared latent representation h for diagnostic purposes. 342 343 Parameters 344 ---------- 345 x: 346 Feature matrix, shape (batch, in_features). 347 348 Returns 349 ------- 350 Tensor of shape (batch, latent_dim). 351 """ 352 self.eval() 353 with torch.no_grad(): 354 return self.trunk(x)
Return the shared latent representation h for diagnostic purposes.
Parameters
x: Feature matrix, shape (batch, in_features).
Returns
Tensor of shape (batch, latent_dim).
356 def count_parameters(self) -> dict: 357 """Return a breakdown of trainable parameter counts by component.""" 358 def _count(module: nn.Module) -> int: 359 return sum(p.numel() for p in module.parameters() if p.requires_grad) 360 361 return { 362 "trunk": _count(self.trunk), 363 "freq_head": _count(self.freq_head), 364 "sev_head": _count(self.sev_head), 365 "log_phi": 1, 366 "total": _count(self), 367 }
Return a breakdown of trainable parameter counts by component.
369 def extra_repr(self) -> str: 370 cfg = self.trunk_config 371 gamma_str = f", gamma={self.sev_head.gamma.item():.4f}" if self.use_explicit_gamma else "" 372 return ( 373 f"hidden_dims={cfg.hidden_dims}, latent_dim={cfg.latent_dim}, " 374 f"phi={self.phi.item():.4f}{gamma_str}" 375 )
Return the extra representation of the module.
To print customized extra information, you should re-implement this method in your own modules. Both single-line and multi-line strings are acceptable.
134class FrequencyHead(nn.Module): 135 """Poisson frequency head. 136 137 Maps latent h to log λ and adds the log exposure offset so that 138 the output is log(expected_claims) = log λ + log t. 139 140 Parameters 141 ---------- 142 latent_dim: 143 Dimension of the latent representation coming from the trunk. 144 """ 145 146 def __init__(self, latent_dim: int) -> None: 147 super().__init__() 148 self.linear = nn.Linear(latent_dim, 1) 149 nn.init.zeros_(self.linear.weight) 150 nn.init.zeros_(self.linear.bias) 151 152 def forward(self, h: Tensor, log_exposure: Tensor) -> Tensor: 153 """ 154 Parameters 155 ---------- 156 h: 157 Latent representation, shape (batch, latent_dim). 158 log_exposure: 159 Log of the exposure (time at risk), shape (batch,) or (batch, 1). 160 161 Returns 162 ------- 163 log_lambda: Tensor of shape (batch,). 164 log(λ_i · t_i) — the Poisson rate including exposure. 165 """ 166 log_exposure = log_exposure.view(-1) 167 log_lambda_x = self.linear(h).squeeze(-1) 168 return log_lambda_x + log_exposure
Poisson frequency head.
Maps latent h to log λ and adds the log exposure offset so that the output is log(expected_claims) = log λ + log t.
Parameters
latent_dim: Dimension of the latent representation coming from the trunk.
146 def __init__(self, latent_dim: int) -> None: 147 super().__init__() 148 self.linear = nn.Linear(latent_dim, 1) 149 nn.init.zeros_(self.linear.weight) 150 nn.init.zeros_(self.linear.bias)
Initialize internal Module state, shared by both nn.Module and ScriptModule.
152 def forward(self, h: Tensor, log_exposure: Tensor) -> Tensor: 153 """ 154 Parameters 155 ---------- 156 h: 157 Latent representation, shape (batch, latent_dim). 158 log_exposure: 159 Log of the exposure (time at risk), shape (batch,) or (batch, 1). 160 161 Returns 162 ------- 163 log_lambda: Tensor of shape (batch,). 164 log(λ_i · t_i) — the Poisson rate including exposure. 165 """ 166 log_exposure = log_exposure.view(-1) 167 log_lambda_x = self.linear(h).squeeze(-1) 168 return log_lambda_x + log_exposure
Parameters
h: Latent representation, shape (batch, latent_dim). log_exposure: Log of the exposure (time at risk), shape (batch,) or (batch, 1).
Returns
log_lambda: Tensor of shape (batch,). log(λ_i · t_i) — the Poisson rate including exposure.
171class SeverityHead(nn.Module): 172 """Gamma severity head. 173 174 Maps latent h to log μ (the expected average severity given x). Optionally 175 includes an explicit dependence term γ·N where N is the realised claim count 176 and γ is a learnable scalar. 177 178 Parameters 179 ---------- 180 latent_dim: 181 Dimension of the latent representation coming from the trunk. 182 use_explicit_gamma: 183 If True, a scalar γ is learned and the forward pass accepts an 184 ``n_claims`` argument. When γ=0 at convergence, the latent dependence 185 is sufficient to describe the joint structure. 186 """ 187 188 def __init__( 189 self, 190 latent_dim: int, 191 use_explicit_gamma: bool = True, 192 ) -> None: 193 super().__init__() 194 self.use_explicit_gamma = use_explicit_gamma 195 self.linear = nn.Linear(latent_dim, 1) 196 nn.init.zeros_(self.linear.weight) 197 nn.init.zeros_(self.linear.bias) 198 if use_explicit_gamma: 199 # Initialise γ to zero so the model starts at independence. 200 self.gamma = nn.Parameter(torch.zeros(1)) 201 else: 202 self.register_parameter("gamma", None) 203 204 def forward( 205 self, 206 h: Tensor, 207 n_claims: Optional[Tensor] = None, 208 ) -> Tensor: 209 """ 210 Parameters 211 ---------- 212 h: 213 Latent representation, shape (batch, latent_dim). 214 n_claims: 215 Realised claim counts, shape (batch,). Required when 216 ``use_explicit_gamma=True``; ignored otherwise. 217 218 Returns 219 ------- 220 log_mu: Tensor of shape (batch,). 221 log(μ_i), the log expected average severity. 222 """ 223 log_mu = self.linear(h).squeeze(-1) 224 if self.use_explicit_gamma: 225 if n_claims is None: 226 raise ValueError( 227 "n_claims must be supplied when use_explicit_gamma=True." 228 ) 229 log_mu = log_mu + self.gamma * n_claims.float() 230 return log_mu
Gamma severity head.
Maps latent h to log μ (the expected average severity given x). Optionally includes an explicit dependence term γ·N where N is the realised claim count and γ is a learnable scalar.
Parameters
latent_dim:
Dimension of the latent representation coming from the trunk.
use_explicit_gamma:
If True, a scalar γ is learned and the forward pass accepts an
n_claims argument. When γ=0 at convergence, the latent dependence
is sufficient to describe the joint structure.
188 def __init__( 189 self, 190 latent_dim: int, 191 use_explicit_gamma: bool = True, 192 ) -> None: 193 super().__init__() 194 self.use_explicit_gamma = use_explicit_gamma 195 self.linear = nn.Linear(latent_dim, 1) 196 nn.init.zeros_(self.linear.weight) 197 nn.init.zeros_(self.linear.bias) 198 if use_explicit_gamma: 199 # Initialise γ to zero so the model starts at independence. 200 self.gamma = nn.Parameter(torch.zeros(1)) 201 else: 202 self.register_parameter("gamma", None)
Initialize internal Module state, shared by both nn.Module and ScriptModule.
204 def forward( 205 self, 206 h: Tensor, 207 n_claims: Optional[Tensor] = None, 208 ) -> Tensor: 209 """ 210 Parameters 211 ---------- 212 h: 213 Latent representation, shape (batch, latent_dim). 214 n_claims: 215 Realised claim counts, shape (batch,). Required when 216 ``use_explicit_gamma=True``; ignored otherwise. 217 218 Returns 219 ------- 220 log_mu: Tensor of shape (batch,). 221 log(μ_i), the log expected average severity. 222 """ 223 log_mu = self.linear(h).squeeze(-1) 224 if self.use_explicit_gamma: 225 if n_claims is None: 226 raise ValueError( 227 "n_claims must be supplied when use_explicit_gamma=True." 228 ) 229 log_mu = log_mu + self.gamma * n_claims.float() 230 return log_mu
Parameters
h:
Latent representation, shape (batch, latent_dim).
n_claims:
Realised claim counts, shape (batch,). Required when
use_explicit_gamma=True; ignored otherwise.
Returns
log_mu: Tensor of shape (batch,). log(μ_i), the log expected average severity.
45class JointLoss(nn.Module): 46 """Joint Poisson-Gamma negative log-likelihood. 47 48 Poisson NLL for frequency (all rows) plus Gamma NLL for average severity 49 (positive-claim rows only). 50 51 Parameters 52 ---------- 53 loss_weight_sev: 54 Fixed weight applied to the Gamma loss. If ``auto_balance=True`` this 55 is ignored and the weight is updated each batch to equalise the two loss 56 scales. 57 auto_balance: 58 When True, the severity weight is set to 59 ``|ℓ_freq| / max(|ℓ_sev|, ε)`` each forward call, so both losses 60 contribute roughly equally. Useful when you have no prior intuition 61 about the relative scales. 62 eps: 63 Small constant for numerical stability in auto-balancing. 64 """ 65 66 def __init__( 67 self, 68 loss_weight_sev: float = 1.0, 69 auto_balance: bool = False, 70 eps: float = 1e-8, 71 ) -> None: 72 super().__init__() 73 self.loss_weight_sev = loss_weight_sev 74 self.auto_balance = auto_balance 75 self.eps = eps 76 77 def forward( 78 self, 79 log_lambda: Tensor, 80 log_mu: Tensor, 81 phi: Tensor, 82 n_claims: Tensor, 83 avg_severity: Tensor, 84 ) -> Tuple[Tensor, Tensor, Tensor]: 85 """Compute joint loss. 86 87 Parameters 88 ---------- 89 log_lambda: 90 Log Poisson rate (including log exposure), shape (batch,). 91 log_mu: 92 Log expected average severity, shape (batch,). 93 phi: 94 Gamma dispersion scalar. 95 n_claims: 96 Observed claim counts, shape (batch,). 97 avg_severity: 98 Observed average severity (total loss / n_claims), shape (batch,). 99 Only rows where n_claims > 0 are used. 100 101 Returns 102 ------- 103 total_loss: scalar Tensor. 104 freq_loss: scalar Tensor (for logging). 105 sev_loss: scalar Tensor (for logging). 106 """ 107 # -- Poisson NLL (all rows) -- 108 # ℓ_freq = Σ [n_i·log λ_i - λ_i - log(n_i!)] 109 # We drop the log-factorial term (constant w.r.t. parameters). 110 lambda_ = torch.exp(log_lambda) 111 freq_loss = -torch.mean(n_claims.float() * log_lambda - lambda_) 112 113 # -- Gamma NLL (positive-claim rows only) -- 114 pos_mask = n_claims > 0 115 n_pos = pos_mask.sum() 116 117 if n_pos == 0: 118 sev_loss = torch.tensor(0.0, device=log_lambda.device) 119 else: 120 log_mu_pos = log_mu[pos_mask] 121 y_pos = avg_severity[pos_mask] 122 n_pos_vals = n_claims[pos_mask].float() 123 mu_pos = torch.exp(log_mu_pos) 124 125 # Average severity ȳ_i = Σ Y_ij / n_i follows Gamma with: 126 # shape = n_i / φ, scale = φ·μ_i / n_i (mean = μ_i) 127 # Gamma NLL = -Σ [α·log β - log Γ(α) + (α-1)·log ȳ - β·ȳ] 128 # where α = n_i/φ, β = n_i/(φ·μ_i) 129 alpha = n_pos_vals / phi 130 # Gamma NLL (up to log-Gamma constant): 131 # = Σ [α·log(α/μ) - (α-1)·log(ȳ) + α·ȳ/μ - log Γ(α)] 132 # Simplified form (standard Gamma deviance contribution): 133 # nll_i = α·(log μ - log ȳ + ȳ/μ - 1) + log Γ(α) - α·log α 134 log_y_pos = torch.log(y_pos + 1e-10) 135 ratio = y_pos / (mu_pos + 1e-10) 136 # Using Gamma log-likelihood: log p(y|α,β) = α log β - lgamma(α) + (α-1) log y - β y 137 # with β = α/μ 138 beta = alpha / (mu_pos + 1e-10) 139 log_p = ( 140 alpha * torch.log(beta + 1e-10) 141 - torch.lgamma(alpha) 142 + (alpha - 1.0) * log_y_pos 143 - beta * y_pos 144 ) 145 sev_loss = -torch.mean(log_p) 146 147 # -- Combine -- 148 if self.auto_balance: 149 w = (freq_loss.detach().abs() / (sev_loss.detach().abs() + self.eps)).clamp(0.01, 100.0) 150 else: 151 w = self.loss_weight_sev 152 153 total_loss = freq_loss + w * sev_loss 154 return total_loss, freq_loss.detach(), sev_loss.detach()
Joint Poisson-Gamma negative log-likelihood.
Poisson NLL for frequency (all rows) plus Gamma NLL for average severity (positive-claim rows only).
Parameters
loss_weight_sev:
Fixed weight applied to the Gamma loss. If auto_balance=True this
is ignored and the weight is updated each batch to equalise the two loss
scales.
auto_balance:
When True, the severity weight is set to
|ℓ_freq| / max(|ℓ_sev|, ε) each forward call, so both losses
contribute roughly equally. Useful when you have no prior intuition
about the relative scales.
eps:
Small constant for numerical stability in auto-balancing.
66 def __init__( 67 self, 68 loss_weight_sev: float = 1.0, 69 auto_balance: bool = False, 70 eps: float = 1e-8, 71 ) -> None: 72 super().__init__() 73 self.loss_weight_sev = loss_weight_sev 74 self.auto_balance = auto_balance 75 self.eps = eps
Initialize internal Module state, shared by both nn.Module and ScriptModule.
77 def forward( 78 self, 79 log_lambda: Tensor, 80 log_mu: Tensor, 81 phi: Tensor, 82 n_claims: Tensor, 83 avg_severity: Tensor, 84 ) -> Tuple[Tensor, Tensor, Tensor]: 85 """Compute joint loss. 86 87 Parameters 88 ---------- 89 log_lambda: 90 Log Poisson rate (including log exposure), shape (batch,). 91 log_mu: 92 Log expected average severity, shape (batch,). 93 phi: 94 Gamma dispersion scalar. 95 n_claims: 96 Observed claim counts, shape (batch,). 97 avg_severity: 98 Observed average severity (total loss / n_claims), shape (batch,). 99 Only rows where n_claims > 0 are used. 100 101 Returns 102 ------- 103 total_loss: scalar Tensor. 104 freq_loss: scalar Tensor (for logging). 105 sev_loss: scalar Tensor (for logging). 106 """ 107 # -- Poisson NLL (all rows) -- 108 # ℓ_freq = Σ [n_i·log λ_i - λ_i - log(n_i!)] 109 # We drop the log-factorial term (constant w.r.t. parameters). 110 lambda_ = torch.exp(log_lambda) 111 freq_loss = -torch.mean(n_claims.float() * log_lambda - lambda_) 112 113 # -- Gamma NLL (positive-claim rows only) -- 114 pos_mask = n_claims > 0 115 n_pos = pos_mask.sum() 116 117 if n_pos == 0: 118 sev_loss = torch.tensor(0.0, device=log_lambda.device) 119 else: 120 log_mu_pos = log_mu[pos_mask] 121 y_pos = avg_severity[pos_mask] 122 n_pos_vals = n_claims[pos_mask].float() 123 mu_pos = torch.exp(log_mu_pos) 124 125 # Average severity ȳ_i = Σ Y_ij / n_i follows Gamma with: 126 # shape = n_i / φ, scale = φ·μ_i / n_i (mean = μ_i) 127 # Gamma NLL = -Σ [α·log β - log Γ(α) + (α-1)·log ȳ - β·ȳ] 128 # where α = n_i/φ, β = n_i/(φ·μ_i) 129 alpha = n_pos_vals / phi 130 # Gamma NLL (up to log-Gamma constant): 131 # = Σ [α·log(α/μ) - (α-1)·log(ȳ) + α·ȳ/μ - log Γ(α)] 132 # Simplified form (standard Gamma deviance contribution): 133 # nll_i = α·(log μ - log ȳ + ȳ/μ - 1) + log Γ(α) - α·log α 134 log_y_pos = torch.log(y_pos + 1e-10) 135 ratio = y_pos / (mu_pos + 1e-10) 136 # Using Gamma log-likelihood: log p(y|α,β) = α log β - lgamma(α) + (α-1) log y - β y 137 # with β = α/μ 138 beta = alpha / (mu_pos + 1e-10) 139 log_p = ( 140 alpha * torch.log(beta + 1e-10) 141 - torch.lgamma(alpha) 142 + (alpha - 1.0) * log_y_pos 143 - beta * y_pos 144 ) 145 sev_loss = -torch.mean(log_p) 146 147 # -- Combine -- 148 if self.auto_balance: 149 w = (freq_loss.detach().abs() / (sev_loss.detach().abs() + self.eps)).clamp(0.01, 100.0) 150 else: 151 w = self.loss_weight_sev 152 153 total_loss = freq_loss + w * sev_loss 154 return total_loss, freq_loss.detach(), sev_loss.detach()
Compute joint loss.
Parameters
log_lambda: Log Poisson rate (including log exposure), shape (batch,). log_mu: Log expected average severity, shape (batch,). phi: Gamma dispersion scalar. n_claims: Observed claim counts, shape (batch,). avg_severity: Observed average severity (total loss / n_claims), shape (batch,). Only rows where n_claims > 0 are used.
Returns
total_loss: scalar Tensor. freq_loss: scalar Tensor (for logging). sev_loss: scalar Tensor (for logging).
242class DependentFSTrainer: 243 """Trains a ``DependentFreqSevNet`` using the joint Poisson-Gamma loss. 244 245 Parameters 246 ---------- 247 model: 248 The network to train. 249 config: 250 Training hyperparameters. 251 """ 252 253 def __init__( 254 self, 255 model: DependentFreqSevNet, 256 config: Optional[TrainingConfig] = None, 257 ) -> None: 258 self.model = model 259 self.config = config or TrainingConfig() 260 self._device = self._resolve_device(self.config.device) 261 self.history: Dict[str, List[float]] = { 262 "train_loss": [], 263 "val_loss": [], 264 "freq_loss": [], 265 "sev_loss": [], 266 "gamma": [], 267 } 268 269 @staticmethod 270 def _resolve_device(spec: str) -> torch.device: 271 if spec == "auto": 272 return torch.device("cuda" if torch.cuda.is_available() else "cpu") 273 return torch.device(spec) 274 275 def _build_optimizer(self) -> Optimizer: 276 cfg = self.config 277 if abs(cfg.trunk_lr_multiplier - 1.0) < 1e-9: 278 return Adam( 279 self.model.parameters(), 280 lr=cfg.lr, 281 weight_decay=cfg.weight_decay, 282 ) 283 trunk_params = list(self.model.trunk.parameters()) 284 head_params = ( 285 list(self.model.freq_head.parameters()) 286 + list(self.model.sev_head.parameters()) 287 + [self.model.log_phi] 288 ) 289 return Adam( 290 [ 291 {"params": trunk_params, "lr": cfg.lr * cfg.trunk_lr_multiplier}, 292 {"params": head_params, "lr": cfg.lr}, 293 ], 294 weight_decay=cfg.weight_decay, 295 ) 296 297 def _run_epoch( 298 self, 299 loader: DataLoader, 300 criterion: JointLoss, 301 optimizer: Optional[Optimizer], 302 train: bool, 303 ) -> Tuple[float, float, float]: 304 """Run one epoch. Returns (mean_loss, mean_freq_loss, mean_sev_loss).""" 305 self.model.train(train) 306 total_loss = freq_loss_sum = sev_loss_sum = 0.0 307 n_batches = 0 308 309 context = torch.enable_grad() if train else torch.no_grad() 310 with context: 311 for batch in loader: 312 x = batch["x"].to(self._device) 313 log_exp = batch["log_exposure"].to(self._device) 314 n = batch["n_claims"].to(self._device) 315 y = batch["avg_severity"].to(self._device) 316 317 log_lambda, log_mu, phi = self.model(x, log_exp, n) 318 loss, fl, sl = criterion(log_lambda, log_mu, phi, n, y) 319 320 if train: 321 optimizer.zero_grad() 322 loss.backward() 323 nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=5.0) 324 optimizer.step() 325 326 total_loss += loss.item() 327 freq_loss_sum += fl.item() 328 sev_loss_sum += sl.item() 329 n_batches += 1 330 331 return total_loss / n_batches, freq_loss_sum / n_batches, sev_loss_sum / n_batches 332 333 def fit( 334 self, 335 train_loader: DataLoader, 336 val_loader: Optional[DataLoader] = None, 337 ) -> "DependentFSTrainer": 338 """Train the model. 339 340 Parameters 341 ---------- 342 train_loader: 343 DataLoader yielding batches with keys ``x``, ``log_exposure``, 344 ``n_claims``, ``avg_severity``. 345 val_loader: 346 Optional validation DataLoader for early stopping. 347 348 Returns 349 ------- 350 self 351 """ 352 cfg = self.config 353 self.model.to(self._device) 354 criterion = JointLoss( 355 loss_weight_sev=cfg.loss_weight_sev, 356 auto_balance=cfg.auto_balance, 357 ) 358 optimizer = self._build_optimizer() 359 scheduler = ReduceLROnPlateau( 360 optimizer, 361 factor=cfg.lr_reduce_factor, 362 patience=cfg.lr_patience, 363 min_lr=1e-6, 364 ) 365 early_stop: Optional[_EarlyStoppingState] = None 366 if cfg.patience is not None: 367 early_stop = _EarlyStoppingState( 368 patience=cfg.patience, min_delta=cfg.min_delta 369 ) 370 371 for epoch in range(1, cfg.max_epochs + 1): 372 t0 = time.time() 373 train_loss, fl, sl = self._run_epoch( 374 train_loader, criterion, optimizer, train=True 375 ) 376 self.history["train_loss"].append(train_loss) 377 self.history["freq_loss"].append(fl) 378 self.history["sev_loss"].append(sl) 379 if self.model.gamma is not None: 380 self.history["gamma"].append(self.model.gamma.item()) 381 382 val_loss = train_loss 383 if val_loader is not None: 384 val_loss, _, _ = self._run_epoch( 385 val_loader, criterion, optimizer=None, train=False 386 ) 387 self.history["val_loss"].append(val_loss) 388 389 scheduler.step(val_loss) 390 391 if cfg.verbose and (epoch % 10 == 0 or epoch == 1): 392 gamma_str = "" 393 if self.model.gamma is not None: 394 gamma_str = f" γ={self.model.gamma.item():.4f}" 395 elapsed = time.time() - t0 396 logger.info( 397 "Epoch %3d/%d train=%.4f val=%.4f " 398 "freq=%.4f sev=%.4f%s (%.1fs)", 399 epoch, cfg.max_epochs, 400 train_loss, val_loss, fl, sl, gamma_str, elapsed, 401 ) 402 403 if early_stop is not None: 404 early_stop.step(val_loss, self.model.state_dict()) 405 if early_stop.should_stop: 406 if cfg.verbose: 407 logger.info( 408 "Early stopping at epoch %d (best val=%.4f)", 409 epoch, early_stop.best_loss, 410 ) 411 break 412 413 # Restore best weights if early stopping was used. 414 if early_stop is not None and early_stop.best_state is not None: 415 self.model.load_state_dict( 416 {k: v.to(self._device) for k, v in early_stop.best_state.items()} 417 ) 418 419 self.model.eval() 420 return self
Trains a DependentFreqSevNet using the joint Poisson-Gamma loss.
Parameters
model: The network to train. config: Training hyperparameters.
253 def __init__( 254 self, 255 model: DependentFreqSevNet, 256 config: Optional[TrainingConfig] = None, 257 ) -> None: 258 self.model = model 259 self.config = config or TrainingConfig() 260 self._device = self._resolve_device(self.config.device) 261 self.history: Dict[str, List[float]] = { 262 "train_loss": [], 263 "val_loss": [], 264 "freq_loss": [], 265 "sev_loss": [], 266 "gamma": [], 267 }
333 def fit( 334 self, 335 train_loader: DataLoader, 336 val_loader: Optional[DataLoader] = None, 337 ) -> "DependentFSTrainer": 338 """Train the model. 339 340 Parameters 341 ---------- 342 train_loader: 343 DataLoader yielding batches with keys ``x``, ``log_exposure``, 344 ``n_claims``, ``avg_severity``. 345 val_loader: 346 Optional validation DataLoader for early stopping. 347 348 Returns 349 ------- 350 self 351 """ 352 cfg = self.config 353 self.model.to(self._device) 354 criterion = JointLoss( 355 loss_weight_sev=cfg.loss_weight_sev, 356 auto_balance=cfg.auto_balance, 357 ) 358 optimizer = self._build_optimizer() 359 scheduler = ReduceLROnPlateau( 360 optimizer, 361 factor=cfg.lr_reduce_factor, 362 patience=cfg.lr_patience, 363 min_lr=1e-6, 364 ) 365 early_stop: Optional[_EarlyStoppingState] = None 366 if cfg.patience is not None: 367 early_stop = _EarlyStoppingState( 368 patience=cfg.patience, min_delta=cfg.min_delta 369 ) 370 371 for epoch in range(1, cfg.max_epochs + 1): 372 t0 = time.time() 373 train_loss, fl, sl = self._run_epoch( 374 train_loader, criterion, optimizer, train=True 375 ) 376 self.history["train_loss"].append(train_loss) 377 self.history["freq_loss"].append(fl) 378 self.history["sev_loss"].append(sl) 379 if self.model.gamma is not None: 380 self.history["gamma"].append(self.model.gamma.item()) 381 382 val_loss = train_loss 383 if val_loader is not None: 384 val_loss, _, _ = self._run_epoch( 385 val_loader, criterion, optimizer=None, train=False 386 ) 387 self.history["val_loss"].append(val_loss) 388 389 scheduler.step(val_loss) 390 391 if cfg.verbose and (epoch % 10 == 0 or epoch == 1): 392 gamma_str = "" 393 if self.model.gamma is not None: 394 gamma_str = f" γ={self.model.gamma.item():.4f}" 395 elapsed = time.time() - t0 396 logger.info( 397 "Epoch %3d/%d train=%.4f val=%.4f " 398 "freq=%.4f sev=%.4f%s (%.1fs)", 399 epoch, cfg.max_epochs, 400 train_loss, val_loss, fl, sl, gamma_str, elapsed, 401 ) 402 403 if early_stop is not None: 404 early_stop.step(val_loss, self.model.state_dict()) 405 if early_stop.should_stop: 406 if cfg.verbose: 407 logger.info( 408 "Early stopping at epoch %d (best val=%.4f)", 409 epoch, early_stop.best_loss, 410 ) 411 break 412 413 # Restore best weights if early stopping was used. 414 if early_stop is not None and early_stop.best_state is not None: 415 self.model.load_state_dict( 416 {k: v.to(self._device) for k, v in early_stop.best_state.items()} 417 ) 418 419 self.model.eval() 420 return self
Train the model.
Parameters
train_loader:
DataLoader yielding batches with keys x, log_exposure,
n_claims, avg_severity.
val_loader:
Optional validation DataLoader for early stopping.
Returns
self
45class DependentFSModel(BaseEstimator): 46 """Sklearn-compatible dependent frequency-severity neural model. 47 48 Parameters 49 ---------- 50 trunk_config: 51 Shared encoder architecture. Defaults to ``SharedTrunkConfig()``. 52 training_config: 53 Training hyperparameters. Defaults to ``TrainingConfig()``. 54 use_explicit_gamma: 55 Whether to include the GGS γ·N conditional covariate in the severity 56 head. When True, ``gamma_`` is populated after fitting and analytical 57 pure premium is available. 58 n_mc: 59 Number of Monte Carlo samples per policy for ``predict_pure_premium`` 60 (used when ``method="mc"`` or always if ``use_explicit_gamma=False``). 61 val_fraction: 62 Fraction of training data held out for early stopping. Set to 0.0 to 63 disable validation split (and early stopping). 64 batch_size: 65 Mini-batch size for training DataLoader. 66 random_state: 67 Seed for reproducibility of data splitting and MC sampling. 68 69 Attributes 70 ---------- 71 model_: 72 Fitted ``DependentFreqSevNet`` (set after ``fit``). 73 trainer_: 74 Fitted ``DependentFSTrainer`` (holds training history). 75 n_features_in_: 76 Number of input features. 77 gamma_: 78 Estimated explicit dependence parameter γ (None if 79 ``use_explicit_gamma=False``). 80 """ 81 82 def __init__( 83 self, 84 trunk_config: Optional[SharedTrunkConfig] = None, 85 training_config: Optional[TrainingConfig] = None, 86 use_explicit_gamma: bool = True, 87 n_mc: int = 1000, 88 val_fraction: float = 0.1, 89 batch_size: int = 512, 90 random_state: int = 42, 91 ) -> None: 92 self.trunk_config = trunk_config 93 self.training_config = training_config 94 self.use_explicit_gamma = use_explicit_gamma 95 self.n_mc = n_mc 96 self.val_fraction = val_fraction 97 self.batch_size = batch_size 98 self.random_state = random_state 99 100 # ------------------------------------------------------------------ 101 # Fit 102 # ------------------------------------------------------------------ 103 104 def fit( 105 self, 106 X: np.ndarray, 107 n_claims: np.ndarray, 108 avg_severity: np.ndarray, 109 exposure: np.ndarray, 110 ) -> "DependentFSModel": 111 """Fit the joint frequency-severity model. 112 113 Parameters 114 ---------- 115 X: 116 Feature matrix, shape (n_policies, n_features). Must be numeric 117 float32. Use ``prepare_features`` to encode raw DataFrames. 118 n_claims: 119 Claim count per policy, shape (n_policies,). 120 avg_severity: 121 Average claim size per policy, shape (n_policies,). Zero for 122 policies with zero claims. 123 exposure: 124 Years at risk per policy, shape (n_policies,). Must be > 0. 125 126 Returns 127 ------- 128 self 129 """ 130 X = np.asarray(X, dtype=np.float32) 131 n_claims = np.asarray(n_claims, dtype=np.float32) 132 avg_severity = np.asarray(avg_severity, dtype=np.float32) 133 exposure = np.asarray(exposure, dtype=np.float32) 134 135 self.n_features_in_ = X.shape[1] 136 trunk_config = self.trunk_config or SharedTrunkConfig() 137 train_config = self.training_config or TrainingConfig() 138 # Propagate batch_size into training config if not set by caller. 139 if self.training_config is None: 140 train_config = TrainingConfig( 141 batch_size=self.batch_size, 142 verbose=train_config.verbose, 143 ) 144 145 self.model_ = DependentFreqSevNet( 146 in_features=self.n_features_in_, 147 trunk_config=trunk_config, 148 use_explicit_gamma=self.use_explicit_gamma, 149 ) 150 151 dataset = FreqSevDataset(X, n_claims, avg_severity, exposure) 152 153 if self.val_fraction > 0.0: 154 train_loader, val_loader = make_train_val_loaders( 155 dataset, 156 val_fraction=self.val_fraction, 157 batch_size=train_config.batch_size, 158 seed=self.random_state, 159 ) 160 else: 161 train_loader = DataLoader( 162 dataset, 163 batch_size=train_config.batch_size, 164 shuffle=True, 165 ) 166 val_loader = None 167 168 self.trainer_ = DependentFSTrainer(self.model_, train_config) 169 self.trainer_.fit(train_loader, val_loader) 170 171 if self.use_explicit_gamma and self.model_.gamma is not None: 172 self.gamma_ = self.model_.gamma.item() 173 else: 174 self.gamma_ = None 175 176 return self 177 178 # ------------------------------------------------------------------ 179 # Predict helpers 180 # ------------------------------------------------------------------ 181 182 def _to_tensor(self, arr: np.ndarray, dtype=torch.float32) -> Tensor: 183 return torch.tensor(np.asarray(arr, dtype=np.float32), dtype=dtype) 184 185 def _device(self) -> torch.device: 186 return next(self.model_.parameters()).device 187 188 def _forward_numpy( 189 self, 190 X: np.ndarray, 191 exposure: np.ndarray, 192 n_claims_for_gamma: Optional[np.ndarray] = None, 193 ) -> Tuple[Tensor, Tensor, Tensor]: 194 """Run forward pass and return (log_lambda, log_mu, phi) as CPU tensors.""" 195 check_is_fitted(self, "model_") 196 self.model_.eval() 197 device = self._device() 198 x_t = self._to_tensor(X).to(device) 199 log_exp_t = self._to_tensor(np.log(exposure.astype(np.float32))).to(device) 200 n_t: Optional[Tensor] = None 201 if self.use_explicit_gamma: 202 # When use_explicit_gamma=True the severity head requires n_claims. 203 # Default to zeros (N=0) when not provided — gives baseline severity 204 # without the gamma adjustment, which is correct for predict_frequency 205 # and predict_severity calls. 206 if n_claims_for_gamma is not None: 207 n_t = self._to_tensor(n_claims_for_gamma).to(device) 208 else: 209 n_t = torch.zeros(len(X), dtype=torch.float32, device=device) 210 211 with torch.no_grad(): 212 log_lambda, log_mu, phi = self.model_(x_t, log_exp_t, n_t) 213 214 return log_lambda.cpu(), log_mu.cpu(), phi.cpu() 215 216 # ------------------------------------------------------------------ 217 # Public prediction methods 218 # ------------------------------------------------------------------ 219 220 def predict_frequency( 221 self, 222 X: np.ndarray, 223 exposure: np.ndarray, 224 ) -> np.ndarray: 225 """Predict expected claim frequency (per unit exposure). 226 227 Parameters 228 ---------- 229 X: 230 Feature matrix, shape (n, p). 231 exposure: 232 Exposure for each policy, shape (n,). 233 234 Returns 235 ------- 236 np.ndarray of shape (n,) 237 Expected claims per unit exposure for each policy. 238 """ 239 exposure = np.asarray(exposure, dtype=np.float32) 240 log_lambda, _, _ = self._forward_numpy(X, exposure) 241 lambda_with_exp = torch.exp(log_lambda).numpy() 242 return lambda_with_exp / exposure 243 244 def predict_severity( 245 self, 246 X: np.ndarray, 247 exposure: Optional[np.ndarray] = None, 248 ) -> np.ndarray: 249 """Predict expected average severity (at N=0 for γ·N term). 250 251 When ``use_explicit_gamma=True``, the severity head includes γ·N. In 252 this method N is set to zero, giving the baseline severity 253 exp(SevHead(h)). This is the severity for a policy with zero prior 254 claims — which is useful for comparing relative risk, but not for 255 computing the pure premium (use ``predict_pure_premium`` for that). 256 257 Parameters 258 ---------- 259 X: 260 Feature matrix, shape (n, p). 261 exposure: 262 Needed internally; defaults to all-ones if not provided. 263 264 Returns 265 ------- 266 np.ndarray of shape (n,) 267 Expected average severity (baseline, N=0). 268 """ 269 n = len(X) 270 if exposure is None: 271 exposure = np.ones(n, dtype=np.float32) 272 exposure = np.asarray(exposure, dtype=np.float32) 273 n_zero = np.zeros(n, dtype=np.float32) 274 _, log_mu, _ = self._forward_numpy(X, exposure, n_claims_for_gamma=n_zero) 275 return torch.exp(log_mu).numpy() 276 277 def predict_pure_premium( 278 self, 279 X: np.ndarray, 280 exposure: np.ndarray, 281 method: str = "auto", 282 n_mc: Optional[int] = None, 283 ) -> np.ndarray: 284 """Predict pure premium (expected total loss per unit exposure). 285 286 Parameters 287 ---------- 288 X: 289 Feature matrix, shape (n, p). 290 exposure: 291 Exposure, shape (n,). 292 method: 293 ``"auto"`` uses analytical when γ is available, MC otherwise. 294 ``"mc"`` forces Monte Carlo regardless. 295 ``"analytical"`` uses the GGS MGF formula (requires γ). 296 n_mc: 297 Override the MC sample size. 298 299 Returns 300 ------- 301 np.ndarray of shape (n,) 302 Pure premium per unit exposure. 303 """ 304 check_is_fitted(self, "model_") 305 exposure = np.asarray(exposure, dtype=np.float32) 306 n = len(X) 307 308 use_analytical = ( 309 method == "analytical" 310 or (method == "auto" and self.use_explicit_gamma) 311 ) 312 313 estimator = PurePremiumEstimator( 314 n_mc=n_mc or self.n_mc, 315 seed=self.random_state, 316 ) 317 318 # Zero out n_claims for the forward pass (base severity, no look-ahead) 319 n_zero = np.zeros(n, dtype=np.float32) 320 log_lambda, log_mu_base, phi = self._forward_numpy( 321 X, exposure, n_claims_for_gamma=n_zero 322 ) 323 exp_t = torch.tensor(exposure, dtype=torch.float32) 324 325 if use_analytical and self.model_.gamma is not None: 326 gamma_t = self.model_.gamma.detach().cpu() 327 pp = estimator.analytical(log_lambda, log_mu_base, gamma_t, exp_t) 328 else: 329 pp = estimator.monte_carlo(log_lambda, log_mu_base, phi, exp_t) 330 331 return pp.numpy() 332 333 def predict(self, X: np.ndarray, exposure: Optional[np.ndarray] = None) -> np.ndarray: 334 """Sklearn-compatible predict: returns pure premium with unit exposure.""" 335 if exposure is None: 336 exposure = np.ones(len(X), dtype=np.float32) 337 return self.predict_pure_premium(X, exposure) 338 339 def latent_repr(self, X: np.ndarray) -> np.ndarray: 340 """Return the shared latent representation h for each policy. 341 342 Useful for diagnostics: inspect whether the latent space captures 343 meaningful risk structure, or compute correlation between freq-direction 344 and sev-direction in h. 345 346 Parameters 347 ---------- 348 X: 349 Feature matrix, shape (n, p). 350 351 Returns 352 ------- 353 np.ndarray of shape (n, latent_dim) 354 """ 355 check_is_fitted(self, "model_") 356 device = self._device() 357 x_t = self._to_tensor(X).to(device) 358 h = self.model_.latent(x_t) 359 return h.cpu().numpy() 360 361 def score( 362 self, 363 X: np.ndarray, 364 n_claims: np.ndarray, 365 avg_severity: np.ndarray, 366 exposure: np.ndarray, 367 ) -> float: 368 """Negative mean Poisson deviance on frequency (higher is better). 369 370 This is the sklearn convention: ``score`` returns a value to maximise. 371 372 The metric is the mean Poisson deviance on frequency predictions, negated 373 so that cross_val_score() sees it as a maximisation problem. 374 375 Parameters 376 ---------- 377 X, n_claims, avg_severity, exposure: 378 Evaluation data in the same format as ``fit``. 379 380 Returns 381 ------- 382 float 383 Negative mean Poisson deviance. 384 """ 385 exposure = np.asarray(exposure, dtype=np.float32) 386 freq_pred = self.predict_frequency(X, exposure) 387 freq_actual_per_unit = n_claims.astype(np.float32) / exposure 388 389 # Poisson deviance = 2 * Σ [y log(y/μ) - (y - μ)] 390 eps = 1e-10 391 y = freq_actual_per_unit 392 mu = freq_pred + eps 393 dev = 2.0 * np.where(y > 0, y * np.log(y / mu) - (y - mu), mu) 394 return -float(np.mean(dev)) 395 396 def training_history(self) -> dict: 397 """Return the training history dict from the trainer.""" 398 check_is_fitted(self, "trainer_") 399 return self.trainer_.history
Sklearn-compatible dependent frequency-severity neural model.
Parameters
trunk_config:
Shared encoder architecture. Defaults to SharedTrunkConfig().
training_config:
Training hyperparameters. Defaults to TrainingConfig().
use_explicit_gamma:
Whether to include the GGS γ·N conditional covariate in the severity
head. When True, gamma_ is populated after fitting and analytical
pure premium is available.
n_mc:
Number of Monte Carlo samples per policy for predict_pure_premium
(used when method="mc" or always if use_explicit_gamma=False).
val_fraction:
Fraction of training data held out for early stopping. Set to 0.0 to
disable validation split (and early stopping).
batch_size:
Mini-batch size for training DataLoader.
random_state:
Seed for reproducibility of data splitting and MC sampling.
Attributes
model_:
Fitted DependentFreqSevNet (set after fit).
trainer_:
Fitted DependentFSTrainer (holds training history).
n_features_in_:
Number of input features.
gamma_:
Estimated explicit dependence parameter γ (None if
use_explicit_gamma=False).
82 def __init__( 83 self, 84 trunk_config: Optional[SharedTrunkConfig] = None, 85 training_config: Optional[TrainingConfig] = None, 86 use_explicit_gamma: bool = True, 87 n_mc: int = 1000, 88 val_fraction: float = 0.1, 89 batch_size: int = 512, 90 random_state: int = 42, 91 ) -> None: 92 self.trunk_config = trunk_config 93 self.training_config = training_config 94 self.use_explicit_gamma = use_explicit_gamma 95 self.n_mc = n_mc 96 self.val_fraction = val_fraction 97 self.batch_size = batch_size 98 self.random_state = random_state
104 def fit( 105 self, 106 X: np.ndarray, 107 n_claims: np.ndarray, 108 avg_severity: np.ndarray, 109 exposure: np.ndarray, 110 ) -> "DependentFSModel": 111 """Fit the joint frequency-severity model. 112 113 Parameters 114 ---------- 115 X: 116 Feature matrix, shape (n_policies, n_features). Must be numeric 117 float32. Use ``prepare_features`` to encode raw DataFrames. 118 n_claims: 119 Claim count per policy, shape (n_policies,). 120 avg_severity: 121 Average claim size per policy, shape (n_policies,). Zero for 122 policies with zero claims. 123 exposure: 124 Years at risk per policy, shape (n_policies,). Must be > 0. 125 126 Returns 127 ------- 128 self 129 """ 130 X = np.asarray(X, dtype=np.float32) 131 n_claims = np.asarray(n_claims, dtype=np.float32) 132 avg_severity = np.asarray(avg_severity, dtype=np.float32) 133 exposure = np.asarray(exposure, dtype=np.float32) 134 135 self.n_features_in_ = X.shape[1] 136 trunk_config = self.trunk_config or SharedTrunkConfig() 137 train_config = self.training_config or TrainingConfig() 138 # Propagate batch_size into training config if not set by caller. 139 if self.training_config is None: 140 train_config = TrainingConfig( 141 batch_size=self.batch_size, 142 verbose=train_config.verbose, 143 ) 144 145 self.model_ = DependentFreqSevNet( 146 in_features=self.n_features_in_, 147 trunk_config=trunk_config, 148 use_explicit_gamma=self.use_explicit_gamma, 149 ) 150 151 dataset = FreqSevDataset(X, n_claims, avg_severity, exposure) 152 153 if self.val_fraction > 0.0: 154 train_loader, val_loader = make_train_val_loaders( 155 dataset, 156 val_fraction=self.val_fraction, 157 batch_size=train_config.batch_size, 158 seed=self.random_state, 159 ) 160 else: 161 train_loader = DataLoader( 162 dataset, 163 batch_size=train_config.batch_size, 164 shuffle=True, 165 ) 166 val_loader = None 167 168 self.trainer_ = DependentFSTrainer(self.model_, train_config) 169 self.trainer_.fit(train_loader, val_loader) 170 171 if self.use_explicit_gamma and self.model_.gamma is not None: 172 self.gamma_ = self.model_.gamma.item() 173 else: 174 self.gamma_ = None 175 176 return self
Fit the joint frequency-severity model.
Parameters
X:
Feature matrix, shape (n_policies, n_features). Must be numeric
float32. Use prepare_features to encode raw DataFrames.
n_claims:
Claim count per policy, shape (n_policies,).
avg_severity:
Average claim size per policy, shape (n_policies,). Zero for
policies with zero claims.
exposure:
Years at risk per policy, shape (n_policies,). Must be > 0.
Returns
self
220 def predict_frequency( 221 self, 222 X: np.ndarray, 223 exposure: np.ndarray, 224 ) -> np.ndarray: 225 """Predict expected claim frequency (per unit exposure). 226 227 Parameters 228 ---------- 229 X: 230 Feature matrix, shape (n, p). 231 exposure: 232 Exposure for each policy, shape (n,). 233 234 Returns 235 ------- 236 np.ndarray of shape (n,) 237 Expected claims per unit exposure for each policy. 238 """ 239 exposure = np.asarray(exposure, dtype=np.float32) 240 log_lambda, _, _ = self._forward_numpy(X, exposure) 241 lambda_with_exp = torch.exp(log_lambda).numpy() 242 return lambda_with_exp / exposure
Predict expected claim frequency (per unit exposure).
Parameters
X: Feature matrix, shape (n, p). exposure: Exposure for each policy, shape (n,).
Returns
np.ndarray of shape (n,) Expected claims per unit exposure for each policy.
244 def predict_severity( 245 self, 246 X: np.ndarray, 247 exposure: Optional[np.ndarray] = None, 248 ) -> np.ndarray: 249 """Predict expected average severity (at N=0 for γ·N term). 250 251 When ``use_explicit_gamma=True``, the severity head includes γ·N. In 252 this method N is set to zero, giving the baseline severity 253 exp(SevHead(h)). This is the severity for a policy with zero prior 254 claims — which is useful for comparing relative risk, but not for 255 computing the pure premium (use ``predict_pure_premium`` for that). 256 257 Parameters 258 ---------- 259 X: 260 Feature matrix, shape (n, p). 261 exposure: 262 Needed internally; defaults to all-ones if not provided. 263 264 Returns 265 ------- 266 np.ndarray of shape (n,) 267 Expected average severity (baseline, N=0). 268 """ 269 n = len(X) 270 if exposure is None: 271 exposure = np.ones(n, dtype=np.float32) 272 exposure = np.asarray(exposure, dtype=np.float32) 273 n_zero = np.zeros(n, dtype=np.float32) 274 _, log_mu, _ = self._forward_numpy(X, exposure, n_claims_for_gamma=n_zero) 275 return torch.exp(log_mu).numpy()
Predict expected average severity (at N=0 for γ·N term).
When use_explicit_gamma=True, the severity head includes γ·N. In
this method N is set to zero, giving the baseline severity
exp(SevHead(h)). This is the severity for a policy with zero prior
claims — which is useful for comparing relative risk, but not for
computing the pure premium (use predict_pure_premium for that).
Parameters
X: Feature matrix, shape (n, p). exposure: Needed internally; defaults to all-ones if not provided.
Returns
np.ndarray of shape (n,) Expected average severity (baseline, N=0).
333 def predict(self, X: np.ndarray, exposure: Optional[np.ndarray] = None) -> np.ndarray: 334 """Sklearn-compatible predict: returns pure premium with unit exposure.""" 335 if exposure is None: 336 exposure = np.ones(len(X), dtype=np.float32) 337 return self.predict_pure_premium(X, exposure)
Sklearn-compatible predict: returns pure premium with unit exposure.
339 def latent_repr(self, X: np.ndarray) -> np.ndarray: 340 """Return the shared latent representation h for each policy. 341 342 Useful for diagnostics: inspect whether the latent space captures 343 meaningful risk structure, or compute correlation between freq-direction 344 and sev-direction in h. 345 346 Parameters 347 ---------- 348 X: 349 Feature matrix, shape (n, p). 350 351 Returns 352 ------- 353 np.ndarray of shape (n, latent_dim) 354 """ 355 check_is_fitted(self, "model_") 356 device = self._device() 357 x_t = self._to_tensor(X).to(device) 358 h = self.model_.latent(x_t) 359 return h.cpu().numpy()
Return the shared latent representation h for each policy.
Useful for diagnostics: inspect whether the latent space captures meaningful risk structure, or compute correlation between freq-direction and sev-direction in h.
Parameters
X: Feature matrix, shape (n, p).
Returns
np.ndarray of shape (n, latent_dim)
361 def score( 362 self, 363 X: np.ndarray, 364 n_claims: np.ndarray, 365 avg_severity: np.ndarray, 366 exposure: np.ndarray, 367 ) -> float: 368 """Negative mean Poisson deviance on frequency (higher is better). 369 370 This is the sklearn convention: ``score`` returns a value to maximise. 371 372 The metric is the mean Poisson deviance on frequency predictions, negated 373 so that cross_val_score() sees it as a maximisation problem. 374 375 Parameters 376 ---------- 377 X, n_claims, avg_severity, exposure: 378 Evaluation data in the same format as ``fit``. 379 380 Returns 381 ------- 382 float 383 Negative mean Poisson deviance. 384 """ 385 exposure = np.asarray(exposure, dtype=np.float32) 386 freq_pred = self.predict_frequency(X, exposure) 387 freq_actual_per_unit = n_claims.astype(np.float32) / exposure 388 389 # Poisson deviance = 2 * Σ [y log(y/μ) - (y - μ)] 390 eps = 1e-10 391 y = freq_actual_per_unit 392 mu = freq_pred + eps 393 dev = 2.0 * np.where(y > 0, y * np.log(y / mu) - (y - mu), mu) 394 return -float(np.mean(dev))
Negative mean Poisson deviance on frequency (higher is better).
This is the sklearn convention: score returns a value to maximise.
The metric is the mean Poisson deviance on frequency predictions, negated so that cross_val_score() sees it as a maximisation problem.
Parameters
X, n_claims, avg_severity, exposure:
Evaluation data in the same format as fit.
Returns
float Negative mean Poisson deviance.
396 def training_history(self) -> dict: 397 """Return the training history dict from the trainer.""" 398 check_is_fitted(self, "trainer_") 399 return self.trainer_.history
Return the training history dict from the trainer.
Descriptor for defining set_{method}_request methods in estimators.
New in version 1.3.
Parameters
name : str
The name of the method for which the request function should be
created, e.g. "fit" would create a set_fit_request function.
keys : list of str
A list of strings which are accepted parameters by the created
function, e.g. ["sample_weight"] if the corresponding method
accepts it as a metadata.
validate_keys : bool, default=True Whether to check if the requested parameters fit the actual parameters of the method.
Notes
This class is a descriptor 1 and uses PEP-362 to set the signature of the returned function 2.
References
Descriptor for defining set_{method}_request methods in estimators.
New in version 1.3.
Parameters
name : str
The name of the method for which the request function should be
created, e.g. "fit" would create a set_fit_request function.
keys : list of str
A list of strings which are accepted parameters by the created
function, e.g. ["sample_weight"] if the corresponding method
accepts it as a metadata.
validate_keys : bool, default=True Whether to check if the requested parameters fit the actual parameters of the method.
Notes
This class is a descriptor 1 and uses PEP-362 to set the signature of the returned function 2.
References
Descriptor for defining set_{method}_request methods in estimators.
New in version 1.3.
Parameters
name : str
The name of the method for which the request function should be
created, e.g. "fit" would create a set_fit_request function.
keys : list of str
A list of strings which are accepted parameters by the created
function, e.g. ["sample_weight"] if the corresponding method
accepts it as a metadata.
validate_keys : bool, default=True Whether to check if the requested parameters fit the actual parameters of the method.
Notes
This class is a descriptor 1 and uses PEP-362 to set the signature of the returned function 2.
References
46class PurePremiumEstimator: 47 """Compute per-policy pure premium predictions from model outputs. 48 49 This class is designed to be called after a forward pass of 50 ``DependentFreqSevNet``. It does not hold a reference to the model; 51 the caller passes tensors directly. 52 53 Parameters 54 ---------- 55 n_mc: 56 Number of Monte Carlo samples per policy for the MC estimator. 57 1000 gives reasonable accuracy; use 5000 for narrow confidence intervals. 58 seed: 59 Random seed for reproducibility of MC samples. 60 device: 61 Torch device for MC computations. 62 """ 63 64 def __init__( 65 self, 66 n_mc: int = 1000, 67 seed: int = 42, 68 device: str = "cpu", 69 ) -> None: 70 self.n_mc = n_mc 71 self.seed = seed 72 self.device = torch.device(device) 73 74 def monte_carlo( 75 self, 76 log_lambda: Tensor, 77 log_mu: Tensor, 78 phi: Tensor, 79 exposure: Tensor, 80 ) -> Tensor: 81 """Monte Carlo pure premium estimate. 82 83 Samples N_i ~ Poisson(λ_i · t_i / t_i) [unit-rate, then N_i is the 84 count] and Y_ij ~ Gamma(N_i/φ, φ·μ_i/N_i) for each realisation. The 85 aggregate pure premium per unit exposure is then mean(N·Ȳ) / t. 86 87 This is fully general — it does not assume any particular dependence 88 structure and works whether or not explicit γ is used. 89 90 Parameters 91 ---------- 92 log_lambda: 93 Log Poisson rate (including log exposure), shape (n,). 94 log_mu: 95 Log expected severity, shape (n,). 96 phi: 97 Gamma dispersion parameter, shape (1,) or scalar. 98 exposure: 99 Exposure, shape (n,). Used to normalise the result to a per-unit 100 pure premium. 101 102 Returns 103 ------- 104 pp: Tensor of shape (n,) 105 Pure premium (expected total loss per unit exposure) for each policy. 106 """ 107 torch.manual_seed(self.seed) 108 n_policies = log_lambda.shape[0] 109 lambda_ = torch.exp(log_lambda).cpu() # shape (n,) 110 mu = torch.exp(log_mu).cpu() # shape (n,) 111 phi_val = phi.cpu().squeeze().item() 112 exp_val = exposure.cpu() 113 114 # Shape: (n_mc, n_policies) 115 # Sample Poisson counts for each MC iteration and policy. 116 poisson_dist = torch.distributions.Poisson( 117 lambda_.unsqueeze(0).expand(self.n_mc, -1) 118 ) 119 n_samp = poisson_dist.sample() # (n_mc, n_policies) 120 121 # Sample Gamma severities only where n_samp > 0. 122 # Shape = alpha of Gamma. We need to handle n_samp=0 carefully. 123 positive_mask = n_samp > 0 124 total_loss = torch.zeros(self.n_mc, n_policies) 125 126 # Vectorised: concentration = n_samp / phi; rate = n_samp / (phi * mu) 127 # mean of Gamma = concentration / rate = mu. 128 # For zero claims, loss = 0. 129 alpha = n_samp.float() / phi_val 130 rate = n_samp.float() / (phi_val * mu.unsqueeze(0) + 1e-10) 131 132 # Sample from Gamma where positive; leave zeros elsewhere. 133 # We clamp alpha and rate to be > 0 to avoid numerical errors. 134 safe_alpha = alpha.clamp(min=1e-6) 135 safe_rate = rate.clamp(min=1e-6) 136 gamma_dist = torch.distributions.Gamma(safe_alpha, safe_rate) 137 y_samp = gamma_dist.sample() # (n_mc, n_policies) 138 139 # Total loss = N * avg_severity (with N * avg_sev = sum of claims) 140 # n_samp * y_samp = total claim amount for that realisation 141 total_loss = torch.where(positive_mask, n_samp.float() * y_samp, torch.zeros_like(y_samp)) 142 143 # Pure premium = E[total_loss] / exposure 144 mean_total_loss = total_loss.mean(dim=0) # (n_policies,) 145 pp = mean_total_loss / exp_val 146 return pp.to(self.device) 147 148 def analytical( 149 self, 150 log_lambda: Tensor, 151 log_mu_base: Tensor, 152 gamma: Tensor, 153 exposure: Tensor, 154 ) -> Tensor: 155 """Semi-analytical pure premium for the GGS conditional covariate model. 156 157 Uses the closed form from Garrido-Genest-Schulz (2016) / NeurFS Theorem 1: 158 159 E[Z | x] = exp(log_mu_base + γ) · exp(λ(eᵞ − 1)) · λ 160 161 where log_mu_base = SevHead(h) (without the γ·N term), λ = exp(log_lambda) 162 is the Poisson rate for the POLICY (not per unit exposure — i.e. λ = rate·t). 163 164 This formula is exact when γ is the only dependence and frequency is 165 Poisson. When the shared trunk also contributes implicit dependence, this 166 is an approximation. 167 168 Parameters 169 ---------- 170 log_lambda: 171 Log Poisson rate including exposure, shape (n,). 172 log_mu_base: 173 Log expected severity WITHOUT the γ·N adjustment, shape (n,). 174 gamma: 175 The scalar dependence parameter γ. 176 exposure: 177 Policy exposure, shape (n,). 178 179 Returns 180 ------- 181 pp: Tensor of shape (n,) 182 Pure premium per unit exposure. 183 """ 184 lambda_ = torch.exp(log_lambda) # includes exposure 185 g = gamma.squeeze() 186 eg = torch.exp(g) 187 188 # E[total_loss | x] = E[N·Y | x] 189 # = exp(log_mu_base + γ) · exp(λ(eᵞ - 1)) · λ 190 log_pp_times_exp = ( 191 log_mu_base + g 192 + lambda_ * (eg - 1.0) 193 + torch.log(lambda_ + 1e-10) 194 ) 195 total_loss = torch.exp(log_pp_times_exp) 196 pp = total_loss / exposure 197 return pp 198 199 def confidence_interval( 200 self, 201 log_lambda: Tensor, 202 log_mu: Tensor, 203 phi: Tensor, 204 exposure: Tensor, 205 alpha: float = 0.05, 206 ) -> tuple: 207 """Bootstrap confidence interval on the pure premium via MC. 208 209 Returns the (lower, point_estimate, upper) pure premiums, shape (n, 3). 210 211 Parameters 212 ---------- 213 alpha: 214 Significance level. 0.05 gives 95% CI. 215 """ 216 torch.manual_seed(self.seed) 217 n_policies = log_lambda.shape[0] 218 lambda_ = torch.exp(log_lambda).cpu() 219 mu = torch.exp(log_mu).cpu() 220 phi_val = phi.cpu().squeeze().item() 221 exp_val = exposure.cpu() 222 223 poisson_dist = torch.distributions.Poisson( 224 lambda_.unsqueeze(0).expand(self.n_mc, -1) 225 ) 226 n_samp = poisson_dist.sample() 227 228 alpha_g = n_samp.float() / phi_val 229 rate_g = n_samp.float() / (phi_val * mu.unsqueeze(0) + 1e-10) 230 safe_alpha = alpha_g.clamp(min=1e-6) 231 safe_rate = rate_g.clamp(min=1e-6) 232 gamma_dist = torch.distributions.Gamma(safe_alpha, safe_rate) 233 y_samp = gamma_dist.sample() 234 235 total_loss = torch.where( 236 n_samp > 0, n_samp.float() * y_samp, torch.zeros_like(y_samp) 237 ) 238 pp_mc = total_loss / exp_val.unsqueeze(0) # (n_mc, n_policies) 239 240 lo = pp_mc.quantile(alpha / 2, dim=0) 241 mid = pp_mc.mean(dim=0) 242 hi = pp_mc.quantile(1 - alpha / 2, dim=0) 243 return lo, mid, hi
Compute per-policy pure premium predictions from model outputs.
This class is designed to be called after a forward pass of
DependentFreqSevNet. It does not hold a reference to the model;
the caller passes tensors directly.
Parameters
n_mc: Number of Monte Carlo samples per policy for the MC estimator. 1000 gives reasonable accuracy; use 5000 for narrow confidence intervals. seed: Random seed for reproducibility of MC samples. device: Torch device for MC computations.
74 def monte_carlo( 75 self, 76 log_lambda: Tensor, 77 log_mu: Tensor, 78 phi: Tensor, 79 exposure: Tensor, 80 ) -> Tensor: 81 """Monte Carlo pure premium estimate. 82 83 Samples N_i ~ Poisson(λ_i · t_i / t_i) [unit-rate, then N_i is the 84 count] and Y_ij ~ Gamma(N_i/φ, φ·μ_i/N_i) for each realisation. The 85 aggregate pure premium per unit exposure is then mean(N·Ȳ) / t. 86 87 This is fully general — it does not assume any particular dependence 88 structure and works whether or not explicit γ is used. 89 90 Parameters 91 ---------- 92 log_lambda: 93 Log Poisson rate (including log exposure), shape (n,). 94 log_mu: 95 Log expected severity, shape (n,). 96 phi: 97 Gamma dispersion parameter, shape (1,) or scalar. 98 exposure: 99 Exposure, shape (n,). Used to normalise the result to a per-unit 100 pure premium. 101 102 Returns 103 ------- 104 pp: Tensor of shape (n,) 105 Pure premium (expected total loss per unit exposure) for each policy. 106 """ 107 torch.manual_seed(self.seed) 108 n_policies = log_lambda.shape[0] 109 lambda_ = torch.exp(log_lambda).cpu() # shape (n,) 110 mu = torch.exp(log_mu).cpu() # shape (n,) 111 phi_val = phi.cpu().squeeze().item() 112 exp_val = exposure.cpu() 113 114 # Shape: (n_mc, n_policies) 115 # Sample Poisson counts for each MC iteration and policy. 116 poisson_dist = torch.distributions.Poisson( 117 lambda_.unsqueeze(0).expand(self.n_mc, -1) 118 ) 119 n_samp = poisson_dist.sample() # (n_mc, n_policies) 120 121 # Sample Gamma severities only where n_samp > 0. 122 # Shape = alpha of Gamma. We need to handle n_samp=0 carefully. 123 positive_mask = n_samp > 0 124 total_loss = torch.zeros(self.n_mc, n_policies) 125 126 # Vectorised: concentration = n_samp / phi; rate = n_samp / (phi * mu) 127 # mean of Gamma = concentration / rate = mu. 128 # For zero claims, loss = 0. 129 alpha = n_samp.float() / phi_val 130 rate = n_samp.float() / (phi_val * mu.unsqueeze(0) + 1e-10) 131 132 # Sample from Gamma where positive; leave zeros elsewhere. 133 # We clamp alpha and rate to be > 0 to avoid numerical errors. 134 safe_alpha = alpha.clamp(min=1e-6) 135 safe_rate = rate.clamp(min=1e-6) 136 gamma_dist = torch.distributions.Gamma(safe_alpha, safe_rate) 137 y_samp = gamma_dist.sample() # (n_mc, n_policies) 138 139 # Total loss = N * avg_severity (with N * avg_sev = sum of claims) 140 # n_samp * y_samp = total claim amount for that realisation 141 total_loss = torch.where(positive_mask, n_samp.float() * y_samp, torch.zeros_like(y_samp)) 142 143 # Pure premium = E[total_loss] / exposure 144 mean_total_loss = total_loss.mean(dim=0) # (n_policies,) 145 pp = mean_total_loss / exp_val 146 return pp.to(self.device)
Monte Carlo pure premium estimate.
Samples N_i ~ Poisson(λ_i · t_i / t_i) [unit-rate, then N_i is the count] and Y_ij ~ Gamma(N_i/φ, φ·μ_i/N_i) for each realisation. The aggregate pure premium per unit exposure is then mean(N·Ȳ) / t.
This is fully general — it does not assume any particular dependence structure and works whether or not explicit γ is used.
Parameters
log_lambda: Log Poisson rate (including log exposure), shape (n,). log_mu: Log expected severity, shape (n,). phi: Gamma dispersion parameter, shape (1,) or scalar. exposure: Exposure, shape (n,). Used to normalise the result to a per-unit pure premium.
Returns
pp: Tensor of shape (n,) Pure premium (expected total loss per unit exposure) for each policy.
148 def analytical( 149 self, 150 log_lambda: Tensor, 151 log_mu_base: Tensor, 152 gamma: Tensor, 153 exposure: Tensor, 154 ) -> Tensor: 155 """Semi-analytical pure premium for the GGS conditional covariate model. 156 157 Uses the closed form from Garrido-Genest-Schulz (2016) / NeurFS Theorem 1: 158 159 E[Z | x] = exp(log_mu_base + γ) · exp(λ(eᵞ − 1)) · λ 160 161 where log_mu_base = SevHead(h) (without the γ·N term), λ = exp(log_lambda) 162 is the Poisson rate for the POLICY (not per unit exposure — i.e. λ = rate·t). 163 164 This formula is exact when γ is the only dependence and frequency is 165 Poisson. When the shared trunk also contributes implicit dependence, this 166 is an approximation. 167 168 Parameters 169 ---------- 170 log_lambda: 171 Log Poisson rate including exposure, shape (n,). 172 log_mu_base: 173 Log expected severity WITHOUT the γ·N adjustment, shape (n,). 174 gamma: 175 The scalar dependence parameter γ. 176 exposure: 177 Policy exposure, shape (n,). 178 179 Returns 180 ------- 181 pp: Tensor of shape (n,) 182 Pure premium per unit exposure. 183 """ 184 lambda_ = torch.exp(log_lambda) # includes exposure 185 g = gamma.squeeze() 186 eg = torch.exp(g) 187 188 # E[total_loss | x] = E[N·Y | x] 189 # = exp(log_mu_base + γ) · exp(λ(eᵞ - 1)) · λ 190 log_pp_times_exp = ( 191 log_mu_base + g 192 + lambda_ * (eg - 1.0) 193 + torch.log(lambda_ + 1e-10) 194 ) 195 total_loss = torch.exp(log_pp_times_exp) 196 pp = total_loss / exposure 197 return pp
Semi-analytical pure premium for the GGS conditional covariate model.
Uses the closed form from Garrido-Genest-Schulz (2016) / NeurFS Theorem 1:
E[Z | x] = exp(log_mu_base + γ) · exp(λ(eᵞ − 1)) · λ
where log_mu_base = SevHead(h) (without the γ·N term), λ = exp(log_lambda) is the Poisson rate for the POLICY (not per unit exposure — i.e. λ = rate·t).
This formula is exact when γ is the only dependence and frequency is Poisson. When the shared trunk also contributes implicit dependence, this is an approximation.
Parameters
log_lambda: Log Poisson rate including exposure, shape (n,). log_mu_base: Log expected severity WITHOUT the γ·N adjustment, shape (n,). gamma: The scalar dependence parameter γ. exposure: Policy exposure, shape (n,).
Returns
pp: Tensor of shape (n,) Pure premium per unit exposure.
199 def confidence_interval( 200 self, 201 log_lambda: Tensor, 202 log_mu: Tensor, 203 phi: Tensor, 204 exposure: Tensor, 205 alpha: float = 0.05, 206 ) -> tuple: 207 """Bootstrap confidence interval on the pure premium via MC. 208 209 Returns the (lower, point_estimate, upper) pure premiums, shape (n, 3). 210 211 Parameters 212 ---------- 213 alpha: 214 Significance level. 0.05 gives 95% CI. 215 """ 216 torch.manual_seed(self.seed) 217 n_policies = log_lambda.shape[0] 218 lambda_ = torch.exp(log_lambda).cpu() 219 mu = torch.exp(log_mu).cpu() 220 phi_val = phi.cpu().squeeze().item() 221 exp_val = exposure.cpu() 222 223 poisson_dist = torch.distributions.Poisson( 224 lambda_.unsqueeze(0).expand(self.n_mc, -1) 225 ) 226 n_samp = poisson_dist.sample() 227 228 alpha_g = n_samp.float() / phi_val 229 rate_g = n_samp.float() / (phi_val * mu.unsqueeze(0) + 1e-10) 230 safe_alpha = alpha_g.clamp(min=1e-6) 231 safe_rate = rate_g.clamp(min=1e-6) 232 gamma_dist = torch.distributions.Gamma(safe_alpha, safe_rate) 233 y_samp = gamma_dist.sample() 234 235 total_loss = torch.where( 236 n_samp > 0, n_samp.float() * y_samp, torch.zeros_like(y_samp) 237 ) 238 pp_mc = total_loss / exp_val.unsqueeze(0) # (n_mc, n_policies) 239 240 lo = pp_mc.quantile(alpha / 2, dim=0) 241 mid = pp_mc.mean(dim=0) 242 hi = pp_mc.quantile(1 - alpha / 2, dim=0) 243 return lo, mid, hi
Bootstrap confidence interval on the pure premium via MC.
Returns the (lower, point_estimate, upper) pure premiums, shape (n, 3).
Parameters
alpha: Significance level. 0.05 gives 95% CI.
36class DependentFSDiagnostics: 37 """Diagnostic tools for a fitted ``DependentFSModel``. 38 39 Parameters 40 ---------- 41 model: 42 A fitted ``DependentFSModel``. 43 X: 44 Feature matrix used for evaluation, shape (n, p). 45 n_claims: 46 Observed claim counts, shape (n,). 47 avg_severity: 48 Observed average severity, shape (n,). 49 exposure: 50 Policy exposure, shape (n,). 51 """ 52 53 def __init__( 54 self, 55 model: DependentFSModel, 56 X: np.ndarray, 57 n_claims: np.ndarray, 58 avg_severity: np.ndarray, 59 exposure: np.ndarray, 60 ) -> None: 61 self.model = model 62 self.X = np.asarray(X, dtype=np.float32) 63 self.n_claims = np.asarray(n_claims, dtype=np.float32) 64 self.avg_severity = np.asarray(avg_severity, dtype=np.float32) 65 self.exposure = np.asarray(exposure, dtype=np.float32) 66 67 # ------------------------------------------------------------------ 68 # Lorenz / Gini 69 # ------------------------------------------------------------------ 70 71 def lorenz_curve( 72 self, 73 target: str = "frequency", 74 n_groups: int = 100, 75 ) -> Tuple[np.ndarray, np.ndarray, float]: 76 """Compute the concentration (Lorenz) curve for frequency or severity. 77 78 Policies are sorted by their predicted risk (ascending), and we plot 79 the cumulative proportion of actual losses vs. cumulative proportion of 80 policies. A perfect model has all concentration at the right; a random 81 model gives the 45-degree diagonal. 82 83 Parameters 84 ---------- 85 target: 86 ``"frequency"`` or ``"severity"`` or ``"pure_premium"``. 87 n_groups: 88 Number of groups for binning the predictions. 89 90 Returns 91 ------- 92 cum_exposure: np.ndarray 93 Cumulative exposure fraction (x-axis). 94 cum_loss: np.ndarray 95 Cumulative loss fraction (y-axis). 96 gini: float 97 Normalised Gini coefficient (2·AUC − 1). 98 """ 99 if target == "frequency": 100 pred = self.model.predict_frequency(self.X, self.exposure) 101 actual = self.n_claims / self.exposure 102 elif target == "severity": 103 pos = self.n_claims > 0 104 if pos.sum() == 0: 105 raise ValueError("No positive-claim rows for severity Lorenz curve.") 106 pred = self.model.predict_severity(self.X[pos], self.exposure[pos]) 107 actual = self.avg_severity[pos] 108 elif target == "pure_premium": 109 pred = self.model.predict_pure_premium(self.X, self.exposure) 110 actual = self.n_claims * self.avg_severity / self.exposure 111 else: 112 raise ValueError(f"Unknown target '{target}'. Use 'frequency', 'severity', or 'pure_premium'.") 113 114 order = np.argsort(pred) 115 actual_sorted = actual[order] 116 exp_sorted = self.exposure[order] if target != "severity" else np.ones_like(actual_sorted) 117 118 cum_exp = np.cumsum(exp_sorted) / exp_sorted.sum() 119 cum_loss = np.cumsum(actual_sorted * exp_sorted) / (actual_sorted * exp_sorted).sum() 120 121 cum_exp = np.concatenate([[0.0], cum_exp]) 122 cum_loss = np.concatenate([[0.0], cum_loss]) 123 124 auc = (np.trapezoid if hasattr(np, "trapezoid") else np.trapz)(cum_loss, cum_exp) 125 gini = 2.0 * auc - 1.0 126 return cum_exp, cum_loss, gini 127 128 def gini_summary(self) -> Dict[str, float]: 129 """Return Gini coefficients for frequency, severity, and pure premium.""" 130 result = {} 131 for t in ("frequency", "pure_premium"): 132 _, _, g = self.lorenz_curve(target=t) 133 result[f"gini_{t}"] = g 134 pos = self.n_claims > 0 135 if pos.sum() >= 10: 136 _, _, g_sev = self.lorenz_curve(target="severity") 137 result["gini_severity"] = g_sev 138 return result 139 140 # ------------------------------------------------------------------ 141 # Calibration 142 # ------------------------------------------------------------------ 143 144 def calibration( 145 self, 146 target: str = "frequency", 147 n_deciles: int = 10, 148 ) -> Dict[str, np.ndarray]: 149 """Calibration of predicted vs observed in n_deciles risk buckets. 150 151 Parameters 152 ---------- 153 target: 154 ``"frequency"`` or ``"pure_premium"``. 155 n_deciles: 156 Number of equally-sized quantile buckets. 157 158 Returns 159 ------- 160 dict with keys ``pred_mean``, ``obs_mean``, ``bucket_edge``. 161 """ 162 if target == "frequency": 163 pred = self.model.predict_frequency(self.X, self.exposure) 164 obs = self.n_claims / self.exposure 165 elif target == "pure_premium": 166 pred = self.model.predict_pure_premium(self.X, self.exposure) 167 obs = self.n_claims * self.avg_severity / self.exposure 168 else: 169 raise ValueError(f"Unknown target '{target}'.") 170 171 quantiles = np.linspace(0, 100, n_deciles + 1) 172 edges = np.percentile(pred, quantiles) 173 pred_means = [] 174 obs_means = [] 175 for lo, hi in zip(edges[:-1], edges[1:]): 176 mask = (pred >= lo) & (pred <= hi) 177 if mask.sum() == 0: 178 continue 179 pred_means.append(pred[mask].mean()) 180 obs_means.append(obs[mask].mean()) 181 return { 182 "pred_mean": np.array(pred_means), 183 "obs_mean": np.array(obs_means), 184 "bucket_edge": edges, 185 } 186 187 # ------------------------------------------------------------------ 188 # Dependence test 189 # ------------------------------------------------------------------ 190 191 def dependence_test( 192 self, 193 n_bootstrap: int = 200, 194 seed: int = 42, 195 ) -> Dict[str, float]: 196 """Test H₀: γ = 0 (frequency-severity independence via explicit term). 197 198 This is only meaningful when ``use_explicit_gamma=True``. It returns 199 the estimated γ, its bootstrap standard error, and an approximate 200 z-statistic. 201 202 The test does NOT account for the implicit latent dependence from the 203 shared trunk — that is captured in the latent correlation analysis 204 instead. 205 206 Parameters 207 ---------- 208 n_bootstrap: 209 Number of bootstrap resamples for the standard error. 210 seed: 211 Random seed. 212 213 Returns 214 ------- 215 dict with keys ``gamma``, ``gamma_se``, ``z_stat``, ``p_value``. 216 """ 217 if not self.model.use_explicit_gamma: 218 return {"gamma": None, "gamma_se": None, "z_stat": None, "p_value": None, 219 "note": "use_explicit_gamma=False; no γ to test."} 220 221 gamma_hat = self.model.gamma_ 222 223 # Bootstrap the γ estimate. 224 rng = np.random.default_rng(seed) 225 n = len(self.X) 226 gamma_boots = [] 227 for _ in range(n_bootstrap): 228 idx = rng.integers(0, n, size=n) 229 m_boot = DependentFSModel( 230 trunk_config=self.model.trunk_config, 231 training_config=self.model.training_config, 232 use_explicit_gamma=True, 233 val_fraction=0.0, 234 random_state=seed, 235 ) 236 # Fit with a short training (5 epochs) to get bootstrap distribution 237 import copy 238 from insurance_frequency_severity.dependent.training import TrainingConfig 239 fast_tc = TrainingConfig(max_epochs=5, verbose=False, auto_balance=True) 240 m_boot.training_config = fast_tc 241 try: 242 m_boot.fit( 243 self.X[idx], 244 self.n_claims[idx], 245 self.avg_severity[idx], 246 self.exposure[idx], 247 ) 248 gamma_boots.append(m_boot.gamma_) 249 except Exception: 250 pass 251 252 gamma_boots = np.array(gamma_boots) 253 se = gamma_boots.std() if len(gamma_boots) > 1 else np.nan 254 z = gamma_hat / se if se > 0 else np.nan 255 p_value = 2.0 * (1.0 - stats.norm.cdf(abs(z))) if not np.isnan(z) else np.nan 256 257 return { 258 "gamma": gamma_hat, 259 "gamma_se": se, 260 "z_stat": z, 261 "p_value": p_value, 262 "n_bootstrap": len(gamma_boots), 263 } 264 265 # ------------------------------------------------------------------ 266 # Latent correlation 267 # ------------------------------------------------------------------ 268 269 def latent_correlation(self) -> Dict[str, object]: 270 """Correlation structure in the shared latent space. 271 272 Computes: 273 - Correlation matrix of latent dimensions h₁, …, h_d 274 - Correlation of each latent dim with observed frequency and severity 275 - Number of latent dims with |corr| > 0.1 with freq and with sev 276 277 Returns 278 ------- 279 dict with keys ``latent_corr``, ``freq_corr``, ``sev_corr``. 280 """ 281 h = self.model.latent_repr(self.X) # (n, d) 282 freq_actual = self.n_claims / (self.exposure + 1e-10) 283 284 pos = self.n_claims > 0 285 sev_actual = np.where(pos, self.avg_severity, np.nan) 286 287 # Latent-latent correlation matrix 288 latent_corr = np.corrcoef(h.T) # (d, d) 289 290 # Latent → frequency correlation 291 freq_corr = np.array([ 292 np.corrcoef(h[:, j], freq_actual)[0, 1] for j in range(h.shape[1]) 293 ]) 294 295 # Latent → severity correlation (positive claims only) 296 sev_corr_vals = [] 297 for j in range(h.shape[1]): 298 valid = ~np.isnan(sev_actual) & (sev_actual > 0) 299 if valid.sum() > 5: 300 sev_corr_vals.append(np.corrcoef(h[valid, j], sev_actual[valid])[0, 1]) 301 else: 302 sev_corr_vals.append(np.nan) 303 sev_corr = np.array(sev_corr_vals) 304 305 return { 306 "latent_corr": latent_corr, 307 "freq_corr": freq_corr, 308 "sev_corr": sev_corr, 309 "n_freq_active": int((np.abs(freq_corr) > 0.1).sum()), 310 "n_sev_active": int((np.abs(sev_corr[~np.isnan(sev_corr)]) > 0.1).sum()), 311 } 312 313 # ------------------------------------------------------------------ 314 # Comparison vs independent 315 # ------------------------------------------------------------------ 316 317 def vs_independent( 318 self, 319 X_val: Optional[np.ndarray] = None, 320 n_claims_val: Optional[np.ndarray] = None, 321 avg_severity_val: Optional[np.ndarray] = None, 322 exposure_val: Optional[np.ndarray] = None, 323 n_mc: int = 500, 324 ) -> Dict[str, float]: 325 """Compare dependent model vs a naive independent (GLM-style) baseline. 326 327 The independent baseline ignores the shared trunk: it predicts pure 328 premium as predict_frequency * predict_severity (i.e. assumes E[Z]=E[N]·E[Y]). 329 The dependent model uses ``predict_pure_premium`` (MC or analytical). 330 331 Returns mean squared error and mean absolute error for both. 332 333 Parameters 334 ---------- 335 X_val, n_claims_val, avg_severity_val, exposure_val: 336 Held-out evaluation data. If None, uses the data passed to the 337 constructor. 338 n_mc: 339 MC samples for the dependent model. 340 """ 341 X = X_val if X_val is not None else self.X 342 n_c = n_claims_val if n_claims_val is not None else self.n_claims 343 a_s = avg_severity_val if avg_severity_val is not None else self.avg_severity 344 exp = exposure_val if exposure_val is not None else self.exposure 345 346 X = np.asarray(X, dtype=np.float32) 347 n_c = np.asarray(n_c, dtype=np.float32) 348 a_s = np.asarray(a_s, dtype=np.float32) 349 exp = np.asarray(exp, dtype=np.float32) 350 351 actual_pp = n_c * a_s / (exp + 1e-10) 352 353 # Dependent model prediction 354 dep_pp = self.model.predict_pure_premium(X, exp, n_mc=n_mc) 355 356 # Independent baseline: freq × sev 357 freq = self.model.predict_frequency(X, exp) 358 sev = self.model.predict_severity(X, exp) 359 indep_pp = freq * sev 360 361 results = {} 362 for name, pred in [("dependent", dep_pp), ("independent", indep_pp)]: 363 err = pred - actual_pp 364 results[f"{name}_mse"] = float(np.mean(err ** 2)) 365 results[f"{name}_mae"] = float(np.mean(np.abs(err))) 366 # Poisson deviance on pure premium 367 eps = 1e-10 368 y, mu = actual_pp + eps, pred + eps 369 dev = 2.0 * np.where(actual_pp > 0, y * np.log(y / mu) - (y - mu), mu - y) 370 results[f"{name}_mean_deviance"] = float(np.mean(dev)) 371 372 results["mse_reduction_pct"] = ( 373 100.0 * (results["independent_mse"] - results["dependent_mse"]) 374 / (results["independent_mse"] + 1e-10) 375 ) 376 return results 377 378 # ------------------------------------------------------------------ 379 # Plotting (requires matplotlib) 380 # ------------------------------------------------------------------ 381 382 def plot_lorenz( 383 self, 384 target: str = "frequency", 385 ax=None, 386 ): 387 """Plot the Lorenz concentration curve. 388 389 Parameters 390 ---------- 391 target: 392 ``"frequency"``, ``"severity"``, or ``"pure_premium"``. 393 ax: 394 Matplotlib axes to plot on. If None, a new figure is created. 395 396 Returns 397 ------- 398 fig, ax 399 """ 400 try: 401 import matplotlib.pyplot as plt 402 except ImportError: 403 raise ImportError( 404 "matplotlib is required for plotting. " 405 "Install with: pip install matplotlib" 406 ) 407 408 cum_exp, cum_loss, gini = self.lorenz_curve(target=target) 409 if ax is None: 410 fig, ax = plt.subplots(figsize=(6, 5)) 411 else: 412 fig = ax.figure 413 414 ax.plot(cum_exp, cum_loss, label=f"Model (Gini={gini:.3f})") 415 ax.plot([0, 1], [0, 1], "k--", label="Random") 416 ax.set_xlabel("Cumulative exposure fraction") 417 ax.set_ylabel(f"Cumulative {target} fraction") 418 ax.set_title(f"Lorenz curve — {target}") 419 ax.legend() 420 return fig, ax 421 422 def plot_calibration( 423 self, 424 target: str = "frequency", 425 n_deciles: int = 10, 426 ax=None, 427 ): 428 """Plot calibration: predicted vs observed in decile buckets. 429 430 Returns 431 ------- 432 fig, ax 433 """ 434 try: 435 import matplotlib.pyplot as plt 436 except ImportError: 437 raise ImportError("matplotlib is required for plotting.") 438 439 cal = self.calibration(target=target, n_deciles=n_deciles) 440 if ax is None: 441 fig, ax = plt.subplots(figsize=(6, 5)) 442 else: 443 fig = ax.figure 444 445 ax.scatter(cal["pred_mean"], cal["obs_mean"], zorder=3) 446 lo = min(cal["pred_mean"].min(), cal["obs_mean"].min()) 447 hi = max(cal["pred_mean"].max(), cal["obs_mean"].max()) 448 ax.plot([lo, hi], [lo, hi], "k--") 449 ax.set_xlabel("Predicted (decile mean)") 450 ax.set_ylabel("Observed (decile mean)") 451 ax.set_title(f"Calibration — {target}") 452 return fig, ax 453 454 def plot_training_history(self, model: Optional[DependentFSModel] = None, ax=None): 455 """Plot training and validation loss curves. 456 457 Parameters 458 ---------- 459 model: 460 The fitted model. If None, uses ``self.model``. 461 462 Returns 463 ------- 464 fig, ax 465 """ 466 try: 467 import matplotlib.pyplot as plt 468 except ImportError: 469 raise ImportError("matplotlib is required for plotting.") 470 471 m = model or self.model 472 hist = m.training_history() 473 if ax is None: 474 fig, ax = plt.subplots(figsize=(8, 4)) 475 else: 476 fig = ax.figure 477 478 ax.plot(hist["train_loss"], label="Train loss") 479 if any(v != 0 for v in hist.get("val_loss", [])): 480 ax.plot(hist["val_loss"], label="Val loss") 481 ax.set_xlabel("Epoch") 482 ax.set_ylabel("Joint loss") 483 ax.set_title("Training history") 484 ax.legend() 485 return fig, ax
Diagnostic tools for a fitted DependentFSModel.
Parameters
model:
A fitted DependentFSModel.
X:
Feature matrix used for evaluation, shape (n, p).
n_claims:
Observed claim counts, shape (n,).
avg_severity:
Observed average severity, shape (n,).
exposure:
Policy exposure, shape (n,).
53 def __init__( 54 self, 55 model: DependentFSModel, 56 X: np.ndarray, 57 n_claims: np.ndarray, 58 avg_severity: np.ndarray, 59 exposure: np.ndarray, 60 ) -> None: 61 self.model = model 62 self.X = np.asarray(X, dtype=np.float32) 63 self.n_claims = np.asarray(n_claims, dtype=np.float32) 64 self.avg_severity = np.asarray(avg_severity, dtype=np.float32) 65 self.exposure = np.asarray(exposure, dtype=np.float32)
71 def lorenz_curve( 72 self, 73 target: str = "frequency", 74 n_groups: int = 100, 75 ) -> Tuple[np.ndarray, np.ndarray, float]: 76 """Compute the concentration (Lorenz) curve for frequency or severity. 77 78 Policies are sorted by their predicted risk (ascending), and we plot 79 the cumulative proportion of actual losses vs. cumulative proportion of 80 policies. A perfect model has all concentration at the right; a random 81 model gives the 45-degree diagonal. 82 83 Parameters 84 ---------- 85 target: 86 ``"frequency"`` or ``"severity"`` or ``"pure_premium"``. 87 n_groups: 88 Number of groups for binning the predictions. 89 90 Returns 91 ------- 92 cum_exposure: np.ndarray 93 Cumulative exposure fraction (x-axis). 94 cum_loss: np.ndarray 95 Cumulative loss fraction (y-axis). 96 gini: float 97 Normalised Gini coefficient (2·AUC − 1). 98 """ 99 if target == "frequency": 100 pred = self.model.predict_frequency(self.X, self.exposure) 101 actual = self.n_claims / self.exposure 102 elif target == "severity": 103 pos = self.n_claims > 0 104 if pos.sum() == 0: 105 raise ValueError("No positive-claim rows for severity Lorenz curve.") 106 pred = self.model.predict_severity(self.X[pos], self.exposure[pos]) 107 actual = self.avg_severity[pos] 108 elif target == "pure_premium": 109 pred = self.model.predict_pure_premium(self.X, self.exposure) 110 actual = self.n_claims * self.avg_severity / self.exposure 111 else: 112 raise ValueError(f"Unknown target '{target}'. Use 'frequency', 'severity', or 'pure_premium'.") 113 114 order = np.argsort(pred) 115 actual_sorted = actual[order] 116 exp_sorted = self.exposure[order] if target != "severity" else np.ones_like(actual_sorted) 117 118 cum_exp = np.cumsum(exp_sorted) / exp_sorted.sum() 119 cum_loss = np.cumsum(actual_sorted * exp_sorted) / (actual_sorted * exp_sorted).sum() 120 121 cum_exp = np.concatenate([[0.0], cum_exp]) 122 cum_loss = np.concatenate([[0.0], cum_loss]) 123 124 auc = (np.trapezoid if hasattr(np, "trapezoid") else np.trapz)(cum_loss, cum_exp) 125 gini = 2.0 * auc - 1.0 126 return cum_exp, cum_loss, gini
Compute the concentration (Lorenz) curve for frequency or severity.
Policies are sorted by their predicted risk (ascending), and we plot the cumulative proportion of actual losses vs. cumulative proportion of policies. A perfect model has all concentration at the right; a random model gives the 45-degree diagonal.
Parameters
target:
"frequency" or "severity" or "pure_premium".
n_groups:
Number of groups for binning the predictions.
Returns
cum_exposure: np.ndarray Cumulative exposure fraction (x-axis). cum_loss: np.ndarray Cumulative loss fraction (y-axis). gini: float Normalised Gini coefficient (2·AUC − 1).
128 def gini_summary(self) -> Dict[str, float]: 129 """Return Gini coefficients for frequency, severity, and pure premium.""" 130 result = {} 131 for t in ("frequency", "pure_premium"): 132 _, _, g = self.lorenz_curve(target=t) 133 result[f"gini_{t}"] = g 134 pos = self.n_claims > 0 135 if pos.sum() >= 10: 136 _, _, g_sev = self.lorenz_curve(target="severity") 137 result["gini_severity"] = g_sev 138 return result
Return Gini coefficients for frequency, severity, and pure premium.
144 def calibration( 145 self, 146 target: str = "frequency", 147 n_deciles: int = 10, 148 ) -> Dict[str, np.ndarray]: 149 """Calibration of predicted vs observed in n_deciles risk buckets. 150 151 Parameters 152 ---------- 153 target: 154 ``"frequency"`` or ``"pure_premium"``. 155 n_deciles: 156 Number of equally-sized quantile buckets. 157 158 Returns 159 ------- 160 dict with keys ``pred_mean``, ``obs_mean``, ``bucket_edge``. 161 """ 162 if target == "frequency": 163 pred = self.model.predict_frequency(self.X, self.exposure) 164 obs = self.n_claims / self.exposure 165 elif target == "pure_premium": 166 pred = self.model.predict_pure_premium(self.X, self.exposure) 167 obs = self.n_claims * self.avg_severity / self.exposure 168 else: 169 raise ValueError(f"Unknown target '{target}'.") 170 171 quantiles = np.linspace(0, 100, n_deciles + 1) 172 edges = np.percentile(pred, quantiles) 173 pred_means = [] 174 obs_means = [] 175 for lo, hi in zip(edges[:-1], edges[1:]): 176 mask = (pred >= lo) & (pred <= hi) 177 if mask.sum() == 0: 178 continue 179 pred_means.append(pred[mask].mean()) 180 obs_means.append(obs[mask].mean()) 181 return { 182 "pred_mean": np.array(pred_means), 183 "obs_mean": np.array(obs_means), 184 "bucket_edge": edges, 185 }
Calibration of predicted vs observed in n_deciles risk buckets.
Parameters
target:
"frequency" or "pure_premium".
n_deciles:
Number of equally-sized quantile buckets.
Returns
dict with keys pred_mean, obs_mean, bucket_edge.
191 def dependence_test( 192 self, 193 n_bootstrap: int = 200, 194 seed: int = 42, 195 ) -> Dict[str, float]: 196 """Test H₀: γ = 0 (frequency-severity independence via explicit term). 197 198 This is only meaningful when ``use_explicit_gamma=True``. It returns 199 the estimated γ, its bootstrap standard error, and an approximate 200 z-statistic. 201 202 The test does NOT account for the implicit latent dependence from the 203 shared trunk — that is captured in the latent correlation analysis 204 instead. 205 206 Parameters 207 ---------- 208 n_bootstrap: 209 Number of bootstrap resamples for the standard error. 210 seed: 211 Random seed. 212 213 Returns 214 ------- 215 dict with keys ``gamma``, ``gamma_se``, ``z_stat``, ``p_value``. 216 """ 217 if not self.model.use_explicit_gamma: 218 return {"gamma": None, "gamma_se": None, "z_stat": None, "p_value": None, 219 "note": "use_explicit_gamma=False; no γ to test."} 220 221 gamma_hat = self.model.gamma_ 222 223 # Bootstrap the γ estimate. 224 rng = np.random.default_rng(seed) 225 n = len(self.X) 226 gamma_boots = [] 227 for _ in range(n_bootstrap): 228 idx = rng.integers(0, n, size=n) 229 m_boot = DependentFSModel( 230 trunk_config=self.model.trunk_config, 231 training_config=self.model.training_config, 232 use_explicit_gamma=True, 233 val_fraction=0.0, 234 random_state=seed, 235 ) 236 # Fit with a short training (5 epochs) to get bootstrap distribution 237 import copy 238 from insurance_frequency_severity.dependent.training import TrainingConfig 239 fast_tc = TrainingConfig(max_epochs=5, verbose=False, auto_balance=True) 240 m_boot.training_config = fast_tc 241 try: 242 m_boot.fit( 243 self.X[idx], 244 self.n_claims[idx], 245 self.avg_severity[idx], 246 self.exposure[idx], 247 ) 248 gamma_boots.append(m_boot.gamma_) 249 except Exception: 250 pass 251 252 gamma_boots = np.array(gamma_boots) 253 se = gamma_boots.std() if len(gamma_boots) > 1 else np.nan 254 z = gamma_hat / se if se > 0 else np.nan 255 p_value = 2.0 * (1.0 - stats.norm.cdf(abs(z))) if not np.isnan(z) else np.nan 256 257 return { 258 "gamma": gamma_hat, 259 "gamma_se": se, 260 "z_stat": z, 261 "p_value": p_value, 262 "n_bootstrap": len(gamma_boots), 263 }
Test H₀: γ = 0 (frequency-severity independence via explicit term).
This is only meaningful when use_explicit_gamma=True. It returns
the estimated γ, its bootstrap standard error, and an approximate
z-statistic.
The test does NOT account for the implicit latent dependence from the shared trunk — that is captured in the latent correlation analysis instead.
Parameters
n_bootstrap: Number of bootstrap resamples for the standard error. seed: Random seed.
Returns
dict with keys gamma, gamma_se, z_stat, p_value.
269 def latent_correlation(self) -> Dict[str, object]: 270 """Correlation structure in the shared latent space. 271 272 Computes: 273 - Correlation matrix of latent dimensions h₁, …, h_d 274 - Correlation of each latent dim with observed frequency and severity 275 - Number of latent dims with |corr| > 0.1 with freq and with sev 276 277 Returns 278 ------- 279 dict with keys ``latent_corr``, ``freq_corr``, ``sev_corr``. 280 """ 281 h = self.model.latent_repr(self.X) # (n, d) 282 freq_actual = self.n_claims / (self.exposure + 1e-10) 283 284 pos = self.n_claims > 0 285 sev_actual = np.where(pos, self.avg_severity, np.nan) 286 287 # Latent-latent correlation matrix 288 latent_corr = np.corrcoef(h.T) # (d, d) 289 290 # Latent → frequency correlation 291 freq_corr = np.array([ 292 np.corrcoef(h[:, j], freq_actual)[0, 1] for j in range(h.shape[1]) 293 ]) 294 295 # Latent → severity correlation (positive claims only) 296 sev_corr_vals = [] 297 for j in range(h.shape[1]): 298 valid = ~np.isnan(sev_actual) & (sev_actual > 0) 299 if valid.sum() > 5: 300 sev_corr_vals.append(np.corrcoef(h[valid, j], sev_actual[valid])[0, 1]) 301 else: 302 sev_corr_vals.append(np.nan) 303 sev_corr = np.array(sev_corr_vals) 304 305 return { 306 "latent_corr": latent_corr, 307 "freq_corr": freq_corr, 308 "sev_corr": sev_corr, 309 "n_freq_active": int((np.abs(freq_corr) > 0.1).sum()), 310 "n_sev_active": int((np.abs(sev_corr[~np.isnan(sev_corr)]) > 0.1).sum()), 311 }
Correlation structure in the shared latent space.
Computes:
- Correlation matrix of latent dimensions h₁, …, h_d
- Correlation of each latent dim with observed frequency and severity
- Number of latent dims with |corr| > 0.1 with freq and with sev
Returns
dict with keys latent_corr, freq_corr, sev_corr.
317 def vs_independent( 318 self, 319 X_val: Optional[np.ndarray] = None, 320 n_claims_val: Optional[np.ndarray] = None, 321 avg_severity_val: Optional[np.ndarray] = None, 322 exposure_val: Optional[np.ndarray] = None, 323 n_mc: int = 500, 324 ) -> Dict[str, float]: 325 """Compare dependent model vs a naive independent (GLM-style) baseline. 326 327 The independent baseline ignores the shared trunk: it predicts pure 328 premium as predict_frequency * predict_severity (i.e. assumes E[Z]=E[N]·E[Y]). 329 The dependent model uses ``predict_pure_premium`` (MC or analytical). 330 331 Returns mean squared error and mean absolute error for both. 332 333 Parameters 334 ---------- 335 X_val, n_claims_val, avg_severity_val, exposure_val: 336 Held-out evaluation data. If None, uses the data passed to the 337 constructor. 338 n_mc: 339 MC samples for the dependent model. 340 """ 341 X = X_val if X_val is not None else self.X 342 n_c = n_claims_val if n_claims_val is not None else self.n_claims 343 a_s = avg_severity_val if avg_severity_val is not None else self.avg_severity 344 exp = exposure_val if exposure_val is not None else self.exposure 345 346 X = np.asarray(X, dtype=np.float32) 347 n_c = np.asarray(n_c, dtype=np.float32) 348 a_s = np.asarray(a_s, dtype=np.float32) 349 exp = np.asarray(exp, dtype=np.float32) 350 351 actual_pp = n_c * a_s / (exp + 1e-10) 352 353 # Dependent model prediction 354 dep_pp = self.model.predict_pure_premium(X, exp, n_mc=n_mc) 355 356 # Independent baseline: freq × sev 357 freq = self.model.predict_frequency(X, exp) 358 sev = self.model.predict_severity(X, exp) 359 indep_pp = freq * sev 360 361 results = {} 362 for name, pred in [("dependent", dep_pp), ("independent", indep_pp)]: 363 err = pred - actual_pp 364 results[f"{name}_mse"] = float(np.mean(err ** 2)) 365 results[f"{name}_mae"] = float(np.mean(np.abs(err))) 366 # Poisson deviance on pure premium 367 eps = 1e-10 368 y, mu = actual_pp + eps, pred + eps 369 dev = 2.0 * np.where(actual_pp > 0, y * np.log(y / mu) - (y - mu), mu - y) 370 results[f"{name}_mean_deviance"] = float(np.mean(dev)) 371 372 results["mse_reduction_pct"] = ( 373 100.0 * (results["independent_mse"] - results["dependent_mse"]) 374 / (results["independent_mse"] + 1e-10) 375 ) 376 return results
Compare dependent model vs a naive independent (GLM-style) baseline.
The independent baseline ignores the shared trunk: it predicts pure
premium as predict_frequency * predict_severity (i.e. assumes E[Z]=E[N]·E[Y]).
The dependent model uses predict_pure_premium (MC or analytical).
Returns mean squared error and mean absolute error for both.
Parameters
X_val, n_claims_val, avg_severity_val, exposure_val: Held-out evaluation data. If None, uses the data passed to the constructor. n_mc: MC samples for the dependent model.
382 def plot_lorenz( 383 self, 384 target: str = "frequency", 385 ax=None, 386 ): 387 """Plot the Lorenz concentration curve. 388 389 Parameters 390 ---------- 391 target: 392 ``"frequency"``, ``"severity"``, or ``"pure_premium"``. 393 ax: 394 Matplotlib axes to plot on. If None, a new figure is created. 395 396 Returns 397 ------- 398 fig, ax 399 """ 400 try: 401 import matplotlib.pyplot as plt 402 except ImportError: 403 raise ImportError( 404 "matplotlib is required for plotting. " 405 "Install with: pip install matplotlib" 406 ) 407 408 cum_exp, cum_loss, gini = self.lorenz_curve(target=target) 409 if ax is None: 410 fig, ax = plt.subplots(figsize=(6, 5)) 411 else: 412 fig = ax.figure 413 414 ax.plot(cum_exp, cum_loss, label=f"Model (Gini={gini:.3f})") 415 ax.plot([0, 1], [0, 1], "k--", label="Random") 416 ax.set_xlabel("Cumulative exposure fraction") 417 ax.set_ylabel(f"Cumulative {target} fraction") 418 ax.set_title(f"Lorenz curve — {target}") 419 ax.legend() 420 return fig, ax
Plot the Lorenz concentration curve.
Parameters
target:
"frequency", "severity", or "pure_premium".
ax:
Matplotlib axes to plot on. If None, a new figure is created.
Returns
fig, ax
422 def plot_calibration( 423 self, 424 target: str = "frequency", 425 n_deciles: int = 10, 426 ax=None, 427 ): 428 """Plot calibration: predicted vs observed in decile buckets. 429 430 Returns 431 ------- 432 fig, ax 433 """ 434 try: 435 import matplotlib.pyplot as plt 436 except ImportError: 437 raise ImportError("matplotlib is required for plotting.") 438 439 cal = self.calibration(target=target, n_deciles=n_deciles) 440 if ax is None: 441 fig, ax = plt.subplots(figsize=(6, 5)) 442 else: 443 fig = ax.figure 444 445 ax.scatter(cal["pred_mean"], cal["obs_mean"], zorder=3) 446 lo = min(cal["pred_mean"].min(), cal["obs_mean"].min()) 447 hi = max(cal["pred_mean"].max(), cal["obs_mean"].max()) 448 ax.plot([lo, hi], [lo, hi], "k--") 449 ax.set_xlabel("Predicted (decile mean)") 450 ax.set_ylabel("Observed (decile mean)") 451 ax.set_title(f"Calibration — {target}") 452 return fig, ax
Plot calibration: predicted vs observed in decile buckets.
Returns
fig, ax
454 def plot_training_history(self, model: Optional[DependentFSModel] = None, ax=None): 455 """Plot training and validation loss curves. 456 457 Parameters 458 ---------- 459 model: 460 The fitted model. If None, uses ``self.model``. 461 462 Returns 463 ------- 464 fig, ax 465 """ 466 try: 467 import matplotlib.pyplot as plt 468 except ImportError: 469 raise ImportError("matplotlib is required for plotting.") 470 471 m = model or self.model 472 hist = m.training_history() 473 if ax is None: 474 fig, ax = plt.subplots(figsize=(8, 4)) 475 else: 476 fig = ax.figure 477 478 ax.plot(hist["train_loss"], label="Train loss") 479 if any(v != 0 for v in hist.get("val_loss", [])): 480 ax.plot(hist["val_loss"], label="Val loss") 481 ax.set_xlabel("Epoch") 482 ax.set_ylabel("Joint loss") 483 ax.set_title("Training history") 484 ax.legend() 485 return fig, ax
Plot training and validation loss curves.
Parameters
model:
The fitted model. If None, uses self.model.
Returns
fig, ax
108class FreqSevDataset(Dataset): 109 """PyTorch Dataset for frequency-severity data. 110 111 Each item is a dict with keys ``x``, ``log_exposure``, ``n_claims``, 112 ``avg_severity``. 113 114 Parameters 115 ---------- 116 X: 117 Feature matrix, shape (n, p). Should be float32. 118 n_claims: 119 Claim counts, shape (n,). Integer or float. 120 avg_severity: 121 Average severity (total loss / n_claims), shape (n,). Zero for 122 policies with no claims. 123 exposure: 124 Exposure (years at risk), shape (n,). Must be > 0. 125 """ 126 127 def __init__( 128 self, 129 X: np.ndarray, 130 n_claims: np.ndarray, 131 avg_severity: np.ndarray, 132 exposure: np.ndarray, 133 ) -> None: 134 n = len(X) 135 if not (len(n_claims) == len(avg_severity) == len(exposure) == n): 136 raise ValueError("All arrays must have the same length.") 137 if np.any(exposure <= 0): 138 raise ValueError("All exposure values must be strictly positive.") 139 140 self.x = torch.tensor(X, dtype=torch.float32) 141 self.n_claims = torch.tensor(n_claims, dtype=torch.float32) 142 self.avg_severity = torch.tensor(avg_severity, dtype=torch.float32) 143 self.log_exposure = torch.tensor( 144 np.log(exposure.astype(np.float32)), dtype=torch.float32 145 ) 146 147 def __len__(self) -> int: 148 return len(self.x) 149 150 def __getitem__(self, idx: int) -> dict: 151 return { 152 "x": self.x[idx], 153 "log_exposure": self.log_exposure[idx], 154 "n_claims": self.n_claims[idx], 155 "avg_severity": self.avg_severity[idx], 156 } 157 158 @classmethod 159 def from_dataframe( 160 cls, 161 df: pd.DataFrame, 162 feature_cols: List[str], 163 n_claims_col: str = "n_claims", 164 avg_severity_col: str = "avg_severity", 165 exposure_col: str = "exposure", 166 ) -> "FreqSevDataset": 167 """Construct from a Pandas DataFrame (features already numeric). 168 169 Parameters 170 ---------- 171 df: 172 DataFrame with at least the feature columns plus target columns. 173 feature_cols: 174 Ordered list of feature column names. These are used as-is; 175 call ``prepare_features`` first if you need encoding. 176 n_claims_col, avg_severity_col, exposure_col: 177 Column names for the target variables. 178 """ 179 X = df[feature_cols].to_numpy(dtype=np.float32) 180 n_claims = df[n_claims_col].to_numpy(dtype=np.float32) 181 avg_sev = df[avg_severity_col].to_numpy(dtype=np.float32) 182 exposure = df[exposure_col].to_numpy(dtype=np.float32) 183 return cls(X, n_claims, avg_sev, exposure)
PyTorch Dataset for frequency-severity data.
Each item is a dict with keys x, log_exposure, n_claims,
avg_severity.
Parameters
X: Feature matrix, shape (n, p). Should be float32. n_claims: Claim counts, shape (n,). Integer or float. avg_severity: Average severity (total loss / n_claims), shape (n,). Zero for policies with no claims. exposure: Exposure (years at risk), shape (n,). Must be > 0.
127 def __init__( 128 self, 129 X: np.ndarray, 130 n_claims: np.ndarray, 131 avg_severity: np.ndarray, 132 exposure: np.ndarray, 133 ) -> None: 134 n = len(X) 135 if not (len(n_claims) == len(avg_severity) == len(exposure) == n): 136 raise ValueError("All arrays must have the same length.") 137 if np.any(exposure <= 0): 138 raise ValueError("All exposure values must be strictly positive.") 139 140 self.x = torch.tensor(X, dtype=torch.float32) 141 self.n_claims = torch.tensor(n_claims, dtype=torch.float32) 142 self.avg_severity = torch.tensor(avg_severity, dtype=torch.float32) 143 self.log_exposure = torch.tensor( 144 np.log(exposure.astype(np.float32)), dtype=torch.float32 145 )
158 @classmethod 159 def from_dataframe( 160 cls, 161 df: pd.DataFrame, 162 feature_cols: List[str], 163 n_claims_col: str = "n_claims", 164 avg_severity_col: str = "avg_severity", 165 exposure_col: str = "exposure", 166 ) -> "FreqSevDataset": 167 """Construct from a Pandas DataFrame (features already numeric). 168 169 Parameters 170 ---------- 171 df: 172 DataFrame with at least the feature columns plus target columns. 173 feature_cols: 174 Ordered list of feature column names. These are used as-is; 175 call ``prepare_features`` first if you need encoding. 176 n_claims_col, avg_severity_col, exposure_col: 177 Column names for the target variables. 178 """ 179 X = df[feature_cols].to_numpy(dtype=np.float32) 180 n_claims = df[n_claims_col].to_numpy(dtype=np.float32) 181 avg_sev = df[avg_severity_col].to_numpy(dtype=np.float32) 182 exposure = df[exposure_col].to_numpy(dtype=np.float32) 183 return cls(X, n_claims, avg_sev, exposure)
Construct from a Pandas DataFrame (features already numeric).
Parameters
df:
DataFrame with at least the feature columns plus target columns.
feature_cols:
Ordered list of feature column names. These are used as-is;
call prepare_features first if you need encoding.
n_claims_col, avg_severity_col, exposure_col:
Column names for the target variables.
38def prepare_features( 39 df: pd.DataFrame, 40 numeric_cols: List[str], 41 categorical_cols: Optional[List[str]] = None, 42 transformer: Optional[ColumnTransformer] = None, 43) -> Tuple[np.ndarray, ColumnTransformer]: 44 """Encode a DataFrame into a numeric matrix. 45 46 Numeric columns are standardised (zero mean, unit variance). Categorical 47 columns are one-hot encoded (unknown categories at inference are set to 48 all-zero rows, not an error). 49 50 Parameters 51 ---------- 52 df: 53 Input data. Must contain all columns in ``numeric_cols`` and 54 ``categorical_cols``. 55 numeric_cols: 56 Names of numeric/continuous columns. 57 categorical_cols: 58 Names of categorical columns. Pass ``None`` or ``[]`` if there are no 59 categoricals. 60 transformer: 61 A fitted ``ColumnTransformer`` from a previous call to this function. 62 Pass this when encoding held-out or test data to ensure the same 63 encoding is applied. When ``None``, a new transformer is fitted on 64 ``df``. 65 66 Returns 67 ------- 68 X: np.ndarray of shape (n, n_features_out) 69 Encoded feature matrix. 70 transformer: ColumnTransformer 71 Fitted transformer (reuse for test data). 72 73 Examples 74 -------- 75 >>> X_train, ct = prepare_features(df_train, numeric_cols=["age", "value"], 76 ... categorical_cols=["vehicle_class"]) 77 >>> X_test, _ = prepare_features(df_test, numeric_cols=["age", "value"], 78 ... categorical_cols=["vehicle_class"], 79 ... transformer=ct) 80 """ 81 categorical_cols = categorical_cols or [] 82 83 if transformer is None: 84 transformers = [] 85 if numeric_cols: 86 transformers.append( 87 ("num", StandardScaler(), numeric_cols) 88 ) 89 if categorical_cols: 90 transformers.append( 91 ( 92 "cat", 93 OneHotEncoder(handle_unknown="ignore", sparse_output=False), 94 categorical_cols, 95 ) 96 ) 97 transformer = ColumnTransformer(transformers, remainder="drop") 98 transformer.fit(df) 99 100 X = transformer.transform(df).astype(np.float32) 101 return X, transformer
Encode a DataFrame into a numeric matrix.
Numeric columns are standardised (zero mean, unit variance). Categorical columns are one-hot encoded (unknown categories at inference are set to all-zero rows, not an error).
Parameters
df:
Input data. Must contain all columns in numeric_cols and
categorical_cols.
numeric_cols:
Names of numeric/continuous columns.
categorical_cols:
Names of categorical columns. Pass None or [] if there are no
categoricals.
transformer:
A fitted ColumnTransformer from a previous call to this function.
Pass this when encoding held-out or test data to ensure the same
encoding is applied. When None, a new transformer is fitted on
df.
Returns
X: np.ndarray of shape (n, n_features_out) Encoded feature matrix. transformer: ColumnTransformer Fitted transformer (reuse for test data).
Examples
>>> X_train, ct = prepare_features(df_train, numeric_cols=["age", "value"],
... categorical_cols=["vehicle_class"])
>>> X_test, _ = prepare_features(df_test, numeric_cols=["age", "value"],
... categorical_cols=["vehicle_class"],
... transformer=ct)
37def make_dependent_claims( 38 n_policies: int = 10_000, 39 gamma: float = -0.15, 40 base_freq: float = 0.08, 41 base_sev: float = 3_000.0, 42 phi: float = 1.5, 43 n_features: int = 5, 44 seed: int = 42, 45 test_fraction: float = 0.2, 46) -> Tuple[pd.DataFrame, pd.DataFrame]: 47 """Generate synthetic motor insurance claims with known frequency-severity dependence. 48 49 Data generating process 50 ----------------------- 51 For each policy i with covariates xᵢ and exposure tᵢ: 52 53 1. log λᵢ = log(tᵢ) + β₀ + Xᵢ · β_freq (Poisson frequency) 54 2. Nᵢ ~ Poisson(λᵢ · tᵢ) 55 3. log μᵢ = α₀ + Xᵢ · β_sev + γ · Nᵢ (GGS conditional severity) 56 4. If Nᵢ > 0: Ȳᵢ ~ Gamma(shape=Nᵢ/φ, mean=μᵢ); else Ȳᵢ = 0 57 58 The parameter γ is the true dependence parameter the model should recover. 59 Negative γ (default −0.15) means higher-claim-count policies have lower 60 average severity — the pattern typically found in UK motor. 61 62 Parameters 63 ---------- 64 n_policies: 65 Number of policies in the full dataset (train + test combined). 66 gamma: 67 True dependence parameter. γ=0 gives independence; γ<0 gives negative 68 frequency-severity correlation (typical for motor). 69 base_freq: 70 Baseline claim frequency (λ at x=0, t=1). 71 base_sev: 72 Baseline average severity (μ at x=0, N=0), in pounds. 73 phi: 74 Gamma dispersion parameter φ. Higher values give more severity 75 variability. 76 n_features: 77 Number of synthetic covariates. First half affect frequency, second 78 half affect severity; they all share the same feature matrix so the 79 trunk has genuine heterogeneity to exploit. 80 seed: 81 Random seed. 82 test_fraction: 83 Fraction of data to hold out as test set. 84 85 Returns 86 ------- 87 df_train, df_test : pd.DataFrame 88 DataFrames with columns: 89 ``feature_0``, …, ``feature_{n_features-1}``, 90 ``exposure``, ``n_claims``, ``avg_severity``, ``total_loss``, 91 ``true_lambda``, ``true_mu``. 92 93 Examples 94 -------- 95 >>> df_train, df_test = make_dependent_claims(n_policies=20_000, gamma=-0.15) 96 >>> df_train.head() 97 """ 98 rng = np.random.default_rng(seed) 99 100 # --- Covariates --- 101 X = rng.standard_normal((n_policies, n_features)).astype(np.float32) 102 103 # --- Exposure: mix of short- and full-year policies --- 104 exposure = rng.choice([0.25, 0.5, 0.75, 1.0, 1.0, 1.0], size=n_policies).astype(np.float32) 105 106 # --- Regression coefficients --- 107 n_freq_feats = max(1, n_features // 2) 108 n_sev_feats = n_features - n_freq_feats 109 beta_freq = rng.uniform(-0.3, 0.3, size=n_freq_feats).astype(np.float32) 110 beta_sev = rng.uniform(-0.2, 0.2, size=n_sev_feats).astype(np.float32) 111 112 # --- Frequency --- 113 log_lambda_base = np.log(base_freq) + X[:, :n_freq_feats] @ beta_freq 114 lambda_ = np.exp(log_lambda_base) * exposure 115 n_claims = rng.poisson(lambda_).astype(np.float32) 116 117 # --- Conditional severity --- 118 log_mu = np.log(base_sev) + X[:, n_freq_feats:] @ beta_sev + gamma * n_claims 119 mu = np.exp(log_mu) 120 121 avg_severity = np.zeros(n_policies, dtype=np.float32) 122 pos = n_claims > 0 123 if pos.sum() > 0: 124 alpha = n_claims[pos] / phi 125 rate = n_claims[pos] / (phi * mu[pos] + 1e-10) 126 avg_severity[pos] = rng.gamma(shape=alpha, scale=1.0 / rate).astype(np.float32) 127 128 total_loss = n_claims * avg_severity 129 130 feature_cols = [f"feature_{i}" for i in range(n_features)] 131 df = pd.DataFrame(X, columns=feature_cols) 132 df["exposure"] = exposure 133 df["n_claims"] = n_claims 134 df["avg_severity"] = avg_severity 135 df["total_loss"] = total_loss 136 df["true_lambda"] = lambda_ / exposure # per unit exposure 137 df["true_mu"] = mu 138 139 # --- Split --- 140 n_test = max(1, int(n_policies * test_fraction)) 141 df_test = df.iloc[-n_test:].copy().reset_index(drop=True) 142 df_train = df.iloc[:-n_test].copy().reset_index(drop=True) 143 144 return df_train, df_test
Generate synthetic motor insurance claims with known frequency-severity dependence.
Data generating process
For each policy i with covariates xᵢ and exposure tᵢ:
- log λᵢ = log(tᵢ) + β₀ + Xᵢ · β_freq (Poisson frequency)
- Nᵢ ~ Poisson(λᵢ · tᵢ)
- log μᵢ = α₀ + Xᵢ · β_sev + γ · Nᵢ (GGS conditional severity)
- If Nᵢ > 0: Ȳᵢ ~ Gamma(shape=Nᵢ/φ, mean=μᵢ); else Ȳᵢ = 0
The parameter γ is the true dependence parameter the model should recover. Negative γ (default −0.15) means higher-claim-count policies have lower average severity — the pattern typically found in UK motor.
Parameters
n_policies: Number of policies in the full dataset (train + test combined). gamma: True dependence parameter. γ=0 gives independence; γ<0 gives negative frequency-severity correlation (typical for motor). base_freq: Baseline claim frequency (λ at x=0, t=1). base_sev: Baseline average severity (μ at x=0, N=0), in pounds. phi: Gamma dispersion parameter φ. Higher values give more severity variability. n_features: Number of synthetic covariates. First half affect frequency, second half affect severity; they all share the same feature matrix so the trunk has genuine heterogeneity to exploit. seed: Random seed. test_fraction: Fraction of data to hold out as test set.
Returns
df_train, df_test : pd.DataFrame
DataFrames with columns:
feature_0, …, feature_{n_features-1},
exposure, n_claims, avg_severity, total_loss,
true_lambda, true_mu.
Examples
>>> df_train, df_test = make_dependent_claims(n_policies=20_000, gamma=-0.15)
>>> df_train.head()
147def make_independent_claims( 148 n_policies: int = 10_000, 149 base_freq: float = 0.08, 150 base_sev: float = 3_000.0, 151 phi: float = 1.5, 152 n_features: int = 5, 153 seed: int = 42, 154 test_fraction: float = 0.2, 155) -> Tuple[pd.DataFrame, pd.DataFrame]: 156 """Generate synthetic claims with γ=0 (frequency-severity independence). 157 158 Identical to ``make_dependent_claims`` with ``gamma=0``. Use this as the 159 null comparison to verify the model does not overfit spurious dependence. 160 161 Parameters 162 ---------- 163 n_policies, base_freq, base_sev, phi, n_features, seed, test_fraction: 164 Same as ``make_dependent_claims``. 165 166 Returns 167 ------- 168 df_train, df_test : pd.DataFrame 169 """ 170 return make_dependent_claims( 171 n_policies=n_policies, 172 gamma=0.0, 173 base_freq=base_freq, 174 base_sev=base_sev, 175 phi=phi, 176 n_features=n_features, 177 seed=seed, 178 test_fraction=test_fraction, 179 )
Generate synthetic claims with γ=0 (frequency-severity independence).
Identical to make_dependent_claims with gamma=0. Use this as the
null comparison to verify the model does not overfit spurious dependence.
Parameters
n_policies, base_freq, base_sev, phi, n_features, seed, test_fraction:
Same as make_dependent_claims.
Returns
df_train, df_test : pd.DataFrame