Coverage for src / tracekit / pipeline / composition.py: 100%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Functional composition operators for trace transformations. 

2 

3This module implements compose() and pipe() functions for functional-style 

4trace processing, with support for operator overloading. 

5""" 

6 

7from __future__ import annotations 

8 

9from collections.abc import Callable 

10from functools import reduce, wraps 

11from typing import Any, TypeVar 

12 

13from ..core.types import WaveformTrace 

14 

15# Type variables for generic composition 

16T = TypeVar("T") 

17TraceFunc = Callable[[WaveformTrace], WaveformTrace] 

18 

19 

20def compose(*funcs: TraceFunc) -> TraceFunc: 

21 """Compose functions right-to-left: compose(f, g, h)(x) == f(g(h(x))). 

22 

23 Creates a single function that applies the given functions in reverse order. 

24 This follows mathematical function composition notation. 

25 

26 Args: 

27 *funcs: Variable number of functions to compose. Each function should 

28 take a WaveformTrace and return a WaveformTrace. 

29 

30 Returns: 

31 Composite function that applies all functions in reverse order. 

32 

33 Raises: 

34 ValueError: If no functions provided. 

35 

36 Example: 

37 >>> import tracekit as tk 

38 >>> from functools import partial 

39 >>> # Create composed analysis function 

40 >>> analyze_signal = tk.compose( 

41 ... tk.extract_thd, 

42 ... partial(tk.fft, nfft=8192, window='hann'), 

43 ... partial(tk.normalize, method='peak'), 

44 ... partial(tk.low_pass, cutoff=5e6) 

45 ... ) 

46 >>> # Apply to trace: low_pass -> normalize -> fft -> extract_thd 

47 >>> thd = analyze_signal(trace) 

48 

49 References: 

50 API-002: Function Composition Operators 

51 toolz.functoolz 

52 https://github.com/pytoolz/toolz 

53 """ 

54 if not funcs: 

55 raise ValueError("compose() requires at least one function") 

56 

57 if len(funcs) == 1: 

58 return funcs[0] 

59 

60 def composed(x: WaveformTrace) -> WaveformTrace: 

61 """Apply composed functions right-to-left.""" 

62 # Apply functions in reverse order (right to left) 

63 return reduce(lambda val, func: func(val), reversed(funcs), x) 

64 

65 # Preserve function metadata 

66 composed.__name__ = "compose(" + ", ".join(f.__name__ for f in funcs) + ")" 

67 composed.__doc__ = f"Composition of {len(funcs)} functions" 

68 

69 return composed 

70 

71 

72def pipe(data: WaveformTrace, *funcs: TraceFunc) -> WaveformTrace: 

73 """Apply functions left-to-right: pipe(x, f, g, h) == h(g(f(x))). 

74 

75 Applies the given functions sequentially to the data, passing the output 

76 of each function to the next. This is more intuitive for sequential 

77 processing pipelines. 

78 

79 Args: 

80 data: Initial WaveformTrace to process. 

81 *funcs: Variable number of functions to apply sequentially. 

82 

83 Returns: 

84 Transformed WaveformTrace after applying all functions. 

85 

86 Example: 

87 >>> import tracekit as tk 

88 >>> # Apply operations left-to-right 

89 >>> result = tk.pipe( 

90 ... trace, 

91 ... tk.low_pass(cutoff=1e6), 

92 ... tk.resample(rate=1e9), 

93 ... tk.fft(nfft=8192) 

94 ... ) 

95 >>> # Equivalent to: fft(resample(low_pass(trace))) 

96 

97 Advanced Example: 

98 >>> # Use with partial application 

99 >>> from functools import partial 

100 >>> result = tk.pipe( 

101 ... trace, 

102 ... partial(tk.low_pass, cutoff=1e6), 

103 ... partial(tk.normalize, method='zscore'), 

104 ... partial(tk.fft, nfft=8192, window='hann') 

105 ... ) 

106 

107 References: 

108 API-002: Function Composition Operators 

109 toolz.pipe 

110 """ 

111 # Apply functions left-to-right 

112 return reduce(lambda val, func: func(val), funcs, data) 

113 

114 

115class Composable: 

116 """Mixin class to enable >> operator for function composition. 

117 

118 This class provides the __rshift__ operator to enable pipe-style 

119 composition using the >> syntax. Intended to be mixed into WaveformTrace 

120 or used as a wrapper for transformer functions. 

121 

122 Example: 

123 >>> # Enable >> operator on WaveformTrace 

124 >>> result = trace >> low_pass(1e6) >> normalize() >> fft() 

125 >>> # Equivalent to: fft(normalize(low_pass(trace))) 

126 

127 References: 

128 API-002: Function Composition Operators 

129 """ 

130 

131 def __rshift__(self, func: Callable[[Any], Any]) -> Any: 

132 """Enable >> operator for function application. 

133 

134 Args: 

135 func: Function to apply to self. 

136 

137 Returns: 

138 Result of applying func to self. 

139 

140 Example: 

141 >>> result = trace >> low_pass(1e6) 

142 """ 

143 return func(self) 

144 

145 

146def make_composable(func: Callable[..., WaveformTrace]) -> Callable[..., TraceFunc]: 

147 """Decorator to make a function support partial application and composition. 

148 

149 Wraps a function so it can be used in compose() and pipe() with 

150 partial argument application. 

151 

152 Args: 

153 func: Function to wrap. 

154 

155 Returns: 

156 Wrapped function that returns a partially applied function when 

157 called without a trace argument. 

158 

159 Example: 

160 >>> @make_composable 

161 ... def scale(trace, factor=1.0): 

162 ... return WaveformTrace( 

163 ... data=trace.data * factor, 

164 ... metadata=trace.metadata 

165 ... ) 

166 >>> # Use with partial application 

167 >>> double = scale(factor=2.0) 

168 >>> result = double(trace) 

169 >>> # Or in pipe 

170 >>> result = pipe(trace, scale(factor=2.0), scale(factor=0.5)) 

171 

172 References: 

173 API-002: Function Composition Operators 

174 """ 

175 

176 @wraps(func) 

177 def wrapper(*args: Any, **kwargs: Any) -> TraceFunc | WaveformTrace: 

178 # If first arg is a WaveformTrace, apply function immediately 

179 if args and isinstance(args[0], WaveformTrace): 

180 return func(*args, **kwargs) 

181 

182 # Otherwise, return a partially applied function 

183 def partial_func(trace: WaveformTrace) -> WaveformTrace: 

184 return func(trace, *args, **kwargs) 

185 

186 return partial_func 

187 

188 return wrapper # type: ignore[return-value] 

189 

190 

191def curry(func: Callable[..., WaveformTrace]) -> Callable[..., TraceFunc]: 

192 """Curry a function for easier composition. 

193 

194 Transforms a multi-argument function into a series of single-argument 

195 functions. Useful for creating reusable transformation functions. 

196 

197 Args: 

198 func: Function to curry. 

199 

200 Returns: 

201 Curried version of the function. 

202 

203 Example: 

204 >>> @curry 

205 ... def scale_and_offset(trace, scale, offset): 

206 ... return WaveformTrace( 

207 ... data=trace.data * scale + offset, 

208 ... metadata=trace.metadata 

209 ... ) 

210 >>> # Create specialized functions 

211 >>> double_and_shift = scale_and_offset(scale=2.0, offset=1.0) 

212 >>> result = double_and_shift(trace) 

213 

214 References: 

215 API-002: Function Composition Operators 

216 Functional programming currying 

217 """ 

218 

219 @wraps(func) 

220 def curried(*args: Any, **kwargs: Any) -> TraceFunc | WaveformTrace: 

221 # If we have a WaveformTrace as first arg, apply immediately 

222 if args and isinstance(args[0], WaveformTrace): 

223 return func(*args, **kwargs) 

224 

225 # Return a function that waits for the trace 

226 def partial(*more_args: Any, **more_kwargs: Any) -> WaveformTrace: 

227 all_args = args + more_args 

228 all_kwargs = {**kwargs, **more_kwargs} 

229 return func(*all_args, **all_kwargs) 

230 

231 return partial 

232 

233 return curried # type: ignore[return-value] 

234 

235 

236__all__ = [ 

237 "Composable", 

238 "compose", 

239 "curry", 

240 "make_composable", 

241 "pipe", 

242]