Pipeline API Reference¶
Version: 0.1.0 Last Updated: 2026-01-08
Overview¶
TraceKit's pipeline architecture provides a powerful framework for building composable, reusable analysis workflows. Inspired by scikit-learn's Pipeline design, it enables declarative signal processing through:
- Pipeline - Chain multiple operations into sequential workflows
- TraceTransformer - Base class for creating custom transformations
- Functional composition - compose(), pipe(), and curry() for functional programming
- Composable mixin - Enable >> operator for fluent pipelines
Quick Start¶
import tracekit as tk
# Load trace
trace = tk.load("signal.wfm")
# Create a simple pipeline
pipeline = tk.Pipeline([
('filter', tk.LowPassFilter(cutoff=1e6)),
('normalize', tk.Normalize(method='peak')),
('resample', tk.Resample(rate=1e9))
])
# Transform trace through pipeline
result = pipeline.transform(trace)
# Or use functional composition
from functools import partial
result = tk.pipe(
trace,
partial(tk.low_pass, cutoff=1e6),
partial(tk.normalize, method='peak'),
partial(tk.resample, rate=1e9)
)
Pipeline Class¶
Pipeline¶
Chain multiple trace transformers into a sequential processing pipeline.
class Pipeline(TraceTransformer):
def __init__(self, steps: Sequence[tuple[str, TraceTransformer]])
def fit(self, trace: WaveformTrace) -> Pipeline
def transform(self, trace: WaveformTrace) -> WaveformTrace
def fit_transform(self, trace: WaveformTrace) -> WaveformTrace
def get_intermediate(self, step_name: str, key: str | None = None) -> Any
Parameters:
| Parameter | Type | Description |
|---|---|---|
steps |
Sequence[tuple[str, TraceTransformer]] |
List of (name, transformer) tuples defining stages |
Attributes:
| Attribute | Type | Description |
|---|---|---|
steps |
list[tuple[str, TraceTransformer]] |
Pipeline stages as list of tuples |
named_steps |
dict[str, TraceTransformer] |
Dictionary mapping names to transformers |
Returns: Transformed WaveformTrace after passing through all stages.
Example:
import tracekit as tk
# Create analysis pipeline
pipeline = tk.Pipeline([
('filter', tk.LowPassFilter(cutoff=1e6)),
('normalize', tk.Normalize()),
('fft', tk.FFT(nfft=8192))
])
# Transform trace
result = pipeline.transform(trace)
# Access steps
filter_step = pipeline['filter']
filter_step = pipeline[0] # By index
print(f"Pipeline has {len(pipeline)} steps")
Creating Custom Transformers¶
Build custom transformers by inheriting from TraceTransformer:
import tracekit as tk
import numpy as np
class AmplitudeScaler(tk.TraceTransformer):
"""Scale waveform amplitude by a fixed factor."""
def __init__(self, scale_factor=1.0):
self.scale_factor = scale_factor
def transform(self, trace):
scaled_data = trace.data * self.scale_factor
return tk.WaveformTrace(
data=scaled_data,
metadata=trace.metadata
)
# Use in pipeline
pipeline = tk.Pipeline([
('scale', AmplitudeScaler(scale_factor=2.0)),
('normalize', tk.Normalize())
])
result = pipeline.transform(trace)
Stateful Transformers¶
Create transformers that learn parameters using the fit/transform pattern:
import tracekit as tk
import numpy as np
class AdaptiveNormalizer(tk.TraceTransformer):
"""Normalize using statistics learned from reference trace."""
def __init__(self):
self.mean_ = None
self.std_ = None
def fit(self, trace):
"""Learn normalization parameters from reference."""
self.mean_ = trace.data.mean()
self.std_ = trace.data.std()
return self
def transform(self, trace):
"""Apply learned normalization."""
if self.mean_ is None or self.std_ is None:
raise ValueError("Must call fit() first")
normalized = (trace.data - self.mean_) / self.std_
return tk.WaveformTrace(
data=normalized,
metadata=trace.metadata
)
# Fit on reference, transform test traces
normalizer = AdaptiveNormalizer()
normalizer.fit(reference_trace)
results = [normalizer.transform(t) for t in test_traces]
Pipeline Methods¶
fit()¶
Fit all transformers in the pipeline sequentially.
Parameters:
| Parameter | Type | Description |
|---|---|---|
trace |
WaveformTrace |
Reference trace to fit to |
Returns: Self for method chaining.
Example:
# Create pipeline with stateful transformers
pipeline = tk.Pipeline([
('adaptive_filter', AdaptiveFilter()),
('normalizer', AdaptiveNormalizer())
])
# Fit on reference trace
pipeline.fit(reference_trace)
# Transform multiple traces with fitted parameters
results = [pipeline.transform(t) for t in test_traces]
transform()¶
Transform a trace through all pipeline stages.
Parameters:
| Parameter | Type | Description |
|---|---|---|
trace |
WaveformTrace |
Input trace |
Returns: Transformed WaveformTrace.
fit_transform()¶
Fit pipeline to trace and immediately transform it.
Convenience method equivalent to pipeline.fit(trace).transform(trace).
get_intermediate()¶
Retrieve cached intermediate results from pipeline stages.
Parameters:
| Parameter | Type | Description |
|---|---|---|
step_name |
str |
Name of the pipeline step |
key |
str \|None |
Optional key for transformer-internal results |
Returns: WaveformTrace output from that stage (if key=None), or specific intermediate result.
Raises: KeyError if step not found or transform() not yet called.
Example:
# Create pipeline with multiple stages
pipeline = tk.Pipeline([
('filter', tk.LowPassFilter(cutoff=1e6)),
('fft', tk.FFT(nfft=8192)),
('normalize', tk.Normalize())
])
# Transform trace
result = pipeline.transform(trace)
# Get trace output from filter stage
filtered = pipeline.get_intermediate('filter')
# Get FFT-specific intermediates
spectrum = pipeline.get_intermediate('fft', 'spectrum')
frequencies = pipeline.get_intermediate('fft', 'frequencies')
power = pipeline.get_intermediate('fft', 'power')
has_intermediate()¶
Check if intermediate result is available.
Example:
if pipeline.has_intermediate('fft', 'spectrum'):
spectrum = pipeline.get_intermediate('fft', 'spectrum')
list_intermediates()¶
List available intermediate results.
Parameters:
| Parameter | Type | Description |
|---|---|---|
step_name |
str \|None |
If specified, list intermediates for that step only |
Returns: List of keys for a step, or dict mapping all steps to their intermediates.
Example:
# List all intermediates
all_intermediates = pipeline.list_intermediates()
print(all_intermediates)
# {'filter': ['transfer_function', 'impulse_response'],
# 'fft': ['spectrum', 'frequencies', 'power', 'phase']}
# List intermediates for specific step
fft_intermediates = pipeline.list_intermediates('fft')
print(fft_intermediates)
# ['spectrum', 'frequencies', 'power', 'phase']
Parameter Management¶
Get and set pipeline parameters using sklearn-style syntax:
# Get all parameters
params = pipeline.get_params()
print(params['filter__cutoff']) # Access nested parameter
# Set parameters
pipeline.set_params(
filter__cutoff=2e6,
normalize__method='zscore'
)
# Clone pipeline
pipeline_copy = pipeline.clone()
TraceTransformer Base Class¶
TraceTransformer¶
Abstract base class for all trace transformations.
from abc import ABC, abstractmethod
class TraceTransformer(ABC):
@abstractmethod
def transform(self, trace: WaveformTrace) -> WaveformTrace
def fit(self, trace: WaveformTrace) -> TraceTransformer
def fit_transform(self, trace: WaveformTrace) -> WaveformTrace
def get_params(self, deep: bool = True) -> dict[str, Any]
def set_params(self, **params: Any) -> TraceTransformer
def clone(self) -> TraceTransformer
All custom transformers must inherit from this class and implement transform().
Required Methods:
transform(trace)- Transform the input trace (must override)
Optional Methods:
fit(trace)- Learn parameters from reference tracefit_transform(trace)- Fit then transformget_params()- Get transformer parametersset_params(**params)- Set transformer parametersclone()- Create a copy of the transformer
Example:
import tracekit as tk
import numpy as np
class NoiseReducer(tk.TraceTransformer):
"""Reduce noise using moving average."""
def __init__(self, window_size=5):
self.window_size = window_size
def transform(self, trace):
kernel = np.ones(self.window_size) / self.window_size
smoothed = np.convolve(trace.data, kernel, mode='same')
return tk.WaveformTrace(
data=smoothed,
metadata=trace.metadata
)
# Use standalone
reducer = NoiseReducer(window_size=10)
result = reducer.transform(trace)
# Or in pipeline
pipeline = tk.Pipeline([
('noise_reduce', NoiseReducer(window_size=10)),
('filter', tk.LowPassFilter(cutoff=1e6))
])
Intermediate Results¶
Transformers can cache intermediate results for inspection:
class FFTTransformer(tk.TraceTransformer):
"""FFT transformer that caches intermediate results."""
def transform(self, trace):
# Clear previous intermediates
self._clear_intermediates()
# Compute FFT
spectrum = np.fft.fft(trace.data)
frequencies = np.fft.fftfreq(len(trace.data),
1.0/trace.metadata.sample_rate)
power = np.abs(spectrum) ** 2
phase = np.angle(spectrum)
# Cache intermediates
self._cache_intermediate('spectrum', spectrum)
self._cache_intermediate('frequencies', frequencies)
self._cache_intermediate('power', power)
self._cache_intermediate('phase', phase)
# Return transformed trace
magnitude = np.abs(spectrum)
return tk.WaveformTrace(
data=magnitude,
metadata=trace.metadata
)
# Use transformer
fft = FFTTransformer()
result = fft.transform(trace)
# Access intermediates
if fft.has_intermediate_result('power'):
power = fft.get_intermediate_result('power')
# List available intermediates
print(fft.list_intermediate_results())
# ['spectrum', 'frequencies', 'power', 'phase']
Functional Composition¶
compose()¶
Compose functions right-to-left (mathematical composition).
Creates a single function that applies functions in reverse order: compose(f, g, h)(x) equals f(g(h(x))).
Parameters:
| Parameter | Type | Description |
|---|---|---|
*funcs |
Callable[[WaveformTrace], WaveformTrace] |
Functions to compose in reverse order |
Returns: Composite function.
Example:
import tracekit as tk
from functools import partial
# Create composed analysis function
analyze_signal = tk.compose(
tk.extract_thd, # Apply last
partial(tk.fft, nfft=8192, window='hann'), # Apply second
partial(tk.normalize, method='peak'), # Apply first (innermost)
)
# Apply to trace: normalize -> fft -> extract_thd
thd = analyze_signal(trace)
# Equivalent to:
# thd = tk.extract_thd(
# tk.fft(
# tk.normalize(trace, method='peak'),
# nfft=8192, window='hann'
# )
# )
pipe()¶
Apply functions left-to-right (pipeline order).
Applies functions sequentially: pipe(x, f, g, h) equals h(g(f(x))).
Parameters:
| Parameter | Type | Description |
|---|---|---|
data |
WaveformTrace |
Initial trace to process |
*funcs |
Callable[[WaveformTrace], WaveformTrace] |
Functions to apply in order |
Returns: Transformed WaveformTrace.
Example:
import tracekit as tk
from functools import partial
# Apply operations left-to-right (more intuitive)
result = tk.pipe(
trace,
partial(tk.low_pass, cutoff=1e6), # Apply first
partial(tk.resample, rate=1e9), # Apply second
partial(tk.normalize, method='zscore'), # Apply third
partial(tk.fft, nfft=8192) # Apply last
)
# Equivalent to Pipeline
pipeline = tk.Pipeline([
('filter', tk.LowPassFilter(cutoff=1e6)),
('resample', tk.Resample(rate=1e9)),
('normalize', tk.Normalize(method='zscore')),
('fft', tk.FFT(nfft=8192))
])
result = pipeline.transform(trace)
curry()¶
Curry a function for easier composition.
Transforms a multi-argument function into a series of single-argument functions.
Parameters:
| Parameter | Type | Description |
|---|---|---|
func |
Callable[..., WaveformTrace] |
Function to curry |
Returns: Curried version of the function.
Example:
import tracekit as tk
import numpy as np
@tk.curry
def scale_and_offset(trace, scale, offset):
"""Scale and offset trace data."""
return tk.WaveformTrace(
data=trace.data * scale + offset,
metadata=trace.metadata
)
# Create specialized functions
double_and_shift = scale_and_offset(scale=2.0, offset=1.0)
result = double_and_shift(trace)
# Use in composition
result = tk.pipe(
trace,
scale_and_offset(scale=2.0, offset=0.0),
scale_and_offset(scale=0.5, offset=1.0)
)
make_composable()¶
Decorator to make a function support partial application.
Example:
import tracekit as tk
import numpy as np
@tk.make_composable
def moving_average(trace, window_size=5):
"""Apply moving average filter."""
kernel = np.ones(window_size) / window_size
smoothed = np.convolve(trace.data, kernel, mode='same')
return tk.WaveformTrace(
data=smoothed,
metadata=trace.metadata
)
# Use with partial application
smooth_5 = moving_average(window_size=5)
smooth_10 = moving_average(window_size=10)
# Use in pipeline
result = tk.pipe(
trace,
moving_average(window_size=5),
moving_average(window_size=3)
)
Composable Mixin¶
Mixin class to enable >> operator for function composition.
Example:
# If WaveformTrace inherits from Composable
result = (
trace
>> low_pass(cutoff=1e6)
>> normalize()
>> fft(nfft=8192)
)
# Equivalent to:
result = fft(normalize(low_pass(trace, cutoff=1e6)), nfft=8192)
Complete Examples¶
Building a Signal Analysis Pipeline¶
import tracekit as tk
import numpy as np
# Define custom transformers
class OutlierClipper(tk.TraceTransformer):
"""Clip outliers beyond N standard deviations."""
def __init__(self, n_sigma=3.0):
self.n_sigma = n_sigma
def transform(self, trace):
mean = trace.data.mean()
std = trace.data.std()
lower = mean - self.n_sigma * std
upper = mean + self.n_sigma * std
clipped = np.clip(trace.data, lower, upper)
return tk.WaveformTrace(
data=clipped,
metadata=trace.metadata
)
class TrendRemover(tk.TraceTransformer):
"""Remove linear trend from signal."""
def __init__(self):
self.slope_ = None
self.intercept_ = None
def fit(self, trace):
x = np.arange(len(trace.data))
coeffs = np.polyfit(x, trace.data, 1)
self.slope_ = coeffs[0]
self.intercept_ = coeffs[1]
return self
def transform(self, trace):
if self.slope_ is None:
trend = trace.data.mean()
else:
x = np.arange(len(trace.data))
trend = self.slope_ * x + self.intercept_
detrended = trace.data - trend
return tk.WaveformTrace(
data=detrended,
metadata=trace.metadata
)
# Build comprehensive analysis pipeline
pipeline = tk.Pipeline([
('clip_outliers', OutlierClipper(n_sigma=3.0)),
('remove_trend', TrendRemover()),
('filter', tk.LowPassFilter(cutoff=1e6)),
('normalize', tk.Normalize(method='zscore'))
])
# Fit and transform
pipeline.fit(reference_trace)
result = pipeline.transform(test_trace)
# Access intermediate results
after_clipping = pipeline.get_intermediate('clip_outliers')
after_detrend = pipeline.get_intermediate('remove_trend')
filtered = pipeline.get_intermediate('filter')
print(f"Original std: {test_trace.data.std():.3f}")
print(f"After clipping std: {after_clipping.data.std():.3f}")
print(f"After detrend std: {after_detrend.data.std():.3f}")
print(f"Final std: {result.data.std():.3f}")
Functional Programming Style¶
import tracekit as tk
from functools import partial
# Define transformations as functions
def remove_dc(trace):
"""Remove DC component."""
centered = trace.data - trace.data.mean()
return tk.WaveformTrace(data=centered, metadata=trace.metadata)
def scale_to_range(trace, min_val=-1.0, max_val=1.0):
"""Scale trace to specified range."""
data_min = trace.data.min()
data_max = trace.data.max()
scaled = (trace.data - data_min) / (data_max - data_min)
scaled = scaled * (max_val - min_val) + min_val
return tk.WaveformTrace(data=scaled, metadata=trace.metadata)
# Compose operations
preprocess = tk.compose(
partial(scale_to_range, min_val=-1.0, max_val=1.0),
remove_dc,
partial(tk.low_pass, cutoff=1e6)
)
# Apply to trace
result = preprocess(trace)
# Or use pipe for left-to-right flow
result = tk.pipe(
trace,
partial(tk.low_pass, cutoff=1e6),
remove_dc,
partial(scale_to_range, min_val=-1.0, max_val=1.0)
)
Creating Reusable Analysis Components¶
import tracekit as tk
from functools import partial
# Create reusable analysis pipeline factory
def create_digital_analysis_pipeline(
threshold=0.5,
filter_cutoff=1e6,
sample_rate=1e9
):
"""Factory for digital signal analysis pipeline."""
return tk.Pipeline([
('filter', tk.LowPassFilter(cutoff=filter_cutoff)),
('resample', tk.Resample(rate=sample_rate)),
('threshold', tk.Threshold(level=threshold)),
('edges', tk.EdgeDetector())
])
# Create reusable spectral analysis function
def spectral_analysis(nfft=8192, window='hann'):
"""Create spectral analysis function."""
return tk.compose(
tk.extract_harmonics,
partial(tk.fft, nfft=nfft, window=window),
partial(tk.normalize, method='peak')
)
# Use reusable components
digital_pipeline = create_digital_analysis_pipeline(
threshold=0.5,
filter_cutoff=5e6
)
result = digital_pipeline.transform(trace)
# Apply spectral analysis
analyze = spectral_analysis(nfft=16384, window='hamming')
harmonics = analyze(trace)
Combining Pipeline and Functional Approaches¶
import tracekit as tk
from functools import partial
# Create base pipeline
base_pipeline = tk.Pipeline([
('filter', tk.BandPassFilter(low=1e5, high=1e7)),
('normalize', tk.Normalize(method='peak'))
])
# Define post-processing functions
def compute_envelope(trace):
"""Compute signal envelope."""
analytic = scipy.signal.hilbert(trace.data)
envelope = np.abs(analytic)
return tk.WaveformTrace(data=envelope, metadata=trace.metadata)
def extract_peaks(trace, threshold=0.5):
"""Extract peak locations."""
from scipy.signal import find_peaks
peaks, _ = find_peaks(trace.data, height=threshold)
return peaks
# Combine pipeline with functional composition
def analyze_signal(trace):
"""Complete signal analysis workflow."""
# Apply base pipeline
processed = base_pipeline.transform(trace)
# Get filtered intermediate
filtered = base_pipeline.get_intermediate('filter')
# Apply post-processing
envelope = compute_envelope(processed)
peaks = extract_peaks(envelope, threshold=0.5)
return {
'processed': processed,
'filtered': filtered,
'envelope': envelope,
'peaks': peaks,
'num_peaks': len(peaks)
}
# Run analysis
results = analyze_signal(trace)
print(f"Found {results['num_peaks']} peaks")
Pipeline Serialization¶
Save and load pipelines for reuse:
import joblib
import tracekit as tk
# Create and fit pipeline
pipeline = tk.Pipeline([
('filter', tk.LowPassFilter(cutoff=1e6)),
('normalize', tk.Normalize())
])
pipeline.fit(reference_trace)
# Save pipeline
joblib.dump(pipeline, 'analysis_pipeline.pkl')
# Load pipeline
loaded_pipeline = joblib.load('analysis_pipeline.pkl')
result = loaded_pipeline.transform(test_trace)
Best Practices¶
When to Use Pipeline vs Functional Composition¶
Use Pipeline when:
- You need stateful transformers (fit/transform pattern)
- You want to inspect intermediate results
- You need parameter tuning (get_params/set_params)
- You want to serialize the workflow
- Building complex, multi-stage analysis workflows
Use functional composition when:
- You have simple, stateless transformations
- You want concise, readable code
- You're working with pure functions
- You need quick prototyping
Pipeline Design Tips¶
- Keep transformers focused: Each transformer should do one thing well
- Name steps clearly: Use descriptive names for pipeline steps
- Cache intermediates: Store useful intermediate results for inspection
- Document parameters: Include docstrings with parameter descriptions
- Handle edge cases: Validate inputs and handle NaN/inf values
- Make transformers stateless when possible: Easier to reason about and test
Performance Considerations¶
# Avoid: Creating pipeline in loop
for trace in traces:
pipeline = tk.Pipeline([...]) # Recreated each time
result = pipeline.transform(trace)
# Better: Reuse pipeline
pipeline = tk.Pipeline([...])
results = [pipeline.transform(t) for t in traces]
# Best: Use fit once for all traces
pipeline.fit(reference_trace)
results = [pipeline.transform(t) for t in traces]
See Also¶
- Analysis API - Built-in analysis functions
- Loader API - Data loading
- Export API - Save results
- Advanced Examples - Complete workflow demonstrations