Skip to content

Pipeline API Reference

Version: 0.1.0 Last Updated: 2026-01-08

Overview

TraceKit's pipeline architecture provides a powerful framework for building composable, reusable analysis workflows. Inspired by scikit-learn's Pipeline design, it enables declarative signal processing through:

  • Pipeline - Chain multiple operations into sequential workflows
  • TraceTransformer - Base class for creating custom transformations
  • Functional composition - compose(), pipe(), and curry() for functional programming
  • Composable mixin - Enable >> operator for fluent pipelines

Quick Start

import tracekit as tk

# Load trace
trace = tk.load("signal.wfm")

# Create a simple pipeline
pipeline = tk.Pipeline([
    ('filter', tk.LowPassFilter(cutoff=1e6)),
    ('normalize', tk.Normalize(method='peak')),
    ('resample', tk.Resample(rate=1e9))
])

# Transform trace through pipeline
result = pipeline.transform(trace)

# Or use functional composition
from functools import partial
result = tk.pipe(
    trace,
    partial(tk.low_pass, cutoff=1e6),
    partial(tk.normalize, method='peak'),
    partial(tk.resample, rate=1e9)
)

Pipeline Class

Pipeline

Chain multiple trace transformers into a sequential processing pipeline.

class Pipeline(TraceTransformer):
    def __init__(self, steps: Sequence[tuple[str, TraceTransformer]])
    def fit(self, trace: WaveformTrace) -> Pipeline
    def transform(self, trace: WaveformTrace) -> WaveformTrace
    def fit_transform(self, trace: WaveformTrace) -> WaveformTrace
    def get_intermediate(self, step_name: str, key: str | None = None) -> Any

Parameters:

Parameter Type Description
steps Sequence[tuple[str, TraceTransformer]] List of (name, transformer) tuples defining stages

Attributes:

Attribute Type Description
steps list[tuple[str, TraceTransformer]] Pipeline stages as list of tuples
named_steps dict[str, TraceTransformer] Dictionary mapping names to transformers

Returns: Transformed WaveformTrace after passing through all stages.

Example:

import tracekit as tk

# Create analysis pipeline
pipeline = tk.Pipeline([
    ('filter', tk.LowPassFilter(cutoff=1e6)),
    ('normalize', tk.Normalize()),
    ('fft', tk.FFT(nfft=8192))
])

# Transform trace
result = pipeline.transform(trace)

# Access steps
filter_step = pipeline['filter']
filter_step = pipeline[0]  # By index
print(f"Pipeline has {len(pipeline)} steps")

Creating Custom Transformers

Build custom transformers by inheriting from TraceTransformer:

import tracekit as tk
import numpy as np

class AmplitudeScaler(tk.TraceTransformer):
    """Scale waveform amplitude by a fixed factor."""

    def __init__(self, scale_factor=1.0):
        self.scale_factor = scale_factor

    def transform(self, trace):
        scaled_data = trace.data * self.scale_factor
        return tk.WaveformTrace(
            data=scaled_data,
            metadata=trace.metadata
        )

# Use in pipeline
pipeline = tk.Pipeline([
    ('scale', AmplitudeScaler(scale_factor=2.0)),
    ('normalize', tk.Normalize())
])

result = pipeline.transform(trace)

Stateful Transformers

Create transformers that learn parameters using the fit/transform pattern:

import tracekit as tk
import numpy as np

class AdaptiveNormalizer(tk.TraceTransformer):
    """Normalize using statistics learned from reference trace."""

    def __init__(self):
        self.mean_ = None
        self.std_ = None

    def fit(self, trace):
        """Learn normalization parameters from reference."""
        self.mean_ = trace.data.mean()
        self.std_ = trace.data.std()
        return self

    def transform(self, trace):
        """Apply learned normalization."""
        if self.mean_ is None or self.std_ is None:
            raise ValueError("Must call fit() first")

        normalized = (trace.data - self.mean_) / self.std_
        return tk.WaveformTrace(
            data=normalized,
            metadata=trace.metadata
        )

# Fit on reference, transform test traces
normalizer = AdaptiveNormalizer()
normalizer.fit(reference_trace)

results = [normalizer.transform(t) for t in test_traces]

Pipeline Methods

fit()

Fit all transformers in the pipeline sequentially.

def fit(self, trace: WaveformTrace) -> Pipeline

Parameters:

Parameter Type Description
trace WaveformTrace Reference trace to fit to

Returns: Self for method chaining.

Example:

# Create pipeline with stateful transformers
pipeline = tk.Pipeline([
    ('adaptive_filter', AdaptiveFilter()),
    ('normalizer', AdaptiveNormalizer())
])

# Fit on reference trace
pipeline.fit(reference_trace)

# Transform multiple traces with fitted parameters
results = [pipeline.transform(t) for t in test_traces]

transform()

Transform a trace through all pipeline stages.

def transform(self, trace: WaveformTrace) -> WaveformTrace

Parameters:

Parameter Type Description
trace WaveformTrace Input trace

Returns: Transformed WaveformTrace.

fit_transform()

Fit pipeline to trace and immediately transform it.

def fit_transform(self, trace: WaveformTrace) -> WaveformTrace

Convenience method equivalent to pipeline.fit(trace).transform(trace).

get_intermediate()

Retrieve cached intermediate results from pipeline stages.

def get_intermediate(
    self,
    step_name: str,
    key: str | None = None
) -> Any

Parameters:

Parameter Type Description
step_name str Name of the pipeline step
key str \|None Optional key for transformer-internal results

Returns: WaveformTrace output from that stage (if key=None), or specific intermediate result.

Raises: KeyError if step not found or transform() not yet called.

Example:

# Create pipeline with multiple stages
pipeline = tk.Pipeline([
    ('filter', tk.LowPassFilter(cutoff=1e6)),
    ('fft', tk.FFT(nfft=8192)),
    ('normalize', tk.Normalize())
])

# Transform trace
result = pipeline.transform(trace)

# Get trace output from filter stage
filtered = pipeline.get_intermediate('filter')

# Get FFT-specific intermediates
spectrum = pipeline.get_intermediate('fft', 'spectrum')
frequencies = pipeline.get_intermediate('fft', 'frequencies')
power = pipeline.get_intermediate('fft', 'power')

has_intermediate()

Check if intermediate result is available.

def has_intermediate(
    self,
    step_name: str,
    key: str | None = None
) -> bool

Example:

if pipeline.has_intermediate('fft', 'spectrum'):
    spectrum = pipeline.get_intermediate('fft', 'spectrum')

list_intermediates()

List available intermediate results.

def list_intermediates(
    self,
    step_name: str | None = None
) -> list[str] | dict[str, list[str]]

Parameters:

Parameter Type Description
step_name str \|None If specified, list intermediates for that step only

Returns: List of keys for a step, or dict mapping all steps to their intermediates.

Example:

# List all intermediates
all_intermediates = pipeline.list_intermediates()
print(all_intermediates)
# {'filter': ['transfer_function', 'impulse_response'],
#  'fft': ['spectrum', 'frequencies', 'power', 'phase']}

# List intermediates for specific step
fft_intermediates = pipeline.list_intermediates('fft')
print(fft_intermediates)
# ['spectrum', 'frequencies', 'power', 'phase']

Parameter Management

Get and set pipeline parameters using sklearn-style syntax:

# Get all parameters
params = pipeline.get_params()
print(params['filter__cutoff'])  # Access nested parameter

# Set parameters
pipeline.set_params(
    filter__cutoff=2e6,
    normalize__method='zscore'
)

# Clone pipeline
pipeline_copy = pipeline.clone()

TraceTransformer Base Class

TraceTransformer

Abstract base class for all trace transformations.

from abc import ABC, abstractmethod

class TraceTransformer(ABC):
    @abstractmethod
    def transform(self, trace: WaveformTrace) -> WaveformTrace

    def fit(self, trace: WaveformTrace) -> TraceTransformer
    def fit_transform(self, trace: WaveformTrace) -> WaveformTrace
    def get_params(self, deep: bool = True) -> dict[str, Any]
    def set_params(self, **params: Any) -> TraceTransformer
    def clone(self) -> TraceTransformer

All custom transformers must inherit from this class and implement transform().

Required Methods:

  • transform(trace) - Transform the input trace (must override)

Optional Methods:

  • fit(trace) - Learn parameters from reference trace
  • fit_transform(trace) - Fit then transform
  • get_params() - Get transformer parameters
  • set_params(**params) - Set transformer parameters
  • clone() - Create a copy of the transformer

Example:

import tracekit as tk
import numpy as np

class NoiseReducer(tk.TraceTransformer):
    """Reduce noise using moving average."""

    def __init__(self, window_size=5):
        self.window_size = window_size

    def transform(self, trace):
        kernel = np.ones(self.window_size) / self.window_size
        smoothed = np.convolve(trace.data, kernel, mode='same')

        return tk.WaveformTrace(
            data=smoothed,
            metadata=trace.metadata
        )

# Use standalone
reducer = NoiseReducer(window_size=10)
result = reducer.transform(trace)

# Or in pipeline
pipeline = tk.Pipeline([
    ('noise_reduce', NoiseReducer(window_size=10)),
    ('filter', tk.LowPassFilter(cutoff=1e6))
])

Intermediate Results

Transformers can cache intermediate results for inspection:

class FFTTransformer(tk.TraceTransformer):
    """FFT transformer that caches intermediate results."""

    def transform(self, trace):
        # Clear previous intermediates
        self._clear_intermediates()

        # Compute FFT
        spectrum = np.fft.fft(trace.data)
        frequencies = np.fft.fftfreq(len(trace.data),
                                      1.0/trace.metadata.sample_rate)
        power = np.abs(spectrum) ** 2
        phase = np.angle(spectrum)

        # Cache intermediates
        self._cache_intermediate('spectrum', spectrum)
        self._cache_intermediate('frequencies', frequencies)
        self._cache_intermediate('power', power)
        self._cache_intermediate('phase', phase)

        # Return transformed trace
        magnitude = np.abs(spectrum)
        return tk.WaveformTrace(
            data=magnitude,
            metadata=trace.metadata
        )

# Use transformer
fft = FFTTransformer()
result = fft.transform(trace)

# Access intermediates
if fft.has_intermediate_result('power'):
    power = fft.get_intermediate_result('power')

# List available intermediates
print(fft.list_intermediate_results())
# ['spectrum', 'frequencies', 'power', 'phase']

Functional Composition

compose()

Compose functions right-to-left (mathematical composition).

def compose(*funcs: TraceFunc) -> TraceFunc

Creates a single function that applies functions in reverse order: compose(f, g, h)(x) equals f(g(h(x))).

Parameters:

Parameter Type Description
*funcs Callable[[WaveformTrace], WaveformTrace] Functions to compose in reverse order

Returns: Composite function.

Example:

import tracekit as tk
from functools import partial

# Create composed analysis function
analyze_signal = tk.compose(
    tk.extract_thd,                              # Apply last
    partial(tk.fft, nfft=8192, window='hann'),   # Apply second
    partial(tk.normalize, method='peak'),        # Apply first (innermost)
)

# Apply to trace: normalize -> fft -> extract_thd
thd = analyze_signal(trace)

# Equivalent to:
# thd = tk.extract_thd(
#     tk.fft(
#         tk.normalize(trace, method='peak'),
#         nfft=8192, window='hann'
#     )
# )

pipe()

Apply functions left-to-right (pipeline order).

def pipe(data: WaveformTrace, *funcs: TraceFunc) -> WaveformTrace

Applies functions sequentially: pipe(x, f, g, h) equals h(g(f(x))).

Parameters:

Parameter Type Description
data WaveformTrace Initial trace to process
*funcs Callable[[WaveformTrace], WaveformTrace] Functions to apply in order

Returns: Transformed WaveformTrace.

Example:

import tracekit as tk
from functools import partial

# Apply operations left-to-right (more intuitive)
result = tk.pipe(
    trace,
    partial(tk.low_pass, cutoff=1e6),        # Apply first
    partial(tk.resample, rate=1e9),          # Apply second
    partial(tk.normalize, method='zscore'),  # Apply third
    partial(tk.fft, nfft=8192)               # Apply last
)

# Equivalent to Pipeline
pipeline = tk.Pipeline([
    ('filter', tk.LowPassFilter(cutoff=1e6)),
    ('resample', tk.Resample(rate=1e9)),
    ('normalize', tk.Normalize(method='zscore')),
    ('fft', tk.FFT(nfft=8192))
])
result = pipeline.transform(trace)

curry()

Curry a function for easier composition.

def curry(func: Callable[..., WaveformTrace]) -> Callable[..., TraceFunc]

Transforms a multi-argument function into a series of single-argument functions.

Parameters:

Parameter Type Description
func Callable[..., WaveformTrace] Function to curry

Returns: Curried version of the function.

Example:

import tracekit as tk
import numpy as np

@tk.curry
def scale_and_offset(trace, scale, offset):
    """Scale and offset trace data."""
    return tk.WaveformTrace(
        data=trace.data * scale + offset,
        metadata=trace.metadata
    )

# Create specialized functions
double_and_shift = scale_and_offset(scale=2.0, offset=1.0)
result = double_and_shift(trace)

# Use in composition
result = tk.pipe(
    trace,
    scale_and_offset(scale=2.0, offset=0.0),
    scale_and_offset(scale=0.5, offset=1.0)
)

make_composable()

Decorator to make a function support partial application.

def make_composable(func: Callable[..., WaveformTrace]) -> Callable[..., TraceFunc]

Example:

import tracekit as tk
import numpy as np

@tk.make_composable
def moving_average(trace, window_size=5):
    """Apply moving average filter."""
    kernel = np.ones(window_size) / window_size
    smoothed = np.convolve(trace.data, kernel, mode='same')
    return tk.WaveformTrace(
        data=smoothed,
        metadata=trace.metadata
    )

# Use with partial application
smooth_5 = moving_average(window_size=5)
smooth_10 = moving_average(window_size=10)

# Use in pipeline
result = tk.pipe(
    trace,
    moving_average(window_size=5),
    moving_average(window_size=3)
)

Composable Mixin

Mixin class to enable >> operator for function composition.

class Composable:
    def __rshift__(self, func: Callable[[Any], Any]) -> Any

Example:

# If WaveformTrace inherits from Composable
result = (
    trace
    >> low_pass(cutoff=1e6)
    >> normalize()
    >> fft(nfft=8192)
)

# Equivalent to:
result = fft(normalize(low_pass(trace, cutoff=1e6)), nfft=8192)

Complete Examples

Building a Signal Analysis Pipeline

import tracekit as tk
import numpy as np

# Define custom transformers
class OutlierClipper(tk.TraceTransformer):
    """Clip outliers beyond N standard deviations."""

    def __init__(self, n_sigma=3.0):
        self.n_sigma = n_sigma

    def transform(self, trace):
        mean = trace.data.mean()
        std = trace.data.std()
        lower = mean - self.n_sigma * std
        upper = mean + self.n_sigma * std
        clipped = np.clip(trace.data, lower, upper)

        return tk.WaveformTrace(
            data=clipped,
            metadata=trace.metadata
        )

class TrendRemover(tk.TraceTransformer):
    """Remove linear trend from signal."""

    def __init__(self):
        self.slope_ = None
        self.intercept_ = None

    def fit(self, trace):
        x = np.arange(len(trace.data))
        coeffs = np.polyfit(x, trace.data, 1)
        self.slope_ = coeffs[0]
        self.intercept_ = coeffs[1]
        return self

    def transform(self, trace):
        if self.slope_ is None:
            trend = trace.data.mean()
        else:
            x = np.arange(len(trace.data))
            trend = self.slope_ * x + self.intercept_

        detrended = trace.data - trend
        return tk.WaveformTrace(
            data=detrended,
            metadata=trace.metadata
        )

# Build comprehensive analysis pipeline
pipeline = tk.Pipeline([
    ('clip_outliers', OutlierClipper(n_sigma=3.0)),
    ('remove_trend', TrendRemover()),
    ('filter', tk.LowPassFilter(cutoff=1e6)),
    ('normalize', tk.Normalize(method='zscore'))
])

# Fit and transform
pipeline.fit(reference_trace)
result = pipeline.transform(test_trace)

# Access intermediate results
after_clipping = pipeline.get_intermediate('clip_outliers')
after_detrend = pipeline.get_intermediate('remove_trend')
filtered = pipeline.get_intermediate('filter')

print(f"Original std: {test_trace.data.std():.3f}")
print(f"After clipping std: {after_clipping.data.std():.3f}")
print(f"After detrend std: {after_detrend.data.std():.3f}")
print(f"Final std: {result.data.std():.3f}")

Functional Programming Style

import tracekit as tk
from functools import partial

# Define transformations as functions
def remove_dc(trace):
    """Remove DC component."""
    centered = trace.data - trace.data.mean()
    return tk.WaveformTrace(data=centered, metadata=trace.metadata)

def scale_to_range(trace, min_val=-1.0, max_val=1.0):
    """Scale trace to specified range."""
    data_min = trace.data.min()
    data_max = trace.data.max()
    scaled = (trace.data - data_min) / (data_max - data_min)
    scaled = scaled * (max_val - min_val) + min_val
    return tk.WaveformTrace(data=scaled, metadata=trace.metadata)

# Compose operations
preprocess = tk.compose(
    partial(scale_to_range, min_val=-1.0, max_val=1.0),
    remove_dc,
    partial(tk.low_pass, cutoff=1e6)
)

# Apply to trace
result = preprocess(trace)

# Or use pipe for left-to-right flow
result = tk.pipe(
    trace,
    partial(tk.low_pass, cutoff=1e6),
    remove_dc,
    partial(scale_to_range, min_val=-1.0, max_val=1.0)
)

Creating Reusable Analysis Components

import tracekit as tk
from functools import partial

# Create reusable analysis pipeline factory
def create_digital_analysis_pipeline(
    threshold=0.5,
    filter_cutoff=1e6,
    sample_rate=1e9
):
    """Factory for digital signal analysis pipeline."""
    return tk.Pipeline([
        ('filter', tk.LowPassFilter(cutoff=filter_cutoff)),
        ('resample', tk.Resample(rate=sample_rate)),
        ('threshold', tk.Threshold(level=threshold)),
        ('edges', tk.EdgeDetector())
    ])

# Create reusable spectral analysis function
def spectral_analysis(nfft=8192, window='hann'):
    """Create spectral analysis function."""
    return tk.compose(
        tk.extract_harmonics,
        partial(tk.fft, nfft=nfft, window=window),
        partial(tk.normalize, method='peak')
    )

# Use reusable components
digital_pipeline = create_digital_analysis_pipeline(
    threshold=0.5,
    filter_cutoff=5e6
)
result = digital_pipeline.transform(trace)

# Apply spectral analysis
analyze = spectral_analysis(nfft=16384, window='hamming')
harmonics = analyze(trace)

Combining Pipeline and Functional Approaches

import tracekit as tk
from functools import partial

# Create base pipeline
base_pipeline = tk.Pipeline([
    ('filter', tk.BandPassFilter(low=1e5, high=1e7)),
    ('normalize', tk.Normalize(method='peak'))
])

# Define post-processing functions
def compute_envelope(trace):
    """Compute signal envelope."""
    analytic = scipy.signal.hilbert(trace.data)
    envelope = np.abs(analytic)
    return tk.WaveformTrace(data=envelope, metadata=trace.metadata)

def extract_peaks(trace, threshold=0.5):
    """Extract peak locations."""
    from scipy.signal import find_peaks
    peaks, _ = find_peaks(trace.data, height=threshold)
    return peaks

# Combine pipeline with functional composition
def analyze_signal(trace):
    """Complete signal analysis workflow."""
    # Apply base pipeline
    processed = base_pipeline.transform(trace)

    # Get filtered intermediate
    filtered = base_pipeline.get_intermediate('filter')

    # Apply post-processing
    envelope = compute_envelope(processed)
    peaks = extract_peaks(envelope, threshold=0.5)

    return {
        'processed': processed,
        'filtered': filtered,
        'envelope': envelope,
        'peaks': peaks,
        'num_peaks': len(peaks)
    }

# Run analysis
results = analyze_signal(trace)
print(f"Found {results['num_peaks']} peaks")

Pipeline Serialization

Save and load pipelines for reuse:

import joblib
import tracekit as tk

# Create and fit pipeline
pipeline = tk.Pipeline([
    ('filter', tk.LowPassFilter(cutoff=1e6)),
    ('normalize', tk.Normalize())
])
pipeline.fit(reference_trace)

# Save pipeline
joblib.dump(pipeline, 'analysis_pipeline.pkl')

# Load pipeline
loaded_pipeline = joblib.load('analysis_pipeline.pkl')
result = loaded_pipeline.transform(test_trace)

Best Practices

When to Use Pipeline vs Functional Composition

Use Pipeline when:

  • You need stateful transformers (fit/transform pattern)
  • You want to inspect intermediate results
  • You need parameter tuning (get_params/set_params)
  • You want to serialize the workflow
  • Building complex, multi-stage analysis workflows

Use functional composition when:

  • You have simple, stateless transformations
  • You want concise, readable code
  • You're working with pure functions
  • You need quick prototyping

Pipeline Design Tips

  1. Keep transformers focused: Each transformer should do one thing well
  2. Name steps clearly: Use descriptive names for pipeline steps
  3. Cache intermediates: Store useful intermediate results for inspection
  4. Document parameters: Include docstrings with parameter descriptions
  5. Handle edge cases: Validate inputs and handle NaN/inf values
  6. Make transformers stateless when possible: Easier to reason about and test

Performance Considerations

# Avoid: Creating pipeline in loop
for trace in traces:
    pipeline = tk.Pipeline([...])  # Recreated each time
    result = pipeline.transform(trace)

# Better: Reuse pipeline
pipeline = tk.Pipeline([...])
results = [pipeline.transform(t) for t in traces]

# Best: Use fit once for all traces
pipeline.fit(reference_trace)
results = [pipeline.transform(t) for t in traces]

See Also