Metadata-Version: 2.4
Name: f9columnar
Version: 0.4.1
Summary: Columnar analysis utils for high-energy physics data analysis.
Project-URL: Repository, https://gitlab.cern.ch/ijs-f9-ljubljana/f9columnar
Author-email: Jan Gavranovic <jan.gavranovic@cern.ch>
License: MIT
License-File: LICENSE
Keywords: analysis,columnar,data-analysis,hep,physics
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering :: Physics
Requires-Python: <3.13,>=3.11
Requires-Dist: act-client
Requires-Dist: aiohttp<4,>=3.13.3
Requires-Dist: awkward<3,>=2.6.7
Requires-Dist: corner<3,>=2.2.2
Requires-Dist: dill<1,>=0.4.1
Requires-Dist: flask<4,>=3.1.2
Requires-Dist: h5py<4,>=3.11.0
Requires-Dist: hist[plot]<3,>=2.7.3
Requires-Dist: imbalanced-learn<1,>=0.13.0
Requires-Dist: matplotlib<4,>=3.9.2
Requires-Dist: mplhep<1,>=0.3.50
Requires-Dist: networkx<4,>=3.3
Requires-Dist: numba<1,>=0.60.0
Requires-Dist: numpy<3,>=2.0.2
Requires-Dist: pandas<3,>=2.2.2
Requires-Dist: plothist<2,>=1.2.6
Requires-Dist: psutil<7,>=6.1.0
Requires-Dist: pydot<4,>=3.0.1
Requires-Dist: requests<3,>=2.28.1
Requires-Dist: rich<14,>=13.7.0
Requires-Dist: scikit-learn<2,>=1.6.1
Requires-Dist: scipy<2,>=1.14.1
Requires-Dist: seaborn<1,>=0.13.2
Requires-Dist: sympy<2,>=1.13.1
Requires-Dist: tables<4,>=3.10.1
Requires-Dist: tqdm<5,>=4.65.0
Requires-Dist: uncertainties<4,>=3.2.2
Requires-Dist: uproot<6,>=5.3.10
Requires-Dist: vector<2,>=1.4.1
Provides-Extra: torch-cpu
Requires-Dist: torch<2.10,>=2.9.0; extra == 'torch-cpu'
Provides-Extra: torch-cuda
Requires-Dist: torch<2.10,>=2.9.0; extra == 'torch-cuda'
Description-Content-Type: text/markdown

# F9 Columnar

<div align="center">

[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![python](https://img.shields.io/badge/-Python_3.12-3776AB?logo=python&logoColor=white)](https://www.python.org/)
[![pytorch](https://img.shields.io/badge/-PyTorch_2.9-EE4C2C?logo=pytorch&logoColor=white)](https://pytorch.org/)
[![License](https://img.shields.io/badge/license-MIT-green)](./LICENSE)
[![pytorch](https://img.shields.io/pypi/v/f9columnar)](https://pypi.org/project/f9columnar/)

</div>

A PyTorch-based library for processing ROOT and HDF5 event data in high energy physics.

### Project description

F9Columnar is a Python library designed to streamline the processing of physics datasets from ROOT and HDF5 files for machine learning applications. Built on top of PyTorch, Uproot and Awkward Array, it provides a modular framework for efficient data loading, transformation, and analysis in high-energy physics research.

The library bridges the gap between traditional high-energy physics data formats (ROOT) and modern machine learning workflows. It enables physicists and data scientists to:

- Process ROOT and HDF5 datasets using PyTorch DataLoaders
- Apply event selections, calculate variables, and create histograms
- Convert ROOT data to HDF5 format for ML training

Check out the [PyHEP 2025 talk](https://indico.cern.ch/event/1566263/timetable/?view=standard#14-from-root-to-pytorch-seamle) for a good overview of what the library can do.

##  Setup

The library comes without any PyTorch dependencies by default to allow flexible installations. You can install it with or without PyTorch GPU support depending on your needs.

### Install with PyTorch for GPU

```shell
pip install f9columnar
pip install torch
```

### Install with PyTorch for CPU

```shell
pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu
```

## Library overview

- [Aim of the library](#aim-of-the-library)
- [Getting started](#getting-started)
- [Basic data loading](#basic-data-loading)
- [Using processors and histogramming](#using-processors-and-histogramming)
- [Converting ROOT to HDF5](#converting-root-to-hdf5)
- [Feature scaling](#feature-scaling)
- [HDF5 ML DataLoader](#hdf5-ml-dataloader)

### Aim of the library

The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:

<div align="center">

![F9Columnar](https://gitlab.cern.ch/ijs-f9-ljubljana/F9Columnar/-/raw/main/docs/f9columnar_flow.svg)

</div>

### Getting started

To illustrate the basic usage of the library, we need some ROOT files. We can create artificial ROOT files using the `AwkwardEventGenerator` utility. This utility allows us to define flat and jagged branches with different distributions.
Let's create some artificial ROOT files with 100,000 events, containing electrons and muons with transverse momentum (pt), pseudorapidity (eta), and azimuthal angle (phi) distributions, and a flat weight distribution.

```python
from f9columnar.utils.ak_events import AwkwardEventGenerator

gen = AwkwardEventGenerator(n_events=100000, min_particles=0, max_particles=5)

flat_spec = {"weight": {"dist_type": "normal", "params": {"mean": 1.0, "stddev": 0.1}}}

jagged_spec = {
    "el": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 3.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
    "mu": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 5.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
}

arrays = gen.from_spec_to_dict(flat_spec=flat_spec, jagged_spec=jagged_spec)
gen.to_root(arrays, file_path="data/simpleNTuple.root", tree_name="physics", n_splits=5)
```

This will create 5 ROOT files named `simpleNTuple_part<idx>.root` each having 20,000 events in the `data/` directory with artificial event data. We can now use the library to load and process this test data.

### Basic data loading

The most basic usage of the library is to load data from ROOT files using the `get_root_dataloader` function. This function returns a PyTorch DataLoader that yields batches of events as Awkward arrays.

```python
import glob

from f9columnar.root_dataloader import get_root_dataloader

def filter_branch(branch: str) -> bool:
    # select only these two branches
    return branch == "el_pt" or branch == "mu_pt"

# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total, _ = get_root_dataloader(
    name="awkwardEvents",  # name identifier
    files=glob.glob("data/simpleNTuple_part*.root"),  # root files
    key="physics",  # root file tree name
    step_size=1024,  # number of events per array batch to read into memory
    num_workers=0,  # number of workers for parallel processing (single threaded if 0)
    processors=None,  # arbitrary calculations on arrays
    filter_name=filter_branch,  # filter branches
)

# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
    arrays, reports = events  # reports includes useful information about the batch
    el_pt, mu_pt = arrays["el_pt"], arrays["mu_pt"]  # awkward arrays of electron and muon pT
    # ... do something with the arrays
```

### Using processors and histogramming

The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.

Calculations on arrays within worker processes can be performed using a `Processor`. Multiple processors can be linked together in a `ProcessorsGraph`, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG's topological order.

Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete.

Lets make a simple analysis that performs a pT cut on leptons, selects a leading lepton (highest pT), and creates a histogram of the leading lepton pT.

First, we define the processors:

```python
import awkward as ak

from f9columnar.processors import CheckpointProcessor, Processor, ProcessorsGraph
from f9columnar.processors_collection import decorate_branches

@decorate_branches("el_pt", "el_eta", "el_phi", "mu_pt", "mu_eta", "mu_phi", "weight")
class CutLeptons(Processor):
    def __init__(self, name: str, pt_cut: float) -> None:
        super().__init__(name)
        self.pt_cut = pt_cut

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # apply pt cut particle mask on electrons and muons
        mask_el, mask_mu = arrays["el_pt"] > self.pt_cut, arrays["mu_pt"] > self.pt_cut

        # mask arrays to keep only leptons passing the pt cut
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][mask_el]

        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mask_mu]

        # return must be a dictionary with key name for the argument of the next processor
        return {"arrays": arrays}

class LeadingLeptons(Processor):
    def __init__(self, name: str) -> None:
        super().__init__(name)

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # get leading electron (highest pt)
        el_pt_argmax = ak.argmax(arrays["el_pt"], axis=1, keepdims=True)
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][el_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        # get leading muon (highest pt)
        mu_pt_argmax = ak.argmax(arrays["mu_pt"], axis=1, keepdims=True)
        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mu_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        return {"arrays": arrays}
```

The `decorate_branches` decorator is used to specify the branches that the processor depends on. This allows the dataloader to only load the necessary branches from the ROOT files.

Making a histogram processor is also straightforward:

```python
from f9columnar.histograms import HistogramProcessor

class LeadingLeptonHistogram(HistogramProcessor):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self.make_hist1d("leading_el_pt", 50, 20.0, 300.0)  # num. bins, min, max
        self.make_hist1d("leading_mu_pt", 50, 20.0, 500.0)  # num. bins, min, max

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # fill histogram with leading lepton pT in GeV
        leading_el_pt, leading_mu_pt = arrays["el_pt"] / 1e3, arrays["mu_pt"] / 1e3

        non_empty_el = ak.num(leading_el_pt) > 0
        leading_el_pt = ak.firsts(leading_el_pt[non_empty_el])
        el_weight = arrays["weight"][non_empty_el]

        # fill electron histogram with pt and weights
        self.fill_hist1d("leading_el_pt", leading_el_pt, el_weight)

        non_empty_mu = ak.num(leading_mu_pt) > 0
        leading_mu_pt = ak.firsts(leading_mu_pt[non_empty_mu])
        mu_weight = arrays["weight"][non_empty_mu]

        # fill muon histogram with pt and weights
        self.fill_hist1d("leading_mu_pt", leading_mu_pt, mu_weight)

        return {"arrays": arrays}
```

We can now build the processor graph from the defined processors:

```python
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.processors_collection import ProcessorsCollection

# define a collection of processors for bookkeeping
analysis_collection = ProcessorsCollection("leptonCollection")

# add processors to the collection
analysis_collection += CutLeptons(name="cutLeptons", pt_cut=25.0)  # pt cut at 25 GeV
analysis_collection += LeadingLeptons(name="leadingLeptons")
analysis_collection += LeadingLeptonHistogram(name="leadingLeptonHistogram")

# define the processor graph
analysis_graph = ProcessorsGraph()

# build a linear chain of processors
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(),
    CheckpointProcessor("output"),
)

# if the graph is a linear chain we can use chain() to connect the nodes
# for more complex graphs use connect() method that defines edges explicitly
analysis_graph.chain()

# draw the graph with pygraphviz
analysis_graph.draw("analysis_graph.pdf")
```

We now have a processor graph that can be passed into the `DataLoader` to process events. The graph will be executed in each worker process for each batch of events. Instead of using a simple `get_root_dataloader` function, we can create a `RootPhysicsDataset` and initialize the dataloader with the processor graph:

```python
from f9columnar.dataset_builder import RootPhysicsDataset

# make one or more root datasets as a list of RootPhysicsDataset objects
root_datasets = [
    RootPhysicsDataset(
        name="awkwardEvents",
        root_files=glob.glob("data/simpleNTuple_part*.root"),
        is_data=False,
    )
]

# analysis collection provides the branch name filter automatically
branch_name_filter = analysis_collection.get_branch_name_filter()

# setup dataloader configuration
dataloader_config = {
    "step_size": 1024,
    "key": "physics",
    "num_workers": 0,
    "filter_name": branch_name_filter,
}

# setup and initialize the dataloader for the root dataset
root_datasets[0].setup_dataloader(**dataloader_config)
# runs get_root_dataloader() internally
root_datasets[0].init_dataloader(analysis_graph)
```

We have build a processor graph (the actual calculations) and a dataloader that uses the graph to process events. We can now run the event loop over batches of events:

```python
from f9columnar.run import ColumnarEventLoop

# define the event loop over the root datasets
event_loop = ColumnarEventLoop(
    mc_datasets=root_datasets,
    data_datasets=None,
    cut_flow=False,
    save_path="results/",
)
# run it!
event_loop.run(mc_only=True)
```

That's it! After running the event loop, the histograms will be saved in the `results/` directory as `awkwardEvents_mc_0_hists.p`.

### Converting ROOT to HDF5

The library also includes a utility to convert ROOT files to HDF5 format.

Event shuffling is supported to randomize the order of events in the output HDF5 file. This is particularly useful for machine learning applications where data order can impact training. This is done using a 2-pass shuffling algorithm that is memory efficient and works well with large datasets:

```
-- writing step --
create empty datasets called piles p[0], p[1], ..., p[n - 1]
for step size of events x from a root file i do
    for chunk size c of events x do
        j ← random integer in [0, n - 1]
        append c to p[j]

-- reading step --
for j in [n1, n2] where 0 ≤ n1 < n2 and n1 < n2 ≤ n do
    read all events from p[j] into memory
    shuffle p[j] in memory
    for batch size of events x in p[j] do
        yield x to DataLoader
```

Below is an example of how to use the [writing utility](https://gitlab.cern.ch/ijs-f9-ljubljana/F9Columnar/-/blob/main/f9columnar/ml/hdf5_writer.py?ref_type=heads#L134). In the example, the dataloader loop is abstracted away using the `ColumnarEventLoop` class.

```python
from f9columnar.ml.hdf5_writer import Hdf5WriterPostprocessor

# we will make a linear chain of processors
analysis_graph = ProcessorsGraph()
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(), # add a collection of processors that do some calculations
    CheckpointProcessor("output", save_arrays=True), # save arrays at the end of the chain for the hdf5 writer
)
analysis_graph.chain()
analysis_graph.draw("hdf_analysis_graph.pdf")

# create the hdf5 writer postprocessor
# a postprocessor is a special processor that runs after the main processor graph and is executed in the main process
# ProcessorsGraph can be thought of as a map step and PostprocessorsGraph as a reduce step
hdf5_writer = Hdf5WriterPostprocessor(
    output_file,
    flat_column_names=["tau_vis_mass"], # flat columns to save
    jagged_column_names=None, # jagged columns to save, pads extra values with a pad value if needed
    chunk_shape=512, # shape of chunks to write to hdf5 file
    n_piles=1024, # number of piles to use for shuffling
    pile_assignment="random", # how to assign events to piles, random or deque
    merge_piles=False, # merge piles into a single hdf5 file at the end
    enforce_dtypes=None, # enforce specific dtypes for columns
    save_node="output", # name of the node in the processor graph to save arrays from
)

# create a linear chain of postprocessors
postprocessors_graph = PostprocessorsGraph()
postprocessors_graph.add(
    CheckpointPostprocessor("input"),
    hdf5_writer,
)
postprocessors_graph.chain()
postprocessors_graph.draw("hdf_post_analysis_graph.pdf")

# we can make a root dataset for MC and data files lists
data_dataset = RootPhysicsDataset(f"Data", [str(f) for f in data_files], is_data=True)
mc_dataset = RootPhysicsDataset(f"MC", [str(f) for f in mc_files], is_data=False)

# get the branch filter from the analysis collection to not load unnecessary branches
branch_filter = analysis_collection.branch_name_filter

# setup the data dataloader
data_dataset.setup_dataloader(**dataloader_config)
data_dataset.init_dataloader(processors=analysis_graph)

# setup the mc dataloader
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)

# run the DataLoader event loop over batches of events for both datasets
event_loop = ColumnarEventLoop(
    mc_datasets=[mc_dataset], # supports multiple datasets
    data_datasets=[data_dataset], # supports multiple datasets
    postprocessors_graph=postprocessors_graph,
    fit_postprocessors=True,
    cut_flow=False,
)
event_loop.run() # iterates over batches of events in both datasets

postprocessors_graph["hdf5WriterPostprocessor"].close() # close the file handles
```

Note that variables ending with `*_type` have a special meaning in the HDF5 writer. For example, a variable named `label_type` can be used to determine the type of the event (e.g., signal or background) and will be saved as a column in the HDF5 file. This is useful for classification tasks in machine learning. Another special variable is `weights`, which will be used to store event weights in the HDF5 file. These special variables help in organizing and utilizing the data effectively for training machine learning models.

### Feature scaling

The library also includes a utility to scale the entire HDF5 dataset using sklearn scalers. This is particularly useful for machine learning applications where feature scaling can improve model performance.

```python
from f9columnar.ml.dataset_scaling import DatasetScaler

ds_scaler = DatasetScaler(
    files, # list of hdf5 files
    scaler_type, # minmax, maxabs, standard, robust, quantile, power, logit, standard_logit
    features, # names of the features to scale
    scaler_save_path=save_path,
    n_max=None, # maximum number of events to use for fitting the scaler if the scaler does not have partial fit
    extra_hash="", # extra string to add to the hash of the scaler
    scaler_kwargs=scaler_kwargs, # kwargs for the scaler
    dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
ds_scaler.feature_scale()
```

The result will be a pickle file with the fitted scaler object that can be used in the HDF5 ML DataLoader as:

```python
feature_scaling_kwargs = {
    "scaler_type": scaler_type,
    "scaler_path": save_path, # where the scaler was saved
    "scalers_extra_hash": "",
}
dataset_kwargs = dataset_kwargs | feature_scaling_kwargs
```

Note that categorical features use a custom `LabelEncoder` that supports partial fitting and can be used in an online fashion.

### HDF5 ML DataLoader

We will use PyTorch Lightning to demonstrate how to use the [Hdf5IterableDataset](https://gitlab.cern.ch/ijs-f9-ljubljana/F9Columnar/-/blob/main/f9columnar/hdf5_dataloader.py?ref_type=heads#L375) in a training loop.

```python
from typing import Any, Callable

import lightning as L
from torch.utils.data import DataLoader

from f9columnar.ml.hdf5_ml_dataloader import (
    WeightedBatch,
    WeightedBatchType,
    default_setup_func,
    get_ml_hdf5_dataloader,
)

class LightningHdf5DataModule(L.LightningDataModule):
    def __init__(
        self,
        name: str,
        files: str | list[str],
        column_names: list[str],
        stage_split_piles: dict[str, list[int] | int],
        shuffle: bool = False,
        collate_fn: Callable[[list[WeightedBatch]], WeightedBatchType] | None = None,
        dataset_kwargs: dict[str, Any] | None = None,
        dataloader_kwargs: dict[str, Any] | None = None,
    ) -> None:
        super().__init__()
        self.dl_name = name
        self.files = files
        self.column_names = column_names
        self.stage_split_piles = stage_split_piles
        self.shuffle = shuffle
        self.collate_fn = collate_fn
        self.dataset_kwargs = dataset_kwargs
        self.dl_kwargs = dataloader_kwargs

    def _get_dataloader(self, stage: str) -> DataLoader:
        # returns DataLoader object for the given stage, selection and number of events
        dl, _, _ = get_ml_hdf5_dataloader(
            f"{stage} - {self.dl_name}",
            self.files,
            self.column_names,
            stage_split_piles=self.stage_split_piles,
            stage=stage,
            shuffle=self.shuffle,
            collate_fn=self.collate_fn,
            dataset_kwargs=self.dataset_kwargs,
            dataloader_kwargs=self.dl_kwargs,
        )
        return dl

    def train_dataloader(self) -> DataLoader:
        return self._get_dataloader("train")

    def val_dataloader(self) -> DataLoader:
        return self._get_dataloader("val")

    def test_dataloader(self) -> DataLoader:
        return self._get_dataloader("test")

def events_collate_fn(batch: tuple[WeightedDatasetBatch, dict[str, Any]]) -> WeightedBatchType:
    ds, reports = batch[0]["events"], batch[1]
    # these are the return values from the DataLoader
    return ds.X, ds.y, ds.w, ds.y_aux, reports

# can additionally pass any kwargs to the dataset and it will be forwarded to the reports dictionary
dataset_kwargs = {
    "batch_size": 128, # number of events per batch
    "imbalanced_sampler": None, # supports oversampling and undersampling
    "drop_last": True, # drop last incomplete batch
}

# custom function to process the dataset inside the DataLoader workers
# the function should be: Callable[[StackedDatasets, MLHdf5Iterator], WeightedDatasetBatch | None]
dataset_kwargs["setup_func"] = default_setup_func

dataloader_kwargs = {
    "num_workers": -1, # use all available cores
    "prefetch_factor": 2, # number of batches to prefetch by each worker
}

dm = LightningHdf5DataModule(
    dm_name, # name of the datamodule
    files, # list of hdf5 files
    features, # names of the features to load (column names in the hdf5 file)
    stage_split_piles={"train": 512, "test": 256, "val": 256}, # number of piles for each stage
    shuffle=True, # shuffle events at each epoch
    collate_fn=events_collate_fn, # collate function to handle batches
    dataset_kwargs=dataset_kwargs, # kwargs for the dataset
    dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
```
