Metadata-Version: 2.4
Name: f9columnar
Version: 0.5.0
Summary: Columnar analysis utils for high-energy physics data analysis.
Project-URL: Repository, https://gitlab.cern.ch/ijs-f9-ljubljana/f9columnar
Author-email: Jan Gavranovic <jan.gavranovic@cern.ch>
License: MIT
License-File: LICENSE
Keywords: analysis,columnar,data-analysis,hep,physics
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Scientific/Engineering :: Physics
Requires-Python: <3.15,>=3.11
Requires-Dist: act-client
Requires-Dist: aiohttp<4,>=3.13.3
Requires-Dist: awkward<3,>=2.6.7
Requires-Dist: corner<3,>=2.2.2
Requires-Dist: dill<1,>=0.4.1
Requires-Dist: flask<4,>=3.1.2
Requires-Dist: h5py<4,>=3.11.0
Requires-Dist: hdf5plugin<7,>=6.0.0
Requires-Dist: hist[plot]<3,>=2.7.3
Requires-Dist: imbalanced-learn<1,>=0.13.0
Requires-Dist: lightning<3,>=2.5.3
Requires-Dist: matplotlib<4,>=3.9.2
Requires-Dist: mplhep<1,>=0.3.50
Requires-Dist: networkx<4,>=3.3
Requires-Dist: numba<1,>=0.60.0
Requires-Dist: numpy<3,>=2.0.2
Requires-Dist: pandas<3,>=2.2.2
Requires-Dist: plothist<2,>=1.2.6
Requires-Dist: psutil<7,>=6.1.0
Requires-Dist: pydot<4,>=3.0.1
Requires-Dist: requests<3,>=2.28.1
Requires-Dist: rich<14,>=13.7.0
Requires-Dist: scikit-learn<2,>=1.6.1
Requires-Dist: scipy<2,>=1.14.1
Requires-Dist: seaborn<1,>=0.13.2
Requires-Dist: sympy<2,>=1.13.1
Requires-Dist: tables<4,>=3.10.1
Requires-Dist: tqdm<5,>=4.65.0
Requires-Dist: uncertainties<4,>=3.2.2
Requires-Dist: uproot<6,>=5.3.10
Requires-Dist: vector<2,>=1.4.1
Provides-Extra: torch-cpu
Requires-Dist: torch<2.12,>=2.9.0; extra == 'torch-cpu'
Provides-Extra: torch-cuda
Requires-Dist: torch<2.12,>=2.9.0; extra == 'torch-cuda'
Description-Content-Type: text/markdown

# F9 Columnar

<div align="center">

[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![python](https://img.shields.io/badge/-Python_3.11+-3776AB?logo=python&logoColor=white)](https://www.python.org/)
[![pytorch](https://img.shields.io/badge/-PyTorch_2.9+-EE4C2C?logo=pytorch&logoColor=white)](https://pytorch.org/)
[![License](https://img.shields.io/badge/license-MIT-green)](./LICENSE)
[![pytorch](https://img.shields.io/pypi/v/f9columnar)](https://pypi.org/project/f9columnar/)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.19312298.svg)](https://doi.org/10.5281/zenodo.19312298)

</div>

A PyTorch-based library for processing ROOT and HDF5 event data in high energy physics.

### Project description

F9Columnar is a Python library designed to streamline the processing of physics datasets from ROOT and HDF5 files for machine learning applications. Built on top of PyTorch, Uproot and Awkward Array, it provides a modular framework for efficient data loading, transformation, and analysis in high-energy physics research.

The library bridges the gap between traditional high-energy physics data formats (ROOT) and modern machine learning workflows. It enables physicists and data scientists to:

- Process ROOT and HDF5 datasets using PyTorch DataLoaders
- Apply event selections, calculate variables, and create histograms
- Convert ROOT data to HDF5 format for ML training

Check out the [PyHEP 2025 talk](https://indico.cern.ch/event/1566263/timetable/?view=standard#14-from-root-to-pytorch-seamle) for a good overview of what the library can do.

##  Setup

The library comes without any PyTorch dependencies by default to allow flexible installations. You can install it with or without PyTorch GPU support depending on your needs.

### Install with PyTorch for GPU

```shell
pip install f9columnar
pip install torch
```

### Install with PyTorch for CPU

```shell
pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu
```

## Library overview

- [Aim of the library](#aim-of-the-library)
- [Getting started](#getting-started)
- [Basic data loading](#basic-data-loading)
- [Using processors and histogramming](#using-processors-and-histogramming)
- [Converting ROOT to HDF5](#converting-root-to-hdf5)
- [Feature scaling](#feature-scaling)
- [ML DataLoaders](#ml-dataloaders)
  - [Collate configuration](#collate-configuration)
  - [HDF5 DataModule](#hdf5-datamodule)
  - [Direct ROOT DataModule](#direct-root-datamodule)
  - [Multi-source training (Combined Sources)](#multi-source-training-combined-sources)
- [Data augmentation](#data-augmentation)
- [Inference server](#inference-server)

### Aim of the library

The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:

<div align="center">

![F9Columnar](https://gitlab.cern.ch/ijs-f9-ljubljana/F9Columnar/-/raw/main/docs/f9columnar_flow.svg)

</div>

### Getting started

To illustrate the basic usage of the library, we need some ROOT files. We can create artificial ROOT files using the `AwkwardEventGenerator` utility. This utility allows us to define flat and jagged branches with different distributions.
Let's create some artificial ROOT files with 100,000 events, containing electrons and muons with transverse momentum (pt), pseudorapidity (eta), and azimuthal angle (phi) distributions, and a flat weight distribution.

```python
from f9columnar.utils.ak_events import AwkwardEventGenerator

gen = AwkwardEventGenerator(n_events=100000, min_particles=0, max_particles=5)

flat_spec = {"weight": {"dist_type": "normal", "params": {"mean": 1.0, "stddev": 0.1}}}

jagged_spec = {
    "el": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 3.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
    "mu": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 5.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
}

arrays = gen.from_spec_to_dict(flat_spec=flat_spec, jagged_spec=jagged_spec)
gen.to_root(arrays, file_path="data/simpleNTuple.root", tree_name="physics", n_splits=5)
```

This will create 5 ROOT files named `simpleNTuple_part<idx>.root` each having 20,000 events in the `data/` directory with artificial event data. We can now use the library to load and process this test data.

### Basic data loading

The most basic usage of the library is to load data from ROOT files using the `get_root_dataloader` function. This function returns a PyTorch DataLoader that yields batches of events as Awkward arrays.

```python
import glob

from f9columnar.root_dataloader import get_root_dataloader

def filter_branch(branch: str) -> bool:
    # select only these two branches
    return branch == "el_pt" or branch == "mu_pt"

# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total, _ = get_root_dataloader(
    name="awkwardEvents",  # name identifier
    files=glob.glob("data/simpleNTuple_part*.root"),  # root files
    key="physics",  # root file tree name
    step_size=1024,  # number of events per array batch to read into memory
    num_workers=0,  # number of workers for parallel processing (single threaded if 0)
    processors=None,  # arbitrary calculations on arrays
    filter_name=filter_branch,  # filter branches
)

# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
    arrays, reports = events  # reports includes useful information about the batch
    el_pt, mu_pt = arrays["el_pt"], arrays["mu_pt"]  # awkward arrays of electron and muon pT
    # ... do something with the arrays
```

### Using processors and histogramming

The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.

Calculations on arrays within worker processes can be performed using a `Processor`. Multiple processors can be linked together in a `ProcessorsGraph`, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG's topological order.

Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete.

Lets make a simple analysis that performs a pT cut on leptons, selects a leading lepton (highest pT), and creates a histogram of the leading lepton pT.

First, we define the processors:

```python
import awkward as ak

from f9columnar.processors import CheckpointProcessor, Processor, ProcessorsGraph
from f9columnar.processors_collection import decorate_branches

@decorate_branches("el_pt", "el_eta", "el_phi", "mu_pt", "mu_eta", "mu_phi", "weight")
class CutLeptons(Processor):
    def __init__(self, name: str, pt_cut: float) -> None:
        super().__init__(name)
        self.pt_cut = pt_cut

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # apply pt cut particle mask on electrons and muons
        mask_el, mask_mu = arrays["el_pt"] > self.pt_cut, arrays["mu_pt"] > self.pt_cut

        # mask arrays to keep only leptons passing the pt cut
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][mask_el]

        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mask_mu]

        # return must be a dictionary with key name for the argument of the next processor
        return {"arrays": arrays}

class LeadingLeptons(Processor):
    def __init__(self, name: str) -> None:
        super().__init__(name)

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # get leading electron (highest pt)
        el_pt_argmax = ak.argmax(arrays["el_pt"], axis=1, keepdims=True)
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][el_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        # get leading muon (highest pt)
        mu_pt_argmax = ak.argmax(arrays["mu_pt"], axis=1, keepdims=True)
        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mu_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        return {"arrays": arrays}
```

The `decorate_branches` decorator is used to specify the branches that the processor depends on. This allows the dataloader to only load the necessary branches from the ROOT files.

Making a histogram processor is also straightforward:

```python
from f9columnar.histograms import HistogramProcessor

class LeadingLeptonHistogram(HistogramProcessor):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self.make_hist1d("leading_el_pt", 50, 20.0, 300.0)  # num. bins, min, max
        self.make_hist1d("leading_mu_pt", 50, 20.0, 500.0)  # num. bins, min, max

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # fill histogram with leading lepton pT in GeV
        leading_el_pt, leading_mu_pt = arrays["el_pt"] / 1e3, arrays["mu_pt"] / 1e3

        non_empty_el = ak.num(leading_el_pt) > 0
        leading_el_pt = ak.firsts(leading_el_pt[non_empty_el])
        el_weight = arrays["weight"][non_empty_el]

        # fill electron histogram with pt and weights
        self.fill_hist1d("leading_el_pt", leading_el_pt, el_weight)

        non_empty_mu = ak.num(leading_mu_pt) > 0
        leading_mu_pt = ak.firsts(leading_mu_pt[non_empty_mu])
        mu_weight = arrays["weight"][non_empty_mu]

        # fill muon histogram with pt and weights
        self.fill_hist1d("leading_mu_pt", leading_mu_pt, mu_weight)

        return {"arrays": arrays}
```

We can now build the processor graph from the defined processors:

```python
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.processors_collection import ProcessorsCollection

# define a collection of processors for bookkeeping
analysis_collection = ProcessorsCollection("leptonCollection")

# add processors to the collection
analysis_collection += CutLeptons(name="cutLeptons", pt_cut=25.0)  # pt cut at 25 GeV
analysis_collection += LeadingLeptons(name="leadingLeptons")
analysis_collection += LeadingLeptonHistogram(name="leadingLeptonHistogram")

# define the processor graph
analysis_graph = ProcessorsGraph()

# build a linear chain of processors
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(),
    CheckpointProcessor("output"),
)

# if the graph is a linear chain we can use chain() to connect the nodes
# for more complex graphs use connect() method that defines edges explicitly
analysis_graph.chain()

# draw the graph with pygraphviz
analysis_graph.draw("analysis_graph.pdf")
```

We now have a processor graph that can be passed into the `DataLoader` to process events. The graph will be executed in each worker process for each batch of events. Instead of using a simple `get_root_dataloader` function, we can create a `RootPhysicsDataset` and initialize the dataloader with the processor graph:

```python
from f9columnar.dataset_builder import RootPhysicsDataset

# make one or more root datasets as a list of RootPhysicsDataset objects
root_datasets = [
    RootPhysicsDataset(
        name="awkwardEvents",
        root_files=glob.glob("data/simpleNTuple_part*.root"),
        is_data=False,
    )
]

# analysis collection provides the branch name filter automatically
branch_name_filter = analysis_collection.get_branch_name_filter()

# setup dataloader configuration
dataloader_config = {
    "step_size": 1024,
    "key": "physics",
    "num_workers": 0,
    "filter_name": branch_name_filter,
}

# setup and initialize the dataloader for the root dataset
root_datasets[0].setup_dataloader(**dataloader_config)
# runs get_root_dataloader() internally
root_datasets[0].init_dataloader(analysis_graph)
```

We have build a processor graph (the actual calculations) and a dataloader that uses the graph to process events. We can now run the event loop over batches of events:

```python
from f9columnar.run import ColumnarEventLoop

# define the event loop over the root datasets
event_loop = ColumnarEventLoop(
    mc_datasets=root_datasets,
    data_datasets=None,
    cut_flow=False,
    save_path="results/",
)
# run it!
event_loop.run(mc_only=True)
```

That's it! After running the event loop, the histograms will be saved in the `results/` directory as `awkwardEvents_mc_0_hists.p`.

### Converting ROOT to HDF5

The library includes `Hdf5WriterProcessor` - a `Processor` that converts ROOT ntuples into HDF5 files suitable for ML training. It supports both **flat** (event-level) and **jagged** (variable-length, e.g. per-jet) arrays, with two storage modes for jagged data.

#### Storage modes

**Padded mode** (`varlen=False`): jagged arrays are stored as fixed-size 2D arrays `(n_events, max_length)`, with short sequences filled using a configurable `pad_value`.

**Variable-length mode** (`varlen=True`): jagged arrays are flattened into 1D packed arrays, with boundaries tracked by a cumulative-lengths dataset (`culens`). For example, an object group `jets` with `culens = [0, 3, 5]` means event 0 owns jets at indices 0-2 and event 1 owns jets at indices 3-4. This avoids wasting space when sequence lengths vary significantly.

#### Pile-based shuffling

Output is split across multiple HDF5 "pile" files (`p0.hdf5`, `p1.hdf5`, ..., `p{n-1}.hdf5`). During writing, each chunk of events is assigned to a pile either randomly (`pile_assignment="random"`) or in round-robin order (`pile_assignment="deque"`). During reading, piles are loaded independently - the ML dataloader shuffles within each pile, giving efficient pseudo-random access without loading the full dataset into memory.

```
-- writing step --
for batch of events from ROOT file do
    for chunk of size chunk_shape in batch do
        j <- pile index (random or round-robin)
        append chunk to pile p[j]

-- reading step --
for pile p[j] in selected range do
    load p[j], shuffle in memory
    yield batches to DataLoader
```

#### Valid masks

The writer can produce a boolean `valid` column inside each jagged object struct. When `valid=True`, slots holding real data are marked `True` and padding slots are `False`. Passing a dict (e.g. `valid={"jets": ("jet_type", [2])}`) additionally filters by a type field - only slots where `jet_type == 2` are marked valid.

#### Example usage

`Hdf5WriterProcessor` is a regular `Processor` that runs inside the processor graph (alongside any custom transformation processors) rather than as a separate postprocessor:

```python
from f9columnar.dataset_builder import RootPhysicsDataset
from f9columnar.hdf5_writer import Hdf5WriterProcessor
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.run import ColumnarEventLoop

# build the processor graph -- the writer sits in the chain like any other processor
analysis_graph = ProcessorsGraph()
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(),  # custom processors (weights, labels, scaling, ...)
    Hdf5WriterProcessor(
        output_files_path="output/hdf5/",
        flat_column_names=["ptl1", "ptl2", "mll", "weights", "label_type"],
        jagged_column_names={
            "jets": ["jet_e", "jet_pt", "jet_eta", "jet_phi"],
        },
        default_chunk_shape=32,
        max_lengths={"jets": 15},
        pad_values={"jets": 999.0},
        sort_by_variable={"jets": "jet_pt"},     # descending sort before truncation
        n_piles=256,
        pile_assignment="random",
        enforce_dtypes={"nbjets": "int64"},
        varlen=True,                              # variable-length storage
        enforce_varlen_max_length=False,
        valid={"jets": ("jet_type", [2])},        # per-slot valid mask filtered by jet_type
        extra_metadata={"scale": {"jet_e": 1.0e-3}},
        append=False,
        compression="lz4",                       # forwarded to h5py.create_dataset
    ),
    CheckpointProcessor("output", save_arrays=True),
)
analysis_graph.chain()

# create ROOT datasets
mc_dataset = RootPhysicsDataset("MC", [str(f) for f in mc_files], is_data=False)
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)

# run the event loop
event_loop = ColumnarEventLoop(
    mc_datasets=[mc_dataset],
    data_datasets=None,
    cut_flow=False,
)
event_loop.run(mc_only=True)
```

#### Output HDF5 structure

Each pile file (`p0.hdf5`, ...) contains:

```
/events                  # flat event-level structured array (n_events,)
    fields: ptl1, ptl2, mll, weights, label_type, ...

/jets                    # varlen: packed 1D (total_jets,) | padded: 2D (n_events, max_length)
    fields: jet_e, jet_pt, jet_eta, jet_phi, valid

/jets_culens             # varlen only: (n_events+1,) cumulative offsets, dtype int64
                         # jets[culens[i]:culens[i+1]] = jets for event i

/metadata                # JSON string - pad_values, max_lengths, sort_by_variable,
                         # varlen, enforce_varlen_max_length, extra_metadata, ...
```

### Feature scaling

The library fits sklearn-based scalers over HDF5 (or ROOT) datasets so that scaling works on data that doesn't fit in memory.

#### Numerical scalers

The following numerical scaler types are supported (passed as `numer_scaler_type`):

| Type | sklearn class | Supports partial fit | NN scaler |
|---|---|---|---|
| `"minmax"` | `MinMaxScaler` | yes | `NNMinMaxScaler` |
| `"maxabs"` | `MaxAbsScaler` | yes | - |
| `"standard"` | `StandardScaler` | yes | `NNStandardScaler` |
| `"robust"` | `RobustScaler` | no | - |
| `"quantile"` | `QuantileTransformer` | no | - |
| `"power"` | `PowerTransformer` | no | - |
| `"logit"` | `MinMaxScaler` -> `LogitTransform` | yes | `NNLogitScaler` |
| `"standard_logit"` | `MinMaxScaler` -> `LogitTransform` -> `StandardScaler` | yes | `NNStandardLogitScaler` |

When a scaler does not support partial fit, all events (up to `n_max`) are accumulated before fitting. Scalers that support partial fit are updated incrementally per batch.

#### Categorical scaler

Setting `categ_scaler_type="categorical"` enables a custom `CategoricalFeatureScaler` (a `LabelEncoder` variant that supports partial fitting). It maps each unique category value to a consecutive integer per column, and can be updated incrementally as new batches arrive.

Either scaler can be disabled by passing `None` for its type. At least one must be enabled.

#### Single-loader usage

```python
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.hdf5_dataloader import get_hdf5_dataloader
from f9columnar.ml.dataset_scaling import DatasetScaler

collate_config = CollateConfig(
    modalities={
        "events": ["mll", "met", "metphi"],
        "jets":   ["jet_pt", "jet_eta", "jet_phi"],
    },
    modalities_metadata={"events": ["eventNumber", "weights"]},
    layout="varlen",
)

loader, _ = get_hdf5_dataloader(
    files=hdf5_pile_files,
    collate_config=collate_config,
    step_size=100_000,
    num_workers=4,
    batch_size=4096,
)

ds_scaler = DatasetScaler(
    dataloader=loader,
    modalities=collate_config.modalities,
    modalities_metadata=collate_config.modalities_metadata,
    numer_scaler_type="standard",        # numerical scaler (or None to disable)
    categ_scaler_type="categorical",     # categorical scaler (or None to disable)
    scaler_save_path="scalers/",         # output directory
    n_max=1_000_000,                     # cap for non-partial-fit scalers (None = all)
    extra_hash="v2",                     # optional suffix on scaler filenames
    scaler_kwargs={"partial_fit": True, "copy": True},
)
ds_scaler.feature_scale()
```

The driver writes one pickle per (modality, scaler kind). For varlen/padded modalities the padding mask is applied automatically - padded slots are not included in the fit.

#### Multi-source usage

When you train with several HDF5/ROOT sources (`LightningCombinedSourcesDataModule`), use `MultiSourceDatasetScaler` to fit one scaler set per source from the same YAML config used at training time:

```python
from f9columnar.ml.dataset_scaling import MultiSourceDatasetScaler

ms_scaler = MultiSourceDatasetScaler(
    config=sources_yaml,                 # path/dict/DictConfig - same schema as the DataModule
    extra_hash="v2",
    extra_processors=[CustomProcessor],  # ROOT-source processor classes (optional)
)
ms_scaler.feature_scale()
```

Each source's `feature_scaling` block specifies the scaler types and save path:

```yaml
sources:
  - name: 2L_Run2
    feature_scaling:
      numer_scaler_type: standard
      categ_scaler_type: null
      save_path: scalers/Run2/
      n_max: null
      scaler_params: { partial_fit: true, copy: true }
```

`MultiSourceDatasetScaler` internally runs a deterministic, augmentation-free pass over each source.

#### NN scalers (torch nn.Module wrappers)

The fitted sklearn scalers are saved as pickle files, but at inference time (and during training) you typically want pure PyTorch operations that live inside the model graph. `nn_scalers.py` provides `nn.Module` wrappers that extract the fitted statistics from sklearn scalers and register them as non-trainable buffers, so they move with the model across devices and are included in `state_dict` / ONNX exports.

Each scaler exposes both `forward()` (scale) and `inverse()` (unscale).

The composite `NNDatasetScaler` wraps all per-modality scalers into a single module. It takes a fitted scaler utility plus a `ColumnSelection` describing which columns are numerical vs categorical and applies the correct scaler to each feature column:

```python
from f9columnar.ml.nn_scalers import NNDatasetScaler

nn_scaler = NNDatasetScaler(scaler_util, selection)

# forward: raw features -> scaled features
X_events_scaled, Xs_scaled = nn_scaler(X_events, [X_jets, X_electrons])

# inverse: scaled features -> raw features
X_events_raw, Xs_raw = nn_scaler.inverse(X_events_scaled, Xs_scaled)
```

Padded rows are preserved - the scaler computes a padding mask before applying transforms and restores the pad value afterwards.

### ML DataLoaders

The library exposes three PyTorch Lightning `DataModule`s built on top of the same awkward-array to tensor collate pipeline:

| DataModule | Source | Use case |
|---|---|---|
| `LightningHdf5DataModule` | one HDF5 dataset (pile files) | single-source training on preconverted data |
| `LightningRootDataModule` | one ROOT dataset | training **directly** from ROOT files, no HDF5 conversion |
| `LightningCombinedSourcesDataModule` | many HDF5 / ROOT sources mixed | multi-source training (regions, runs, MC campaigns) from a single YAML |

All three share a common collation model (`CollateConfig`) and produce per-modality `ModalityBatch` tensors organised inside a `SourceBatch`.

#### Collate configuration

`CollateConfig` describes how an awkward batch is converted into model-ready tensors. It is the single object that drives field selection, layout, sorting, padding, augmentation, and (for ROOT) automatic branch filtering.

```python
from f9columnar.ak_collate_fn import CollateConfig

collate_config = CollateConfig(
    # per-modality field selection - modality names must match the HDF5
    # object groups (or are arbitrary group names when loading from ROOT)
    modalities={
        "events":    ["mll", "met", "metphi", "nbjets"],
        "jets":      ["jet_pt", "jet_eta", "jet_phi", "jet_e"],
        "electrons": ["el_pt", "el_eta", "el_phi"],
    },
    # extra fields that travel with each batch but are NOT fed to the model
    # (labels, weights, eventNumber for stage splitting, ...), available via
    # `ModalityBatch.metadata`
    modalities_metadata={"events": ["eventWeight", "eventNumber", "y", "w"]},
    layout="varlen",                      # "varlen" (packed + cu_seqlens) or "padding" (2D + valid)
    max_lengths={"jets": 12},             # only used when layout="padding"
    pad_values={"jets": 0.0},             # only used when layout="padding"
    sort_by_variable={"jets": "jet_pt"},  # optional descending sort per modality
    combine_varlen_objects=True,          # attach cross-modality reorder perm in metadata
    augmentations={...},                  # see "Data augmentation" below
)
```

**Layouts.** With `layout="varlen"` each jagged modality returns a packed 1D tensor of shape `(total_objects,)` plus a `cu_seqlens` index `(B+1,)`, flat fields are `(B,)` tensors. With `layout="padding"` jagged modalities return `(B, max_length)` tensors plus a boolean `valid` mask of the same shape.

The same `CollateConfig` is used by every DataModule below - `LightningRootDataModule` additionally uses `config.make_filter_fn()` to skip ROOT branches that the model never asks for.

#### HDF5 DataModule

`LightningHdf5DataModule` consumes pile files produced by `Hdf5WriterProcessor`. Stages are split by pile index - each stage gets a **disjoint** subset of piles, so the same event is never seen across train/val/test.

```python
import lightning as L
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.ml.lightning_data_module import LightningHdf5DataModule

dm = LightningHdf5DataModule(
    name="ttHcc_2L",
    files="output/hdf5/p*.hdf5",                   # glob or explicit list
    collate_config=collate_config,
    stage_split_piles={"train": 512, "val": 128, "test": 128},  # counts or [indices]
    step_size=100_000,                              # entries loaded into memory per step
    batch_size=4096,
    num_workers=-1,                                 # -1 = all available cores
    shuffle=True,                                   # train-only; non-train stages force False
    shuffle_files=True,
    shuffle_batch=True,
    drop_last=True,
    dataloader_kwargs={
        "val_batch_size": 8192,                     # per-stage batch_size override
        "test_batch_size": 8192,
        "pin_memory": True,
        "prefetch_factor": 2,
    },
)

trainer = L.Trainer(...)
trainer.fit(model, datamodule=dm)
```

`stage_split_piles` accepts either explicit pile indices (`{"train": [0, 1, 2, 3], "val": [4, 5]}`) or counts assigned in order (`{"train": 512, "val": 128}`). Shuffling is automatically disabled for non-training stages.

Per-source class re-balancing can be enabled by passing an `ImbalancedSampler` (`RandomUnderSampler`, `RandomOverSampler`, or `RandomResampleToTarget`).

#### Direct ROOT DataModule

`LightningRootDataModule` trains **directly** from ROOT files via `uproot`, with no HDF5 conversion step. Stage splitting is performed inside the per-stage processor graphs (typically via a `StageSplitProcessor` that routes events by `eventNumber % divisor`), and per-class balancing is provided by `PartitionedRootConfig` - partitions are matched by regex over filenames and combined according to `partition_mode` (`min_size`, `max_size`, or `max_size_cycle`).

```python
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.ml.lightning_data_module import LightningRootDataModule
from f9columnar.root_dataloader import PartitionedRootConfig

# one processor graph per stage -- usually only `stage` differs between them
def build_graph(stage: str):
    graph = ProcessorsGraph()
    graph.add(
        CheckpointProcessor("input"),
        StageSplitProcessor(branch="eventNumber", divisor=10,
                            splits={"train": [2,3,4,5,6,7,8,9], "val": [0], "test": [1]},
                            stage=stage),
        LabelProcessor(labels=CLASSES),
        WeightsProcessor(weights_field="eventWeight"),
        ScaleBranchProcessor(scale_dct={"jet_pt": 1e-3, "jet_e": 1e-3}),
    )
    graph.chain()
    return graph

dm = LightningRootDataModule(
    name="phys_root",
    files=glob.glob("data/*.Nominal.root"),
    key="NOSYS/physics",                            # TTree path (REQUIRED for ROOT)
    step_size=512,                                  # uproot.iterate chunk size
    num_workers=4,
    shuffle=True,
    stage_processors={                              # one graph per stage
        "train": build_graph("train"),
        "val":   build_graph("val"),
        "test":  build_graph("test"),
    },
    partitioned_config=PartitionedRootConfig(
        # regex per partition -- one partition per class label
        partitions={c: [f"^{c}\\.Nominal\\.root$"] for c in CLASSES},
        partition_mode="max_size_cycle",            # cycle short partitions until longest done
        shuffle_partitions=True,
        shuffle_batch_partitions=True,
        partition_batch_size=64,                    # sub-batch size after concatenating partitions
        partition_drop_last_batch=True,
    ),
    collate_config=collate_config,                  # same CollateConfig as HDF5
    dataloader_kwargs={"val_batch_size": 128, "test_batch_size": 128},
)
```

For non-training stages the module **forces** `partition_mode="max_size"`, `shuffle_partitions=False`, and `shuffle_batch_partitions=False` so every event in val/test is seen exactly once.

When the partitioned config is not needed, you can also build a plain DataLoader with `get_root_dataloader(name, files, key, step_size, num_workers, processors=..., collate_config=..., partitioned_config=...)` directly - `LightningRootDataModule` is a thin wrapper around it.

#### Multi-source training (Combined Sources)

`LightningCombinedSourcesDataModule` mixes any number of HDF5 and/or ROOT sources, each with its own collate, scaler, augmentation, and per-source processors. The whole configuration is driven by a single YAML schema (also documented in [sources.py](f9columnar/ml/sources.py)):

```yaml
defaults: &source_defaults
  shuffle: true
  drop_last: true
  num_workers: -1
  step_size: 8192

sources:
  - name: 2L_Run2
    id: 0
    data:
      <<: *source_defaults
      reader: hdf5
      paths: [/path/to/2L_Run2/*.h5]
      pin_memory: true
    collate:
      modalities:
        events: [run_number, mc_event_weight, label_type, weights]
        jets:   [jet_pt, jet_eta, jet_phi]
      layout: varlen
    sampling: { weight: 1.0, batch_size: 64 }
    imbalanced_sampler:
      sampler_name: RandomResampleToTarget
      sampler_kwargs:
        sampling_strategy: { ttH_cc: 4096, ttH_bb: 2048 }
      class_labels: { ttH_cc: 0, ttH_bb: 1 }
      label_field: y

  - name: 2L_Run3
    id: 1
    data:
      <<: *source_defaults
      reader: root
      paths: [/path/to/2L_Run3/*.root]
      key: NOSYS/physics
    partitioned:
      partitions:
        mc:   ['^mc_.*.root$']
        data: ['^data_.*.root$']
      partition_mode: max_size_cycle
      partition_batch_size: 64
    collate:
      modalities:
        events: [run_number, mc_event_weight]
        jets:   [jet_pt, jet_eta]
      layout: varlen
    processors:                # ROOT-only: per-source processor graph
      start_graph:
        chain: [CheckpointProcessor, JetSelection]
        JetSelection: { pt_cut: 20.0, sel_jet: true }
    sampling: { weight: 1.5, batch_size: 64 }

mixing:
  combined_mode: max_size_cycle   # min_size | max_size | max_size_cycle | sequential
  total_batch_size: 256
  weight_strategy: proportional_to_weight
```

Then in Python:

```python
from f9columnar.ml.lightning_data_module import LightningCombinedSourcesDataModule

dm = LightningCombinedSourcesDataModule(
    name="ttHcc_combined",
    config="config/sources.yaml",          # path, dict, DictConfig, or SourcesConfig
    extra_processors=[CustomProcessor],    # Processor classes referenced by name in YAML
)
```

Per-source batch sizes are derived from `mixing.weight_strategy`:

- `user_specified` - each source's `sampling.batch_size` is used as-is (`total_batch_size` ignored).
- `uniform` - `total_batch_size` is split equally across sources.
- `proportional_to_weight` - split in proportion to `sampling.weight`.
- `proportional_to_size` - split in proportion to each source's event count.

The DataModule wraps the per-source loaders in a Lightning `CombinedLoader`, so each training step yields a dict `{source_name: SourceBatch}`. `combined_mode` controls when an epoch ends (shortest source, longest, or cycle the others). Non-train stages disable shuffles and augmentations automatically.

### Data augmentation

`f9columnar.ml.augmentation` provides physics-aware awkward-array augmentations that run **inside the collate step**, before tensor conversion. Available transforms:

| Spec | Effect |
|---|---|
| `ConstituentDropoutSpec(p)` | drop jagged constituents independently with probability `p` |
| `PhiRotationSpec(fields=...)` | global per-event phi rotation (synchronised across all modalities to preserve angular correlations) |
| `PtSmearingSpec(fields=..., a, b)` | multiplicative pT smearing |
| `AngularSmearingSpec(fields=..., sigma)` | additive Gaussian noise on eta and phi |
| `FlipSignSpec(fields=..., prob)` | per-event sign flip with probability `prob` |

Augmentations are declared per modality (under `CollateConfig.augmentations` or the YAML `collate.augmentations`) and combined by `ModalityAugmentationSpec`:

```python
augmentations = {
    "jets": {
        "constituent_dropout": 0.1,
        "phi_rotation":   {"fields": ["jet_phi"]},
        "pt_smearing":    {"fields": ["jet_pt"], "a": 0.05, "b": 0.01},
        "angular_smearing": {"fields": ["jet_eta", "jet_phi"], "sigma": 0.005},
    },
}
collate_config = CollateConfig(..., augmentations=augmentations, augmentation_strength=1.0)
```

### Inference server

`f9columnar.ml.inference_server.InferenceServer` is a dedicated GPU process that hosts one or more trained models and serves inference requests sent from PyTorch DataLoader workers via multiprocessing queues. This sidesteps the usual CUDA / fork-safety issues when workers try to run CUDA themselves and allows `MLScoreProcessor`-style processors to call a model inside the columnar pipeline.

```python
import torch
from f9columnar.ml.inference_server import InferenceServer

model = torch.load("checkpoints/model.pt").eval()

server = InferenceServer(
    models={"classifier": model},
    device="cuda:0",
    num_workers=4,                    # must match DataLoader num_workers
    multiprocessing_context="forkserver",
    to_numpy=True,                    # convert outputs to numpy on return
)
server.start()

# inside a processor that runs in a DataLoader worker:
scores = self.inference_server.infer(X, *Xs, model_key="classifier")

# tear down at the end of the run
server.shutdown()
```

Inputs may be tensors or arbitrarily nested lists / tuples / dicts of tensors. The server batches them, runs `model.forward` (optionally under `torch.autocast` for bf16/fp16), and returns the result to the originating worker.

## Projects using F9Columnar

- [SeeSawML](https://gitlab.cern.ch/ijs-f9-ljubljana/SeeSawML) - multi-source, multi-modal and multi-task ML training framework, built directly on the ROOT/HDF5 dataloaders, scalers, augmentations, and inference server described above.

## Citation

If you found F9Columnar useful, please consider citing its Zenodo record [10.5281/zenodo.19312298](https://zenodo.org/records/19312298).

You can find an example bibtex entry below.
```
@software{F9Columnar,
  author       = {Gavranovic, Jan},
  title        = {F9Columnar: A PyTorch-Based Library for High Energy Physics Data Processing and Analysis},
  publisher    = {Zenodo},
  doi          = {10.5281/zenodo.19312298},
  url          = {https://doi.org/10.5281/zenodo.19312298},
}
```
