insurance_telematics
insurance-telematics
End-to-end pipeline from raw telematics trip data to GLM-compatible risk scores for UK motor insurance pricing.
Pipeline stages:
- Load — trip_loader.load_trips()
- Clean — preprocessor.clean_trips()
- Score — feature_extractor.extract_trip_features()
- Model — hmm_model.DrivingStateHMM
- Aggregate — risk_aggregator.aggregate_to_driver()
- Price — scoring_pipeline.TelematicsScoringPipeline
Synthetic data for testing and prototyping:
trip_simulator.TripSimulator
Academic basis:
Jiang & Shi (2024), NAAJ 28(4), pp.822-839 Wüthrich (2017), European Actuarial Journal 7, pp.89-108
1""" 2insurance-telematics 3==================== 4 5End-to-end pipeline from raw telematics trip data to GLM-compatible risk scores 6for UK motor insurance pricing. 7 8Pipeline stages: 9 1. Load — trip_loader.load_trips() 10 2. Clean — preprocessor.clean_trips() 11 3. Score — feature_extractor.extract_trip_features() 12 4. Model — hmm_model.DrivingStateHMM 13 5. Aggregate — risk_aggregator.aggregate_to_driver() 14 6. Price — scoring_pipeline.TelematicsScoringPipeline 15 16Synthetic data for testing and prototyping: 17 trip_simulator.TripSimulator 18 19Academic basis: 20 Jiang & Shi (2024), NAAJ 28(4), pp.822-839 21 Wüthrich (2017), European Actuarial Journal 7, pp.89-108 22""" 23 24from importlib.metadata import version, PackageNotFoundError 25 26try: 27 __version__ = version("insurance-telematics") 28except PackageNotFoundError: 29 __version__ = "0.1.0" 30 31from .trip_loader import load_trips 32from .preprocessor import clean_trips 33from .feature_extractor import extract_trip_features 34from .hmm_model import DrivingStateHMM, ContinuousTimeHMM 35from .risk_aggregator import aggregate_to_driver 36from .scoring_pipeline import TelematicsScoringPipeline, score_trips 37from .trip_simulator import TripSimulator 38 39__all__ = [ 40 "load_trips", 41 "clean_trips", 42 "extract_trip_features", 43 "DrivingStateHMM", 44 "ContinuousTimeHMM", 45 "aggregate_to_driver", 46 "TelematicsScoringPipeline", 47 "score_trips", 48 "TripSimulator", 49]
51def load_trips( 52 path: str | os.PathLike, 53 *, 54 schema: dict[str, pl.DataType] | None = None, 55) -> pl.DataFrame: 56 """ 57 Load telematics trip data from CSV or Parquet, validate schema, and 58 return a normalised Polars DataFrame. 59 60 Parameters 61 ---------- 62 path: 63 Path to a CSV or Parquet file, or a directory containing Parquet 64 files (all will be read and concatenated). 65 schema: 66 Optional column name overrides for non-standard source files. 67 Maps source column names to the standard column names. For example: 68 ``{"gps_speed": "speed_kmh", "accel_x": "acceleration_ms2"}``. 69 70 Returns 71 ------- 72 pl.DataFrame 73 Normalised DataFrame with standard column names. The ``timestamp`` 74 column is cast to ``Datetime[us, UTC]``. Missing optional columns 75 are added with null values. Rows are sorted by ``trip_id`` then 76 ``timestamp``. 77 78 Raises 79 ------ 80 ValueError 81 If any required column is missing after applying the schema mapping. 82 FileNotFoundError 83 If the path does not exist. 84 85 Examples 86 -------- 87 >>> df = load_trips("trips.csv") 88 >>> df = load_trips("trips.parquet", schema={"gps_speed": "speed_kmh"}) 89 """ 90 path = Path(path) 91 if not path.exists(): 92 raise FileNotFoundError(f"Path not found: {path}") 93 94 if path.is_dir(): 95 parquet_files = list(path.glob("*.parquet")) 96 if not parquet_files: 97 raise ValueError(f"No Parquet files found in directory: {path}") 98 df = pl.concat([pl.read_parquet(f) for f in sorted(parquet_files)]) 99 elif path.suffix.lower() in {".parquet", ".pq"}: 100 df = pl.read_parquet(path) 101 elif path.suffix.lower() in {".csv", ".tsv"}: 102 df = pl.read_csv(path, try_parse_dates=True) 103 else: 104 raise ValueError( 105 f"Unsupported file type '{path.suffix}'. Use CSV or Parquet." 106 ) 107 108 # Apply column renames from schema mapping 109 if schema: 110 df = df.rename({src: dst for src, dst in schema.items() if src in df.columns}) 111 112 # Validate required columns 113 missing = [c for c in REQUIRED_COLUMNS if c not in df.columns] 114 if missing: 115 raise ValueError( 116 f"Missing required columns: {missing}. " 117 f"Use the 'schema' parameter to map source column names." 118 ) 119 120 # Ensure timestamp is datetime 121 if df["timestamp"].dtype == pl.Utf8 or df["timestamp"].dtype == pl.String: 122 df = df.with_columns( 123 pl.col("timestamp").str.to_datetime( 124 format=None, strict=False 125 ) 126 ) 127 128 if df["timestamp"].dtype not in (pl.Datetime("us", "UTC"), pl.Datetime("ns", "UTC")): 129 # Cast to microsecond UTC if not already timezone-aware 130 if hasattr(df["timestamp"].dtype, "time_zone") and df["timestamp"].dtype.time_zone: 131 df = df.with_columns( 132 pl.col("timestamp").dt.convert_time_zone("UTC").dt.cast_time_unit("us") 133 ) 134 else: 135 df = df.with_columns( 136 pl.col("timestamp") 137 .dt.replace_time_zone("UTC") 138 .dt.cast_time_unit("us") 139 ) 140 141 # Add missing optional columns as nulls 142 for col in OPTIONAL_COLUMNS: 143 if col not in df.columns: 144 if col == "driver_id": 145 df = df.with_columns(pl.lit("unknown").alias("driver_id")) 146 else: 147 df = df.with_columns(pl.lit(None).cast(pl.Float64).alias(col)) 148 149 # Enforce numeric types for key columns 150 numeric_cols = ["latitude", "longitude", "speed_kmh", "acceleration_ms2", "heading_deg"] 151 cast_exprs = [ 152 pl.col(c).cast(pl.Float64) for c in numeric_cols if c in df.columns 153 ] 154 if cast_exprs: 155 df = df.with_columns(cast_exprs) 156 157 # Sort for deterministic processing 158 df = df.sort(["trip_id", "timestamp"]) 159 160 return df
Load telematics trip data from CSV or Parquet, validate schema, and return a normalised Polars DataFrame.
Parameters
path:
Path to a CSV or Parquet file, or a directory containing Parquet
files (all will be read and concatenated).
schema:
Optional column name overrides for non-standard source files.
Maps source column names to the standard column names. For example:
{"gps_speed": "speed_kmh", "accel_x": "acceleration_ms2"}.
Returns
pl.DataFrame
Normalised DataFrame with standard column names. The timestamp
column is cast to Datetime[us, UTC]. Missing optional columns
are added with null values. Rows are sorted by trip_id then
timestamp.
Raises
ValueError If any required column is missing after applying the schema mapping. FileNotFoundError If the path does not exist.
Examples
>>> df = load_trips("trips.csv")
>>> df = load_trips("trips.parquet", schema={"gps_speed": "speed_kmh"})
32def clean_trips(df: pl.DataFrame) -> pl.DataFrame: 33 """ 34 Clean and enrich a raw telematics DataFrame. 35 36 Applies the following steps per trip, in order: 37 38 1. Remove GPS jump rows (speed > 250 km/h). 39 2. Interpolate speed for small gaps (< 5 s within a trip). 40 3. Derive acceleration from speed differences if the 41 ``acceleration_ms2`` column is null or absent. 42 4. Derive jerk (m/s³) as the first difference of acceleration. 43 5. Classify each observation into a road type 44 (``urban`` / ``rural`` / ``motorway``) from speed. 45 46 Parameters 47 ---------- 48 df: 49 Raw trip DataFrame as returned by :func:`~insurance_telematics.load_trips`. 50 Must contain ``trip_id``, ``timestamp``, and ``speed_kmh``. 51 52 Returns 53 ------- 54 pl.DataFrame 55 Cleaned DataFrame with additional columns: 56 ``acceleration_ms2`` (if derived), ``jerk_ms3``, ``road_type``. 57 58 Notes 59 ----- 60 The road type heuristic (urban < 50 km/h, rural 50-100, motorway > 100) 61 is a proxy. It works well in aggregate but misclassifies individual 62 observations — a momentary slow-down on a motorway reads as urban. 63 Trip-level road type statistics (fraction of time in each band) are 64 more robust than per-row labels. 65 """ 66 df = _remove_gps_jumps(df) 67 df = _interpolate_speed_gaps(df) 68 df = _derive_acceleration(df) 69 df = _derive_jerk(df) 70 df = _classify_road_type(df) 71 return df
Clean and enrich a raw telematics DataFrame.
Applies the following steps per trip, in order:
- Remove GPS jump rows (speed > 250 km/h).
- Interpolate speed for small gaps (< 5 s within a trip).
- Derive acceleration from speed differences if the
acceleration_ms2column is null or absent. - Derive jerk (m/s³) as the first difference of acceleration.
- Classify each observation into a road type
(
urban/rural/motorway) from speed.
Parameters
df:
Raw trip DataFrame as returned by ~insurance_telematics.load_trips().
Must contain trip_id, timestamp, and speed_kmh.
Returns
pl.DataFrame
Cleaned DataFrame with additional columns:
acceleration_ms2 (if derived), jerk_ms3, road_type.
Notes
The road type heuristic (urban < 50 km/h, rural 50-100, motorway > 100) is a proxy. It works well in aggregate but misclassifies individual observations — a momentary slow-down on a motorway reads as urban. Trip-level road type statistics (fraction of time in each band) are more robust than per-row labels.
48def extract_trip_features(df: pl.DataFrame) -> pl.DataFrame: 49 """ 50 Compute one summary row per trip from cleaned 1Hz observations. 51 52 Parameters 53 ---------- 54 df: 55 Cleaned trip DataFrame as returned by 56 :func:`~insurance_telematics.clean_trips`. Must contain: 57 ``trip_id``, ``timestamp``, ``speed_kmh``, ``acceleration_ms2``, 58 ``road_type``. 59 60 Returns 61 ------- 62 pl.DataFrame 63 One row per ``trip_id`` with columns: 64 65 ``trip_id``, ``distance_km``, ``duration_min``, ``mean_speed_kmh``, 66 ``p95_speed_kmh``, ``speed_variation_coeff``, ``harsh_braking_rate``, 67 ``harsh_accel_rate``, ``harsh_cornering_rate``, ``speeding_fraction``, 68 ``night_driving_fraction``, ``urban_fraction``, ``driver_id`` 69 (if present in input). 70 71 Notes 72 ----- 73 Distance is estimated as the integral of speed over time at 1Hz: 74 sum(speed_kmh) / 3600. This is a reasonable proxy when GPS coordinates 75 are noisy or unavailable for Haversine distance computation. 76 """ 77 _check_required(df) 78 79 has_driver_id = "driver_id" in df.columns 80 has_heading = "heading_deg" in df.columns and df["heading_deg"].null_count() < len(df) 81 82 # Compute per-row flags in a single with_columns call for efficiency 83 flags = [ 84 # Harsh braking flag 85 (pl.col("acceleration_ms2") < _HARSH_BRAKE_THRESHOLD) 86 .cast(pl.Int32) 87 .alias("_harsh_brake"), 88 # Harsh acceleration flag 89 (pl.col("acceleration_ms2") > _HARSH_ACCEL_THRESHOLD) 90 .cast(pl.Int32) 91 .alias("_harsh_accel"), 92 # Night flag 93 pl.col("timestamp") 94 .dt.hour() 95 .is_in(list(_NIGHT_HOURS)) 96 .cast(pl.Int32) 97 .alias("_night"), 98 # Urban flag 99 (pl.col("road_type") == "urban") 100 .cast(pl.Int32) 101 .alias("_urban"), 102 # Speeding flag — road-type-specific threshold 103 pl.when(pl.col("road_type") == "urban") 104 .then(pl.col("speed_kmh") > _SPEED_LIMIT_URBAN) 105 .when(pl.col("road_type") == "rural") 106 .then(pl.col("speed_kmh") > _SPEED_LIMIT_RURAL) 107 .otherwise(pl.col("speed_kmh") > _SPEED_LIMIT_MOTORWAY) 108 .cast(pl.Int32) 109 .alias("_speeding"), 110 # Distance increment: speed_kmh / 3600 = km per second at 1Hz 111 (pl.col("speed_kmh") / 3600.0).alias("_dist_km_increment"), 112 ] 113 114 if has_heading: 115 # Cornering proxy: heading change rate (deg/s) × speed (km/h) 116 # Proportional to lateral acceleration; calibrated empirically. 117 flags.append( 118 ( 119 pl.col("heading_deg") 120 .diff() 121 .over("trip_id") 122 .abs() 123 .fill_null(0.0) 124 * pl.col("speed_kmh") 125 * _HARSH_CORNER_DEG_PER_S_KMPH 126 > 1.0 127 ) 128 .cast(pl.Int32) 129 .alias("_harsh_corner"), 130 ) 131 else: 132 flags.append(pl.lit(0).cast(pl.Int32).alias("_harsh_corner")) 133 134 df = df.with_columns(flags) 135 136 # Aggregate per trip 137 group_cols = ["trip_id"] 138 if has_driver_id: 139 # Carry driver_id through — take first value (it's constant per trip) 140 agg_extra = [pl.col("driver_id").first().alias("driver_id")] 141 else: 142 agg_extra = [] 143 144 agg = df.group_by("trip_id").agg( 145 [ 146 # Basic trip stats 147 pl.len().alias("n_obs"), 148 (pl.col("_dist_km_increment").sum()).alias("distance_km"), 149 (pl.len() / 60.0).alias("duration_min"), 150 pl.col("speed_kmh").mean().alias("mean_speed_kmh"), 151 pl.col("speed_kmh").quantile(0.95).alias("p95_speed_kmh"), 152 pl.col("speed_kmh").std().alias("_speed_std"), 153 # Event counts (will divide by distance below) 154 pl.col("_harsh_brake").sum().alias("_n_harsh_brake"), 155 pl.col("_harsh_accel").sum().alias("_n_harsh_accel"), 156 pl.col("_harsh_corner").sum().alias("_n_harsh_corner"), 157 # Fraction-based features 158 (pl.col("_speeding").mean()).alias("speeding_fraction"), 159 (pl.col("_night").mean()).alias("night_driving_fraction"), 160 (pl.col("_urban").mean()).alias("urban_fraction"), 161 ] 162 + agg_extra 163 ) 164 165 # Derive final rate features 166 agg = agg.with_columns( 167 [ 168 # Speed coefficient of variation — std / mean, clamped to avoid div/0 169 ( 170 pl.col("_speed_std") 171 / (pl.col("mean_speed_kmh") + 1e-6) 172 ).alias("speed_variation_coeff"), 173 # Event rates per km — clamp distance to 0.01 km minimum 174 ( 175 pl.col("_n_harsh_brake") 176 / (pl.col("distance_km").clip(0.01)) 177 ).alias("harsh_braking_rate"), 178 ( 179 pl.col("_n_harsh_accel") 180 / (pl.col("distance_km").clip(0.01)) 181 ).alias("harsh_accel_rate"), 182 ( 183 pl.col("_n_harsh_corner") 184 / (pl.col("distance_km").clip(0.01)) 185 ).alias("harsh_cornering_rate"), 186 ] 187 ) 188 189 # Select and order final output columns 190 output_cols = [ 191 "trip_id", 192 "distance_km", 193 "duration_min", 194 "mean_speed_kmh", 195 "p95_speed_kmh", 196 "speed_variation_coeff", 197 "harsh_braking_rate", 198 "harsh_accel_rate", 199 "harsh_cornering_rate", 200 "speeding_fraction", 201 "night_driving_fraction", 202 "urban_fraction", 203 ] 204 if has_driver_id: 205 output_cols.append("driver_id") 206 207 return agg.select(output_cols).sort("trip_id")
Compute one summary row per trip from cleaned 1Hz observations.
Parameters
df:
Cleaned trip DataFrame as returned by
~insurance_telematics.clean_trips(). Must contain:
trip_id, timestamp, speed_kmh, acceleration_ms2,
road_type.
Returns
pl.DataFrame
One row per trip_id with columns:
``trip_id``, ``distance_km``, ``duration_min``, ``mean_speed_kmh``,
``p95_speed_kmh``, ``speed_variation_coeff``, ``harsh_braking_rate``,
``harsh_accel_rate``, ``harsh_cornering_rate``, ``speeding_fraction``,
``night_driving_fraction``, ``urban_fraction``, ``driver_id``
(if present in input).
Notes
Distance is estimated as the integral of speed over time at 1Hz: sum(speed_kmh) / 3600. This is a reasonable proxy when GPS coordinates are noisy or unavailable for Haversine distance computation.
57class DrivingStateHMM: 58 """ 59 Discrete-time Gaussian HMM for classifying driving trip sequences into 60 latent driving states (e.g. cautious / normal / aggressive). 61 62 Wraps ``hmmlearn.GaussianHMM`` with state-ordering normalisation and 63 a driver-level feature extraction step. 64 65 Parameters 66 ---------- 67 n_states: 68 Number of latent states. Default 3. With three states the literature 69 consistently produces a cautious / normal / aggressive partition. 70 features: 71 Trip-level feature columns to use as HMM observations. Defaults to 72 mean speed, speed variation, harsh braking rate, harsh acceleration. 73 n_iter: 74 Maximum EM iterations for hmmlearn (default 200). 75 covariance_type: 76 Covariance type passed to GaussianHMM (default "diag"). 77 random_state: 78 Random seed for reproducibility. 79 80 Examples 81 -------- 82 >>> model = DrivingStateHMM(n_states=3) 83 >>> model.fit(trip_features_df) 84 >>> states = model.predict_states(trip_features_df) 85 >>> driver_features = model.driver_state_features(trip_features_df, states) 86 """ 87 88 def __init__( 89 self, 90 n_states: int = 3, 91 features: list[str] | None = None, 92 n_iter: int = 200, 93 covariance_type: str = "diag", 94 random_state: int = 42, 95 ) -> None: 96 if not _HMMLEARN_AVAILABLE: 97 raise ImportError( 98 "hmmlearn is required for DrivingStateHMM. " 99 "Install it with: pip install hmmlearn" 100 ) 101 102 self.n_states = n_states 103 self.features = features or _DEFAULT_HMM_FEATURES 104 self.n_iter = n_iter 105 self.covariance_type = covariance_type 106 self.random_state = random_state 107 108 self._model: Optional[hmmlearn_hmm.GaussianHMM] = None 109 self._state_order: Optional[np.ndarray] = None # indices sorted by mean speed 110 self.is_fitted: bool = False 111 112 def fit(self, trip_features: pl.DataFrame) -> "DrivingStateHMM": 113 """ 114 Fit the HMM to a DataFrame of trip-level scalar features. 115 116 Each row is treated as one observation. For a per-driver sequence, 117 pass only that driver's trips (ordered by timestamp). For a 118 portfolio-level fit, pass all trips — hmmlearn handles the full 119 sequence as one long chain, which is appropriate for population-level 120 state estimation. 121 122 Parameters 123 ---------- 124 trip_features: 125 DataFrame from :func:`~insurance_telematics.extract_trip_features`. 126 Must contain the columns listed in ``self.features``. 127 128 Returns 129 ------- 130 self 131 """ 132 X = self._to_matrix(trip_features) 133 X = self._standardise(X, fit=True) 134 135 model = hmmlearn_hmm.GaussianHMM( 136 n_components=self.n_states, 137 covariance_type=self.covariance_type, 138 n_iter=self.n_iter, 139 random_state=self.random_state, 140 verbose=False, 141 ) 142 with warnings.catch_warnings(): 143 warnings.simplefilter("ignore") 144 model.fit(X) 145 146 self._model = model 147 148 # Determine state ordering by the mean speed dimension (feature index 0) 149 # so that state 0 is always the lowest-speed (most cautious) state 150 speed_means = model.means_[:, 0] 151 self._state_order = np.argsort(speed_means) 152 self._state_rank = np.empty_like(self._state_order) 153 self._state_rank[self._state_order] = np.arange(self.n_states) 154 155 self.is_fitted = True 156 return self 157 158 def predict_states(self, trip_features: pl.DataFrame) -> np.ndarray: 159 """ 160 Decode the most likely state sequence for a set of trips. 161 162 Parameters 163 ---------- 164 trip_features: 165 Same format as used for ``fit()``. 166 167 Returns 168 ------- 169 np.ndarray of shape (n_trips,) 170 Integer state labels 0..n_states-1, where 0 = most cautious 171 and n_states-1 = most aggressive. 172 """ 173 self._check_fitted() 174 X = self._to_matrix(trip_features) 175 X = self._standardise(X, fit=False) 176 raw_states = self._model.predict(X) 177 # Remap to ordered states 178 return self._state_rank[raw_states] 179 180 def predict_state_probs(self, trip_features: pl.DataFrame) -> np.ndarray: 181 """ 182 Compute posterior state probabilities for each trip observation. 183 184 Parameters 185 ---------- 186 trip_features: 187 Same format as used for ``fit()``. 188 189 Returns 190 ------- 191 np.ndarray of shape (n_trips, n_states) 192 Posterior probabilities, summing to 1.0 across states per row. 193 Column ordering matches the ordered states (0 = most cautious). 194 """ 195 self._check_fitted() 196 X = self._to_matrix(trip_features) 197 X = self._standardise(X, fit=False) 198 _, posteriors = self._model.score_samples(X) 199 # Reorder columns to match state ordering 200 return posteriors[:, self._state_order] 201 202 def driver_state_features( 203 self, 204 trip_features: pl.DataFrame, 205 states: np.ndarray, 206 ) -> pl.DataFrame: 207 """ 208 Compute per-driver HMM-derived features for use as GLM covariates. 209 210 Parameters 211 ---------- 212 trip_features: 213 Trip-level features DataFrame. Must contain ``driver_id``. 214 states: 215 State sequence as returned by :meth:`predict_states`. 216 217 Returns 218 ------- 219 pl.DataFrame 220 One row per driver with columns: 221 222 - ``state_{k}_fraction`` for k in 0..n_states-1 223 - ``mean_transition_rate`` — average state transitions per km 224 - ``state_entropy`` — Shannon entropy of state distribution 225 - ``driver_id`` 226 227 Notes 228 ----- 229 State fractions are the primary actuarial risk features. Following 230 Jiang & Shi (2024), the fraction of time spent in the highest-risk 231 state (state ``n_states - 1``) is the most predictive of claim 232 frequency. 233 """ 234 if "driver_id" not in trip_features.columns: 235 raise ValueError( 236 "trip_features must contain 'driver_id' for driver-level aggregation." 237 ) 238 239 df = trip_features.with_columns(pl.Series("_state", states.tolist())) 240 241 # Count state occurrences per driver 242 driver_rows = [] 243 driver_ids = df["driver_id"].unique().sort().to_list() 244 245 for driver_id in driver_ids: 246 mask = df["driver_id"] == driver_id 247 driver_df = df.filter(mask) 248 driver_states = driver_df["_state"].to_numpy() 249 n_trips = len(driver_states) 250 251 # State fractions 252 state_counts = np.bincount(driver_states, minlength=self.n_states) 253 state_fractions = state_counts / max(n_trips, 1) 254 255 # Transition rate: number of state changes per km 256 n_transitions = np.sum(np.diff(driver_states) != 0) 257 total_km = driver_df["distance_km"].sum() if "distance_km" in driver_df.columns else n_trips 258 transition_rate = n_transitions / max(total_km, 0.01) 259 260 # State entropy (Shannon) 261 state_entropy = float(scipy_entropy(state_fractions)) 262 263 row = {"driver_id": driver_id} 264 for k in range(self.n_states): 265 row[f"state_{k}_fraction"] = float(state_fractions[k]) 266 row["mean_transition_rate"] = float(transition_rate) 267 row["state_entropy"] = float(state_entropy) 268 269 driver_rows.append(row) 270 271 return pl.DataFrame(driver_rows) 272 273 def _to_matrix(self, trip_features: pl.DataFrame) -> np.ndarray: 274 """Extract feature columns as float64 numpy array.""" 275 missing = [c for c in self.features if c not in trip_features.columns] 276 if missing: 277 raise ValueError(f"Feature columns missing from DataFrame: {missing}") 278 return trip_features.select(self.features).to_numpy().astype(np.float64) 279 280 def _standardise(self, X: np.ndarray, *, fit: bool) -> np.ndarray: 281 """Z-score standardise features. Fit mean/std on first call.""" 282 if fit: 283 self._mean = np.nanmean(X, axis=0) 284 self._std = np.nanstd(X, axis=0) + 1e-8 285 return (X - self._mean) / self._std 286 287 def _check_fitted(self) -> None: 288 if not self.is_fitted: 289 raise RuntimeError("Call fit() before predict_states().")
Discrete-time Gaussian HMM for classifying driving trip sequences into latent driving states (e.g. cautious / normal / aggressive).
Wraps hmmlearn.GaussianHMM with state-ordering normalisation and
a driver-level feature extraction step.
Parameters
n_states: Number of latent states. Default 3. With three states the literature consistently produces a cautious / normal / aggressive partition. features: Trip-level feature columns to use as HMM observations. Defaults to mean speed, speed variation, harsh braking rate, harsh acceleration. n_iter: Maximum EM iterations for hmmlearn (default 200). covariance_type: Covariance type passed to GaussianHMM (default "diag"). random_state: Random seed for reproducibility.
Examples
>>> model = DrivingStateHMM(n_states=3)
>>> model.fit(trip_features_df)
>>> states = model.predict_states(trip_features_df)
>>> driver_features = model.driver_state_features(trip_features_df, states)
88 def __init__( 89 self, 90 n_states: int = 3, 91 features: list[str] | None = None, 92 n_iter: int = 200, 93 covariance_type: str = "diag", 94 random_state: int = 42, 95 ) -> None: 96 if not _HMMLEARN_AVAILABLE: 97 raise ImportError( 98 "hmmlearn is required for DrivingStateHMM. " 99 "Install it with: pip install hmmlearn" 100 ) 101 102 self.n_states = n_states 103 self.features = features or _DEFAULT_HMM_FEATURES 104 self.n_iter = n_iter 105 self.covariance_type = covariance_type 106 self.random_state = random_state 107 108 self._model: Optional[hmmlearn_hmm.GaussianHMM] = None 109 self._state_order: Optional[np.ndarray] = None # indices sorted by mean speed 110 self.is_fitted: bool = False
112 def fit(self, trip_features: pl.DataFrame) -> "DrivingStateHMM": 113 """ 114 Fit the HMM to a DataFrame of trip-level scalar features. 115 116 Each row is treated as one observation. For a per-driver sequence, 117 pass only that driver's trips (ordered by timestamp). For a 118 portfolio-level fit, pass all trips — hmmlearn handles the full 119 sequence as one long chain, which is appropriate for population-level 120 state estimation. 121 122 Parameters 123 ---------- 124 trip_features: 125 DataFrame from :func:`~insurance_telematics.extract_trip_features`. 126 Must contain the columns listed in ``self.features``. 127 128 Returns 129 ------- 130 self 131 """ 132 X = self._to_matrix(trip_features) 133 X = self._standardise(X, fit=True) 134 135 model = hmmlearn_hmm.GaussianHMM( 136 n_components=self.n_states, 137 covariance_type=self.covariance_type, 138 n_iter=self.n_iter, 139 random_state=self.random_state, 140 verbose=False, 141 ) 142 with warnings.catch_warnings(): 143 warnings.simplefilter("ignore") 144 model.fit(X) 145 146 self._model = model 147 148 # Determine state ordering by the mean speed dimension (feature index 0) 149 # so that state 0 is always the lowest-speed (most cautious) state 150 speed_means = model.means_[:, 0] 151 self._state_order = np.argsort(speed_means) 152 self._state_rank = np.empty_like(self._state_order) 153 self._state_rank[self._state_order] = np.arange(self.n_states) 154 155 self.is_fitted = True 156 return self
Fit the HMM to a DataFrame of trip-level scalar features.
Each row is treated as one observation. For a per-driver sequence, pass only that driver's trips (ordered by timestamp). For a portfolio-level fit, pass all trips — hmmlearn handles the full sequence as one long chain, which is appropriate for population-level state estimation.
Parameters
trip_features:
DataFrame from ~insurance_telematics.extract_trip_features().
Must contain the columns listed in self.features.
Returns
self
158 def predict_states(self, trip_features: pl.DataFrame) -> np.ndarray: 159 """ 160 Decode the most likely state sequence for a set of trips. 161 162 Parameters 163 ---------- 164 trip_features: 165 Same format as used for ``fit()``. 166 167 Returns 168 ------- 169 np.ndarray of shape (n_trips,) 170 Integer state labels 0..n_states-1, where 0 = most cautious 171 and n_states-1 = most aggressive. 172 """ 173 self._check_fitted() 174 X = self._to_matrix(trip_features) 175 X = self._standardise(X, fit=False) 176 raw_states = self._model.predict(X) 177 # Remap to ordered states 178 return self._state_rank[raw_states]
Decode the most likely state sequence for a set of trips.
Parameters
trip_features:
Same format as used for fit().
Returns
np.ndarray of shape (n_trips,) Integer state labels 0..n_states-1, where 0 = most cautious and n_states-1 = most aggressive.
180 def predict_state_probs(self, trip_features: pl.DataFrame) -> np.ndarray: 181 """ 182 Compute posterior state probabilities for each trip observation. 183 184 Parameters 185 ---------- 186 trip_features: 187 Same format as used for ``fit()``. 188 189 Returns 190 ------- 191 np.ndarray of shape (n_trips, n_states) 192 Posterior probabilities, summing to 1.0 across states per row. 193 Column ordering matches the ordered states (0 = most cautious). 194 """ 195 self._check_fitted() 196 X = self._to_matrix(trip_features) 197 X = self._standardise(X, fit=False) 198 _, posteriors = self._model.score_samples(X) 199 # Reorder columns to match state ordering 200 return posteriors[:, self._state_order]
Compute posterior state probabilities for each trip observation.
Parameters
trip_features:
Same format as used for fit().
Returns
np.ndarray of shape (n_trips, n_states) Posterior probabilities, summing to 1.0 across states per row. Column ordering matches the ordered states (0 = most cautious).
202 def driver_state_features( 203 self, 204 trip_features: pl.DataFrame, 205 states: np.ndarray, 206 ) -> pl.DataFrame: 207 """ 208 Compute per-driver HMM-derived features for use as GLM covariates. 209 210 Parameters 211 ---------- 212 trip_features: 213 Trip-level features DataFrame. Must contain ``driver_id``. 214 states: 215 State sequence as returned by :meth:`predict_states`. 216 217 Returns 218 ------- 219 pl.DataFrame 220 One row per driver with columns: 221 222 - ``state_{k}_fraction`` for k in 0..n_states-1 223 - ``mean_transition_rate`` — average state transitions per km 224 - ``state_entropy`` — Shannon entropy of state distribution 225 - ``driver_id`` 226 227 Notes 228 ----- 229 State fractions are the primary actuarial risk features. Following 230 Jiang & Shi (2024), the fraction of time spent in the highest-risk 231 state (state ``n_states - 1``) is the most predictive of claim 232 frequency. 233 """ 234 if "driver_id" not in trip_features.columns: 235 raise ValueError( 236 "trip_features must contain 'driver_id' for driver-level aggregation." 237 ) 238 239 df = trip_features.with_columns(pl.Series("_state", states.tolist())) 240 241 # Count state occurrences per driver 242 driver_rows = [] 243 driver_ids = df["driver_id"].unique().sort().to_list() 244 245 for driver_id in driver_ids: 246 mask = df["driver_id"] == driver_id 247 driver_df = df.filter(mask) 248 driver_states = driver_df["_state"].to_numpy() 249 n_trips = len(driver_states) 250 251 # State fractions 252 state_counts = np.bincount(driver_states, minlength=self.n_states) 253 state_fractions = state_counts / max(n_trips, 1) 254 255 # Transition rate: number of state changes per km 256 n_transitions = np.sum(np.diff(driver_states) != 0) 257 total_km = driver_df["distance_km"].sum() if "distance_km" in driver_df.columns else n_trips 258 transition_rate = n_transitions / max(total_km, 0.01) 259 260 # State entropy (Shannon) 261 state_entropy = float(scipy_entropy(state_fractions)) 262 263 row = {"driver_id": driver_id} 264 for k in range(self.n_states): 265 row[f"state_{k}_fraction"] = float(state_fractions[k]) 266 row["mean_transition_rate"] = float(transition_rate) 267 row["state_entropy"] = float(state_entropy) 268 269 driver_rows.append(row) 270 271 return pl.DataFrame(driver_rows)
Compute per-driver HMM-derived features for use as GLM covariates.
Parameters
trip_features:
Trip-level features DataFrame. Must contain driver_id.
states:
State sequence as returned by predict_states().
Returns
pl.DataFrame One row per driver with columns:
- ``state_{k}_fraction`` for k in 0..n_states-1
- ``mean_transition_rate`` — average state transitions per km
- ``state_entropy`` — Shannon entropy of state distribution
- ``driver_id``
Notes
State fractions are the primary actuarial risk features. Following
Jiang & Shi (2024), the fraction of time spent in the highest-risk
state (state n_states - 1) is the most predictive of claim
frequency.
292class ContinuousTimeHMM: 293 """ 294 Continuous-time HMM for irregularly-sampled telematics observations. 295 296 Uses a generator matrix Q where the transition probability over an 297 interval Δt is P(Δt) = expm(Q × Δt). This handles variable trip 298 lengths and inter-observation time gaps without resampling. 299 300 Implementation uses an EM algorithm: 301 - E-step: forward-backward with time-varying transition matrices P(Δt_i) 302 - M-step: update Q via expected transition counts and holding times 303 304 Parameters 305 ---------- 306 n_states: 307 Number of latent states (default 3). 308 features: 309 Feature columns to use as Gaussian emissions per state. 310 n_iter: 311 Number of EM iterations (default 100). 312 tol: 313 Convergence tolerance on log-likelihood (default 1e-4). 314 random_state: 315 Seed for parameter initialisation. 316 317 Notes 318 ----- 319 The M-step update for Q follows the standard uniformisation approach: 320 Q_ij (i≠j) ∝ E[N_ij] / E[T_i], where N_ij is the expected number of 321 i→j transitions and T_i is the expected total holding time in state i. 322 323 Numerical stability: scipy.linalg.expm is used for matrix exponential. 324 For typical generator matrices from driving data (rates on the order of 325 0.01-1.0 transitions/minute), expm is stable and accurate. 326 327 Examples 328 -------- 329 >>> model = ContinuousTimeHMM(n_states=3) 330 >>> model.fit(trip_features_df, time_deltas=delta_array) 331 >>> states = model.predict_states(trip_features_df, time_deltas=delta_array) 332 """ 333 334 def __init__( 335 self, 336 n_states: int = 3, 337 features: list[str] | None = None, 338 n_iter: int = 100, 339 tol: float = 1e-4, 340 random_state: int = 42, 341 ) -> None: 342 if not _SCIPY_AVAILABLE: 343 raise ImportError( 344 "scipy is required for ContinuousTimeHMM. " 345 "Install it with: pip install scipy" 346 ) 347 348 self.n_states = n_states 349 self.features = features or _DEFAULT_HMM_FEATURES 350 self.n_iter = n_iter 351 self.tol = tol 352 self.random_state = random_state 353 354 self.Q_: Optional[np.ndarray] = None # Generator matrix (n_states × n_states) 355 self.means_: Optional[np.ndarray] = None # Emission means (n_states × n_features) 356 self.covars_: Optional[np.ndarray] = None # Emission variances (diagonal) 357 self.pi_: Optional[np.ndarray] = None # Initial state distribution 358 self.is_fitted: bool = False 359 self._state_order: Optional[np.ndarray] = None 360 361 def fit( 362 self, 363 trip_features: pl.DataFrame, 364 time_deltas: np.ndarray | None = None, 365 ) -> "ContinuousTimeHMM": 366 """ 367 Fit the CTHMM using the EM algorithm. 368 369 Parameters 370 ---------- 371 trip_features: 372 Trip-level features DataFrame (one row per trip, not per second). 373 time_deltas: 374 Array of shape (n_trips,) giving the time interval in minutes 375 between consecutive trips for the same driver. Use a large value 376 (e.g. 1440.0 = 24 hours) between trips from different sessions. 377 If None, all intervals are set to 1.0 (equivalent to discrete HMM 378 with unit time steps). 379 380 Returns 381 ------- 382 self 383 """ 384 X = self._to_matrix(trip_features) 385 X = self._standardise(X, fit=True) 386 n_obs = X.shape[0] 387 388 if time_deltas is None: 389 time_deltas = np.ones(n_obs, dtype=np.float64) 390 else: 391 time_deltas = np.asarray(time_deltas, dtype=np.float64) 392 if len(time_deltas) != n_obs: 393 raise ValueError( 394 f"time_deltas length {len(time_deltas)} != n_obs {n_obs}" 395 ) 396 # Clamp to small positive to avoid expm(Q * 0) 397 time_deltas = np.clip(time_deltas, 1e-6, None) 398 399 rng = np.random.default_rng(self.random_state) 400 401 # Initialise parameters 402 self.pi_ = np.ones(self.n_states) / self.n_states 403 self.Q_ = self._init_generator(rng) 404 self.means_, self.covars_ = self._init_emissions(X, rng) 405 406 log_likelihood = -np.inf 407 for iteration in range(self.n_iter): 408 # E-step 409 gammas, xis, log_lik_new = self._e_step(X, time_deltas) 410 411 # M-step 412 self._m_step(X, gammas, xis, time_deltas) 413 414 # Convergence check 415 if abs(log_lik_new - log_likelihood) < self.tol: 416 break 417 log_likelihood = log_lik_new 418 419 # Order states by mean of the first feature (speed proxy) 420 self._state_order = np.argsort(self.means_[:, 0]) 421 self._state_rank = np.empty_like(self._state_order) 422 self._state_rank[self._state_order] = np.arange(self.n_states) 423 424 self.is_fitted = True 425 return self 426 427 def predict_states( 428 self, 429 trip_features: pl.DataFrame, 430 time_deltas: np.ndarray | None = None, 431 ) -> np.ndarray: 432 """ 433 Viterbi decoding of the most likely state sequence. 434 435 Parameters 436 ---------- 437 trip_features: 438 Same format as used in ``fit()``. 439 time_deltas: 440 Inter-observation time intervals, same semantics as in ``fit()``. 441 442 Returns 443 ------- 444 np.ndarray of shape (n_trips,) 445 State labels 0..n_states-1 with state 0 = most cautious. 446 """ 447 self._check_fitted() 448 X = self._to_matrix(trip_features) 449 X = self._standardise(X, fit=False) 450 n_obs = X.shape[0] 451 452 if time_deltas is None: 453 time_deltas = np.ones(n_obs) 454 time_deltas = np.clip(np.asarray(time_deltas, dtype=np.float64), 1e-6, None) 455 456 raw_states = self._viterbi(X, time_deltas) 457 return self._state_rank[raw_states] 458 459 def driver_state_features( 460 self, 461 trip_features: pl.DataFrame, 462 states: np.ndarray, 463 ) -> pl.DataFrame: 464 """ 465 Compute driver-level HMM features. Same interface as 466 :meth:`DrivingStateHMM.driver_state_features`. 467 """ 468 # Delegate to the same logic — identical to discrete case 469 if "driver_id" not in trip_features.columns: 470 raise ValueError("trip_features must contain 'driver_id'.") 471 472 df = trip_features.with_columns(pl.Series("_state", states.tolist())) 473 driver_rows = [] 474 driver_ids = df["driver_id"].unique().sort().to_list() 475 476 for driver_id in driver_ids: 477 driver_df = df.filter(pl.col("driver_id") == driver_id) 478 driver_states = driver_df["_state"].to_numpy() 479 n_trips = len(driver_states) 480 481 state_counts = np.bincount(driver_states, minlength=self.n_states) 482 state_fractions = state_counts / max(n_trips, 1) 483 484 n_transitions = np.sum(np.diff(driver_states) != 0) 485 total_km = ( 486 driver_df["distance_km"].sum() 487 if "distance_km" in driver_df.columns 488 else float(n_trips) 489 ) 490 transition_rate = n_transitions / max(total_km, 0.01) 491 state_entropy = float(scipy_entropy(state_fractions)) 492 493 row = {"driver_id": driver_id} 494 for k in range(self.n_states): 495 row[f"state_{k}_fraction"] = float(state_fractions[k]) 496 row["mean_transition_rate"] = float(transition_rate) 497 row["state_entropy"] = float(state_entropy) 498 driver_rows.append(row) 499 500 return pl.DataFrame(driver_rows) 501 502 # ------------------------------------------------------------------ 503 # Private EM implementation 504 # ------------------------------------------------------------------ 505 506 def _init_generator(self, rng: np.random.Generator) -> np.ndarray: 507 """Initialise a valid generator matrix Q with small random rates.""" 508 n = self.n_states 509 Q = rng.uniform(0.01, 0.1, size=(n, n)) 510 np.fill_diagonal(Q, 0.0) 511 np.fill_diagonal(Q, -Q.sum(axis=1)) 512 return Q 513 514 def _init_emissions( 515 self, X: np.ndarray, rng: np.random.Generator 516 ) -> tuple[np.ndarray, np.ndarray]: 517 """Initialise Gaussian emission parameters by k-means-like split.""" 518 n, d = X.shape 519 # Use equally-spaced quantile points as initial means 520 quantiles = np.linspace(0.1, 0.9, self.n_states) 521 means = np.array( 522 [np.quantile(X, q, axis=0) for q in quantiles], dtype=np.float64 523 ) 524 # Perturb slightly to break symmetry 525 means += rng.normal(0, 0.05, size=means.shape) 526 covars = np.ones((self.n_states, d), dtype=np.float64) 527 return means, covars 528 529 def _transition_matrix(self, dt: float) -> np.ndarray: 530 """P(dt) = expm(Q * dt), clamped to be a valid stochastic matrix.""" 531 P = matrix_expm(self.Q_ * dt) 532 # Numerical correction: clip to [0,1] and renormalise rows 533 P = np.clip(P.real, 0.0, 1.0) 534 row_sums = P.sum(axis=1, keepdims=True) 535 row_sums = np.where(row_sums < 1e-12, 1.0, row_sums) 536 return P / row_sums 537 538 def _emission_log_prob(self, X: np.ndarray) -> np.ndarray: 539 """ 540 Diagonal Gaussian log-emission probability. 541 542 Returns array of shape (n_obs, n_states). 543 """ 544 n_obs = X.shape[0] 545 log_probs = np.zeros((n_obs, self.n_states)) 546 for k in range(self.n_states): 547 diff = X - self.means_[k] 548 var = self.covars_[k] 549 log_probs[:, k] = -0.5 * np.sum( 550 np.log(2 * np.pi * var) + diff ** 2 / var, axis=1 551 ) 552 return log_probs 553 554 def _e_step( 555 self, X: np.ndarray, dts: np.ndarray 556 ) -> tuple[np.ndarray, np.ndarray, float]: 557 """ 558 Forward-backward algorithm with time-varying transition matrices. 559 560 Returns 561 ------- 562 gammas : (n_obs, n_states) 563 xis : (n_obs-1, n_states, n_states) 564 log_likelihood : float 565 """ 566 n_obs, n_states = X.shape[0], self.n_states 567 log_emit = self._emission_log_prob(X) # (n_obs, n_states) 568 569 # Forward pass (log-scale for numerical stability) 570 log_alpha = np.full((n_obs, n_states), -np.inf) 571 log_alpha[0] = np.log(self.pi_ + 1e-300) + log_emit[0] 572 573 for t in range(1, n_obs): 574 P = self._transition_matrix(dts[t - 1]) 575 log_P = np.log(P + 1e-300) 576 # log_alpha[t, j] = log_emit[t, j] + logsumexp_k(log_alpha[t-1, k] + log_P[k, j]) 577 for j in range(n_states): 578 log_alpha[t, j] = log_emit[t, j] + _logsumexp( 579 log_alpha[t - 1] + log_P[:, j] 580 ) 581 582 log_likelihood = _logsumexp(log_alpha[-1]) 583 584 # Backward pass 585 log_beta = np.zeros((n_obs, n_states)) # log(1) = 0 at final step 586 for t in range(n_obs - 2, -1, -1): 587 P = self._transition_matrix(dts[t]) 588 log_P = np.log(P + 1e-300) 589 for i in range(n_states): 590 log_beta[t, i] = _logsumexp( 591 log_P[i, :] + log_emit[t + 1] + log_beta[t + 1] 592 ) 593 594 # Posterior state probabilities gamma 595 # Normalise each row by log P(observations) to get P(z_t | x). 596 log_gamma = log_alpha + log_beta 597 for t in range(n_obs): 598 log_gamma[t] -= _logsumexp(log_gamma[t]) 599 gammas = np.exp(log_gamma) 600 gammas = np.clip(gammas, 0.0, None) 601 gammas /= gammas.sum(axis=1, keepdims=True) # numerical guard 602 603 # Pairwise posteriors xi 604 # Normalise by P(observations) = exp(log_likelihood), not per-timestep. 605 # Per-timestep normalisation is incorrect: it makes each t contribute 606 # equally regardless of probability mass, causing EM to converge to a 607 # spurious fixed point. 608 xis = np.zeros((n_obs - 1, n_states, n_states)) 609 for t in range(n_obs - 1): 610 P = self._transition_matrix(dts[t]) 611 for i in range(n_states): 612 for j in range(n_states): 613 xis[t, i, j] = np.exp( 614 log_alpha[t, i] 615 + np.log(P[i, j] + 1e-300) 616 + log_emit[t + 1, j] 617 + log_beta[t + 1, j] 618 - log_likelihood 619 ) 620 621 return gammas, xis, log_likelihood 622 623 def _m_step( 624 self, 625 X: np.ndarray, 626 gammas: np.ndarray, 627 xis: np.ndarray, 628 dts: np.ndarray, 629 ) -> None: 630 """Update parameters given posteriors.""" 631 n_obs, n_states = gammas.shape 632 n_features = X.shape[1] 633 634 # Update initial distribution 635 self.pi_ = gammas[0] / gammas[0].sum() 636 637 # Update emission parameters 638 gamma_sum = gammas.sum(axis=0) # (n_states,) 639 for k in range(n_states): 640 w = gammas[:, k] # (n_obs,) 641 w_sum = w.sum() + 1e-300 642 self.means_[k] = (w[:, None] * X).sum(axis=0) / w_sum 643 diff = X - self.means_[k] 644 self.covars_[k] = (w[:, None] * diff ** 2).sum(axis=0) / w_sum + 1e-6 645 646 # Update generator matrix Q 647 # Expected transition counts: E[N_ij] = sum_t xi[t, i, j] 648 expected_transitions = xis.sum(axis=0) # (n_states, n_states) 649 650 # Expected holding times: E[T_i] = sum_t gamma[t, i] * dt[t] 651 holding_times = np.zeros(n_states) 652 for t in range(n_obs - 1): 653 holding_times += gammas[t] * dts[t] 654 holding_times = np.clip(holding_times, 1e-6, None) 655 656 # New Q: Q_ij = E[N_ij] / E[T_i] for i≠j 657 Q_new = np.zeros((n_states, n_states)) 658 for i in range(n_states): 659 for j in range(n_states): 660 if i != j: 661 Q_new[i, j] = expected_transitions[i, j] / holding_times[i] 662 np.fill_diagonal(Q_new, -Q_new.sum(axis=1)) 663 self.Q_ = Q_new 664 665 def _viterbi(self, X: np.ndarray, dts: np.ndarray) -> np.ndarray: 666 """Viterbi decoding returning most likely state sequence.""" 667 n_obs, n_states = X.shape[0], self.n_states 668 log_emit = self._emission_log_prob(X) 669 670 log_delta = np.full((n_obs, n_states), -np.inf) 671 psi = np.zeros((n_obs, n_states), dtype=int) 672 log_delta[0] = np.log(self.pi_ + 1e-300) + log_emit[0] 673 674 for t in range(1, n_obs): 675 P = self._transition_matrix(dts[t - 1]) 676 log_P = np.log(P + 1e-300) 677 for j in range(n_states): 678 scores = log_delta[t - 1] + log_P[:, j] 679 best = int(np.argmax(scores)) 680 log_delta[t, j] = scores[best] + log_emit[t, j] 681 psi[t, j] = best 682 683 # Backtrack 684 states = np.zeros(n_obs, dtype=int) 685 states[-1] = int(np.argmax(log_delta[-1])) 686 for t in range(n_obs - 2, -1, -1): 687 states[t] = psi[t + 1, states[t + 1]] 688 689 return states 690 691 def _to_matrix(self, trip_features: pl.DataFrame) -> np.ndarray: 692 missing = [c for c in self.features if c not in trip_features.columns] 693 if missing: 694 raise ValueError(f"Feature columns missing: {missing}") 695 return trip_features.select(self.features).to_numpy().astype(np.float64) 696 697 def _standardise(self, X: np.ndarray, *, fit: bool) -> np.ndarray: 698 if fit: 699 self._mean = np.nanmean(X, axis=0) 700 self._std = np.nanstd(X, axis=0) + 1e-8 701 return (X - self._mean) / self._std 702 703 def _check_fitted(self) -> None: 704 if not self.is_fitted: 705 raise RuntimeError("Call fit() before predict_states().")
Continuous-time HMM for irregularly-sampled telematics observations.
Uses a generator matrix Q where the transition probability over an interval Δt is P(Δt) = expm(Q × Δt). This handles variable trip lengths and inter-observation time gaps without resampling.
Implementation uses an EM algorithm:
- E-step: forward-backward with time-varying transition matrices P(Δt_i)
- M-step: update Q via expected transition counts and holding times
Parameters
n_states: Number of latent states (default 3). features: Feature columns to use as Gaussian emissions per state. n_iter: Number of EM iterations (default 100). tol: Convergence tolerance on log-likelihood (default 1e-4). random_state: Seed for parameter initialisation.
Notes
The M-step update for Q follows the standard uniformisation approach: Q_ij (i≠j) ∝ E[N_ij] / E[T_i], where N_ij is the expected number of i→j transitions and T_i is the expected total holding time in state i.
Numerical stability: scipy.linalg.expm is used for matrix exponential. For typical generator matrices from driving data (rates on the order of 0.01-1.0 transitions/minute), expm is stable and accurate.
Examples
>>> model = ContinuousTimeHMM(n_states=3)
>>> model.fit(trip_features_df, time_deltas=delta_array)
>>> states = model.predict_states(trip_features_df, time_deltas=delta_array)
334 def __init__( 335 self, 336 n_states: int = 3, 337 features: list[str] | None = None, 338 n_iter: int = 100, 339 tol: float = 1e-4, 340 random_state: int = 42, 341 ) -> None: 342 if not _SCIPY_AVAILABLE: 343 raise ImportError( 344 "scipy is required for ContinuousTimeHMM. " 345 "Install it with: pip install scipy" 346 ) 347 348 self.n_states = n_states 349 self.features = features or _DEFAULT_HMM_FEATURES 350 self.n_iter = n_iter 351 self.tol = tol 352 self.random_state = random_state 353 354 self.Q_: Optional[np.ndarray] = None # Generator matrix (n_states × n_states) 355 self.means_: Optional[np.ndarray] = None # Emission means (n_states × n_features) 356 self.covars_: Optional[np.ndarray] = None # Emission variances (diagonal) 357 self.pi_: Optional[np.ndarray] = None # Initial state distribution 358 self.is_fitted: bool = False 359 self._state_order: Optional[np.ndarray] = None
361 def fit( 362 self, 363 trip_features: pl.DataFrame, 364 time_deltas: np.ndarray | None = None, 365 ) -> "ContinuousTimeHMM": 366 """ 367 Fit the CTHMM using the EM algorithm. 368 369 Parameters 370 ---------- 371 trip_features: 372 Trip-level features DataFrame (one row per trip, not per second). 373 time_deltas: 374 Array of shape (n_trips,) giving the time interval in minutes 375 between consecutive trips for the same driver. Use a large value 376 (e.g. 1440.0 = 24 hours) between trips from different sessions. 377 If None, all intervals are set to 1.0 (equivalent to discrete HMM 378 with unit time steps). 379 380 Returns 381 ------- 382 self 383 """ 384 X = self._to_matrix(trip_features) 385 X = self._standardise(X, fit=True) 386 n_obs = X.shape[0] 387 388 if time_deltas is None: 389 time_deltas = np.ones(n_obs, dtype=np.float64) 390 else: 391 time_deltas = np.asarray(time_deltas, dtype=np.float64) 392 if len(time_deltas) != n_obs: 393 raise ValueError( 394 f"time_deltas length {len(time_deltas)} != n_obs {n_obs}" 395 ) 396 # Clamp to small positive to avoid expm(Q * 0) 397 time_deltas = np.clip(time_deltas, 1e-6, None) 398 399 rng = np.random.default_rng(self.random_state) 400 401 # Initialise parameters 402 self.pi_ = np.ones(self.n_states) / self.n_states 403 self.Q_ = self._init_generator(rng) 404 self.means_, self.covars_ = self._init_emissions(X, rng) 405 406 log_likelihood = -np.inf 407 for iteration in range(self.n_iter): 408 # E-step 409 gammas, xis, log_lik_new = self._e_step(X, time_deltas) 410 411 # M-step 412 self._m_step(X, gammas, xis, time_deltas) 413 414 # Convergence check 415 if abs(log_lik_new - log_likelihood) < self.tol: 416 break 417 log_likelihood = log_lik_new 418 419 # Order states by mean of the first feature (speed proxy) 420 self._state_order = np.argsort(self.means_[:, 0]) 421 self._state_rank = np.empty_like(self._state_order) 422 self._state_rank[self._state_order] = np.arange(self.n_states) 423 424 self.is_fitted = True 425 return self
Fit the CTHMM using the EM algorithm.
Parameters
trip_features: Trip-level features DataFrame (one row per trip, not per second). time_deltas: Array of shape (n_trips,) giving the time interval in minutes between consecutive trips for the same driver. Use a large value (e.g. 1440.0 = 24 hours) between trips from different sessions. If None, all intervals are set to 1.0 (equivalent to discrete HMM with unit time steps).
Returns
self
427 def predict_states( 428 self, 429 trip_features: pl.DataFrame, 430 time_deltas: np.ndarray | None = None, 431 ) -> np.ndarray: 432 """ 433 Viterbi decoding of the most likely state sequence. 434 435 Parameters 436 ---------- 437 trip_features: 438 Same format as used in ``fit()``. 439 time_deltas: 440 Inter-observation time intervals, same semantics as in ``fit()``. 441 442 Returns 443 ------- 444 np.ndarray of shape (n_trips,) 445 State labels 0..n_states-1 with state 0 = most cautious. 446 """ 447 self._check_fitted() 448 X = self._to_matrix(trip_features) 449 X = self._standardise(X, fit=False) 450 n_obs = X.shape[0] 451 452 if time_deltas is None: 453 time_deltas = np.ones(n_obs) 454 time_deltas = np.clip(np.asarray(time_deltas, dtype=np.float64), 1e-6, None) 455 456 raw_states = self._viterbi(X, time_deltas) 457 return self._state_rank[raw_states]
459 def driver_state_features( 460 self, 461 trip_features: pl.DataFrame, 462 states: np.ndarray, 463 ) -> pl.DataFrame: 464 """ 465 Compute driver-level HMM features. Same interface as 466 :meth:`DrivingStateHMM.driver_state_features`. 467 """ 468 # Delegate to the same logic — identical to discrete case 469 if "driver_id" not in trip_features.columns: 470 raise ValueError("trip_features must contain 'driver_id'.") 471 472 df = trip_features.with_columns(pl.Series("_state", states.tolist())) 473 driver_rows = [] 474 driver_ids = df["driver_id"].unique().sort().to_list() 475 476 for driver_id in driver_ids: 477 driver_df = df.filter(pl.col("driver_id") == driver_id) 478 driver_states = driver_df["_state"].to_numpy() 479 n_trips = len(driver_states) 480 481 state_counts = np.bincount(driver_states, minlength=self.n_states) 482 state_fractions = state_counts / max(n_trips, 1) 483 484 n_transitions = np.sum(np.diff(driver_states) != 0) 485 total_km = ( 486 driver_df["distance_km"].sum() 487 if "distance_km" in driver_df.columns 488 else float(n_trips) 489 ) 490 transition_rate = n_transitions / max(total_km, 0.01) 491 state_entropy = float(scipy_entropy(state_fractions)) 492 493 row = {"driver_id": driver_id} 494 for k in range(self.n_states): 495 row[f"state_{k}_fraction"] = float(state_fractions[k]) 496 row["mean_transition_rate"] = float(transition_rate) 497 row["state_entropy"] = float(state_entropy) 498 driver_rows.append(row) 499 500 return pl.DataFrame(driver_rows)
Compute driver-level HMM features. Same interface as
DrivingStateHMM.driver_state_features().
54def aggregate_to_driver( 55 trip_features: pl.DataFrame, 56 *, 57 credibility_threshold: int = 30, 58) -> pl.DataFrame: 59 """ 60 Aggregate trip-level features to one row per driver. 61 62 Features are aggregated as distance-weighted means. Drivers below the 63 credibility threshold have their scores shrunk towards the portfolio 64 mean using Bühlmann-Straub weights. 65 66 Parameters 67 ---------- 68 trip_features: 69 Trip-level DataFrame from :func:`~insurance_telematics.extract_trip_features`. 70 Must contain ``driver_id`` and ``distance_km``. 71 credibility_threshold: 72 Number of trips above which a driver is given full credibility 73 (weight = 1.0). Below this, the Bühlmann weight is n / (n + k) 74 where k is the credibility parameter (default 30). 75 76 Returns 77 ------- 78 pl.DataFrame 79 One row per driver with columns: 80 81 - All aggregated feature columns (distance-weighted means) 82 - ``n_trips`` — number of trips 83 - ``total_km`` — total distance driven 84 - ``credibility_weight`` — Bühlmann-Straub weight ∈ (0, 1] 85 - ``composite_risk_score`` — weighted composite score, scaled 0-100 86 - ``driver_id`` 87 88 Raises 89 ------ 90 ValueError 91 If ``driver_id`` or ``distance_km`` are not in ``trip_features``. 92 """ 93 _validate_input(trip_features) 94 95 # Compute distance-weighted mean of each feature per driver 96 available_mean_features = [ 97 c for c in _MEAN_FEATURES if c in trip_features.columns 98 ] 99 100 agg_exprs = [ 101 pl.len().alias("n_trips"), 102 pl.col("distance_km").sum().alias("total_km"), 103 ] 104 105 for feat in available_mean_features: 106 # Weighted mean: sum(feat * distance) / sum(distance) 107 agg_exprs.append( 108 ( 109 (pl.col(feat) * pl.col("distance_km")).sum() 110 / pl.col("distance_km").sum().clip(0.01) 111 ).alias(feat) 112 ) 113 114 driver_df = trip_features.group_by("driver_id").agg(agg_exprs) 115 116 # Bühlmann-Straub credibility weighting 117 k = float(credibility_threshold) 118 driver_df = driver_df.with_columns( 119 (pl.col("n_trips") / (pl.col("n_trips") + k)).alias("credibility_weight") 120 ) 121 122 # Compute portfolio means (for shrinkage) 123 portfolio_means = {} 124 for feat in available_mean_features: 125 portfolio_means[feat] = float(driver_df[feat].mean()) 126 127 # Apply credibility shrinkage toward portfolio mean 128 shrink_exprs = [] 129 for feat in available_mean_features: 130 pm = portfolio_means[feat] 131 shrink_exprs.append( 132 ( 133 pl.col("credibility_weight") * pl.col(feat) 134 + (1.0 - pl.col("credibility_weight")) * pm 135 ).alias(feat) 136 ) 137 driver_df = driver_df.with_columns(shrink_exprs) 138 139 # Composite risk score 140 driver_df = _compute_composite_score(driver_df, available_mean_features) 141 142 return driver_df.sort("driver_id")
Aggregate trip-level features to one row per driver.
Features are aggregated as distance-weighted means. Drivers below the credibility threshold have their scores shrunk towards the portfolio mean using Bühlmann-Straub weights.
Parameters
trip_features:
Trip-level DataFrame from ~insurance_telematics.extract_trip_features().
Must contain driver_id and distance_km.
credibility_threshold:
Number of trips above which a driver is given full credibility
(weight = 1.0). Below this, the Bühlmann weight is n / (n + k)
where k is the credibility parameter (default 30).
Returns
pl.DataFrame One row per driver with columns:
- All aggregated feature columns (distance-weighted means)
- ``n_trips`` — number of trips
- ``total_km`` — total distance driven
- ``credibility_weight`` — Bühlmann-Straub weight ∈ (0, 1]
- ``composite_risk_score`` — weighted composite score, scaled 0-100
- ``driver_id``
Raises
ValueError
If driver_id or distance_km are not in trip_features.
51class TelematicsScoringPipeline: 52 """ 53 End-to-end pipeline from raw trip data to claim frequency predictions. 54 55 Stages: 56 57 1. ``clean_trips`` — GPS cleaning and kinematics derivation 58 2. ``extract_trip_features`` — trip-level scalar features 59 3. ``DrivingStateHMM.fit`` — HMM state classification 60 4. ``aggregate_to_driver`` — driver-level risk aggregation 61 5. Poisson GLM — maps telematics features to claim frequency 62 63 Parameters 64 ---------- 65 n_hmm_states: 66 Number of HMM latent states (default 3). 67 credibility_threshold: 68 Bühlmann-Straub credibility threshold in number of trips (default 30). 69 hmm_features: 70 Feature columns to pass to the HMM. Defaults to the standard four 71 (mean speed, speed variation, harsh braking rate, harsh accel rate). 72 glm_features: 73 Subset of aggregated driver features to include in the GLM. If None, 74 uses all available aggregated features. 75 random_state: 76 Seed for reproducibility. 77 78 Examples 79 -------- 80 >>> sim = TripSimulator(seed=42) 81 >>> trips_df, claims_df = sim.simulate(n_drivers=50, trips_per_driver=40) 82 >>> pipe = TelematicsScoringPipeline() 83 >>> pipe.fit(trips_df, claims_df) 84 >>> predictions = pipe.predict(trips_df) 85 """ 86 87 def __init__( 88 self, 89 n_hmm_states: int = 3, 90 credibility_threshold: int = 30, 91 hmm_features: list[str] | None = None, 92 glm_feature_subset: list[str] | None = None, 93 random_state: int = 42, 94 ) -> None: 95 self.n_hmm_states = n_hmm_states 96 self.credibility_threshold = credibility_threshold 97 self.hmm_features = hmm_features 98 self.glm_feature_subset = glm_feature_subset 99 self.random_state = random_state 100 101 self._hmm: Optional[DrivingStateHMM] = None 102 self._glm_result = None 103 self._glm_feature_names: list[str] = [] 104 self.is_fitted: bool = False 105 106 def fit( 107 self, 108 trips_df: pl.DataFrame, 109 claims_df: pl.DataFrame, 110 ) -> "TelematicsScoringPipeline": 111 """ 112 Fit the full pipeline. 113 114 Parameters 115 ---------- 116 trips_df: 117 Raw trip observations as returned by :func:`~insurance_telematics.load_trips` 118 or :class:`~insurance_telematics.TripSimulator`. 119 claims_df: 120 Driver-level claims data. Required columns: 121 ``driver_id``, ``n_claims``, ``exposure_years``. 122 123 Returns 124 ------- 125 self 126 """ 127 if not _STATSMODELS_AVAILABLE: 128 raise ImportError( 129 "statsmodels is required for the GLM step. " 130 "Install it with: pip install statsmodels" 131 ) 132 133 # Stage 1-2: clean and extract features 134 driver_features = self._extract_features(trips_df) 135 136 # Stage 3: fit HMM and add state features 137 self._hmm = DrivingStateHMM( 138 n_states=self.n_hmm_states, 139 features=self.hmm_features, 140 random_state=self.random_state, 141 ) 142 self._hmm.fit(driver_features) 143 states = self._hmm.predict_states(driver_features) 144 hmm_driver_features = self._hmm.driver_state_features(driver_features, states) 145 146 # Stage 4: aggregate trip features to driver level 147 driver_risk = aggregate_to_driver( 148 driver_features, 149 credibility_threshold=self.credibility_threshold, 150 ) 151 152 # Merge HMM state features into driver risk 153 driver_risk = driver_risk.join(hmm_driver_features, on="driver_id", how="left") 154 155 # Merge with claims data 156 model_df = driver_risk.join(claims_df, on="driver_id", how="inner") 157 158 # Stage 5: Poisson GLM 159 glm_feature_cols = self._select_glm_features(model_df) 160 161 X = model_df.select(glm_feature_cols).to_pandas() 162 163 # Drop zero-variance and constant columns before GLM to prevent NaN deviance 164 X = X.fillna(X.mean()) 165 zero_var = [c for c in X.columns if X[c].std() < 1e-10] 166 if zero_var: 167 X = X.drop(columns=zero_var) 168 if X.shape[1] == 0: 169 # No useful features: intercept-only model 170 X = pd.DataFrame(index=X.index) 171 172 X = sm.add_constant(X, has_constant="add") 173 174 # Store the final feature names (after dropping zero-variance cols) 175 self._glm_feature_names = [c for c in X.columns if c != "const"] 176 177 y = model_df["n_claims"].to_numpy().astype(float) 178 exposure = model_df["exposure_years"].to_numpy() 179 exposure = np.clip(exposure, 1e-6, None) 180 offset = np.log(exposure) 181 182 poisson_model = sm.GLM( 183 y, 184 X, 185 family=sm.families.Poisson(link=sm.families.links.Log()), 186 offset=offset, 187 ) 188 # Provide zero start_params to avoid NaN deviance when exposure is small. 189 # The intercept-at-zero start is safe for Poisson with log link. 190 start = np.zeros(X.shape[1]) 191 self._glm_result = poisson_model.fit( 192 start_params=start, maxiter=200, disp=False 193 ) 194 self.is_fitted = True 195 return self 196 197 def predict(self, trips_df: pl.DataFrame) -> pl.DataFrame: 198 """ 199 Predict annual claim frequency for each driver. 200 201 Parameters 202 ---------- 203 trips_df: 204 Raw trip observations. Must contain ``driver_id``. 205 206 Returns 207 ------- 208 pl.DataFrame 209 One row per driver with columns ``driver_id`` and 210 ``predicted_claim_frequency`` (claims per year). 211 """ 212 self._check_fitted() 213 glm_df = self.glm_features(trips_df) 214 return self._predict_from_features(glm_df) 215 216 def glm_features(self, trips_df: pl.DataFrame) -> pl.DataFrame: 217 """ 218 Produce a driver-level DataFrame of GLM-ready features without 219 fitting or running the GLM. 220 221 Parameters 222 ---------- 223 trips_df: 224 Raw trip data. 225 226 Returns 227 ------- 228 pl.DataFrame 229 One row per driver with telematics-derived GLM covariates. 230 Column names follow the controlled vocabulary described in the 231 library documentation and suitable for regulatory filings. 232 """ 233 driver_features = self._extract_features(trips_df) 234 235 if self._hmm is not None and self._hmm.is_fitted: 236 states = self._hmm.predict_states(driver_features) 237 hmm_df = self._hmm.driver_state_features(driver_features, states) 238 else: 239 hmm_df = None 240 241 driver_risk = aggregate_to_driver( 242 driver_features, 243 credibility_threshold=self.credibility_threshold, 244 ) 245 246 if hmm_df is not None: 247 driver_risk = driver_risk.join(hmm_df, on="driver_id", how="left") 248 249 return driver_risk 250 251 def _extract_features(self, trips_df: pl.DataFrame) -> pl.DataFrame: 252 """Run clean + extract_trip_features.""" 253 cleaned = clean_trips(trips_df) 254 return extract_trip_features(cleaned) 255 256 def _select_glm_features(self, df: pl.DataFrame) -> list[str]: 257 """Choose GLM feature columns from available numeric columns.""" 258 if self.glm_feature_subset is not None: 259 return [c for c in self.glm_feature_subset if c in df.columns] 260 261 # Default: all numeric columns except identifiers and metadata 262 exclude = { 263 "driver_id", 264 "n_claims", 265 "exposure_years", 266 "n_trips", 267 "total_km", 268 "credibility_weight", 269 "composite_risk_score", 270 "aggressive_fraction", 271 "normal_fraction", 272 "cautious_fraction", 273 "annual_km", 274 } 275 candidates = [ 276 c for c in df.columns 277 if c not in exclude 278 and df[c].dtype in (pl.Float64, pl.Float32, pl.Int64, pl.Int32) 279 ] 280 return candidates 281 282 def _predict_from_features(self, glm_df: pl.DataFrame) -> pl.DataFrame: 283 """Apply fitted GLM to feature DataFrame.""" 284 import statsmodels.api as sm 285 feature_cols = [c for c in self._glm_feature_names if c in glm_df.columns] 286 X = glm_df.select(feature_cols).to_pandas() 287 X = sm.add_constant(X, has_constant="add") 288 289 # Fill any missing columns with 0 290 for col in self._glm_result.model.exog_names: 291 if col not in X.columns: 292 X[col] = 0.0 293 X = X[self._glm_result.model.exog_names] 294 295 predictions = self._glm_result.predict(X) 296 return pl.DataFrame( 297 { 298 "driver_id": glm_df["driver_id"].to_list(), 299 "predicted_claim_frequency": predictions.tolist(), 300 } 301 ) 302 303 def _check_fitted(self) -> None: 304 if not self.is_fitted: 305 raise RuntimeError("Call fit() before predict().")
End-to-end pipeline from raw trip data to claim frequency predictions.
Stages:
clean_trips— GPS cleaning and kinematics derivationextract_trip_features— trip-level scalar featuresDrivingStateHMM.fit— HMM state classificationaggregate_to_driver— driver-level risk aggregation- Poisson GLM — maps telematics features to claim frequency
Parameters
n_hmm_states: Number of HMM latent states (default 3). credibility_threshold: Bühlmann-Straub credibility threshold in number of trips (default 30). hmm_features: Feature columns to pass to the HMM. Defaults to the standard four (mean speed, speed variation, harsh braking rate, harsh accel rate). glm_features: Subset of aggregated driver features to include in the GLM. If None, uses all available aggregated features. random_state: Seed for reproducibility.
Examples
>>> sim = TripSimulator(seed=42)
>>> trips_df, claims_df = sim.simulate(n_drivers=50, trips_per_driver=40)
>>> pipe = TelematicsScoringPipeline()
>>> pipe.fit(trips_df, claims_df)
>>> predictions = pipe.predict(trips_df)
87 def __init__( 88 self, 89 n_hmm_states: int = 3, 90 credibility_threshold: int = 30, 91 hmm_features: list[str] | None = None, 92 glm_feature_subset: list[str] | None = None, 93 random_state: int = 42, 94 ) -> None: 95 self.n_hmm_states = n_hmm_states 96 self.credibility_threshold = credibility_threshold 97 self.hmm_features = hmm_features 98 self.glm_feature_subset = glm_feature_subset 99 self.random_state = random_state 100 101 self._hmm: Optional[DrivingStateHMM] = None 102 self._glm_result = None 103 self._glm_feature_names: list[str] = [] 104 self.is_fitted: bool = False
106 def fit( 107 self, 108 trips_df: pl.DataFrame, 109 claims_df: pl.DataFrame, 110 ) -> "TelematicsScoringPipeline": 111 """ 112 Fit the full pipeline. 113 114 Parameters 115 ---------- 116 trips_df: 117 Raw trip observations as returned by :func:`~insurance_telematics.load_trips` 118 or :class:`~insurance_telematics.TripSimulator`. 119 claims_df: 120 Driver-level claims data. Required columns: 121 ``driver_id``, ``n_claims``, ``exposure_years``. 122 123 Returns 124 ------- 125 self 126 """ 127 if not _STATSMODELS_AVAILABLE: 128 raise ImportError( 129 "statsmodels is required for the GLM step. " 130 "Install it with: pip install statsmodels" 131 ) 132 133 # Stage 1-2: clean and extract features 134 driver_features = self._extract_features(trips_df) 135 136 # Stage 3: fit HMM and add state features 137 self._hmm = DrivingStateHMM( 138 n_states=self.n_hmm_states, 139 features=self.hmm_features, 140 random_state=self.random_state, 141 ) 142 self._hmm.fit(driver_features) 143 states = self._hmm.predict_states(driver_features) 144 hmm_driver_features = self._hmm.driver_state_features(driver_features, states) 145 146 # Stage 4: aggregate trip features to driver level 147 driver_risk = aggregate_to_driver( 148 driver_features, 149 credibility_threshold=self.credibility_threshold, 150 ) 151 152 # Merge HMM state features into driver risk 153 driver_risk = driver_risk.join(hmm_driver_features, on="driver_id", how="left") 154 155 # Merge with claims data 156 model_df = driver_risk.join(claims_df, on="driver_id", how="inner") 157 158 # Stage 5: Poisson GLM 159 glm_feature_cols = self._select_glm_features(model_df) 160 161 X = model_df.select(glm_feature_cols).to_pandas() 162 163 # Drop zero-variance and constant columns before GLM to prevent NaN deviance 164 X = X.fillna(X.mean()) 165 zero_var = [c for c in X.columns if X[c].std() < 1e-10] 166 if zero_var: 167 X = X.drop(columns=zero_var) 168 if X.shape[1] == 0: 169 # No useful features: intercept-only model 170 X = pd.DataFrame(index=X.index) 171 172 X = sm.add_constant(X, has_constant="add") 173 174 # Store the final feature names (after dropping zero-variance cols) 175 self._glm_feature_names = [c for c in X.columns if c != "const"] 176 177 y = model_df["n_claims"].to_numpy().astype(float) 178 exposure = model_df["exposure_years"].to_numpy() 179 exposure = np.clip(exposure, 1e-6, None) 180 offset = np.log(exposure) 181 182 poisson_model = sm.GLM( 183 y, 184 X, 185 family=sm.families.Poisson(link=sm.families.links.Log()), 186 offset=offset, 187 ) 188 # Provide zero start_params to avoid NaN deviance when exposure is small. 189 # The intercept-at-zero start is safe for Poisson with log link. 190 start = np.zeros(X.shape[1]) 191 self._glm_result = poisson_model.fit( 192 start_params=start, maxiter=200, disp=False 193 ) 194 self.is_fitted = True 195 return self
Fit the full pipeline.
Parameters
trips_df:
Raw trip observations as returned by ~insurance_telematics.load_trips()
or ~insurance_telematics.TripSimulator.
claims_df:
Driver-level claims data. Required columns:
driver_id, n_claims, exposure_years.
Returns
self
197 def predict(self, trips_df: pl.DataFrame) -> pl.DataFrame: 198 """ 199 Predict annual claim frequency for each driver. 200 201 Parameters 202 ---------- 203 trips_df: 204 Raw trip observations. Must contain ``driver_id``. 205 206 Returns 207 ------- 208 pl.DataFrame 209 One row per driver with columns ``driver_id`` and 210 ``predicted_claim_frequency`` (claims per year). 211 """ 212 self._check_fitted() 213 glm_df = self.glm_features(trips_df) 214 return self._predict_from_features(glm_df)
Predict annual claim frequency for each driver.
Parameters
trips_df:
Raw trip observations. Must contain driver_id.
Returns
pl.DataFrame
One row per driver with columns driver_id and
predicted_claim_frequency (claims per year).
216 def glm_features(self, trips_df: pl.DataFrame) -> pl.DataFrame: 217 """ 218 Produce a driver-level DataFrame of GLM-ready features without 219 fitting or running the GLM. 220 221 Parameters 222 ---------- 223 trips_df: 224 Raw trip data. 225 226 Returns 227 ------- 228 pl.DataFrame 229 One row per driver with telematics-derived GLM covariates. 230 Column names follow the controlled vocabulary described in the 231 library documentation and suitable for regulatory filings. 232 """ 233 driver_features = self._extract_features(trips_df) 234 235 if self._hmm is not None and self._hmm.is_fitted: 236 states = self._hmm.predict_states(driver_features) 237 hmm_df = self._hmm.driver_state_features(driver_features, states) 238 else: 239 hmm_df = None 240 241 driver_risk = aggregate_to_driver( 242 driver_features, 243 credibility_threshold=self.credibility_threshold, 244 ) 245 246 if hmm_df is not None: 247 driver_risk = driver_risk.join(hmm_df, on="driver_id", how="left") 248 249 return driver_risk
Produce a driver-level DataFrame of GLM-ready features without fitting or running the GLM.
Parameters
trips_df: Raw trip data.
Returns
pl.DataFrame One row per driver with telematics-derived GLM covariates. Column names follow the controlled vocabulary described in the library documentation and suitable for regulatory filings.
308def score_trips(trips_df: pl.DataFrame, model: TelematicsScoringPipeline) -> pl.DataFrame: 309 """ 310 Convenience function: apply a fitted pipeline to raw trip data. 311 312 Parameters 313 ---------- 314 trips_df: 315 Raw trip DataFrame. 316 model: 317 A fitted :class:`TelematicsScoringPipeline` instance. 318 319 Returns 320 ------- 321 pl.DataFrame 322 Driver-level predictions with ``driver_id`` and 323 ``predicted_claim_frequency``. 324 325 Examples 326 -------- 327 >>> predictions = score_trips(new_trips_df, fitted_pipeline) 328 """ 329 return model.predict(trips_df)
Convenience function: apply a fitted pipeline to raw trip data.
Parameters
trips_df:
Raw trip DataFrame.
model:
A fitted TelematicsScoringPipeline instance.
Returns
pl.DataFrame
Driver-level predictions with driver_id and
predicted_claim_frequency.
Examples
>>> predictions = score_trips(new_trips_df, fitted_pipeline)
65@dataclass 66class TripSimulator: 67 """ 68 Generate synthetic 1Hz telematics trip data for a fleet of drivers. 69 70 Each driver is assigned a latent regime mixture (fraction of time spent 71 in cautious, normal, and aggressive driving states). Within a trip, 72 segments of continuous regime are simulated using an Ornstein-Uhlenbeck 73 speed process. A synthetic claim count is generated per driver using a 74 Poisson rate proportional to the aggressive fraction. 75 76 Parameters 77 ---------- 78 seed: 79 Random seed for reproducibility. 80 81 Examples 82 -------- 83 >>> sim = TripSimulator(seed=42) 84 >>> trips_df, claims_df = sim.simulate(n_drivers=20, trips_per_driver=30) 85 >>> trips_df.shape[0] # ~540,000 rows for 20 drivers × 30 trips × ~15 min avg 86 """ 87 88 seed: int = 42 89 90 def simulate( 91 self, 92 n_drivers: int = 100, 93 trips_per_driver: int = 50, 94 *, 95 min_trip_duration_s: int = 300, 96 max_trip_duration_s: int = 3600, 97 ) -> tuple[pl.DataFrame, pl.DataFrame]: 98 """ 99 Simulate 1Hz trip observations for a fleet of drivers. 100 101 Parameters 102 ---------- 103 n_drivers: 104 Number of distinct drivers to simulate. 105 trips_per_driver: 106 Number of trips per driver. 107 min_trip_duration_s: 108 Minimum trip length in seconds (default 5 minutes). 109 max_trip_duration_s: 110 Maximum trip length in seconds (default 60 minutes). 111 112 Returns 113 ------- 114 trips_df: 115 Polars DataFrame with one row per 1Hz observation. Columns: 116 driver_id, trip_id, timestamp, latitude, longitude, 117 speed_kmh, acceleration_ms2, heading_deg. 118 claims_df: 119 Polars DataFrame with one row per driver. Columns: 120 driver_id, n_claims, exposure_years, aggressive_fraction, 121 annual_km. 122 """ 123 rng = np.random.default_rng(self.seed) 124 125 # Assign latent regime mixtures to drivers using a Dirichlet draw. 126 # Concentration steered towards cautious to mimic real portfolio skew. 127 concentration = np.array([3.0, 2.0, 0.5]) 128 driver_mixtures = rng.dirichlet(concentration, size=n_drivers) 129 130 all_trip_rows: list[dict] = [] 131 driver_summary: list[dict] = [] 132 133 base_lat, base_lon = 51.5, -0.1 # London area 134 135 # Trip IDs are globally unique across drivers 136 global_trip_id = 0 137 138 # Reference epoch for reproducible timestamps 139 epoch = datetime(2024, 1, 1, 7, 0, 0, tzinfo=timezone.utc) 140 141 for driver_idx in range(n_drivers): 142 driver_id = f"DRV{driver_idx:04d}" 143 mixture = driver_mixtures[driver_idx] 144 cautious_frac, normal_frac, aggressive_frac = mixture 145 146 driver_km = 0.0 147 driver_total_seconds = 0 148 149 for trip_num in range(trips_per_driver): 150 trip_id = f"TRP{global_trip_id:07d}" 151 global_trip_id += 1 152 153 # Trip duration: random between min and max 154 duration_s = int( 155 rng.integers(min_trip_duration_s, max_trip_duration_s) 156 ) 157 driver_total_seconds += duration_s 158 159 # Stagger trip timestamps to spread across a year 160 days_offset = (driver_idx * trips_per_driver + trip_num) * 3.5 / 365 161 hour_offset = rng.uniform(6.0, 22.0) 162 trip_start = epoch + timedelta( 163 days=float(days_offset), hours=float(hour_offset) 164 ) 165 166 rows = self._simulate_trip( 167 rng=rng, 168 driver_id=driver_id, 169 trip_id=trip_id, 170 mixture=mixture, 171 duration_s=duration_s, 172 trip_start=trip_start, 173 base_lat=base_lat, 174 base_lon=base_lon, 175 ) 176 all_trip_rows.extend(rows) 177 178 # Accumulate distance 179 for row in rows: 180 driver_km += row["speed_kmh"] / 3600.0 # km per second 181 182 # Exposure: actual driving time scaled to years. 183 # Assume each driver drives at this rate year-round, prorated by 184 # average UK annual driving time (~200 hours/year). 185 # We normalise to observation period: total trip seconds / seconds_per_year. 186 # This gives a realistic exposure for a Poisson frequency model. 187 exposure_years = driver_total_seconds / 365.25 / 86400 188 annual_rate = ( 189 cautious_frac * _REGIMES["cautious"].base_claim_rate 190 + normal_frac * _REGIMES["normal"].base_claim_rate 191 + aggressive_frac * _REGIMES["aggressive"].base_claim_rate 192 ) 193 lambda_claims = annual_rate * max(exposure_years, 0.01) 194 n_claims = int(rng.poisson(lambda_claims)) 195 196 driver_summary.append( 197 { 198 "driver_id": driver_id, 199 "n_claims": n_claims, 200 "exposure_years": round(exposure_years, 4), 201 "aggressive_fraction": round(float(aggressive_frac), 4), 202 "normal_fraction": round(float(normal_frac), 4), 203 "cautious_fraction": round(float(cautious_frac), 4), 204 "annual_km": round(driver_km, 1), 205 } 206 ) 207 208 trips_df = pl.DataFrame(all_trip_rows).with_columns( 209 pl.col("timestamp").cast(pl.Datetime("us", "UTC")) 210 ) 211 claims_df = pl.DataFrame(driver_summary) 212 213 return trips_df, claims_df 214 215 def _simulate_trip( 216 self, 217 rng: np.random.Generator, 218 driver_id: str, 219 trip_id: str, 220 mixture: np.ndarray, 221 duration_s: int, 222 trip_start: datetime, 223 base_lat: float, 224 base_lon: float, 225 ) -> list[dict]: 226 """Simulate a single trip at 1Hz. Returns list of row dicts.""" 227 rows = [] 228 229 # Choose regime for each second via regime mixture (multinomial draw) 230 regime_names = ["cautious", "normal", "aggressive"] 231 regimes_seq = rng.choice(regime_names, size=duration_s, p=mixture) 232 233 speed_kmh = float(rng.uniform(0, 30)) # starting speed 234 lat = base_lat + rng.uniform(-0.5, 0.5) 235 lon = base_lon + rng.uniform(-0.5, 0.5) 236 heading = float(rng.uniform(0, 360)) 237 238 for t in range(duration_s): 239 regime = _REGIMES[regimes_seq[t]] 240 dt = 1.0 # 1Hz → 1 second 241 242 # Ornstein-Uhlenbeck speed update 243 mean_rev = regime.mean_speed_kmh 244 theta = regime.speed_reversion 245 sigma = regime.speed_vol 246 dW = float(rng.normal(0, math.sqrt(dt))) 247 speed_kmh = ( 248 speed_kmh 249 + theta * (mean_rev - speed_kmh) * dt 250 + sigma * dW 251 ) 252 speed_kmh = max(0.0, speed_kmh) 253 254 # Acceleration: derivative of speed + process noise 255 # Difference from last step divided by dt, plus noise 256 if t == 0: 257 prev_speed = speed_kmh 258 else: 259 prev_speed = rows[-1]["speed_kmh"] 260 261 accel_from_speed = (speed_kmh - prev_speed) / dt / 3.6 # convert to m/s² 262 accel_noise = float(rng.normal(0, regime.accel_noise_std * 0.3)) 263 accel_ms2 = accel_from_speed + accel_noise 264 265 # Update position using heading and speed 266 speed_ms = speed_kmh / 3.6 267 heading_rad = math.radians(heading) 268 # Approximate lat/lon displacement 269 dlat = speed_ms * math.cos(heading_rad) * dt / 111_320 270 dlon = ( 271 speed_ms 272 * math.sin(heading_rad) 273 * dt 274 / (111_320 * math.cos(math.radians(lat))) 275 ) 276 lat += dlat 277 lon += dlon 278 279 # Slow random heading drift 280 heading = (heading + float(rng.normal(0, 2.0))) % 360 281 282 timestamp = trip_start + timedelta(seconds=t) 283 284 rows.append( 285 { 286 "driver_id": driver_id, 287 "trip_id": trip_id, 288 "timestamp": timestamp, 289 "latitude": round(lat, 6), 290 "longitude": round(lon, 6), 291 "speed_kmh": round(speed_kmh, 2), 292 "acceleration_ms2": round(accel_ms2, 4), 293 "heading_deg": round(heading, 1), 294 } 295 ) 296 297 return rows
Generate synthetic 1Hz telematics trip data for a fleet of drivers.
Each driver is assigned a latent regime mixture (fraction of time spent in cautious, normal, and aggressive driving states). Within a trip, segments of continuous regime are simulated using an Ornstein-Uhlenbeck speed process. A synthetic claim count is generated per driver using a Poisson rate proportional to the aggressive fraction.
Parameters
seed: Random seed for reproducibility.
Examples
>>> sim = TripSimulator(seed=42)
>>> trips_df, claims_df = sim.simulate(n_drivers=20, trips_per_driver=30)
>>> trips_df.shape[0] # ~540,000 rows for 20 drivers × 30 trips × ~15 min avg
90 def simulate( 91 self, 92 n_drivers: int = 100, 93 trips_per_driver: int = 50, 94 *, 95 min_trip_duration_s: int = 300, 96 max_trip_duration_s: int = 3600, 97 ) -> tuple[pl.DataFrame, pl.DataFrame]: 98 """ 99 Simulate 1Hz trip observations for a fleet of drivers. 100 101 Parameters 102 ---------- 103 n_drivers: 104 Number of distinct drivers to simulate. 105 trips_per_driver: 106 Number of trips per driver. 107 min_trip_duration_s: 108 Minimum trip length in seconds (default 5 minutes). 109 max_trip_duration_s: 110 Maximum trip length in seconds (default 60 minutes). 111 112 Returns 113 ------- 114 trips_df: 115 Polars DataFrame with one row per 1Hz observation. Columns: 116 driver_id, trip_id, timestamp, latitude, longitude, 117 speed_kmh, acceleration_ms2, heading_deg. 118 claims_df: 119 Polars DataFrame with one row per driver. Columns: 120 driver_id, n_claims, exposure_years, aggressive_fraction, 121 annual_km. 122 """ 123 rng = np.random.default_rng(self.seed) 124 125 # Assign latent regime mixtures to drivers using a Dirichlet draw. 126 # Concentration steered towards cautious to mimic real portfolio skew. 127 concentration = np.array([3.0, 2.0, 0.5]) 128 driver_mixtures = rng.dirichlet(concentration, size=n_drivers) 129 130 all_trip_rows: list[dict] = [] 131 driver_summary: list[dict] = [] 132 133 base_lat, base_lon = 51.5, -0.1 # London area 134 135 # Trip IDs are globally unique across drivers 136 global_trip_id = 0 137 138 # Reference epoch for reproducible timestamps 139 epoch = datetime(2024, 1, 1, 7, 0, 0, tzinfo=timezone.utc) 140 141 for driver_idx in range(n_drivers): 142 driver_id = f"DRV{driver_idx:04d}" 143 mixture = driver_mixtures[driver_idx] 144 cautious_frac, normal_frac, aggressive_frac = mixture 145 146 driver_km = 0.0 147 driver_total_seconds = 0 148 149 for trip_num in range(trips_per_driver): 150 trip_id = f"TRP{global_trip_id:07d}" 151 global_trip_id += 1 152 153 # Trip duration: random between min and max 154 duration_s = int( 155 rng.integers(min_trip_duration_s, max_trip_duration_s) 156 ) 157 driver_total_seconds += duration_s 158 159 # Stagger trip timestamps to spread across a year 160 days_offset = (driver_idx * trips_per_driver + trip_num) * 3.5 / 365 161 hour_offset = rng.uniform(6.0, 22.0) 162 trip_start = epoch + timedelta( 163 days=float(days_offset), hours=float(hour_offset) 164 ) 165 166 rows = self._simulate_trip( 167 rng=rng, 168 driver_id=driver_id, 169 trip_id=trip_id, 170 mixture=mixture, 171 duration_s=duration_s, 172 trip_start=trip_start, 173 base_lat=base_lat, 174 base_lon=base_lon, 175 ) 176 all_trip_rows.extend(rows) 177 178 # Accumulate distance 179 for row in rows: 180 driver_km += row["speed_kmh"] / 3600.0 # km per second 181 182 # Exposure: actual driving time scaled to years. 183 # Assume each driver drives at this rate year-round, prorated by 184 # average UK annual driving time (~200 hours/year). 185 # We normalise to observation period: total trip seconds / seconds_per_year. 186 # This gives a realistic exposure for a Poisson frequency model. 187 exposure_years = driver_total_seconds / 365.25 / 86400 188 annual_rate = ( 189 cautious_frac * _REGIMES["cautious"].base_claim_rate 190 + normal_frac * _REGIMES["normal"].base_claim_rate 191 + aggressive_frac * _REGIMES["aggressive"].base_claim_rate 192 ) 193 lambda_claims = annual_rate * max(exposure_years, 0.01) 194 n_claims = int(rng.poisson(lambda_claims)) 195 196 driver_summary.append( 197 { 198 "driver_id": driver_id, 199 "n_claims": n_claims, 200 "exposure_years": round(exposure_years, 4), 201 "aggressive_fraction": round(float(aggressive_frac), 4), 202 "normal_fraction": round(float(normal_frac), 4), 203 "cautious_fraction": round(float(cautious_frac), 4), 204 "annual_km": round(driver_km, 1), 205 } 206 ) 207 208 trips_df = pl.DataFrame(all_trip_rows).with_columns( 209 pl.col("timestamp").cast(pl.Datetime("us", "UTC")) 210 ) 211 claims_df = pl.DataFrame(driver_summary) 212 213 return trips_df, claims_df
Simulate 1Hz trip observations for a fleet of drivers.
Parameters
n_drivers: Number of distinct drivers to simulate. trips_per_driver: Number of trips per driver. min_trip_duration_s: Minimum trip length in seconds (default 5 minutes). max_trip_duration_s: Maximum trip length in seconds (default 60 minutes).
Returns
trips_df: Polars DataFrame with one row per 1Hz observation. Columns: driver_id, trip_id, timestamp, latitude, longitude, speed_kmh, acceleration_ms2, heading_deg. claims_df: Polars DataFrame with one row per driver. Columns: driver_id, n_claims, exposure_years, aggressive_fraction, annual_km.