Edit on GitHub

insurance_telematics

insurance-telematics

End-to-end pipeline from raw telematics trip data to GLM-compatible risk scores for UK motor insurance pricing.

Pipeline stages:
  1. Load — trip_loader.load_trips()
  2. Clean — preprocessor.clean_trips()
  3. Score — feature_extractor.extract_trip_features()
  4. Model — hmm_model.DrivingStateHMM
  5. Aggregate — risk_aggregator.aggregate_to_driver()
  6. Price — scoring_pipeline.TelematicsScoringPipeline
Synthetic data for testing and prototyping:

trip_simulator.TripSimulator

Academic basis:

Jiang & Shi (2024), NAAJ 28(4), pp.822-839 Wüthrich (2017), European Actuarial Journal 7, pp.89-108

 1"""
 2insurance-telematics
 3====================
 4
 5End-to-end pipeline from raw telematics trip data to GLM-compatible risk scores
 6for UK motor insurance pricing.
 7
 8Pipeline stages:
 9    1. Load  — trip_loader.load_trips()
10    2. Clean — preprocessor.clean_trips()
11    3. Score — feature_extractor.extract_trip_features()
12    4. Model — hmm_model.DrivingStateHMM
13    5. Aggregate — risk_aggregator.aggregate_to_driver()
14    6. Price — scoring_pipeline.TelematicsScoringPipeline
15
16Synthetic data for testing and prototyping:
17    trip_simulator.TripSimulator
18
19Academic basis:
20    Jiang & Shi (2024), NAAJ 28(4), pp.822-839
21    Wüthrich (2017), European Actuarial Journal 7, pp.89-108
22"""
23
24from importlib.metadata import version, PackageNotFoundError
25
26try:
27    __version__ = version("insurance-telematics")
28except PackageNotFoundError:
29    __version__ = "0.1.0"
30
31from .trip_loader import load_trips
32from .preprocessor import clean_trips
33from .feature_extractor import extract_trip_features
34from .hmm_model import DrivingStateHMM, ContinuousTimeHMM
35from .risk_aggregator import aggregate_to_driver
36from .scoring_pipeline import TelematicsScoringPipeline, score_trips
37from .trip_simulator import TripSimulator
38
39__all__ = [
40    "load_trips",
41    "clean_trips",
42    "extract_trip_features",
43    "DrivingStateHMM",
44    "ContinuousTimeHMM",
45    "aggregate_to_driver",
46    "TelematicsScoringPipeline",
47    "score_trips",
48    "TripSimulator",
49]
def load_trips( path: str | os.PathLike, *, schema: dict[str, polars.datatypes.classes.DataType] | None = None) -> polars.dataframe.frame.DataFrame:
 51def load_trips(
 52    path: str | os.PathLike,
 53    *,
 54    schema: dict[str, pl.DataType] | None = None,
 55) -> pl.DataFrame:
 56    """
 57    Load telematics trip data from CSV or Parquet, validate schema, and
 58    return a normalised Polars DataFrame.
 59
 60    Parameters
 61    ----------
 62    path:
 63        Path to a CSV or Parquet file, or a directory containing Parquet
 64        files (all will be read and concatenated).
 65    schema:
 66        Optional column name overrides for non-standard source files.
 67        Maps source column names to the standard column names. For example:
 68        ``{"gps_speed": "speed_kmh", "accel_x": "acceleration_ms2"}``.
 69
 70    Returns
 71    -------
 72    pl.DataFrame
 73        Normalised DataFrame with standard column names. The ``timestamp``
 74        column is cast to ``Datetime[us, UTC]``. Missing optional columns
 75        are added with null values. Rows are sorted by ``trip_id`` then
 76        ``timestamp``.
 77
 78    Raises
 79    ------
 80    ValueError
 81        If any required column is missing after applying the schema mapping.
 82    FileNotFoundError
 83        If the path does not exist.
 84
 85    Examples
 86    --------
 87    >>> df = load_trips("trips.csv")
 88    >>> df = load_trips("trips.parquet", schema={"gps_speed": "speed_kmh"})
 89    """
 90    path = Path(path)
 91    if not path.exists():
 92        raise FileNotFoundError(f"Path not found: {path}")
 93
 94    if path.is_dir():
 95        parquet_files = list(path.glob("*.parquet"))
 96        if not parquet_files:
 97            raise ValueError(f"No Parquet files found in directory: {path}")
 98        df = pl.concat([pl.read_parquet(f) for f in sorted(parquet_files)])
 99    elif path.suffix.lower() in {".parquet", ".pq"}:
100        df = pl.read_parquet(path)
101    elif path.suffix.lower() in {".csv", ".tsv"}:
102        df = pl.read_csv(path, try_parse_dates=True)
103    else:
104        raise ValueError(
105            f"Unsupported file type '{path.suffix}'. Use CSV or Parquet."
106        )
107
108    # Apply column renames from schema mapping
109    if schema:
110        df = df.rename({src: dst for src, dst in schema.items() if src in df.columns})
111
112    # Validate required columns
113    missing = [c for c in REQUIRED_COLUMNS if c not in df.columns]
114    if missing:
115        raise ValueError(
116            f"Missing required columns: {missing}. "
117            f"Use the 'schema' parameter to map source column names."
118        )
119
120    # Ensure timestamp is datetime
121    if df["timestamp"].dtype == pl.Utf8 or df["timestamp"].dtype == pl.String:
122        df = df.with_columns(
123            pl.col("timestamp").str.to_datetime(
124                format=None, strict=False
125            )
126        )
127
128    if df["timestamp"].dtype not in (pl.Datetime("us", "UTC"), pl.Datetime("ns", "UTC")):
129        # Cast to microsecond UTC if not already timezone-aware
130        if hasattr(df["timestamp"].dtype, "time_zone") and df["timestamp"].dtype.time_zone:
131            df = df.with_columns(
132                pl.col("timestamp").dt.convert_time_zone("UTC").dt.cast_time_unit("us")
133            )
134        else:
135            df = df.with_columns(
136                pl.col("timestamp")
137                .dt.replace_time_zone("UTC")
138                .dt.cast_time_unit("us")
139            )
140
141    # Add missing optional columns as nulls
142    for col in OPTIONAL_COLUMNS:
143        if col not in df.columns:
144            if col == "driver_id":
145                df = df.with_columns(pl.lit("unknown").alias("driver_id"))
146            else:
147                df = df.with_columns(pl.lit(None).cast(pl.Float64).alias(col))
148
149    # Enforce numeric types for key columns
150    numeric_cols = ["latitude", "longitude", "speed_kmh", "acceleration_ms2", "heading_deg"]
151    cast_exprs = [
152        pl.col(c).cast(pl.Float64) for c in numeric_cols if c in df.columns
153    ]
154    if cast_exprs:
155        df = df.with_columns(cast_exprs)
156
157    # Sort for deterministic processing
158    df = df.sort(["trip_id", "timestamp"])
159
160    return df

Load telematics trip data from CSV or Parquet, validate schema, and return a normalised Polars DataFrame.

Parameters

path: Path to a CSV or Parquet file, or a directory containing Parquet files (all will be read and concatenated). schema: Optional column name overrides for non-standard source files. Maps source column names to the standard column names. For example: {"gps_speed": "speed_kmh", "accel_x": "acceleration_ms2"}.

Returns

pl.DataFrame Normalised DataFrame with standard column names. The timestamp column is cast to Datetime[us, UTC]. Missing optional columns are added with null values. Rows are sorted by trip_id then timestamp.

Raises

ValueError If any required column is missing after applying the schema mapping. FileNotFoundError If the path does not exist.

Examples

>>> df = load_trips("trips.csv")
>>> df = load_trips("trips.parquet", schema={"gps_speed": "speed_kmh"})
def clean_trips(df: polars.dataframe.frame.DataFrame) -> polars.dataframe.frame.DataFrame:
32def clean_trips(df: pl.DataFrame) -> pl.DataFrame:
33    """
34    Clean and enrich a raw telematics DataFrame.
35
36    Applies the following steps per trip, in order:
37
38    1. Remove GPS jump rows (speed > 250 km/h).
39    2. Interpolate speed for small gaps (< 5 s within a trip).
40    3. Derive acceleration from speed differences if the
41       ``acceleration_ms2`` column is null or absent.
42    4. Derive jerk (m/s³) as the first difference of acceleration.
43    5. Classify each observation into a road type
44       (``urban`` / ``rural`` / ``motorway``) from speed.
45
46    Parameters
47    ----------
48    df:
49        Raw trip DataFrame as returned by :func:`~insurance_telematics.load_trips`.
50        Must contain ``trip_id``, ``timestamp``, and ``speed_kmh``.
51
52    Returns
53    -------
54    pl.DataFrame
55        Cleaned DataFrame with additional columns:
56        ``acceleration_ms2`` (if derived), ``jerk_ms3``, ``road_type``.
57
58    Notes
59    -----
60    The road type heuristic (urban < 50 km/h, rural 50-100, motorway > 100)
61    is a proxy. It works well in aggregate but misclassifies individual
62    observations — a momentary slow-down on a motorway reads as urban.
63    Trip-level road type statistics (fraction of time in each band) are
64    more robust than per-row labels.
65    """
66    df = _remove_gps_jumps(df)
67    df = _interpolate_speed_gaps(df)
68    df = _derive_acceleration(df)
69    df = _derive_jerk(df)
70    df = _classify_road_type(df)
71    return df

Clean and enrich a raw telematics DataFrame.

Applies the following steps per trip, in order:

  1. Remove GPS jump rows (speed > 250 km/h).
  2. Interpolate speed for small gaps (< 5 s within a trip).
  3. Derive acceleration from speed differences if the acceleration_ms2 column is null or absent.
  4. Derive jerk (m/s³) as the first difference of acceleration.
  5. Classify each observation into a road type (urban / rural / motorway) from speed.

Parameters

df: Raw trip DataFrame as returned by ~insurance_telematics.load_trips(). Must contain trip_id, timestamp, and speed_kmh.

Returns

pl.DataFrame Cleaned DataFrame with additional columns: acceleration_ms2 (if derived), jerk_ms3, road_type.

Notes

The road type heuristic (urban < 50 km/h, rural 50-100, motorway > 100) is a proxy. It works well in aggregate but misclassifies individual observations — a momentary slow-down on a motorway reads as urban. Trip-level road type statistics (fraction of time in each band) are more robust than per-row labels.

def extract_trip_features(df: polars.dataframe.frame.DataFrame) -> polars.dataframe.frame.DataFrame:
 48def extract_trip_features(df: pl.DataFrame) -> pl.DataFrame:
 49    """
 50    Compute one summary row per trip from cleaned 1Hz observations.
 51
 52    Parameters
 53    ----------
 54    df:
 55        Cleaned trip DataFrame as returned by
 56        :func:`~insurance_telematics.clean_trips`. Must contain:
 57        ``trip_id``, ``timestamp``, ``speed_kmh``, ``acceleration_ms2``,
 58        ``road_type``.
 59
 60    Returns
 61    -------
 62    pl.DataFrame
 63        One row per ``trip_id`` with columns:
 64
 65        ``trip_id``, ``distance_km``, ``duration_min``, ``mean_speed_kmh``,
 66        ``p95_speed_kmh``, ``speed_variation_coeff``, ``harsh_braking_rate``,
 67        ``harsh_accel_rate``, ``harsh_cornering_rate``, ``speeding_fraction``,
 68        ``night_driving_fraction``, ``urban_fraction``, ``driver_id``
 69        (if present in input).
 70
 71    Notes
 72    -----
 73    Distance is estimated as the integral of speed over time at 1Hz:
 74    sum(speed_kmh) / 3600. This is a reasonable proxy when GPS coordinates
 75    are noisy or unavailable for Haversine distance computation.
 76    """
 77    _check_required(df)
 78
 79    has_driver_id = "driver_id" in df.columns
 80    has_heading = "heading_deg" in df.columns and df["heading_deg"].null_count() < len(df)
 81
 82    # Compute per-row flags in a single with_columns call for efficiency
 83    flags = [
 84        # Harsh braking flag
 85        (pl.col("acceleration_ms2") < _HARSH_BRAKE_THRESHOLD)
 86        .cast(pl.Int32)
 87        .alias("_harsh_brake"),
 88        # Harsh acceleration flag
 89        (pl.col("acceleration_ms2") > _HARSH_ACCEL_THRESHOLD)
 90        .cast(pl.Int32)
 91        .alias("_harsh_accel"),
 92        # Night flag
 93        pl.col("timestamp")
 94        .dt.hour()
 95        .is_in(list(_NIGHT_HOURS))
 96        .cast(pl.Int32)
 97        .alias("_night"),
 98        # Urban flag
 99        (pl.col("road_type") == "urban")
100        .cast(pl.Int32)
101        .alias("_urban"),
102        # Speeding flag — road-type-specific threshold
103        pl.when(pl.col("road_type") == "urban")
104        .then(pl.col("speed_kmh") > _SPEED_LIMIT_URBAN)
105        .when(pl.col("road_type") == "rural")
106        .then(pl.col("speed_kmh") > _SPEED_LIMIT_RURAL)
107        .otherwise(pl.col("speed_kmh") > _SPEED_LIMIT_MOTORWAY)
108        .cast(pl.Int32)
109        .alias("_speeding"),
110        # Distance increment: speed_kmh / 3600 = km per second at 1Hz
111        (pl.col("speed_kmh") / 3600.0).alias("_dist_km_increment"),
112    ]
113
114    if has_heading:
115        # Cornering proxy: heading change rate (deg/s) × speed (km/h)
116        # Proportional to lateral acceleration; calibrated empirically.
117        flags.append(
118            (
119                pl.col("heading_deg")
120                .diff()
121                .over("trip_id")
122                .abs()
123                .fill_null(0.0)
124                * pl.col("speed_kmh")
125                * _HARSH_CORNER_DEG_PER_S_KMPH
126                > 1.0
127            )
128            .cast(pl.Int32)
129            .alias("_harsh_corner"),
130        )
131    else:
132        flags.append(pl.lit(0).cast(pl.Int32).alias("_harsh_corner"))
133
134    df = df.with_columns(flags)
135
136    # Aggregate per trip
137    group_cols = ["trip_id"]
138    if has_driver_id:
139        # Carry driver_id through — take first value (it's constant per trip)
140        agg_extra = [pl.col("driver_id").first().alias("driver_id")]
141    else:
142        agg_extra = []
143
144    agg = df.group_by("trip_id").agg(
145        [
146            # Basic trip stats
147            pl.len().alias("n_obs"),
148            (pl.col("_dist_km_increment").sum()).alias("distance_km"),
149            (pl.len() / 60.0).alias("duration_min"),
150            pl.col("speed_kmh").mean().alias("mean_speed_kmh"),
151            pl.col("speed_kmh").quantile(0.95).alias("p95_speed_kmh"),
152            pl.col("speed_kmh").std().alias("_speed_std"),
153            # Event counts (will divide by distance below)
154            pl.col("_harsh_brake").sum().alias("_n_harsh_brake"),
155            pl.col("_harsh_accel").sum().alias("_n_harsh_accel"),
156            pl.col("_harsh_corner").sum().alias("_n_harsh_corner"),
157            # Fraction-based features
158            (pl.col("_speeding").mean()).alias("speeding_fraction"),
159            (pl.col("_night").mean()).alias("night_driving_fraction"),
160            (pl.col("_urban").mean()).alias("urban_fraction"),
161        ]
162        + agg_extra
163    )
164
165    # Derive final rate features
166    agg = agg.with_columns(
167        [
168            # Speed coefficient of variation — std / mean, clamped to avoid div/0
169            (
170                pl.col("_speed_std")
171                / (pl.col("mean_speed_kmh") + 1e-6)
172            ).alias("speed_variation_coeff"),
173            # Event rates per km — clamp distance to 0.01 km minimum
174            (
175                pl.col("_n_harsh_brake")
176                / (pl.col("distance_km").clip(0.01))
177            ).alias("harsh_braking_rate"),
178            (
179                pl.col("_n_harsh_accel")
180                / (pl.col("distance_km").clip(0.01))
181            ).alias("harsh_accel_rate"),
182            (
183                pl.col("_n_harsh_corner")
184                / (pl.col("distance_km").clip(0.01))
185            ).alias("harsh_cornering_rate"),
186        ]
187    )
188
189    # Select and order final output columns
190    output_cols = [
191        "trip_id",
192        "distance_km",
193        "duration_min",
194        "mean_speed_kmh",
195        "p95_speed_kmh",
196        "speed_variation_coeff",
197        "harsh_braking_rate",
198        "harsh_accel_rate",
199        "harsh_cornering_rate",
200        "speeding_fraction",
201        "night_driving_fraction",
202        "urban_fraction",
203    ]
204    if has_driver_id:
205        output_cols.append("driver_id")
206
207    return agg.select(output_cols).sort("trip_id")

Compute one summary row per trip from cleaned 1Hz observations.

Parameters

df: Cleaned trip DataFrame as returned by ~insurance_telematics.clean_trips(). Must contain: trip_id, timestamp, speed_kmh, acceleration_ms2, road_type.

Returns

pl.DataFrame One row per trip_id with columns:

``trip_id``, ``distance_km``, ``duration_min``, ``mean_speed_kmh``,
``p95_speed_kmh``, ``speed_variation_coeff``, ``harsh_braking_rate``,
``harsh_accel_rate``, ``harsh_cornering_rate``, ``speeding_fraction``,
``night_driving_fraction``, ``urban_fraction``, ``driver_id``
(if present in input).

Notes

Distance is estimated as the integral of speed over time at 1Hz: sum(speed_kmh) / 3600. This is a reasonable proxy when GPS coordinates are noisy or unavailable for Haversine distance computation.

class DrivingStateHMM:
 57class DrivingStateHMM:
 58    """
 59    Discrete-time Gaussian HMM for classifying driving trip sequences into
 60    latent driving states (e.g. cautious / normal / aggressive).
 61
 62    Wraps ``hmmlearn.GaussianHMM`` with state-ordering normalisation and
 63    a driver-level feature extraction step.
 64
 65    Parameters
 66    ----------
 67    n_states:
 68        Number of latent states. Default 3. With three states the literature
 69        consistently produces a cautious / normal / aggressive partition.
 70    features:
 71        Trip-level feature columns to use as HMM observations. Defaults to
 72        mean speed, speed variation, harsh braking rate, harsh acceleration.
 73    n_iter:
 74        Maximum EM iterations for hmmlearn (default 200).
 75    covariance_type:
 76        Covariance type passed to GaussianHMM (default "diag").
 77    random_state:
 78        Random seed for reproducibility.
 79
 80    Examples
 81    --------
 82    >>> model = DrivingStateHMM(n_states=3)
 83    >>> model.fit(trip_features_df)
 84    >>> states = model.predict_states(trip_features_df)
 85    >>> driver_features = model.driver_state_features(trip_features_df, states)
 86    """
 87
 88    def __init__(
 89        self,
 90        n_states: int = 3,
 91        features: list[str] | None = None,
 92        n_iter: int = 200,
 93        covariance_type: str = "diag",
 94        random_state: int = 42,
 95    ) -> None:
 96        if not _HMMLEARN_AVAILABLE:
 97            raise ImportError(
 98                "hmmlearn is required for DrivingStateHMM. "
 99                "Install it with: pip install hmmlearn"
100            )
101
102        self.n_states = n_states
103        self.features = features or _DEFAULT_HMM_FEATURES
104        self.n_iter = n_iter
105        self.covariance_type = covariance_type
106        self.random_state = random_state
107
108        self._model: Optional[hmmlearn_hmm.GaussianHMM] = None
109        self._state_order: Optional[np.ndarray] = None  # indices sorted by mean speed
110        self.is_fitted: bool = False
111
112    def fit(self, trip_features: pl.DataFrame) -> "DrivingStateHMM":
113        """
114        Fit the HMM to a DataFrame of trip-level scalar features.
115
116        Each row is treated as one observation. For a per-driver sequence,
117        pass only that driver's trips (ordered by timestamp). For a
118        portfolio-level fit, pass all trips — hmmlearn handles the full
119        sequence as one long chain, which is appropriate for population-level
120        state estimation.
121
122        Parameters
123        ----------
124        trip_features:
125            DataFrame from :func:`~insurance_telematics.extract_trip_features`.
126            Must contain the columns listed in ``self.features``.
127
128        Returns
129        -------
130        self
131        """
132        X = self._to_matrix(trip_features)
133        X = self._standardise(X, fit=True)
134
135        model = hmmlearn_hmm.GaussianHMM(
136            n_components=self.n_states,
137            covariance_type=self.covariance_type,
138            n_iter=self.n_iter,
139            random_state=self.random_state,
140            verbose=False,
141        )
142        with warnings.catch_warnings():
143            warnings.simplefilter("ignore")
144            model.fit(X)
145
146        self._model = model
147
148        # Determine state ordering by the mean speed dimension (feature index 0)
149        # so that state 0 is always the lowest-speed (most cautious) state
150        speed_means = model.means_[:, 0]
151        self._state_order = np.argsort(speed_means)
152        self._state_rank = np.empty_like(self._state_order)
153        self._state_rank[self._state_order] = np.arange(self.n_states)
154
155        self.is_fitted = True
156        return self
157
158    def predict_states(self, trip_features: pl.DataFrame) -> np.ndarray:
159        """
160        Decode the most likely state sequence for a set of trips.
161
162        Parameters
163        ----------
164        trip_features:
165            Same format as used for ``fit()``.
166
167        Returns
168        -------
169        np.ndarray of shape (n_trips,)
170            Integer state labels 0..n_states-1, where 0 = most cautious
171            and n_states-1 = most aggressive.
172        """
173        self._check_fitted()
174        X = self._to_matrix(trip_features)
175        X = self._standardise(X, fit=False)
176        raw_states = self._model.predict(X)
177        # Remap to ordered states
178        return self._state_rank[raw_states]
179
180    def predict_state_probs(self, trip_features: pl.DataFrame) -> np.ndarray:
181        """
182        Compute posterior state probabilities for each trip observation.
183
184        Parameters
185        ----------
186        trip_features:
187            Same format as used for ``fit()``.
188
189        Returns
190        -------
191        np.ndarray of shape (n_trips, n_states)
192            Posterior probabilities, summing to 1.0 across states per row.
193            Column ordering matches the ordered states (0 = most cautious).
194        """
195        self._check_fitted()
196        X = self._to_matrix(trip_features)
197        X = self._standardise(X, fit=False)
198        _, posteriors = self._model.score_samples(X)
199        # Reorder columns to match state ordering
200        return posteriors[:, self._state_order]
201
202    def driver_state_features(
203        self,
204        trip_features: pl.DataFrame,
205        states: np.ndarray,
206    ) -> pl.DataFrame:
207        """
208        Compute per-driver HMM-derived features for use as GLM covariates.
209
210        Parameters
211        ----------
212        trip_features:
213            Trip-level features DataFrame. Must contain ``driver_id``.
214        states:
215            State sequence as returned by :meth:`predict_states`.
216
217        Returns
218        -------
219        pl.DataFrame
220            One row per driver with columns:
221
222            - ``state_{k}_fraction`` for k in 0..n_states-1
223            - ``mean_transition_rate`` — average state transitions per km
224            - ``state_entropy`` — Shannon entropy of state distribution
225            - ``driver_id``
226
227        Notes
228        -----
229        State fractions are the primary actuarial risk features. Following
230        Jiang & Shi (2024), the fraction of time spent in the highest-risk
231        state (state ``n_states - 1``) is the most predictive of claim
232        frequency.
233        """
234        if "driver_id" not in trip_features.columns:
235            raise ValueError(
236                "trip_features must contain 'driver_id' for driver-level aggregation."
237            )
238
239        df = trip_features.with_columns(pl.Series("_state", states.tolist()))
240
241        # Count state occurrences per driver
242        driver_rows = []
243        driver_ids = df["driver_id"].unique().sort().to_list()
244
245        for driver_id in driver_ids:
246            mask = df["driver_id"] == driver_id
247            driver_df = df.filter(mask)
248            driver_states = driver_df["_state"].to_numpy()
249            n_trips = len(driver_states)
250
251            # State fractions
252            state_counts = np.bincount(driver_states, minlength=self.n_states)
253            state_fractions = state_counts / max(n_trips, 1)
254
255            # Transition rate: number of state changes per km
256            n_transitions = np.sum(np.diff(driver_states) != 0)
257            total_km = driver_df["distance_km"].sum() if "distance_km" in driver_df.columns else n_trips
258            transition_rate = n_transitions / max(total_km, 0.01)
259
260            # State entropy (Shannon)
261            state_entropy = float(scipy_entropy(state_fractions))
262
263            row = {"driver_id": driver_id}
264            for k in range(self.n_states):
265                row[f"state_{k}_fraction"] = float(state_fractions[k])
266            row["mean_transition_rate"] = float(transition_rate)
267            row["state_entropy"] = float(state_entropy)
268
269            driver_rows.append(row)
270
271        return pl.DataFrame(driver_rows)
272
273    def _to_matrix(self, trip_features: pl.DataFrame) -> np.ndarray:
274        """Extract feature columns as float64 numpy array."""
275        missing = [c for c in self.features if c not in trip_features.columns]
276        if missing:
277            raise ValueError(f"Feature columns missing from DataFrame: {missing}")
278        return trip_features.select(self.features).to_numpy().astype(np.float64)
279
280    def _standardise(self, X: np.ndarray, *, fit: bool) -> np.ndarray:
281        """Z-score standardise features. Fit mean/std on first call."""
282        if fit:
283            self._mean = np.nanmean(X, axis=0)
284            self._std = np.nanstd(X, axis=0) + 1e-8
285        return (X - self._mean) / self._std
286
287    def _check_fitted(self) -> None:
288        if not self.is_fitted:
289            raise RuntimeError("Call fit() before predict_states().")

Discrete-time Gaussian HMM for classifying driving trip sequences into latent driving states (e.g. cautious / normal / aggressive).

Wraps hmmlearn.GaussianHMM with state-ordering normalisation and a driver-level feature extraction step.

Parameters

n_states: Number of latent states. Default 3. With three states the literature consistently produces a cautious / normal / aggressive partition. features: Trip-level feature columns to use as HMM observations. Defaults to mean speed, speed variation, harsh braking rate, harsh acceleration. n_iter: Maximum EM iterations for hmmlearn (default 200). covariance_type: Covariance type passed to GaussianHMM (default "diag"). random_state: Random seed for reproducibility.

Examples

>>> model = DrivingStateHMM(n_states=3)
>>> model.fit(trip_features_df)
>>> states = model.predict_states(trip_features_df)
>>> driver_features = model.driver_state_features(trip_features_df, states)
DrivingStateHMM( n_states: int = 3, features: list[str] | None = None, n_iter: int = 200, covariance_type: str = 'diag', random_state: int = 42)
 88    def __init__(
 89        self,
 90        n_states: int = 3,
 91        features: list[str] | None = None,
 92        n_iter: int = 200,
 93        covariance_type: str = "diag",
 94        random_state: int = 42,
 95    ) -> None:
 96        if not _HMMLEARN_AVAILABLE:
 97            raise ImportError(
 98                "hmmlearn is required for DrivingStateHMM. "
 99                "Install it with: pip install hmmlearn"
100            )
101
102        self.n_states = n_states
103        self.features = features or _DEFAULT_HMM_FEATURES
104        self.n_iter = n_iter
105        self.covariance_type = covariance_type
106        self.random_state = random_state
107
108        self._model: Optional[hmmlearn_hmm.GaussianHMM] = None
109        self._state_order: Optional[np.ndarray] = None  # indices sorted by mean speed
110        self.is_fitted: bool = False
n_states
features
n_iter
covariance_type
random_state
is_fitted: bool
def fit( self, trip_features: polars.dataframe.frame.DataFrame) -> DrivingStateHMM:
112    def fit(self, trip_features: pl.DataFrame) -> "DrivingStateHMM":
113        """
114        Fit the HMM to a DataFrame of trip-level scalar features.
115
116        Each row is treated as one observation. For a per-driver sequence,
117        pass only that driver's trips (ordered by timestamp). For a
118        portfolio-level fit, pass all trips — hmmlearn handles the full
119        sequence as one long chain, which is appropriate for population-level
120        state estimation.
121
122        Parameters
123        ----------
124        trip_features:
125            DataFrame from :func:`~insurance_telematics.extract_trip_features`.
126            Must contain the columns listed in ``self.features``.
127
128        Returns
129        -------
130        self
131        """
132        X = self._to_matrix(trip_features)
133        X = self._standardise(X, fit=True)
134
135        model = hmmlearn_hmm.GaussianHMM(
136            n_components=self.n_states,
137            covariance_type=self.covariance_type,
138            n_iter=self.n_iter,
139            random_state=self.random_state,
140            verbose=False,
141        )
142        with warnings.catch_warnings():
143            warnings.simplefilter("ignore")
144            model.fit(X)
145
146        self._model = model
147
148        # Determine state ordering by the mean speed dimension (feature index 0)
149        # so that state 0 is always the lowest-speed (most cautious) state
150        speed_means = model.means_[:, 0]
151        self._state_order = np.argsort(speed_means)
152        self._state_rank = np.empty_like(self._state_order)
153        self._state_rank[self._state_order] = np.arange(self.n_states)
154
155        self.is_fitted = True
156        return self

Fit the HMM to a DataFrame of trip-level scalar features.

Each row is treated as one observation. For a per-driver sequence, pass only that driver's trips (ordered by timestamp). For a portfolio-level fit, pass all trips — hmmlearn handles the full sequence as one long chain, which is appropriate for population-level state estimation.

Parameters

trip_features: DataFrame from ~insurance_telematics.extract_trip_features(). Must contain the columns listed in self.features.

Returns

self

def predict_states(self, trip_features: polars.dataframe.frame.DataFrame) -> numpy.ndarray:
158    def predict_states(self, trip_features: pl.DataFrame) -> np.ndarray:
159        """
160        Decode the most likely state sequence for a set of trips.
161
162        Parameters
163        ----------
164        trip_features:
165            Same format as used for ``fit()``.
166
167        Returns
168        -------
169        np.ndarray of shape (n_trips,)
170            Integer state labels 0..n_states-1, where 0 = most cautious
171            and n_states-1 = most aggressive.
172        """
173        self._check_fitted()
174        X = self._to_matrix(trip_features)
175        X = self._standardise(X, fit=False)
176        raw_states = self._model.predict(X)
177        # Remap to ordered states
178        return self._state_rank[raw_states]

Decode the most likely state sequence for a set of trips.

Parameters

trip_features: Same format as used for fit().

Returns

np.ndarray of shape (n_trips,) Integer state labels 0..n_states-1, where 0 = most cautious and n_states-1 = most aggressive.

def predict_state_probs(self, trip_features: polars.dataframe.frame.DataFrame) -> numpy.ndarray:
180    def predict_state_probs(self, trip_features: pl.DataFrame) -> np.ndarray:
181        """
182        Compute posterior state probabilities for each trip observation.
183
184        Parameters
185        ----------
186        trip_features:
187            Same format as used for ``fit()``.
188
189        Returns
190        -------
191        np.ndarray of shape (n_trips, n_states)
192            Posterior probabilities, summing to 1.0 across states per row.
193            Column ordering matches the ordered states (0 = most cautious).
194        """
195        self._check_fitted()
196        X = self._to_matrix(trip_features)
197        X = self._standardise(X, fit=False)
198        _, posteriors = self._model.score_samples(X)
199        # Reorder columns to match state ordering
200        return posteriors[:, self._state_order]

Compute posterior state probabilities for each trip observation.

Parameters

trip_features: Same format as used for fit().

Returns

np.ndarray of shape (n_trips, n_states) Posterior probabilities, summing to 1.0 across states per row. Column ordering matches the ordered states (0 = most cautious).

def driver_state_features( self, trip_features: polars.dataframe.frame.DataFrame, states: numpy.ndarray) -> polars.dataframe.frame.DataFrame:
202    def driver_state_features(
203        self,
204        trip_features: pl.DataFrame,
205        states: np.ndarray,
206    ) -> pl.DataFrame:
207        """
208        Compute per-driver HMM-derived features for use as GLM covariates.
209
210        Parameters
211        ----------
212        trip_features:
213            Trip-level features DataFrame. Must contain ``driver_id``.
214        states:
215            State sequence as returned by :meth:`predict_states`.
216
217        Returns
218        -------
219        pl.DataFrame
220            One row per driver with columns:
221
222            - ``state_{k}_fraction`` for k in 0..n_states-1
223            - ``mean_transition_rate`` — average state transitions per km
224            - ``state_entropy`` — Shannon entropy of state distribution
225            - ``driver_id``
226
227        Notes
228        -----
229        State fractions are the primary actuarial risk features. Following
230        Jiang & Shi (2024), the fraction of time spent in the highest-risk
231        state (state ``n_states - 1``) is the most predictive of claim
232        frequency.
233        """
234        if "driver_id" not in trip_features.columns:
235            raise ValueError(
236                "trip_features must contain 'driver_id' for driver-level aggregation."
237            )
238
239        df = trip_features.with_columns(pl.Series("_state", states.tolist()))
240
241        # Count state occurrences per driver
242        driver_rows = []
243        driver_ids = df["driver_id"].unique().sort().to_list()
244
245        for driver_id in driver_ids:
246            mask = df["driver_id"] == driver_id
247            driver_df = df.filter(mask)
248            driver_states = driver_df["_state"].to_numpy()
249            n_trips = len(driver_states)
250
251            # State fractions
252            state_counts = np.bincount(driver_states, minlength=self.n_states)
253            state_fractions = state_counts / max(n_trips, 1)
254
255            # Transition rate: number of state changes per km
256            n_transitions = np.sum(np.diff(driver_states) != 0)
257            total_km = driver_df["distance_km"].sum() if "distance_km" in driver_df.columns else n_trips
258            transition_rate = n_transitions / max(total_km, 0.01)
259
260            # State entropy (Shannon)
261            state_entropy = float(scipy_entropy(state_fractions))
262
263            row = {"driver_id": driver_id}
264            for k in range(self.n_states):
265                row[f"state_{k}_fraction"] = float(state_fractions[k])
266            row["mean_transition_rate"] = float(transition_rate)
267            row["state_entropy"] = float(state_entropy)
268
269            driver_rows.append(row)
270
271        return pl.DataFrame(driver_rows)

Compute per-driver HMM-derived features for use as GLM covariates.

Parameters

trip_features: Trip-level features DataFrame. Must contain driver_id. states: State sequence as returned by predict_states().

Returns

pl.DataFrame One row per driver with columns:

- ``state_{k}_fraction`` for k in 0..n_states-1
- ``mean_transition_rate`` — average state transitions per km
- ``state_entropy`` — Shannon entropy of state distribution
- ``driver_id``

Notes

State fractions are the primary actuarial risk features. Following Jiang & Shi (2024), the fraction of time spent in the highest-risk state (state n_states - 1) is the most predictive of claim frequency.

class ContinuousTimeHMM:
292class ContinuousTimeHMM:
293    """
294    Continuous-time HMM for irregularly-sampled telematics observations.
295
296    Uses a generator matrix Q where the transition probability over an
297    interval Δt is P(Δt) = expm(Q × Δt). This handles variable trip
298    lengths and inter-observation time gaps without resampling.
299
300    Implementation uses an EM algorithm:
301    - E-step: forward-backward with time-varying transition matrices P(Δt_i)
302    - M-step: update Q via expected transition counts and holding times
303
304    Parameters
305    ----------
306    n_states:
307        Number of latent states (default 3).
308    features:
309        Feature columns to use as Gaussian emissions per state.
310    n_iter:
311        Number of EM iterations (default 100).
312    tol:
313        Convergence tolerance on log-likelihood (default 1e-4).
314    random_state:
315        Seed for parameter initialisation.
316
317    Notes
318    -----
319    The M-step update for Q follows the standard uniformisation approach:
320    Q_ij (i≠j) ∝ E[N_ij] / E[T_i], where N_ij is the expected number of
321    i→j transitions and T_i is the expected total holding time in state i.
322
323    Numerical stability: scipy.linalg.expm is used for matrix exponential.
324    For typical generator matrices from driving data (rates on the order of
325    0.01-1.0 transitions/minute), expm is stable and accurate.
326
327    Examples
328    --------
329    >>> model = ContinuousTimeHMM(n_states=3)
330    >>> model.fit(trip_features_df, time_deltas=delta_array)
331    >>> states = model.predict_states(trip_features_df, time_deltas=delta_array)
332    """
333
334    def __init__(
335        self,
336        n_states: int = 3,
337        features: list[str] | None = None,
338        n_iter: int = 100,
339        tol: float = 1e-4,
340        random_state: int = 42,
341    ) -> None:
342        if not _SCIPY_AVAILABLE:
343            raise ImportError(
344                "scipy is required for ContinuousTimeHMM. "
345                "Install it with: pip install scipy"
346            )
347
348        self.n_states = n_states
349        self.features = features or _DEFAULT_HMM_FEATURES
350        self.n_iter = n_iter
351        self.tol = tol
352        self.random_state = random_state
353
354        self.Q_: Optional[np.ndarray] = None  # Generator matrix (n_states × n_states)
355        self.means_: Optional[np.ndarray] = None    # Emission means (n_states × n_features)
356        self.covars_: Optional[np.ndarray] = None   # Emission variances (diagonal)
357        self.pi_: Optional[np.ndarray] = None       # Initial state distribution
358        self.is_fitted: bool = False
359        self._state_order: Optional[np.ndarray] = None
360
361    def fit(
362        self,
363        trip_features: pl.DataFrame,
364        time_deltas: np.ndarray | None = None,
365    ) -> "ContinuousTimeHMM":
366        """
367        Fit the CTHMM using the EM algorithm.
368
369        Parameters
370        ----------
371        trip_features:
372            Trip-level features DataFrame (one row per trip, not per second).
373        time_deltas:
374            Array of shape (n_trips,) giving the time interval in minutes
375            between consecutive trips for the same driver. Use a large value
376            (e.g. 1440.0 = 24 hours) between trips from different sessions.
377            If None, all intervals are set to 1.0 (equivalent to discrete HMM
378            with unit time steps).
379
380        Returns
381        -------
382        self
383        """
384        X = self._to_matrix(trip_features)
385        X = self._standardise(X, fit=True)
386        n_obs = X.shape[0]
387
388        if time_deltas is None:
389            time_deltas = np.ones(n_obs, dtype=np.float64)
390        else:
391            time_deltas = np.asarray(time_deltas, dtype=np.float64)
392            if len(time_deltas) != n_obs:
393                raise ValueError(
394                    f"time_deltas length {len(time_deltas)} != n_obs {n_obs}"
395                )
396        # Clamp to small positive to avoid expm(Q * 0)
397        time_deltas = np.clip(time_deltas, 1e-6, None)
398
399        rng = np.random.default_rng(self.random_state)
400
401        # Initialise parameters
402        self.pi_ = np.ones(self.n_states) / self.n_states
403        self.Q_ = self._init_generator(rng)
404        self.means_, self.covars_ = self._init_emissions(X, rng)
405
406        log_likelihood = -np.inf
407        for iteration in range(self.n_iter):
408            # E-step
409            gammas, xis, log_lik_new = self._e_step(X, time_deltas)
410
411            # M-step
412            self._m_step(X, gammas, xis, time_deltas)
413
414            # Convergence check
415            if abs(log_lik_new - log_likelihood) < self.tol:
416                break
417            log_likelihood = log_lik_new
418
419        # Order states by mean of the first feature (speed proxy)
420        self._state_order = np.argsort(self.means_[:, 0])
421        self._state_rank = np.empty_like(self._state_order)
422        self._state_rank[self._state_order] = np.arange(self.n_states)
423
424        self.is_fitted = True
425        return self
426
427    def predict_states(
428        self,
429        trip_features: pl.DataFrame,
430        time_deltas: np.ndarray | None = None,
431    ) -> np.ndarray:
432        """
433        Viterbi decoding of the most likely state sequence.
434
435        Parameters
436        ----------
437        trip_features:
438            Same format as used in ``fit()``.
439        time_deltas:
440            Inter-observation time intervals, same semantics as in ``fit()``.
441
442        Returns
443        -------
444        np.ndarray of shape (n_trips,)
445            State labels 0..n_states-1 with state 0 = most cautious.
446        """
447        self._check_fitted()
448        X = self._to_matrix(trip_features)
449        X = self._standardise(X, fit=False)
450        n_obs = X.shape[0]
451
452        if time_deltas is None:
453            time_deltas = np.ones(n_obs)
454        time_deltas = np.clip(np.asarray(time_deltas, dtype=np.float64), 1e-6, None)
455
456        raw_states = self._viterbi(X, time_deltas)
457        return self._state_rank[raw_states]
458
459    def driver_state_features(
460        self,
461        trip_features: pl.DataFrame,
462        states: np.ndarray,
463    ) -> pl.DataFrame:
464        """
465        Compute driver-level HMM features. Same interface as
466        :meth:`DrivingStateHMM.driver_state_features`.
467        """
468        # Delegate to the same logic — identical to discrete case
469        if "driver_id" not in trip_features.columns:
470            raise ValueError("trip_features must contain 'driver_id'.")
471
472        df = trip_features.with_columns(pl.Series("_state", states.tolist()))
473        driver_rows = []
474        driver_ids = df["driver_id"].unique().sort().to_list()
475
476        for driver_id in driver_ids:
477            driver_df = df.filter(pl.col("driver_id") == driver_id)
478            driver_states = driver_df["_state"].to_numpy()
479            n_trips = len(driver_states)
480
481            state_counts = np.bincount(driver_states, minlength=self.n_states)
482            state_fractions = state_counts / max(n_trips, 1)
483
484            n_transitions = np.sum(np.diff(driver_states) != 0)
485            total_km = (
486                driver_df["distance_km"].sum()
487                if "distance_km" in driver_df.columns
488                else float(n_trips)
489            )
490            transition_rate = n_transitions / max(total_km, 0.01)
491            state_entropy = float(scipy_entropy(state_fractions))
492
493            row = {"driver_id": driver_id}
494            for k in range(self.n_states):
495                row[f"state_{k}_fraction"] = float(state_fractions[k])
496            row["mean_transition_rate"] = float(transition_rate)
497            row["state_entropy"] = float(state_entropy)
498            driver_rows.append(row)
499
500        return pl.DataFrame(driver_rows)
501
502    # ------------------------------------------------------------------
503    # Private EM implementation
504    # ------------------------------------------------------------------
505
506    def _init_generator(self, rng: np.random.Generator) -> np.ndarray:
507        """Initialise a valid generator matrix Q with small random rates."""
508        n = self.n_states
509        Q = rng.uniform(0.01, 0.1, size=(n, n))
510        np.fill_diagonal(Q, 0.0)
511        np.fill_diagonal(Q, -Q.sum(axis=1))
512        return Q
513
514    def _init_emissions(
515        self, X: np.ndarray, rng: np.random.Generator
516    ) -> tuple[np.ndarray, np.ndarray]:
517        """Initialise Gaussian emission parameters by k-means-like split."""
518        n, d = X.shape
519        # Use equally-spaced quantile points as initial means
520        quantiles = np.linspace(0.1, 0.9, self.n_states)
521        means = np.array(
522            [np.quantile(X, q, axis=0) for q in quantiles], dtype=np.float64
523        )
524        # Perturb slightly to break symmetry
525        means += rng.normal(0, 0.05, size=means.shape)
526        covars = np.ones((self.n_states, d), dtype=np.float64)
527        return means, covars
528
529    def _transition_matrix(self, dt: float) -> np.ndarray:
530        """P(dt) = expm(Q * dt), clamped to be a valid stochastic matrix."""
531        P = matrix_expm(self.Q_ * dt)
532        # Numerical correction: clip to [0,1] and renormalise rows
533        P = np.clip(P.real, 0.0, 1.0)
534        row_sums = P.sum(axis=1, keepdims=True)
535        row_sums = np.where(row_sums < 1e-12, 1.0, row_sums)
536        return P / row_sums
537
538    def _emission_log_prob(self, X: np.ndarray) -> np.ndarray:
539        """
540        Diagonal Gaussian log-emission probability.
541
542        Returns array of shape (n_obs, n_states).
543        """
544        n_obs = X.shape[0]
545        log_probs = np.zeros((n_obs, self.n_states))
546        for k in range(self.n_states):
547            diff = X - self.means_[k]
548            var = self.covars_[k]
549            log_probs[:, k] = -0.5 * np.sum(
550                np.log(2 * np.pi * var) + diff ** 2 / var, axis=1
551            )
552        return log_probs
553
554    def _e_step(
555        self, X: np.ndarray, dts: np.ndarray
556    ) -> tuple[np.ndarray, np.ndarray, float]:
557        """
558        Forward-backward algorithm with time-varying transition matrices.
559
560        Returns
561        -------
562        gammas : (n_obs, n_states)
563        xis    : (n_obs-1, n_states, n_states)
564        log_likelihood : float
565        """
566        n_obs, n_states = X.shape[0], self.n_states
567        log_emit = self._emission_log_prob(X)  # (n_obs, n_states)
568
569        # Forward pass (log-scale for numerical stability)
570        log_alpha = np.full((n_obs, n_states), -np.inf)
571        log_alpha[0] = np.log(self.pi_ + 1e-300) + log_emit[0]
572
573        for t in range(1, n_obs):
574            P = self._transition_matrix(dts[t - 1])
575            log_P = np.log(P + 1e-300)
576            # log_alpha[t, j] = log_emit[t, j] + logsumexp_k(log_alpha[t-1, k] + log_P[k, j])
577            for j in range(n_states):
578                log_alpha[t, j] = log_emit[t, j] + _logsumexp(
579                    log_alpha[t - 1] + log_P[:, j]
580                )
581
582        log_likelihood = _logsumexp(log_alpha[-1])
583
584        # Backward pass
585        log_beta = np.zeros((n_obs, n_states))  # log(1) = 0 at final step
586        for t in range(n_obs - 2, -1, -1):
587            P = self._transition_matrix(dts[t])
588            log_P = np.log(P + 1e-300)
589            for i in range(n_states):
590                log_beta[t, i] = _logsumexp(
591                    log_P[i, :] + log_emit[t + 1] + log_beta[t + 1]
592                )
593
594        # Posterior state probabilities gamma
595        # Normalise each row by log P(observations) to get P(z_t | x).
596        log_gamma = log_alpha + log_beta
597        for t in range(n_obs):
598            log_gamma[t] -= _logsumexp(log_gamma[t])
599        gammas = np.exp(log_gamma)
600        gammas = np.clip(gammas, 0.0, None)
601        gammas /= gammas.sum(axis=1, keepdims=True)  # numerical guard
602
603        # Pairwise posteriors xi
604        # Normalise by P(observations) = exp(log_likelihood), not per-timestep.
605        # Per-timestep normalisation is incorrect: it makes each t contribute
606        # equally regardless of probability mass, causing EM to converge to a
607        # spurious fixed point.
608        xis = np.zeros((n_obs - 1, n_states, n_states))
609        for t in range(n_obs - 1):
610            P = self._transition_matrix(dts[t])
611            for i in range(n_states):
612                for j in range(n_states):
613                    xis[t, i, j] = np.exp(
614                        log_alpha[t, i]
615                        + np.log(P[i, j] + 1e-300)
616                        + log_emit[t + 1, j]
617                        + log_beta[t + 1, j]
618                        - log_likelihood
619                    )
620
621        return gammas, xis, log_likelihood
622
623    def _m_step(
624        self,
625        X: np.ndarray,
626        gammas: np.ndarray,
627        xis: np.ndarray,
628        dts: np.ndarray,
629    ) -> None:
630        """Update parameters given posteriors."""
631        n_obs, n_states = gammas.shape
632        n_features = X.shape[1]
633
634        # Update initial distribution
635        self.pi_ = gammas[0] / gammas[0].sum()
636
637        # Update emission parameters
638        gamma_sum = gammas.sum(axis=0)  # (n_states,)
639        for k in range(n_states):
640            w = gammas[:, k]  # (n_obs,)
641            w_sum = w.sum() + 1e-300
642            self.means_[k] = (w[:, None] * X).sum(axis=0) / w_sum
643            diff = X - self.means_[k]
644            self.covars_[k] = (w[:, None] * diff ** 2).sum(axis=0) / w_sum + 1e-6
645
646        # Update generator matrix Q
647        # Expected transition counts: E[N_ij] = sum_t xi[t, i, j]
648        expected_transitions = xis.sum(axis=0)  # (n_states, n_states)
649
650        # Expected holding times: E[T_i] = sum_t gamma[t, i] * dt[t]
651        holding_times = np.zeros(n_states)
652        for t in range(n_obs - 1):
653            holding_times += gammas[t] * dts[t]
654        holding_times = np.clip(holding_times, 1e-6, None)
655
656        # New Q: Q_ij = E[N_ij] / E[T_i] for i≠j
657        Q_new = np.zeros((n_states, n_states))
658        for i in range(n_states):
659            for j in range(n_states):
660                if i != j:
661                    Q_new[i, j] = expected_transitions[i, j] / holding_times[i]
662        np.fill_diagonal(Q_new, -Q_new.sum(axis=1))
663        self.Q_ = Q_new
664
665    def _viterbi(self, X: np.ndarray, dts: np.ndarray) -> np.ndarray:
666        """Viterbi decoding returning most likely state sequence."""
667        n_obs, n_states = X.shape[0], self.n_states
668        log_emit = self._emission_log_prob(X)
669
670        log_delta = np.full((n_obs, n_states), -np.inf)
671        psi = np.zeros((n_obs, n_states), dtype=int)
672        log_delta[0] = np.log(self.pi_ + 1e-300) + log_emit[0]
673
674        for t in range(1, n_obs):
675            P = self._transition_matrix(dts[t - 1])
676            log_P = np.log(P + 1e-300)
677            for j in range(n_states):
678                scores = log_delta[t - 1] + log_P[:, j]
679                best = int(np.argmax(scores))
680                log_delta[t, j] = scores[best] + log_emit[t, j]
681                psi[t, j] = best
682
683        # Backtrack
684        states = np.zeros(n_obs, dtype=int)
685        states[-1] = int(np.argmax(log_delta[-1]))
686        for t in range(n_obs - 2, -1, -1):
687            states[t] = psi[t + 1, states[t + 1]]
688
689        return states
690
691    def _to_matrix(self, trip_features: pl.DataFrame) -> np.ndarray:
692        missing = [c for c in self.features if c not in trip_features.columns]
693        if missing:
694            raise ValueError(f"Feature columns missing: {missing}")
695        return trip_features.select(self.features).to_numpy().astype(np.float64)
696
697    def _standardise(self, X: np.ndarray, *, fit: bool) -> np.ndarray:
698        if fit:
699            self._mean = np.nanmean(X, axis=0)
700            self._std = np.nanstd(X, axis=0) + 1e-8
701        return (X - self._mean) / self._std
702
703    def _check_fitted(self) -> None:
704        if not self.is_fitted:
705            raise RuntimeError("Call fit() before predict_states().")

Continuous-time HMM for irregularly-sampled telematics observations.

Uses a generator matrix Q where the transition probability over an interval Δt is P(Δt) = expm(Q × Δt). This handles variable trip lengths and inter-observation time gaps without resampling.

Implementation uses an EM algorithm:

  • E-step: forward-backward with time-varying transition matrices P(Δt_i)
  • M-step: update Q via expected transition counts and holding times

Parameters

n_states: Number of latent states (default 3). features: Feature columns to use as Gaussian emissions per state. n_iter: Number of EM iterations (default 100). tol: Convergence tolerance on log-likelihood (default 1e-4). random_state: Seed for parameter initialisation.

Notes

The M-step update for Q follows the standard uniformisation approach: Q_ij (i≠j) ∝ E[N_ij] / E[T_i], where N_ij is the expected number of i→j transitions and T_i is the expected total holding time in state i.

Numerical stability: scipy.linalg.expm is used for matrix exponential. For typical generator matrices from driving data (rates on the order of 0.01-1.0 transitions/minute), expm is stable and accurate.

Examples

>>> model = ContinuousTimeHMM(n_states=3)
>>> model.fit(trip_features_df, time_deltas=delta_array)
>>> states = model.predict_states(trip_features_df, time_deltas=delta_array)
ContinuousTimeHMM( n_states: int = 3, features: list[str] | None = None, n_iter: int = 100, tol: float = 0.0001, random_state: int = 42)
334    def __init__(
335        self,
336        n_states: int = 3,
337        features: list[str] | None = None,
338        n_iter: int = 100,
339        tol: float = 1e-4,
340        random_state: int = 42,
341    ) -> None:
342        if not _SCIPY_AVAILABLE:
343            raise ImportError(
344                "scipy is required for ContinuousTimeHMM. "
345                "Install it with: pip install scipy"
346            )
347
348        self.n_states = n_states
349        self.features = features or _DEFAULT_HMM_FEATURES
350        self.n_iter = n_iter
351        self.tol = tol
352        self.random_state = random_state
353
354        self.Q_: Optional[np.ndarray] = None  # Generator matrix (n_states × n_states)
355        self.means_: Optional[np.ndarray] = None    # Emission means (n_states × n_features)
356        self.covars_: Optional[np.ndarray] = None   # Emission variances (diagonal)
357        self.pi_: Optional[np.ndarray] = None       # Initial state distribution
358        self.is_fitted: bool = False
359        self._state_order: Optional[np.ndarray] = None
n_states
features
n_iter
tol
random_state
Q_: Optional[numpy.ndarray]
means_: Optional[numpy.ndarray]
covars_: Optional[numpy.ndarray]
pi_: Optional[numpy.ndarray]
is_fitted: bool
def fit( self, trip_features: polars.dataframe.frame.DataFrame, time_deltas: numpy.ndarray | None = None) -> ContinuousTimeHMM:
361    def fit(
362        self,
363        trip_features: pl.DataFrame,
364        time_deltas: np.ndarray | None = None,
365    ) -> "ContinuousTimeHMM":
366        """
367        Fit the CTHMM using the EM algorithm.
368
369        Parameters
370        ----------
371        trip_features:
372            Trip-level features DataFrame (one row per trip, not per second).
373        time_deltas:
374            Array of shape (n_trips,) giving the time interval in minutes
375            between consecutive trips for the same driver. Use a large value
376            (e.g. 1440.0 = 24 hours) between trips from different sessions.
377            If None, all intervals are set to 1.0 (equivalent to discrete HMM
378            with unit time steps).
379
380        Returns
381        -------
382        self
383        """
384        X = self._to_matrix(trip_features)
385        X = self._standardise(X, fit=True)
386        n_obs = X.shape[0]
387
388        if time_deltas is None:
389            time_deltas = np.ones(n_obs, dtype=np.float64)
390        else:
391            time_deltas = np.asarray(time_deltas, dtype=np.float64)
392            if len(time_deltas) != n_obs:
393                raise ValueError(
394                    f"time_deltas length {len(time_deltas)} != n_obs {n_obs}"
395                )
396        # Clamp to small positive to avoid expm(Q * 0)
397        time_deltas = np.clip(time_deltas, 1e-6, None)
398
399        rng = np.random.default_rng(self.random_state)
400
401        # Initialise parameters
402        self.pi_ = np.ones(self.n_states) / self.n_states
403        self.Q_ = self._init_generator(rng)
404        self.means_, self.covars_ = self._init_emissions(X, rng)
405
406        log_likelihood = -np.inf
407        for iteration in range(self.n_iter):
408            # E-step
409            gammas, xis, log_lik_new = self._e_step(X, time_deltas)
410
411            # M-step
412            self._m_step(X, gammas, xis, time_deltas)
413
414            # Convergence check
415            if abs(log_lik_new - log_likelihood) < self.tol:
416                break
417            log_likelihood = log_lik_new
418
419        # Order states by mean of the first feature (speed proxy)
420        self._state_order = np.argsort(self.means_[:, 0])
421        self._state_rank = np.empty_like(self._state_order)
422        self._state_rank[self._state_order] = np.arange(self.n_states)
423
424        self.is_fitted = True
425        return self

Fit the CTHMM using the EM algorithm.

Parameters

trip_features: Trip-level features DataFrame (one row per trip, not per second). time_deltas: Array of shape (n_trips,) giving the time interval in minutes between consecutive trips for the same driver. Use a large value (e.g. 1440.0 = 24 hours) between trips from different sessions. If None, all intervals are set to 1.0 (equivalent to discrete HMM with unit time steps).

Returns

self

def predict_states( self, trip_features: polars.dataframe.frame.DataFrame, time_deltas: numpy.ndarray | None = None) -> numpy.ndarray:
427    def predict_states(
428        self,
429        trip_features: pl.DataFrame,
430        time_deltas: np.ndarray | None = None,
431    ) -> np.ndarray:
432        """
433        Viterbi decoding of the most likely state sequence.
434
435        Parameters
436        ----------
437        trip_features:
438            Same format as used in ``fit()``.
439        time_deltas:
440            Inter-observation time intervals, same semantics as in ``fit()``.
441
442        Returns
443        -------
444        np.ndarray of shape (n_trips,)
445            State labels 0..n_states-1 with state 0 = most cautious.
446        """
447        self._check_fitted()
448        X = self._to_matrix(trip_features)
449        X = self._standardise(X, fit=False)
450        n_obs = X.shape[0]
451
452        if time_deltas is None:
453            time_deltas = np.ones(n_obs)
454        time_deltas = np.clip(np.asarray(time_deltas, dtype=np.float64), 1e-6, None)
455
456        raw_states = self._viterbi(X, time_deltas)
457        return self._state_rank[raw_states]

Viterbi decoding of the most likely state sequence.

Parameters

trip_features: Same format as used in fit(). time_deltas: Inter-observation time intervals, same semantics as in fit().

Returns

np.ndarray of shape (n_trips,) State labels 0..n_states-1 with state 0 = most cautious.

def driver_state_features( self, trip_features: polars.dataframe.frame.DataFrame, states: numpy.ndarray) -> polars.dataframe.frame.DataFrame:
459    def driver_state_features(
460        self,
461        trip_features: pl.DataFrame,
462        states: np.ndarray,
463    ) -> pl.DataFrame:
464        """
465        Compute driver-level HMM features. Same interface as
466        :meth:`DrivingStateHMM.driver_state_features`.
467        """
468        # Delegate to the same logic — identical to discrete case
469        if "driver_id" not in trip_features.columns:
470            raise ValueError("trip_features must contain 'driver_id'.")
471
472        df = trip_features.with_columns(pl.Series("_state", states.tolist()))
473        driver_rows = []
474        driver_ids = df["driver_id"].unique().sort().to_list()
475
476        for driver_id in driver_ids:
477            driver_df = df.filter(pl.col("driver_id") == driver_id)
478            driver_states = driver_df["_state"].to_numpy()
479            n_trips = len(driver_states)
480
481            state_counts = np.bincount(driver_states, minlength=self.n_states)
482            state_fractions = state_counts / max(n_trips, 1)
483
484            n_transitions = np.sum(np.diff(driver_states) != 0)
485            total_km = (
486                driver_df["distance_km"].sum()
487                if "distance_km" in driver_df.columns
488                else float(n_trips)
489            )
490            transition_rate = n_transitions / max(total_km, 0.01)
491            state_entropy = float(scipy_entropy(state_fractions))
492
493            row = {"driver_id": driver_id}
494            for k in range(self.n_states):
495                row[f"state_{k}_fraction"] = float(state_fractions[k])
496            row["mean_transition_rate"] = float(transition_rate)
497            row["state_entropy"] = float(state_entropy)
498            driver_rows.append(row)
499
500        return pl.DataFrame(driver_rows)

Compute driver-level HMM features. Same interface as DrivingStateHMM.driver_state_features().

def aggregate_to_driver( trip_features: polars.dataframe.frame.DataFrame, *, credibility_threshold: int = 30) -> polars.dataframe.frame.DataFrame:
 54def aggregate_to_driver(
 55    trip_features: pl.DataFrame,
 56    *,
 57    credibility_threshold: int = 30,
 58) -> pl.DataFrame:
 59    """
 60    Aggregate trip-level features to one row per driver.
 61
 62    Features are aggregated as distance-weighted means. Drivers below the
 63    credibility threshold have their scores shrunk towards the portfolio
 64    mean using Bühlmann-Straub weights.
 65
 66    Parameters
 67    ----------
 68    trip_features:
 69        Trip-level DataFrame from :func:`~insurance_telematics.extract_trip_features`.
 70        Must contain ``driver_id`` and ``distance_km``.
 71    credibility_threshold:
 72        Number of trips above which a driver is given full credibility
 73        (weight = 1.0). Below this, the Bühlmann weight is n / (n + k)
 74        where k is the credibility parameter (default 30).
 75
 76    Returns
 77    -------
 78    pl.DataFrame
 79        One row per driver with columns:
 80
 81        - All aggregated feature columns (distance-weighted means)
 82        - ``n_trips`` — number of trips
 83        - ``total_km`` — total distance driven
 84        - ``credibility_weight`` — Bühlmann-Straub weight ∈ (0, 1]
 85        - ``composite_risk_score`` — weighted composite score, scaled 0-100
 86        - ``driver_id``
 87
 88    Raises
 89    ------
 90    ValueError
 91        If ``driver_id`` or ``distance_km`` are not in ``trip_features``.
 92    """
 93    _validate_input(trip_features)
 94
 95    # Compute distance-weighted mean of each feature per driver
 96    available_mean_features = [
 97        c for c in _MEAN_FEATURES if c in trip_features.columns
 98    ]
 99
100    agg_exprs = [
101        pl.len().alias("n_trips"),
102        pl.col("distance_km").sum().alias("total_km"),
103    ]
104
105    for feat in available_mean_features:
106        # Weighted mean: sum(feat * distance) / sum(distance)
107        agg_exprs.append(
108            (
109                (pl.col(feat) * pl.col("distance_km")).sum()
110                / pl.col("distance_km").sum().clip(0.01)
111            ).alias(feat)
112        )
113
114    driver_df = trip_features.group_by("driver_id").agg(agg_exprs)
115
116    # Bühlmann-Straub credibility weighting
117    k = float(credibility_threshold)
118    driver_df = driver_df.with_columns(
119        (pl.col("n_trips") / (pl.col("n_trips") + k)).alias("credibility_weight")
120    )
121
122    # Compute portfolio means (for shrinkage)
123    portfolio_means = {}
124    for feat in available_mean_features:
125        portfolio_means[feat] = float(driver_df[feat].mean())
126
127    # Apply credibility shrinkage toward portfolio mean
128    shrink_exprs = []
129    for feat in available_mean_features:
130        pm = portfolio_means[feat]
131        shrink_exprs.append(
132            (
133                pl.col("credibility_weight") * pl.col(feat)
134                + (1.0 - pl.col("credibility_weight")) * pm
135            ).alias(feat)
136        )
137    driver_df = driver_df.with_columns(shrink_exprs)
138
139    # Composite risk score
140    driver_df = _compute_composite_score(driver_df, available_mean_features)
141
142    return driver_df.sort("driver_id")

Aggregate trip-level features to one row per driver.

Features are aggregated as distance-weighted means. Drivers below the credibility threshold have their scores shrunk towards the portfolio mean using Bühlmann-Straub weights.

Parameters

trip_features: Trip-level DataFrame from ~insurance_telematics.extract_trip_features(). Must contain driver_id and distance_km. credibility_threshold: Number of trips above which a driver is given full credibility (weight = 1.0). Below this, the Bühlmann weight is n / (n + k) where k is the credibility parameter (default 30).

Returns

pl.DataFrame One row per driver with columns:

- All aggregated feature columns (distance-weighted means)
- ``n_trips`` — number of trips
- ``total_km`` — total distance driven
- ``credibility_weight`` — Bühlmann-Straub weight ∈ (0, 1]
- ``composite_risk_score`` — weighted composite score, scaled 0-100
- ``driver_id``

Raises

ValueError If driver_id or distance_km are not in trip_features.

class TelematicsScoringPipeline:
 51class TelematicsScoringPipeline:
 52    """
 53    End-to-end pipeline from raw trip data to claim frequency predictions.
 54
 55    Stages:
 56
 57    1. ``clean_trips`` — GPS cleaning and kinematics derivation
 58    2. ``extract_trip_features`` — trip-level scalar features
 59    3. ``DrivingStateHMM.fit`` — HMM state classification
 60    4. ``aggregate_to_driver`` — driver-level risk aggregation
 61    5. Poisson GLM — maps telematics features to claim frequency
 62
 63    Parameters
 64    ----------
 65    n_hmm_states:
 66        Number of HMM latent states (default 3).
 67    credibility_threshold:
 68        Bühlmann-Straub credibility threshold in number of trips (default 30).
 69    hmm_features:
 70        Feature columns to pass to the HMM. Defaults to the standard four
 71        (mean speed, speed variation, harsh braking rate, harsh accel rate).
 72    glm_features:
 73        Subset of aggregated driver features to include in the GLM. If None,
 74        uses all available aggregated features.
 75    random_state:
 76        Seed for reproducibility.
 77
 78    Examples
 79    --------
 80    >>> sim = TripSimulator(seed=42)
 81    >>> trips_df, claims_df = sim.simulate(n_drivers=50, trips_per_driver=40)
 82    >>> pipe = TelematicsScoringPipeline()
 83    >>> pipe.fit(trips_df, claims_df)
 84    >>> predictions = pipe.predict(trips_df)
 85    """
 86
 87    def __init__(
 88        self,
 89        n_hmm_states: int = 3,
 90        credibility_threshold: int = 30,
 91        hmm_features: list[str] | None = None,
 92        glm_feature_subset: list[str] | None = None,
 93        random_state: int = 42,
 94    ) -> None:
 95        self.n_hmm_states = n_hmm_states
 96        self.credibility_threshold = credibility_threshold
 97        self.hmm_features = hmm_features
 98        self.glm_feature_subset = glm_feature_subset
 99        self.random_state = random_state
100
101        self._hmm: Optional[DrivingStateHMM] = None
102        self._glm_result = None
103        self._glm_feature_names: list[str] = []
104        self.is_fitted: bool = False
105
106    def fit(
107        self,
108        trips_df: pl.DataFrame,
109        claims_df: pl.DataFrame,
110    ) -> "TelematicsScoringPipeline":
111        """
112        Fit the full pipeline.
113
114        Parameters
115        ----------
116        trips_df:
117            Raw trip observations as returned by :func:`~insurance_telematics.load_trips`
118            or :class:`~insurance_telematics.TripSimulator`.
119        claims_df:
120            Driver-level claims data. Required columns:
121            ``driver_id``, ``n_claims``, ``exposure_years``.
122
123        Returns
124        -------
125        self
126        """
127        if not _STATSMODELS_AVAILABLE:
128            raise ImportError(
129                "statsmodels is required for the GLM step. "
130                "Install it with: pip install statsmodels"
131            )
132
133        # Stage 1-2: clean and extract features
134        driver_features = self._extract_features(trips_df)
135
136        # Stage 3: fit HMM and add state features
137        self._hmm = DrivingStateHMM(
138            n_states=self.n_hmm_states,
139            features=self.hmm_features,
140            random_state=self.random_state,
141        )
142        self._hmm.fit(driver_features)
143        states = self._hmm.predict_states(driver_features)
144        hmm_driver_features = self._hmm.driver_state_features(driver_features, states)
145
146        # Stage 4: aggregate trip features to driver level
147        driver_risk = aggregate_to_driver(
148            driver_features,
149            credibility_threshold=self.credibility_threshold,
150        )
151
152        # Merge HMM state features into driver risk
153        driver_risk = driver_risk.join(hmm_driver_features, on="driver_id", how="left")
154
155        # Merge with claims data
156        model_df = driver_risk.join(claims_df, on="driver_id", how="inner")
157
158        # Stage 5: Poisson GLM
159        glm_feature_cols = self._select_glm_features(model_df)
160
161        X = model_df.select(glm_feature_cols).to_pandas()
162
163        # Drop zero-variance and constant columns before GLM to prevent NaN deviance
164        X = X.fillna(X.mean())
165        zero_var = [c for c in X.columns if X[c].std() < 1e-10]
166        if zero_var:
167            X = X.drop(columns=zero_var)
168        if X.shape[1] == 0:
169            # No useful features: intercept-only model
170            X = pd.DataFrame(index=X.index)
171
172        X = sm.add_constant(X, has_constant="add")
173
174        # Store the final feature names (after dropping zero-variance cols)
175        self._glm_feature_names = [c for c in X.columns if c != "const"]
176
177        y = model_df["n_claims"].to_numpy().astype(float)
178        exposure = model_df["exposure_years"].to_numpy()
179        exposure = np.clip(exposure, 1e-6, None)
180        offset = np.log(exposure)
181
182        poisson_model = sm.GLM(
183            y,
184            X,
185            family=sm.families.Poisson(link=sm.families.links.Log()),
186            offset=offset,
187        )
188        # Provide zero start_params to avoid NaN deviance when exposure is small.
189        # The intercept-at-zero start is safe for Poisson with log link.
190        start = np.zeros(X.shape[1])
191        self._glm_result = poisson_model.fit(
192            start_params=start, maxiter=200, disp=False
193        )
194        self.is_fitted = True
195        return self
196
197    def predict(self, trips_df: pl.DataFrame) -> pl.DataFrame:
198        """
199        Predict annual claim frequency for each driver.
200
201        Parameters
202        ----------
203        trips_df:
204            Raw trip observations. Must contain ``driver_id``.
205
206        Returns
207        -------
208        pl.DataFrame
209            One row per driver with columns ``driver_id`` and
210            ``predicted_claim_frequency`` (claims per year).
211        """
212        self._check_fitted()
213        glm_df = self.glm_features(trips_df)
214        return self._predict_from_features(glm_df)
215
216    def glm_features(self, trips_df: pl.DataFrame) -> pl.DataFrame:
217        """
218        Produce a driver-level DataFrame of GLM-ready features without
219        fitting or running the GLM.
220
221        Parameters
222        ----------
223        trips_df:
224            Raw trip data.
225
226        Returns
227        -------
228        pl.DataFrame
229            One row per driver with telematics-derived GLM covariates.
230            Column names follow the controlled vocabulary described in the
231            library documentation and suitable for regulatory filings.
232        """
233        driver_features = self._extract_features(trips_df)
234
235        if self._hmm is not None and self._hmm.is_fitted:
236            states = self._hmm.predict_states(driver_features)
237            hmm_df = self._hmm.driver_state_features(driver_features, states)
238        else:
239            hmm_df = None
240
241        driver_risk = aggregate_to_driver(
242            driver_features,
243            credibility_threshold=self.credibility_threshold,
244        )
245
246        if hmm_df is not None:
247            driver_risk = driver_risk.join(hmm_df, on="driver_id", how="left")
248
249        return driver_risk
250
251    def _extract_features(self, trips_df: pl.DataFrame) -> pl.DataFrame:
252        """Run clean + extract_trip_features."""
253        cleaned = clean_trips(trips_df)
254        return extract_trip_features(cleaned)
255
256    def _select_glm_features(self, df: pl.DataFrame) -> list[str]:
257        """Choose GLM feature columns from available numeric columns."""
258        if self.glm_feature_subset is not None:
259            return [c for c in self.glm_feature_subset if c in df.columns]
260
261        # Default: all numeric columns except identifiers and metadata
262        exclude = {
263            "driver_id",
264            "n_claims",
265            "exposure_years",
266            "n_trips",
267            "total_km",
268            "credibility_weight",
269            "composite_risk_score",
270            "aggressive_fraction",
271            "normal_fraction",
272            "cautious_fraction",
273            "annual_km",
274        }
275        candidates = [
276            c for c in df.columns
277            if c not in exclude
278            and df[c].dtype in (pl.Float64, pl.Float32, pl.Int64, pl.Int32)
279        ]
280        return candidates
281
282    def _predict_from_features(self, glm_df: pl.DataFrame) -> pl.DataFrame:
283        """Apply fitted GLM to feature DataFrame."""
284        import statsmodels.api as sm
285        feature_cols = [c for c in self._glm_feature_names if c in glm_df.columns]
286        X = glm_df.select(feature_cols).to_pandas()
287        X = sm.add_constant(X, has_constant="add")
288
289        # Fill any missing columns with 0
290        for col in self._glm_result.model.exog_names:
291            if col not in X.columns:
292                X[col] = 0.0
293        X = X[self._glm_result.model.exog_names]
294
295        predictions = self._glm_result.predict(X)
296        return pl.DataFrame(
297            {
298                "driver_id": glm_df["driver_id"].to_list(),
299                "predicted_claim_frequency": predictions.tolist(),
300            }
301        )
302
303    def _check_fitted(self) -> None:
304        if not self.is_fitted:
305            raise RuntimeError("Call fit() before predict().")

End-to-end pipeline from raw trip data to claim frequency predictions.

Stages:

  1. clean_trips — GPS cleaning and kinematics derivation
  2. extract_trip_features — trip-level scalar features
  3. DrivingStateHMM.fit — HMM state classification
  4. aggregate_to_driver — driver-level risk aggregation
  5. Poisson GLM — maps telematics features to claim frequency

Parameters

n_hmm_states: Number of HMM latent states (default 3). credibility_threshold: Bühlmann-Straub credibility threshold in number of trips (default 30). hmm_features: Feature columns to pass to the HMM. Defaults to the standard four (mean speed, speed variation, harsh braking rate, harsh accel rate). glm_features: Subset of aggregated driver features to include in the GLM. If None, uses all available aggregated features. random_state: Seed for reproducibility.

Examples

>>> sim = TripSimulator(seed=42)
>>> trips_df, claims_df = sim.simulate(n_drivers=50, trips_per_driver=40)
>>> pipe = TelematicsScoringPipeline()
>>> pipe.fit(trips_df, claims_df)
>>> predictions = pipe.predict(trips_df)
TelematicsScoringPipeline( n_hmm_states: int = 3, credibility_threshold: int = 30, hmm_features: list[str] | None = None, glm_feature_subset: list[str] | None = None, random_state: int = 42)
 87    def __init__(
 88        self,
 89        n_hmm_states: int = 3,
 90        credibility_threshold: int = 30,
 91        hmm_features: list[str] | None = None,
 92        glm_feature_subset: list[str] | None = None,
 93        random_state: int = 42,
 94    ) -> None:
 95        self.n_hmm_states = n_hmm_states
 96        self.credibility_threshold = credibility_threshold
 97        self.hmm_features = hmm_features
 98        self.glm_feature_subset = glm_feature_subset
 99        self.random_state = random_state
100
101        self._hmm: Optional[DrivingStateHMM] = None
102        self._glm_result = None
103        self._glm_feature_names: list[str] = []
104        self.is_fitted: bool = False
n_hmm_states
credibility_threshold
hmm_features
glm_feature_subset
random_state
is_fitted: bool
def fit( self, trips_df: polars.dataframe.frame.DataFrame, claims_df: polars.dataframe.frame.DataFrame) -> TelematicsScoringPipeline:
106    def fit(
107        self,
108        trips_df: pl.DataFrame,
109        claims_df: pl.DataFrame,
110    ) -> "TelematicsScoringPipeline":
111        """
112        Fit the full pipeline.
113
114        Parameters
115        ----------
116        trips_df:
117            Raw trip observations as returned by :func:`~insurance_telematics.load_trips`
118            or :class:`~insurance_telematics.TripSimulator`.
119        claims_df:
120            Driver-level claims data. Required columns:
121            ``driver_id``, ``n_claims``, ``exposure_years``.
122
123        Returns
124        -------
125        self
126        """
127        if not _STATSMODELS_AVAILABLE:
128            raise ImportError(
129                "statsmodels is required for the GLM step. "
130                "Install it with: pip install statsmodels"
131            )
132
133        # Stage 1-2: clean and extract features
134        driver_features = self._extract_features(trips_df)
135
136        # Stage 3: fit HMM and add state features
137        self._hmm = DrivingStateHMM(
138            n_states=self.n_hmm_states,
139            features=self.hmm_features,
140            random_state=self.random_state,
141        )
142        self._hmm.fit(driver_features)
143        states = self._hmm.predict_states(driver_features)
144        hmm_driver_features = self._hmm.driver_state_features(driver_features, states)
145
146        # Stage 4: aggregate trip features to driver level
147        driver_risk = aggregate_to_driver(
148            driver_features,
149            credibility_threshold=self.credibility_threshold,
150        )
151
152        # Merge HMM state features into driver risk
153        driver_risk = driver_risk.join(hmm_driver_features, on="driver_id", how="left")
154
155        # Merge with claims data
156        model_df = driver_risk.join(claims_df, on="driver_id", how="inner")
157
158        # Stage 5: Poisson GLM
159        glm_feature_cols = self._select_glm_features(model_df)
160
161        X = model_df.select(glm_feature_cols).to_pandas()
162
163        # Drop zero-variance and constant columns before GLM to prevent NaN deviance
164        X = X.fillna(X.mean())
165        zero_var = [c for c in X.columns if X[c].std() < 1e-10]
166        if zero_var:
167            X = X.drop(columns=zero_var)
168        if X.shape[1] == 0:
169            # No useful features: intercept-only model
170            X = pd.DataFrame(index=X.index)
171
172        X = sm.add_constant(X, has_constant="add")
173
174        # Store the final feature names (after dropping zero-variance cols)
175        self._glm_feature_names = [c for c in X.columns if c != "const"]
176
177        y = model_df["n_claims"].to_numpy().astype(float)
178        exposure = model_df["exposure_years"].to_numpy()
179        exposure = np.clip(exposure, 1e-6, None)
180        offset = np.log(exposure)
181
182        poisson_model = sm.GLM(
183            y,
184            X,
185            family=sm.families.Poisson(link=sm.families.links.Log()),
186            offset=offset,
187        )
188        # Provide zero start_params to avoid NaN deviance when exposure is small.
189        # The intercept-at-zero start is safe for Poisson with log link.
190        start = np.zeros(X.shape[1])
191        self._glm_result = poisson_model.fit(
192            start_params=start, maxiter=200, disp=False
193        )
194        self.is_fitted = True
195        return self

Fit the full pipeline.

Parameters

trips_df: Raw trip observations as returned by ~insurance_telematics.load_trips() or ~insurance_telematics.TripSimulator. claims_df: Driver-level claims data. Required columns: driver_id, n_claims, exposure_years.

Returns

self

def predict( self, trips_df: polars.dataframe.frame.DataFrame) -> polars.dataframe.frame.DataFrame:
197    def predict(self, trips_df: pl.DataFrame) -> pl.DataFrame:
198        """
199        Predict annual claim frequency for each driver.
200
201        Parameters
202        ----------
203        trips_df:
204            Raw trip observations. Must contain ``driver_id``.
205
206        Returns
207        -------
208        pl.DataFrame
209            One row per driver with columns ``driver_id`` and
210            ``predicted_claim_frequency`` (claims per year).
211        """
212        self._check_fitted()
213        glm_df = self.glm_features(trips_df)
214        return self._predict_from_features(glm_df)

Predict annual claim frequency for each driver.

Parameters

trips_df: Raw trip observations. Must contain driver_id.

Returns

pl.DataFrame One row per driver with columns driver_id and predicted_claim_frequency (claims per year).

def glm_features( self, trips_df: polars.dataframe.frame.DataFrame) -> polars.dataframe.frame.DataFrame:
216    def glm_features(self, trips_df: pl.DataFrame) -> pl.DataFrame:
217        """
218        Produce a driver-level DataFrame of GLM-ready features without
219        fitting or running the GLM.
220
221        Parameters
222        ----------
223        trips_df:
224            Raw trip data.
225
226        Returns
227        -------
228        pl.DataFrame
229            One row per driver with telematics-derived GLM covariates.
230            Column names follow the controlled vocabulary described in the
231            library documentation and suitable for regulatory filings.
232        """
233        driver_features = self._extract_features(trips_df)
234
235        if self._hmm is not None and self._hmm.is_fitted:
236            states = self._hmm.predict_states(driver_features)
237            hmm_df = self._hmm.driver_state_features(driver_features, states)
238        else:
239            hmm_df = None
240
241        driver_risk = aggregate_to_driver(
242            driver_features,
243            credibility_threshold=self.credibility_threshold,
244        )
245
246        if hmm_df is not None:
247            driver_risk = driver_risk.join(hmm_df, on="driver_id", how="left")
248
249        return driver_risk

Produce a driver-level DataFrame of GLM-ready features without fitting or running the GLM.

Parameters

trips_df: Raw trip data.

Returns

pl.DataFrame One row per driver with telematics-derived GLM covariates. Column names follow the controlled vocabulary described in the library documentation and suitable for regulatory filings.

def score_trips( trips_df: polars.dataframe.frame.DataFrame, model: TelematicsScoringPipeline) -> polars.dataframe.frame.DataFrame:
308def score_trips(trips_df: pl.DataFrame, model: TelematicsScoringPipeline) -> pl.DataFrame:
309    """
310    Convenience function: apply a fitted pipeline to raw trip data.
311
312    Parameters
313    ----------
314    trips_df:
315        Raw trip DataFrame.
316    model:
317        A fitted :class:`TelematicsScoringPipeline` instance.
318
319    Returns
320    -------
321    pl.DataFrame
322        Driver-level predictions with ``driver_id`` and
323        ``predicted_claim_frequency``.
324
325    Examples
326    --------
327    >>> predictions = score_trips(new_trips_df, fitted_pipeline)
328    """
329    return model.predict(trips_df)

Convenience function: apply a fitted pipeline to raw trip data.

Parameters

trips_df: Raw trip DataFrame. model: A fitted TelematicsScoringPipeline instance.

Returns

pl.DataFrame Driver-level predictions with driver_id and predicted_claim_frequency.

Examples

>>> predictions = score_trips(new_trips_df, fitted_pipeline)
@dataclass
class TripSimulator:
 65@dataclass
 66class TripSimulator:
 67    """
 68    Generate synthetic 1Hz telematics trip data for a fleet of drivers.
 69
 70    Each driver is assigned a latent regime mixture (fraction of time spent
 71    in cautious, normal, and aggressive driving states). Within a trip,
 72    segments of continuous regime are simulated using an Ornstein-Uhlenbeck
 73    speed process. A synthetic claim count is generated per driver using a
 74    Poisson rate proportional to the aggressive fraction.
 75
 76    Parameters
 77    ----------
 78    seed:
 79        Random seed for reproducibility.
 80
 81    Examples
 82    --------
 83    >>> sim = TripSimulator(seed=42)
 84    >>> trips_df, claims_df = sim.simulate(n_drivers=20, trips_per_driver=30)
 85    >>> trips_df.shape[0]  # ~540,000 rows for 20 drivers × 30 trips × ~15 min avg
 86    """
 87
 88    seed: int = 42
 89
 90    def simulate(
 91        self,
 92        n_drivers: int = 100,
 93        trips_per_driver: int = 50,
 94        *,
 95        min_trip_duration_s: int = 300,
 96        max_trip_duration_s: int = 3600,
 97    ) -> tuple[pl.DataFrame, pl.DataFrame]:
 98        """
 99        Simulate 1Hz trip observations for a fleet of drivers.
100
101        Parameters
102        ----------
103        n_drivers:
104            Number of distinct drivers to simulate.
105        trips_per_driver:
106            Number of trips per driver.
107        min_trip_duration_s:
108            Minimum trip length in seconds (default 5 minutes).
109        max_trip_duration_s:
110            Maximum trip length in seconds (default 60 minutes).
111
112        Returns
113        -------
114        trips_df:
115            Polars DataFrame with one row per 1Hz observation. Columns:
116            driver_id, trip_id, timestamp, latitude, longitude,
117            speed_kmh, acceleration_ms2, heading_deg.
118        claims_df:
119            Polars DataFrame with one row per driver. Columns:
120            driver_id, n_claims, exposure_years, aggressive_fraction,
121            annual_km.
122        """
123        rng = np.random.default_rng(self.seed)
124
125        # Assign latent regime mixtures to drivers using a Dirichlet draw.
126        # Concentration steered towards cautious to mimic real portfolio skew.
127        concentration = np.array([3.0, 2.0, 0.5])
128        driver_mixtures = rng.dirichlet(concentration, size=n_drivers)
129
130        all_trip_rows: list[dict] = []
131        driver_summary: list[dict] = []
132
133        base_lat, base_lon = 51.5, -0.1  # London area
134
135        # Trip IDs are globally unique across drivers
136        global_trip_id = 0
137
138        # Reference epoch for reproducible timestamps
139        epoch = datetime(2024, 1, 1, 7, 0, 0, tzinfo=timezone.utc)
140
141        for driver_idx in range(n_drivers):
142            driver_id = f"DRV{driver_idx:04d}"
143            mixture = driver_mixtures[driver_idx]
144            cautious_frac, normal_frac, aggressive_frac = mixture
145
146            driver_km = 0.0
147            driver_total_seconds = 0
148
149            for trip_num in range(trips_per_driver):
150                trip_id = f"TRP{global_trip_id:07d}"
151                global_trip_id += 1
152
153                # Trip duration: random between min and max
154                duration_s = int(
155                    rng.integers(min_trip_duration_s, max_trip_duration_s)
156                )
157                driver_total_seconds += duration_s
158
159                # Stagger trip timestamps to spread across a year
160                days_offset = (driver_idx * trips_per_driver + trip_num) * 3.5 / 365
161                hour_offset = rng.uniform(6.0, 22.0)
162                trip_start = epoch + timedelta(
163                    days=float(days_offset), hours=float(hour_offset)
164                )
165
166                rows = self._simulate_trip(
167                    rng=rng,
168                    driver_id=driver_id,
169                    trip_id=trip_id,
170                    mixture=mixture,
171                    duration_s=duration_s,
172                    trip_start=trip_start,
173                    base_lat=base_lat,
174                    base_lon=base_lon,
175                )
176                all_trip_rows.extend(rows)
177
178                # Accumulate distance
179                for row in rows:
180                    driver_km += row["speed_kmh"] / 3600.0  # km per second
181
182            # Exposure: actual driving time scaled to years.
183            # Assume each driver drives at this rate year-round, prorated by
184            # average UK annual driving time (~200 hours/year).
185            # We normalise to observation period: total trip seconds / seconds_per_year.
186            # This gives a realistic exposure for a Poisson frequency model.
187            exposure_years = driver_total_seconds / 365.25 / 86400
188            annual_rate = (
189                cautious_frac * _REGIMES["cautious"].base_claim_rate
190                + normal_frac * _REGIMES["normal"].base_claim_rate
191                + aggressive_frac * _REGIMES["aggressive"].base_claim_rate
192            )
193            lambda_claims = annual_rate * max(exposure_years, 0.01)
194            n_claims = int(rng.poisson(lambda_claims))
195
196            driver_summary.append(
197                {
198                    "driver_id": driver_id,
199                    "n_claims": n_claims,
200                    "exposure_years": round(exposure_years, 4),
201                    "aggressive_fraction": round(float(aggressive_frac), 4),
202                    "normal_fraction": round(float(normal_frac), 4),
203                    "cautious_fraction": round(float(cautious_frac), 4),
204                    "annual_km": round(driver_km, 1),
205                }
206            )
207
208        trips_df = pl.DataFrame(all_trip_rows).with_columns(
209            pl.col("timestamp").cast(pl.Datetime("us", "UTC"))
210        )
211        claims_df = pl.DataFrame(driver_summary)
212
213        return trips_df, claims_df
214
215    def _simulate_trip(
216        self,
217        rng: np.random.Generator,
218        driver_id: str,
219        trip_id: str,
220        mixture: np.ndarray,
221        duration_s: int,
222        trip_start: datetime,
223        base_lat: float,
224        base_lon: float,
225    ) -> list[dict]:
226        """Simulate a single trip at 1Hz. Returns list of row dicts."""
227        rows = []
228
229        # Choose regime for each second via regime mixture (multinomial draw)
230        regime_names = ["cautious", "normal", "aggressive"]
231        regimes_seq = rng.choice(regime_names, size=duration_s, p=mixture)
232
233        speed_kmh = float(rng.uniform(0, 30))  # starting speed
234        lat = base_lat + rng.uniform(-0.5, 0.5)
235        lon = base_lon + rng.uniform(-0.5, 0.5)
236        heading = float(rng.uniform(0, 360))
237
238        for t in range(duration_s):
239            regime = _REGIMES[regimes_seq[t]]
240            dt = 1.0  # 1Hz → 1 second
241
242            # Ornstein-Uhlenbeck speed update
243            mean_rev = regime.mean_speed_kmh
244            theta = regime.speed_reversion
245            sigma = regime.speed_vol
246            dW = float(rng.normal(0, math.sqrt(dt)))
247            speed_kmh = (
248                speed_kmh
249                + theta * (mean_rev - speed_kmh) * dt
250                + sigma * dW
251            )
252            speed_kmh = max(0.0, speed_kmh)
253
254            # Acceleration: derivative of speed + process noise
255            # Difference from last step divided by dt, plus noise
256            if t == 0:
257                prev_speed = speed_kmh
258            else:
259                prev_speed = rows[-1]["speed_kmh"]
260
261            accel_from_speed = (speed_kmh - prev_speed) / dt / 3.6  # convert to m/s²
262            accel_noise = float(rng.normal(0, regime.accel_noise_std * 0.3))
263            accel_ms2 = accel_from_speed + accel_noise
264
265            # Update position using heading and speed
266            speed_ms = speed_kmh / 3.6
267            heading_rad = math.radians(heading)
268            # Approximate lat/lon displacement
269            dlat = speed_ms * math.cos(heading_rad) * dt / 111_320
270            dlon = (
271                speed_ms
272                * math.sin(heading_rad)
273                * dt
274                / (111_320 * math.cos(math.radians(lat)))
275            )
276            lat += dlat
277            lon += dlon
278
279            # Slow random heading drift
280            heading = (heading + float(rng.normal(0, 2.0))) % 360
281
282            timestamp = trip_start + timedelta(seconds=t)
283
284            rows.append(
285                {
286                    "driver_id": driver_id,
287                    "trip_id": trip_id,
288                    "timestamp": timestamp,
289                    "latitude": round(lat, 6),
290                    "longitude": round(lon, 6),
291                    "speed_kmh": round(speed_kmh, 2),
292                    "acceleration_ms2": round(accel_ms2, 4),
293                    "heading_deg": round(heading, 1),
294                }
295            )
296
297        return rows

Generate synthetic 1Hz telematics trip data for a fleet of drivers.

Each driver is assigned a latent regime mixture (fraction of time spent in cautious, normal, and aggressive driving states). Within a trip, segments of continuous regime are simulated using an Ornstein-Uhlenbeck speed process. A synthetic claim count is generated per driver using a Poisson rate proportional to the aggressive fraction.

Parameters

seed: Random seed for reproducibility.

Examples

>>> sim = TripSimulator(seed=42)
>>> trips_df, claims_df = sim.simulate(n_drivers=20, trips_per_driver=30)
>>> trips_df.shape[0]  # ~540,000 rows for 20 drivers × 30 trips × ~15 min avg
TripSimulator(seed: int = 42)
seed: int = 42
def simulate( self, n_drivers: int = 100, trips_per_driver: int = 50, *, min_trip_duration_s: int = 300, max_trip_duration_s: int = 3600) -> tuple[polars.dataframe.frame.DataFrame, polars.dataframe.frame.DataFrame]:
 90    def simulate(
 91        self,
 92        n_drivers: int = 100,
 93        trips_per_driver: int = 50,
 94        *,
 95        min_trip_duration_s: int = 300,
 96        max_trip_duration_s: int = 3600,
 97    ) -> tuple[pl.DataFrame, pl.DataFrame]:
 98        """
 99        Simulate 1Hz trip observations for a fleet of drivers.
100
101        Parameters
102        ----------
103        n_drivers:
104            Number of distinct drivers to simulate.
105        trips_per_driver:
106            Number of trips per driver.
107        min_trip_duration_s:
108            Minimum trip length in seconds (default 5 minutes).
109        max_trip_duration_s:
110            Maximum trip length in seconds (default 60 minutes).
111
112        Returns
113        -------
114        trips_df:
115            Polars DataFrame with one row per 1Hz observation. Columns:
116            driver_id, trip_id, timestamp, latitude, longitude,
117            speed_kmh, acceleration_ms2, heading_deg.
118        claims_df:
119            Polars DataFrame with one row per driver. Columns:
120            driver_id, n_claims, exposure_years, aggressive_fraction,
121            annual_km.
122        """
123        rng = np.random.default_rng(self.seed)
124
125        # Assign latent regime mixtures to drivers using a Dirichlet draw.
126        # Concentration steered towards cautious to mimic real portfolio skew.
127        concentration = np.array([3.0, 2.0, 0.5])
128        driver_mixtures = rng.dirichlet(concentration, size=n_drivers)
129
130        all_trip_rows: list[dict] = []
131        driver_summary: list[dict] = []
132
133        base_lat, base_lon = 51.5, -0.1  # London area
134
135        # Trip IDs are globally unique across drivers
136        global_trip_id = 0
137
138        # Reference epoch for reproducible timestamps
139        epoch = datetime(2024, 1, 1, 7, 0, 0, tzinfo=timezone.utc)
140
141        for driver_idx in range(n_drivers):
142            driver_id = f"DRV{driver_idx:04d}"
143            mixture = driver_mixtures[driver_idx]
144            cautious_frac, normal_frac, aggressive_frac = mixture
145
146            driver_km = 0.0
147            driver_total_seconds = 0
148
149            for trip_num in range(trips_per_driver):
150                trip_id = f"TRP{global_trip_id:07d}"
151                global_trip_id += 1
152
153                # Trip duration: random between min and max
154                duration_s = int(
155                    rng.integers(min_trip_duration_s, max_trip_duration_s)
156                )
157                driver_total_seconds += duration_s
158
159                # Stagger trip timestamps to spread across a year
160                days_offset = (driver_idx * trips_per_driver + trip_num) * 3.5 / 365
161                hour_offset = rng.uniform(6.0, 22.0)
162                trip_start = epoch + timedelta(
163                    days=float(days_offset), hours=float(hour_offset)
164                )
165
166                rows = self._simulate_trip(
167                    rng=rng,
168                    driver_id=driver_id,
169                    trip_id=trip_id,
170                    mixture=mixture,
171                    duration_s=duration_s,
172                    trip_start=trip_start,
173                    base_lat=base_lat,
174                    base_lon=base_lon,
175                )
176                all_trip_rows.extend(rows)
177
178                # Accumulate distance
179                for row in rows:
180                    driver_km += row["speed_kmh"] / 3600.0  # km per second
181
182            # Exposure: actual driving time scaled to years.
183            # Assume each driver drives at this rate year-round, prorated by
184            # average UK annual driving time (~200 hours/year).
185            # We normalise to observation period: total trip seconds / seconds_per_year.
186            # This gives a realistic exposure for a Poisson frequency model.
187            exposure_years = driver_total_seconds / 365.25 / 86400
188            annual_rate = (
189                cautious_frac * _REGIMES["cautious"].base_claim_rate
190                + normal_frac * _REGIMES["normal"].base_claim_rate
191                + aggressive_frac * _REGIMES["aggressive"].base_claim_rate
192            )
193            lambda_claims = annual_rate * max(exposure_years, 0.01)
194            n_claims = int(rng.poisson(lambda_claims))
195
196            driver_summary.append(
197                {
198                    "driver_id": driver_id,
199                    "n_claims": n_claims,
200                    "exposure_years": round(exposure_years, 4),
201                    "aggressive_fraction": round(float(aggressive_frac), 4),
202                    "normal_fraction": round(float(normal_frac), 4),
203                    "cautious_fraction": round(float(cautious_frac), 4),
204                    "annual_km": round(driver_km, 1),
205                }
206            )
207
208        trips_df = pl.DataFrame(all_trip_rows).with_columns(
209            pl.col("timestamp").cast(pl.Datetime("us", "UTC"))
210        )
211        claims_df = pl.DataFrame(driver_summary)
212
213        return trips_df, claims_df

Simulate 1Hz trip observations for a fleet of drivers.

Parameters

n_drivers: Number of distinct drivers to simulate. trips_per_driver: Number of trips per driver. min_trip_duration_s: Minimum trip length in seconds (default 5 minutes). max_trip_duration_s: Maximum trip length in seconds (default 60 minutes).

Returns

trips_df: Polars DataFrame with one row per 1Hz observation. Columns: driver_id, trip_id, timestamp, latitude, longitude, speed_kmh, acceleration_ms2, heading_deg. claims_df: Polars DataFrame with one row per driver. Columns: driver_id, n_claims, exposure_years, aggressive_fraction, annual_km.