Skip to content

API Reference

This page documents the public API of safefeat.

Example

1) Basic window features (count, sum, mean)

import pandas as pd
from safefeat import build_features, WindowAgg

spine = pd.DataFrame({
    "entity_id": ["u1"],
    "cutoff_time": ["2024-01-10"],
})

events = pd.DataFrame({
    "entity_id": ["u1", "u1", "u1", "u1"],
    "event_time": ["2024-01-05", "2024-01-06", "2023-01-01", "2024-01-20"],
    "amount": [10.0, 20.0, 999.0, 999.0],
})

X = build_features(
    spine=spine,
    tables={"events": events},
    spec=[
        WindowAgg(
            table="events",
            windows=["7D", "30D"],
            metrics={
                "*": ["count"],
                "amount": ["sum", "mean"],
            },
        )
    ],
    event_time_cols={"events": "event_time"},
)

print(X)

Expected output :


| entity_id | cutoff_time | events__n_events__7d | events__amount__sum__7d | events__amount__mean__7d | events__n_events__30d | events__amount__sum__30d | events__amount__mean__30d |
| --------- | ----------- | -------------------- | ----------------------- | ------------------------ | --------------------- | ------------------------ | ------------------------- |
| u1        | 2024-01-10  | 2                    | 30.0                    | 15.0                     | 2                     | 30.0                     | 15.0                      |


build_features

safefeat.core.build_features

build_features(
    spine,
    tables,
    spec,
    *,
    entity_col="entity_id",
    cutoff_col="cutoff_time",
    event_time_cols=None,
    allowed_lag="0s",
    return_report=False,
)

Build leakage-safe features from event tables.

Parameters:

Name Type Description Default
spine DataFrame

DataFrame containing entity identifiers and cutoff times.

required
tables dict[str, DataFrame]

Mapping of table name to event DataFrame.

required
spec FeatureSpec or list[WindowAgg]

Feature specification describing windows and aggregations.

required
entity_col str

Name of entity identifier column.

"entity_id"
cutoff_col str

Name of cutoff timestamp column.

"cutoff_time"
event_time_cols dict[str, str]

Mapping of table name to event timestamp column.

None
allowed_lag str

Allowed tolerance for future timestamps (pandas timedelta string).

"0s"
return_report bool

If True, return a tuple (features_df, AuditReport) with audit information about dropped/kept event pairs.

False

Returns:

Type Description
DataFrame or (DataFrame, AuditReport)

Feature matrix aligned to the spine. If return_report is True a second return value contains the audit report.

Source code in src/safefeat/core.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def build_features(spine, tables, spec, *, entity_col="entity_id", cutoff_col="cutoff_time",
                   event_time_cols=None, allowed_lag="0s", return_report=False):
    """Build leakage-safe features from event tables.

    Parameters
    ----------
    spine : pandas.DataFrame
        DataFrame containing entity identifiers and cutoff times.
    tables : dict[str, pandas.DataFrame]
        Mapping of table name to event DataFrame.
    spec : FeatureSpec or list[WindowAgg]
        Feature specification describing windows and aggregations.
    entity_col : str, default="entity_id"
        Name of entity identifier column.
    cutoff_col : str, default="cutoff_time"
        Name of cutoff timestamp column.
    event_time_cols : dict[str, str]
        Mapping of table name to event timestamp column.
    allowed_lag : str, default="0s"
        Allowed tolerance for future timestamps (pandas timedelta string).
    return_report : bool, default=False
        If True, return a tuple ``(features_df, AuditReport)`` with audit
        information about dropped/kept event pairs.

    Returns
    -------
    pandas.DataFrame or (pandas.DataFrame, AuditReport)
        Feature matrix aligned to the spine. If ``return_report`` is True a
        second return value contains the audit report.
    """

    if event_time_cols is None:
        raise ValueError("event_time_cols must be provided, e.g. {'events': 'event_time'}")

    # validate spine
    if entity_col not in spine.columns or cutoff_col not in spine.columns:
        raise ValueError(f"Required columns {entity_col} and/or {cutoff_col} not found in spine DataFrame")

    out = spine.copy()
    out[cutoff_col] = pd.to_datetime(out[cutoff_col], errors="raise")
    spine_subset = out[[entity_col, cutoff_col]]

    report = AuditReport() if return_report else None

    if isinstance(spec, list):
        spec = FeatureSpec(blocks=spec)

    for block in spec.blocks:
        if isinstance(block, WindowAgg): 
            events_df = tables[block.table] # get the events table specified in the block
            event_time_col = event_time_cols[block.table] # get the event time column for this table

            # Collect audit on first window; reuse for subsequent windows or report
            audit_data_for_table = None

            for w in block.windows: # for each window specified in the block
                # get events in the window using the helper function
                result = _events_in_window(
                    spine=spine_subset,
                    events=events_df,
                    time_window=w,
                    allowed_lag=allowed_lag,
                    entity_col=entity_col,
                    cutoff_col=cutoff_col,
                    event_time_col=event_time_col,
                    collect_audit=return_report,
                )

                if return_report:
                    in_window, audit_data = result
                    # Only capture audit from first window (it's the same for all windows of the same table)
                    if audit_data_for_table is None:
                        audit_data_for_table = audit_data
                else:
                    in_window = result

                # process each metric in block.metrics
                for dim, aggs in block.metrics.items():
                    if dim != "*" and dim not in in_window.columns:
                        raise ValueError(f"Column '{dim}' not found in table '{block.table}'")

                    if dim == "*":
                        # wildcard: count
                        if "count" in aggs:
                            counts = (
                                in_window.groupby([entity_col, cutoff_col], sort=False)
                                .size()
                                .reset_index(name="count")
                            )
                            feature_name = f"{block.table}__n_events__{w.lower()}"
                            merged = spine_subset.merge(counts, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["count"].fillna(0).astype(int).values
                    else:
                        gb = in_window.groupby([entity_col, cutoff_col], sort=False)

                        # named column aggregations
                        if "sum" in aggs:
                            sum_agg = (
                                gb[dim]
                                .sum()
                                .reset_index(name="sum_val")
                            )
                            feature_name = f"{block.table}__{dim}__sum__{w.lower()}"
                            merged = spine_subset.merge(sum_agg, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["sum_val"].fillna(0).values

                        if "mean" in aggs:
                            mean_agg = (
                                gb[dim]
                                .mean()
                                .reset_index(name="mean_val")
                            )
                            feature_name = f"{block.table}__{dim}__mean__{w.lower()}"
                            merged = spine_subset.merge(mean_agg, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["mean_val"].fillna(0).values
                        if "nunique" in aggs:
                            nunique_agg = (
                                gb[dim]
                                .nunique()
                                .reset_index(name="nunique_val")
                            )
                            feature_name = f"{block.table}__{dim}__nunique__{w.lower()}"
                            merged = spine_subset.merge(nunique_agg, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["nunique_val"].fillna(0).astype(int).values

            # Add audit data for this table if collecting reports
            if return_report and audit_data_for_table is not None:
                table_audit = TableAudit(
                    table=block.table,
                    total_joined_pairs=audit_data_for_table["total_joined_pairs"],
                    kept_pairs=audit_data_for_table["kept_pairs"],
                    dropped_future_pairs=audit_data_for_table["dropped_future_pairs"],
                    max_future_delta=audit_data_for_table["max_future_delta"],
                )
                report.add_table(table_audit)

        elif isinstance(block, RecencyBlock): # compute recency feature
            events_df = tables[block.table]
            event_time_col = event_time_cols[block.table]

            # Filter events if a filter is specified
            filtered_events = events_df.copy()
            if block.filter_col is not None: 
                filtered_events = filtered_events[filtered_events[block.filter_col] == block.filter_value] 

            # Compute time since last event for each entity-cutoff pair
            recency_features = _compute_recency(
                spine=spine_subset,
                events=filtered_events,
                entity_col=entity_col,
                cutoff_col=cutoff_col,
                event_time_col=event_time_col,
                allowed_lag=allowed_lag,
            )

            # Add recency feature column
            feature_name = f"{block.table}__recency"
            if block.filter_col is not None:
                feature_name += f"__{block.filter_col}_{block.filter_value}"

            merged = spine_subset.merge(recency_features, on=[entity_col, cutoff_col], how="left")
            out[feature_name] = merged["recency_days"].values

        else:
            raise ValueError(f"Unknown block type: {type(block)}")

    if return_report:
        return out, report
    return out

Feature Specification

WindowAgg

safefeat.spec.WindowAgg dataclass

Specification for aggregating events within a time window.

Attributes:

Name Type Description
table str

Name of the events table to read (key in the tables mapping passed to :func:build_features).

windows List[str]

List of window lengths expressed in pandas timedelta strings (e.g. "30D", "7D"). For each window a set of features will be produced.

metrics Dict[str, List[str]]

Mapping from a column name to a list of aggregations to compute. Use "*" as a wildcard key to request event counts (only ["count"] is supported for the wildcard). Example: {"*": ["count"], "amount": ["sum", "mean"]}.

Source code in src/safefeat/spec.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@dataclass
class WindowAgg:
    """Specification for aggregating events within a time window.

    Attributes
    ----------
    table:
        Name of the events table to read (key in the `tables` mapping passed
        to :func:`build_features`).
    windows:
        List of window lengths expressed in pandas timedelta strings (e.g.
        ``"30D"``, ``"7D"``). For each window a set of features will be
        produced.
    metrics:
        Mapping from a column name to a list of aggregations to compute. Use
        ``"*"`` as a wildcard key to request event counts (only ``["count"]``
        is supported for the wildcard). Example: ``{"*": ["count"],
        "amount": ["sum", "mean"]}``.
    """
    table: str
    windows: List[str]
    metrics: Dict[str, List[str]]

    def __post_init__(self):
        # basic shape/type checks
        if not isinstance(self.metrics, dict):
            raise ValueError("metrics must be a dict")

        # allowed aggregations
        allowed_aggs = {"count", "sum", "mean", "nunique"}

        for dim, aggs in self.metrics.items():
            # each value should be a list of strings
            if not isinstance(aggs, list) or not all(isinstance(a, str) for a in aggs):
                raise ValueError(f"aggregations for '{dim}' must be a list of strings")

            if dim == "*":
                # wildcard only supports a single count
                if aggs != ["count"]:
                    raise ValueError("'*' dimension only supports ['count']")
            else:
                # ensure every aggregation is in allow list
                for a in aggs:
                    if a not in allowed_aggs:
                        raise ValueError(f"unsupported aggregation '{a}' for dimension '{dim}'")