Coverage for suppy\perturbations\_base.py: 90%
115 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1"""Base class for perturbations applied to feasibility seeking algorithms."""
2from abc import ABC, abstractmethod
3from typing import Callable, List
4import numpy as np
5import numpy.typing as npt
6from suppy.utils import FuncWrapper
8try:
9 import cupy as cp
11 NO_GPU = False
12except ImportError:
13 NO_GPU = True
14 cp = np
17class Perturbation(ABC):
18 """
19 Abstract base class for perturbations applied to feasibility seeking
20 algorithms.
21 """
23 @abstractmethod
24 def perturbation_step(self, x: npt.NDArray) -> np.ndarray:
25 """
26 Perform a perturbation step.
28 Parameters
29 ----------
30 x : npt.NDArray
31 The input array to be perturbed.
33 Returns
34 -------
35 npt.NDArray
36 The perturbed array.
37 """
40class ObjectivePerturbation(Perturbation, ABC):
41 """
42 Base class for perturbations performed by decreasing an objective
43 function.
45 Parameters
46 ----------
47 func : Callable
48 The objective function to be perturbed.
49 func_args : List
50 The arguments to be passed to the objective function.
51 n_red : int, optional
52 The number of reduction steps to perform in one perturbation iteration (default is 1).
54 Attributes
55 ----------
56 func : FuncWrapper
57 A wrapped version of the objective function with its arguments.
58 n_red : int
59 The number of reduction steps to perform.
60 _k : int
61 Keeps track of the number of performed perturbations.
62 """
64 def __init__(self, func: Callable, func_args: List, n_red: int = 1):
65 self.func = FuncWrapper(func, func_args)
66 self.n_red = n_red
67 self._k = 0
69 def perturbation_step(self, x: npt.NDArray) -> np.ndarray:
70 """
71 Perform n_red perturbation steps on the input array.
73 Parameters
74 ----------
75 x : npt.NDArray
76 The input array to be perturbed.
78 Returns
79 -------
80 npt.NDArray
81 The perturbed array after applying the reduction steps.
82 """
83 self._k += 1
84 n = 0
85 while n < self.n_red:
86 x = self._function_reduction_step(x)
87 n += 1
88 return x
90 @abstractmethod
91 def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
92 """
93 Abstract method to perform that should implement the individual
94 function reduction steps on the input array.
95 Needs to be implemented by subclasses.
97 Parameters
98 ----------
99 x : npt.NDArray
100 Input array on which the reduction step is to be performed.
102 Returns
103 -------
104 npt.NDArray
105 The array after the reduction step has been applied.
106 """
108 def pre_step(self, x: npt.NDArray, *args, **kwargs) -> None:
109 """
110 If required perform any form of step previous to each
111 perturbation in each iteration.
113 This method is intended to be overridden by subclasses to implement
114 specific pre-step logic. By default, it does nothing.
116 Parameters
117 ----------
118 x : npt.NDArray
119 Current iterate.
120 """
122 def post_step(self, x: npt.NDArray, *args, **kwargs) -> None:
123 """
124 If required perform any form of step after each perturbation in each
125 iteration.
127 This method is intended to be overridden by subclasses to implement
128 specific post-step logic. By default, it does nothing.
130 Parameters
131 ----------
132 x : npt.NDArray
133 Current iterate.
134 """
136 def reset(self) -> None:
137 """Reset the perturbation to its initial state."""
138 self._k = 0
141class DummyPerturbation(ObjectivePerturbation):
142 """Dummy perturbation that does not change the input."""
144 def __init__(self):
145 def _dummy_func(x):
146 """Always returns 0."""
147 return 0
149 func_args = []
151 self.func = FuncWrapper(_dummy_func, func_args)
152 self.n_red = 1
153 self._k = 0 # keeps track of the number of performed perturbations
155 def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
156 return x
159class GradientPerturbation(ObjectivePerturbation, ABC):
160 """
161 A class for perturbations performed by decreasing an objective function
162 using the gradient.
164 Parameters
165 ----------
166 func : Callable
167 The objective function to be perturbed.
168 grad : Callable
169 The gradient function of the objective function.
170 func_args : List
171 The arguments to be passed to the objective function.
172 grad_args : List
173 The arguments to be passed to the gradient function.
174 n_red : int, optional
175 The reduction factor, by default 1.
177 Attributes
178 ----------
179 func : FuncWrapper
180 A wrapped version of the objective function with its arguments.
181 grad : FuncWrapper
182 A wrapped gradient function with its arguments.
183 n_red : int
184 The number of reduction steps to perform.
185 _k : int
186 Keeps track of the number of performed perturbations.
187 """
189 def __init__(
190 self,
191 func: Callable,
192 grad: Callable,
193 func_args: List,
194 grad_args: List,
195 n_red: int = 1,
196 ):
197 super().__init__(func, func_args, n_red)
198 self.grad = FuncWrapper(grad, grad_args)
201class AdaptiveStepGradientPerturbation(GradientPerturbation):
202 """
203 Objective function perturbation using gradient descent with adaptive
204 step size reduction.
206 Parameters
207 ----------
208 func : Callable
209 The function to be reduced.
210 grad : Callable
211 The gradient of the function to be reduced.
212 func_args : List, optional
213 Additional arguments to be passed to the function, by default [].
214 grad_args : List, optional
215 Additional arguments to be passed to the gradient function, by default [].
216 func_level : float, optional
217 Value above which to perform a gradient step using a subgradient projection like step.
218 max_func_level : float, optional
219 Upper bound for the func_level; once reached the update step is skipped, by default np.inf.
220 epsilon : float, optional
221 Default value to update func_level by.
222 noisy : bool, optional
223 Changes the behavior of the update step.
225 References
226 ----------
227 - [1] https://doi.org/10.1007/s11075-021-01229-z
228 """
230 def __init__(
231 self,
232 func: Callable,
233 grad: Callable,
234 func_args: List | None = None,
235 grad_args: List | None = None,
236 func_level: float = 0.5,
237 max_func_level: float = np.inf,
238 epsilon: float = 1e-6,
239 noisy: bool = False,
240 ):
241 if func_args is None:
242 func_args = []
243 if grad_args is None:
244 grad_args = []
245 super().__init__(func, grad, func_args, grad_args, n_red=1)
246 self.func_level = func_level
247 self.max_func_level = max_func_level
248 self.epsilon = epsilon
249 self.update_direction = -1 if noisy else 1
250 self.func_levels = [func_level]
252 def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
253 """
254 Perform a function reduction step using gradient descent with a step
255 size based on a subgradient projection.
257 Parameters
258 ----------
259 x : npt.NDArray
260 The current point in the algorithm.
262 Returns
263 -------
264 npt.NDArray
265 The updated point after performing the reduction step.
266 """
267 grad_eval = self.grad(x)
268 func_eval = self.func(x)
269 if grad_eval @ grad_eval <= 0 or func_eval <= self.func_level:
270 # if gradient is zero (or negative) or the function value is below alpha, skip
271 return x
272 return x - (func_eval - self.func_level) / (grad_eval @ grad_eval) * grad_eval
274 def post_step(self, x: npt.NDArray, *args, **kwargs) -> None:
275 """Update func_level after each step."""
276 last_proximity_function_reduction = kwargs.get("last_proximity_function_reduction")[0]
277 last_proximity_basic = kwargs.get("last_proximity_basic")[0]
279 if last_proximity_basic < 1e-10 or last_proximity_function_reduction < 1e-10:
280 desirability = 0
281 else:
282 desirability = (
283 np.sqrt(last_proximity_function_reduction) - np.sqrt(last_proximity_basic)
284 ) / np.sqrt(last_proximity_basic)
286 self.func_level = self.func_level + max(
287 self.epsilon, self.update_direction * self.func_level * desirability
288 )
289 self.func_levels.append(self.func_level)
291 def reset(self) -> None:
292 """Reset the perturbation to its initial state."""
293 super().reset()
294 self._l = -1
297class PowerSeriesGradientPerturbation(GradientPerturbation):
298 """
299 Objective function perturbation using gradient descent with step size
300 reduction according to a power series.
301 Has the option to restart the power series after a certain number of
302 steps.
304 Parameters
305 ----------
306 func : Callable
307 The function to be reduced.
308 grad : Callable
309 The gradient of the function to be reduced.
310 func_args : List, optional
311 Additional arguments to be passed to the function, by default [].
312 grad_args : List, optional
313 Additional arguments to be passed to the gradient function, by default [].
314 n_red : int, optional
315 The number of reductions, by default 1.
316 step_size : float, optional
317 The step size for the gradient descent, by default 0.5.
318 step_size_modifier : float, optional
319 Scaling factor for the step size power series, by default 1.0.
320 n_restart : int, optional
321 The number of steps after which to restart the power series, by default -1 (no restart).
322 disable_gradient_scaling : bool, optional
323 If true, skip the normalization of the gradient, by default False.
324 iterative_scaling : bool, optional
325 If true, the power series is scaled by the iteration k without checking whether this actually decreases the objective, by default False.
326 bypass_objective_decrease : bool, optional
327 If true, the check whether the objective function value actually decreases is bypassed, by default False.
328 """
330 def __init__(
331 self,
332 func: Callable,
333 grad: Callable,
334 func_args: List | None = None,
335 grad_args: List | None = None,
336 n_red: int = 1,
337 step_size: float = 0.5,
338 step_size_modifier: float = 1.0,
339 n_restart: int = -1,
340 disable_gradient_scaling: bool = False,
341 iterative_scaling: bool = False,
342 bypass_objective_decrease: bool = False,
343 ):
344 if func_args is None:
345 func_args = []
346 if grad_args is None:
347 grad_args = []
348 super().__init__(func, grad, func_args, grad_args, n_red)
349 self.step_size = step_size
350 self.step_size_modifier = step_size_modifier
351 self._l = -1
352 self.n_restart = np.inf if n_restart == -1 else n_restart
353 self.disable_gradient_scaling = disable_gradient_scaling
354 self.iterative_scaling = iterative_scaling
355 self.bypass_objective_decrease = bypass_objective_decrease
356 self._last_grad_norm = 1
358 def _function_reduction_step(self, x: npt.NDArray) -> np.ndarray:
359 """
360 Perform a function reduction step using gradient descent.
362 Parameters
363 ----------
364 x : npt.NDArray
365 The current point in the algorithm.
367 Returns
368 -------
369 npt.NDArray
370 The updated point after performing the reduction step.
371 """
372 # quick check if the step size is already zero, if so skip the rest of the calculations
373 if self.step_size_modifier * self.step_size**self._l * self._last_grad_norm <= 1e-14:
374 return x
376 xp = cp if isinstance(x, cp.ndarray) else np
377 grad_eval = self.grad(x)
378 func_eval = self.func(x)
380 if grad_eval @ grad_eval <= 0:
381 return x
383 grad_norm = 1 if self.disable_gradient_scaling else xp.linalg.norm(grad_eval)
384 self._last_grad_norm = grad_norm
386 if not self.iterative_scaling:
387 while True:
388 self._l += 1
389 x_ln = (
390 x - self.step_size_modifier * self.step_size**self._l * grad_eval / grad_norm
391 )
392 if self.bypass_objective_decrease or self.func(x_ln) <= func_eval:
393 return x_ln
394 else:
395 return (
396 x - self.step_size_modifier * self.step_size ** (self._k) * grad_eval / grad_norm
397 )
399 def pre_step(self, x: npt.NDArray, *args, **kwargs) -> None:
400 """
401 Resets the power series after n steps.
403 Parameters
404 ----------
405 x : npt.NDArray
406 Current iterate.
407 """
408 if self._k <= 0:
409 return
410 if self._k % self.n_restart == 0:
411 self._l = self._k // self.n_restart
413 def reset(self) -> None:
414 """Reset the perturbation to its initial state."""
415 super().reset()
416 self._l = -1