Coverage for src / tracekit / pipeline / composition.py: 100%
43 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Functional composition operators for trace transformations.
3This module implements compose() and pipe() functions for functional-style
4trace processing, with support for operator overloading.
5"""
7from __future__ import annotations
9from collections.abc import Callable
10from functools import reduce, wraps
11from typing import Any, TypeVar
13from ..core.types import WaveformTrace
15# Type variables for generic composition
16T = TypeVar("T")
17TraceFunc = Callable[[WaveformTrace], WaveformTrace]
20def compose(*funcs: TraceFunc) -> TraceFunc:
21 """Compose functions right-to-left: compose(f, g, h)(x) == f(g(h(x))).
23 Creates a single function that applies the given functions in reverse order.
24 This follows mathematical function composition notation.
26 Args:
27 *funcs: Variable number of functions to compose. Each function should
28 take a WaveformTrace and return a WaveformTrace.
30 Returns:
31 Composite function that applies all functions in reverse order.
33 Raises:
34 ValueError: If no functions provided.
36 Example:
37 >>> import tracekit as tk
38 >>> from functools import partial
39 >>> # Create composed analysis function
40 >>> analyze_signal = tk.compose(
41 ... tk.extract_thd,
42 ... partial(tk.fft, nfft=8192, window='hann'),
43 ... partial(tk.normalize, method='peak'),
44 ... partial(tk.low_pass, cutoff=5e6)
45 ... )
46 >>> # Apply to trace: low_pass -> normalize -> fft -> extract_thd
47 >>> thd = analyze_signal(trace)
49 References:
50 API-002: Function Composition Operators
51 toolz.functoolz
52 https://github.com/pytoolz/toolz
53 """
54 if not funcs:
55 raise ValueError("compose() requires at least one function")
57 if len(funcs) == 1:
58 return funcs[0]
60 def composed(x: WaveformTrace) -> WaveformTrace:
61 """Apply composed functions right-to-left."""
62 # Apply functions in reverse order (right to left)
63 return reduce(lambda val, func: func(val), reversed(funcs), x)
65 # Preserve function metadata
66 composed.__name__ = "compose(" + ", ".join(f.__name__ for f in funcs) + ")"
67 composed.__doc__ = f"Composition of {len(funcs)} functions"
69 return composed
72def pipe(data: WaveformTrace, *funcs: TraceFunc) -> WaveformTrace:
73 """Apply functions left-to-right: pipe(x, f, g, h) == h(g(f(x))).
75 Applies the given functions sequentially to the data, passing the output
76 of each function to the next. This is more intuitive for sequential
77 processing pipelines.
79 Args:
80 data: Initial WaveformTrace to process.
81 *funcs: Variable number of functions to apply sequentially.
83 Returns:
84 Transformed WaveformTrace after applying all functions.
86 Example:
87 >>> import tracekit as tk
88 >>> # Apply operations left-to-right
89 >>> result = tk.pipe(
90 ... trace,
91 ... tk.low_pass(cutoff=1e6),
92 ... tk.resample(rate=1e9),
93 ... tk.fft(nfft=8192)
94 ... )
95 >>> # Equivalent to: fft(resample(low_pass(trace)))
97 Advanced Example:
98 >>> # Use with partial application
99 >>> from functools import partial
100 >>> result = tk.pipe(
101 ... trace,
102 ... partial(tk.low_pass, cutoff=1e6),
103 ... partial(tk.normalize, method='zscore'),
104 ... partial(tk.fft, nfft=8192, window='hann')
105 ... )
107 References:
108 API-002: Function Composition Operators
109 toolz.pipe
110 """
111 # Apply functions left-to-right
112 return reduce(lambda val, func: func(val), funcs, data)
115class Composable:
116 """Mixin class to enable >> operator for function composition.
118 This class provides the __rshift__ operator to enable pipe-style
119 composition using the >> syntax. Intended to be mixed into WaveformTrace
120 or used as a wrapper for transformer functions.
122 Example:
123 >>> # Enable >> operator on WaveformTrace
124 >>> result = trace >> low_pass(1e6) >> normalize() >> fft()
125 >>> # Equivalent to: fft(normalize(low_pass(trace)))
127 References:
128 API-002: Function Composition Operators
129 """
131 def __rshift__(self, func: Callable[[Any], Any]) -> Any:
132 """Enable >> operator for function application.
134 Args:
135 func: Function to apply to self.
137 Returns:
138 Result of applying func to self.
140 Example:
141 >>> result = trace >> low_pass(1e6)
142 """
143 return func(self)
146def make_composable(func: Callable[..., WaveformTrace]) -> Callable[..., TraceFunc]:
147 """Decorator to make a function support partial application and composition.
149 Wraps a function so it can be used in compose() and pipe() with
150 partial argument application.
152 Args:
153 func: Function to wrap.
155 Returns:
156 Wrapped function that returns a partially applied function when
157 called without a trace argument.
159 Example:
160 >>> @make_composable
161 ... def scale(trace, factor=1.0):
162 ... return WaveformTrace(
163 ... data=trace.data * factor,
164 ... metadata=trace.metadata
165 ... )
166 >>> # Use with partial application
167 >>> double = scale(factor=2.0)
168 >>> result = double(trace)
169 >>> # Or in pipe
170 >>> result = pipe(trace, scale(factor=2.0), scale(factor=0.5))
172 References:
173 API-002: Function Composition Operators
174 """
176 @wraps(func)
177 def wrapper(*args: Any, **kwargs: Any) -> TraceFunc | WaveformTrace:
178 # If first arg is a WaveformTrace, apply function immediately
179 if args and isinstance(args[0], WaveformTrace):
180 return func(*args, **kwargs)
182 # Otherwise, return a partially applied function
183 def partial_func(trace: WaveformTrace) -> WaveformTrace:
184 return func(trace, *args, **kwargs)
186 return partial_func
188 return wrapper # type: ignore[return-value]
191def curry(func: Callable[..., WaveformTrace]) -> Callable[..., TraceFunc]:
192 """Curry a function for easier composition.
194 Transforms a multi-argument function into a series of single-argument
195 functions. Useful for creating reusable transformation functions.
197 Args:
198 func: Function to curry.
200 Returns:
201 Curried version of the function.
203 Example:
204 >>> @curry
205 ... def scale_and_offset(trace, scale, offset):
206 ... return WaveformTrace(
207 ... data=trace.data * scale + offset,
208 ... metadata=trace.metadata
209 ... )
210 >>> # Create specialized functions
211 >>> double_and_shift = scale_and_offset(scale=2.0, offset=1.0)
212 >>> result = double_and_shift(trace)
214 References:
215 API-002: Function Composition Operators
216 Functional programming currying
217 """
219 @wraps(func)
220 def curried(*args: Any, **kwargs: Any) -> TraceFunc | WaveformTrace:
221 # If we have a WaveformTrace as first arg, apply immediately
222 if args and isinstance(args[0], WaveformTrace):
223 return func(*args, **kwargs)
225 # Return a function that waits for the trace
226 def partial(*more_args: Any, **more_kwargs: Any) -> WaveformTrace:
227 all_args = args + more_args
228 all_kwargs = {**kwargs, **more_kwargs}
229 return func(*all_args, **all_kwargs)
231 return partial
233 return curried # type: ignore[return-value]
236__all__ = [
237 "Composable",
238 "compose",
239 "curry",
240 "make_composable",
241 "pipe",
242]