Coverage for suppy\superiorization\_standard_sup.py: 85%
150 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1"""Normal superiorization algorithm."""
2from typing import List, Callable
3import numpy as np
4import time
5import numpy.typing as npt
6from suppy.utils import ensure_float_array
7from suppy.perturbations import Perturbation
8from ._sup import FeasibilityPerturbation
9from suppy.projections import Projection
11try:
12 import cupy as cp
14 NO_GPU = False
15except ImportError:
16 cp = np
17 NO_GPU = True
20class Superiorization(FeasibilityPerturbation):
21 """
22 Superiorization algorithm for constrained optimization problems.
24 Parameters
25 ----------
26 basic : Callable
27 The underlying feasibility seeking algorithm.
28 perturbation_scheme : Perturbation
29 The perturbation scheme to be used for superiorization.
31 Attributes
32 ----------
33 basic : Callable
34 The underlying feasibility seeking algorithm.
35 perturbation_scheme : Perturbation
36 The perturbation scheme to be used for superiorization.
37 objective_tol : float
38 Tolerance for the objective function value change to determine stopping criteria.
39 prox_tol : float
40 Tolerance for the constraint proximity value change to determine stopping criteria.
41 _k : int
42 The current iteration number.
43 all_x : list | None
44 List of all points achieved during the optimization process, only stored if requested by the user.
45 all_function_values : list | None
46 List of all objective function values achieved during the optimization process, only stored if requested by the user.
47 all_x_function_reduction : list | None
48 List of all points achieved via the function reduction step, only stored if requested by the user.
49 all_function_values_function_reduction : list | None
50 List of all objective function values achieved via the function reduction step, only stored if requested by the user.
51 all_x_basic : list | None
52 List of all points achieved via the basic feasibility seeking algorithm, only stored if requested by the user.
53 all_function_values_basic : list | None
54 List of all objective function values achieved via the basic feasibility seeking algorithm, only stored if requested by the user.
55 """
57 def __init__(
58 self,
59 basic: Projection,
60 perturbation_scheme: Perturbation,
61 ):
63 super().__init__(basic)
64 self.perturbation_scheme = perturbation_scheme
66 # initialize some variables for the algorithms
67 self._k = 0
68 self.t = []
69 self.t_it = []
71 # initialize some arrays for storing of results
72 self.all_x = []
73 self.all_function_values = [] # array storing all objective function values
74 self.proximities = [] # array storing all proximity function values
76 self.all_x_function_reduction = []
77 self.all_function_values_function_reduction = []
78 self.proximities_function_reduction = []
80 self.all_x_basic = []
81 self.all_function_values_basic = []
82 self.proximities_basic = []
84 @ensure_float_array
85 def solve(
86 self,
87 x_0: npt.NDArray,
88 max_iter: int = 10,
89 prox_tol: float = 1e-6,
90 del_prox_tol: float = 1e-8,
91 del_prox_n: int = 5,
92 del_objective_tol: float = 1e-6,
93 del_objective_n: int = 5,
94 proximity_measures: List | None = None,
95 storage: bool = False,
96 storage_iters: List[int] | int | None = None,
97 alternative_stopping_criterion: Callable | None = None,
98 alternative_stopping_criterion_initial_call: Callable | None = None,
99 ) -> np.ndarray:
100 """
101 Solve the optimization problem using the superiorization method.
103 Parameters
104 ----------
105 x_0 : npt.NDArray
106 Initial guess for the solution.
107 max_iter : int, optional
108 Maximum number of iterations to perform (default is 10).
109 prox_tol : float, optional
110 Tolerance for the constraint function value to determine stopping criteria, by default 1e-6.
111 del_prox_tol : float, optional
112 Tolerance for the proximity function value change to determine stopping criteria, by default 1e-8.
113 del_prox_n : int, optional
114 Number of iterations with proximity changes below the threshold to determine stopping criteria, by default 5.
115 proximity_measures : List, optional
116 The proximity measures to calculate, by default None. Right now only the first in the list is used to check the feasibility.
117 del_objective_tol : float, optional
118 Tolernace for the objective function value to determine stopping criteria, by default 1e-6.
119 del_objective_n : int, optional
120 Number of iterations with objective function changes below the threshold to determine stopping criteria, by default 5.
121 storage : bool, optional
122 If True, store intermediate results (default is False).
123 storage_iters : List[int] or int, optional
124 Controls which iterations are stored (when storage=True). If None, all iterations are stored.
125 If a list of ints, only those iteration indices are stored (0 = initial point).
126 If an int, storage occurs every that many iterations.
127 alternative_stopping_criterion : callable, optional
128 Alternative stopping criterion
129 alternative_stopping_criterion_initial_call : callable, optional
130 Initial call for an alternative stopping criterion
132 Returns
133 -------
134 npt.NDArray
135 The superiorized solution.
136 """
138 def _should_store(idx):
139 if storage_iters is None:
140 return True
141 if isinstance(storage_iters, int):
142 return idx % storage_iters == 0
143 return idx in storage_iters
145 if proximity_measures is None:
146 proximity_measures = [("p_norm", 2)]
147 else:
148 # TODO: check that proximity measures are valid
149 _ = None
150 # initialization of variables
152 self.perturbation_scheme.reset() # reset the perturbation scheme
154 self._n_tol_objective = 0
155 self._n_tol_prox = 0
157 x = x_0
159 # initial function and proximity values
161 self._initial_storage(
162 x,
163 storage and _should_store(0),
164 self.perturbation_scheme.func(x_0),
165 self.basic.proximity(x_0, proximity_measures),
166 )
168 self._k = 0 # reset counter if necessary
170 if alternative_stopping_criterion_initial_call is not None:
171 stop = alternative_stopping_criterion_initial_call(x, self)
172 else:
173 stop = False # criterion for stopping the algorithm
175 self.t.append(0)
176 t_current = time.time()
177 t_init = t_current
179 while self._k < max_iter and not stop:
180 self.perturbation_scheme.pre_step(
181 x,
182 last_proximity=self.proximities[-1],
183 last_proximity_basic=self.proximities_basic[-1],
184 last_proximity_function_reduction=self.proximities_function_reduction[-1],
185 last_function_value=self.all_function_values[-1],
186 last_function_value_basic=self.all_function_values_basic[-1],
187 ) # perform any pre-step actions of the perturbation scheme
189 # perform the perturbation schemes update step
190 x = self.perturbation_scheme.perturbation_step(x)
191 self.storage(
192 x,
193 kind="function_reduction",
194 storage=storage and _should_store(self._k + 1),
195 f=self.perturbation_scheme.func(x),
196 p=self.basic.proximity(x, proximity_measures),
197 )
199 self.perturbation_scheme.post_step(
200 x,
201 last_proximity=self.proximities[-1],
202 last_proximity_basic=self.proximities_basic[-1],
203 last_proximity_function_reduction=self.proximities_function_reduction[-1],
204 last_function_value=self.all_function_values[-1],
205 last_function_value_basic=self.all_function_values_basic[-1],
206 ) # perform any post-step actions of the perturbation scheme
208 # perform basic step
209 x = self.basic.step(x)
211 # check current function and proximity values
213 self.storage(
214 x,
215 kind="basic",
216 storage=storage and _should_store(self._k + 1),
217 f=self.perturbation_scheme.func(x),
218 p=self.basic.proximity(x, proximity_measures),
219 )
221 self._k += 1
223 # enable different stopping criteria for different superiorization algorithms
224 if alternative_stopping_criterion is not None:
225 stop = alternative_stopping_criterion(x, self)
226 else:
227 stop = self._stopping_criterion(
228 del_objective_tol, del_objective_n, prox_tol, del_prox_tol, del_prox_n
229 )
231 self._additional_action(x)
232 self.t.append(time.time() - t_init)
233 self.t_it.append(time.time() - t_current)
234 t_current = time.time()
235 self._post_step(x)
237 return x
239 def _stopping_criterion(
240 self,
241 del_objective_tol: float,
242 del_objective_n: int,
243 prox_tol: float,
244 del_prox_tol: float,
245 del_prox_n: int,
246 ) -> bool:
247 """
248 Determine if the stopping criterion for the optimization process are
249 met.
251 Parameters
252 ----------
253 del_objective_tol : float, optional
254 Tolernace for the objective function value to determine stopping criteria, by default 1e-6.
255 del_objective_n : int, optional
256 Number of iterations with objective function changes below the threshold to determine stopping criteria, by default 5.
257 prox_tol : float, optional
258 Tolerance for the constraint function value to determine stopping criteria, by default 1e-6.
259 del_prox_tol : float, optional
260 Tolerance for the proximity function value change to determine stopping criteria, by default 1e-8.
261 del_prox_n : int, optional
262 Number of iterations with proximity changes below the threshold to determine stopping criteria, by default 5.
264 Returns
265 -------
266 bool
267 True if the stopping criteria are met, False otherwise.
268 """
270 stop_objective = False # variable to check if the objective function criteria is met
271 # check objective function criteria f_(k+1)-f_k<= (delta f)* is met in last n_tol_objective iterations
272 if (
273 abs(self.all_function_values[-3] - self.all_function_values[-1])
274 / max(1, self.all_function_values[-3])
275 < del_objective_tol
276 ):
277 self._n_tol_objective += 1
278 else:
279 self._n_tol_objective = 0
280 if (
281 self._n_tol_objective >= del_objective_n
282 ): # n objective function changes below threshold
283 stop_objective = True
285 stop_prox = False # variable to check if the proximity criteria is met
286 # check if proximity values are below the threshold
287 if self.proximities[-1][0] < prox_tol: # proximity below goal/tolerance
288 stop_prox = True
290 # check if the proximity changes are below tolerance level
291 if (
292 abs(self.proximities[-3][0] - self.proximities[-1][0])
293 / max(1, self.proximities[-3][0])
294 < del_prox_tol
295 ):
296 self._n_tol_prox += 1
297 else:
298 self._n_tol_prox = 0
299 if self._n_tol_prox >= del_prox_n: # n proximity changes below threshold
300 stop_prox = True
302 # check if both criteria are met
303 return stop_objective and stop_prox
305 def _additional_action(self, x: npt.NDArray):
306 """
307 Perform an additional action on the input, in case it is needed.
309 Parameters
310 ----------
311 x : npt.NDArray
312 The current iterate
314 Returns
315 -------
316 None
317 """
319 def _initial_storage(self, x, storage, f, p):
320 """
321 Initializes storage for objective values and appends initial values.
323 Parameters
324 ----------
325 x : array-like
326 Initial values of the variables.
327 f : array-like
328 Initial values of the objective function.
329 p : array-like
330 Proximity function value
331 """
332 # reset objective values
333 self.all_x = []
334 self.all_function_values = [] # array storing all objective function values
335 self.proximities = [] # array storing all proximity function values
337 self.all_x_function_reduction = []
338 self.all_function_values_function_reduction = []
339 self.proximities_function_reduction = []
341 self.all_x_basic = []
342 self.all_function_values_basic = []
343 self.proximities_basic = []
345 # append initial values
346 self.all_function_values.append(f)
347 self.all_function_values_basic.append(f)
348 self.all_function_values_function_reduction.append(f)
350 self.proximities.append(p)
351 self.proximities_basic.append(p)
352 self.proximities_function_reduction.append(p)
354 if storage and isinstance(x, np.ndarray):
355 self.all_x.append(x.copy())
356 self.all_x_basic.append(x.copy())
357 self.all_x_function_reduction.append(x.copy())
359 elif storage:
360 self.all_x.append(x.get())
361 self.all_x_basic.append(x.get())
362 self.all_x_function_reduction.append(x.get())
364 def storage(
365 self,
366 x: npt.NDArray,
367 kind: str,
368 storage: bool = True,
369 f: float | None = None,
370 p: list | None = None,
371 ):
372 """
373 Stores the given values of x and f into the corresponding lists.
375 Parameters
376 ----------
377 x : npt.NDArray
378 The current value of the variable x to be stored.
379 kind : str
380 The type of storage to be used, either "function_reduction" or "basic".
381 storage : bool, optional
382 If True, store the values of x
383 """
385 # always store all function and proximity values
386 self.all_function_values.append(f)
387 self.proximities.append(p)
388 if storage and isinstance(x, np.ndarray):
389 self.all_x.append(x.copy())
390 elif storage:
391 self.all_x.append((x.get()))
393 if kind == "function_reduction":
394 self.all_function_values_function_reduction.append(f)
395 self.proximities_function_reduction.append(p)
397 if storage and isinstance(x, np.ndarray):
398 self.all_x_function_reduction.append(x.copy())
399 elif storage:
400 self.all_x_function_reduction.append((x.get()))
402 elif kind == "basic":
403 self.all_function_values_basic.append(f)
404 self.proximities_basic.append(p)
406 if storage and isinstance(x, np.ndarray):
407 self.all_x_basic.append(x.copy())
408 elif storage:
409 self.all_x_basic.append((x.get()))
411 else:
412 raise ValueError("Invalid storage type. Use 'function_reduction' or 'basic'.")
414 def _post_step(self, x: npt.NDArray):
415 """
416 Perform an action after the optimization process has finished.
418 Parameters
419 ----------
420 x : array-like
421 The current value of the variable x.
423 Returns
424 -------
425 None
426 """
428 self.all_x = np.array(self.all_x)
429 self.all_x_function_reduction = np.array(self.all_x_function_reduction)
430 self.all_x_basic = np.array(self.all_x_basic)
432 if isinstance(x, np.ndarray):
433 self.all_function_values = np.array(self.all_function_values)
434 self.all_function_values_function_reduction = np.array(
435 self.all_function_values_function_reduction
436 )
437 self.all_function_values_basic = np.array(self.all_function_values_basic)
438 self.proximities = np.array(self.proximities)
439 self.proximities_function_reduction = np.array(self.proximities_function_reduction)
440 self.proximities_basic = np.array(self.proximities_basic)
441 else:
442 self.all_function_values = np.array([el.get() for el in self.all_function_values])
443 self.all_function_values_function_reduction = np.array(
444 [el.get() for el in self.all_function_values_function_reduction]
445 )
446 self.all_function_values_basic = np.array(
447 [el.get() for el in self.all_function_values_basic]
448 )
449 self.proximities = np.array([el.get() for el in self.proximities])
450 self.proximities_function_reduction = np.array(
451 [el.get() for el in self.proximities_function_reduction]
452 )
453 self.proximities_basic = np.array([el.get() for el in self.proximities_basic])