Coverage for suppy\feasibility\_linear_algorithms.py: 83%
151 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1"""Base classes for linear feasibility problems."""
2from abc import ABC
3from typing import List, Callable
4import numpy as np
5import numpy.typing as npt
7from scipy import sparse
9from suppy.utils import Bounds
10from suppy.utils import LinearMapping
11from suppy.utils import ensure_float_array
12from suppy.projections._projections import Projection
14try:
15 import cupy as cp
17 NO_GPU = False
19except ImportError:
20 NO_GPU = True
21 cp = np
24class Feasibility(Projection, ABC):
25 """
26 Parameters
27 ----------
28 algorithmic_relaxation : npt.NDArray or float, optional
29 The relaxation parameter used by the algorithm, by default 1.0.
30 relaxation : float, optional
31 Outer relaxation parameter, applied to the entire solution of the
32 iterate by default 1.0.
33 proximity_flag : bool, optional
34 A flag indicating whether to use this object for proximity
35 calculations, by default True.
37 Attributes
38 ----------
39 algorithmic_relaxation : npt.NDArray or float, optional
40 The relaxation parameter used by the algorithm, by default 1.0.
41 relaxation : float, optional
42 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
43 proximity_flag : bool, optional
44 Flag to indicate whether to calculate proximity, by default True.
45 _use_gpu : bool, optional
46 Flag to indicate whether to use GPU for computations, by default False.
47 """
49 def __init__(
50 self,
51 algorithmic_relaxation: npt.NDArray | float = 1.0,
52 relaxation: float = 1.0,
53 proximity_flag: bool = True,
54 _use_gpu: bool = False,
55 ):
56 super().__init__(relaxation, proximity_flag, _use_gpu)
57 self.algorithmic_relaxation = algorithmic_relaxation
58 self.all_x = None
59 self.proximities = None
60 # self._n_tol = 0 # counter for stopping criterion
62 @ensure_float_array
63 def solve(
64 self,
65 x: npt.NDArray,
66 max_iter: int = 500,
67 prox_tol: float = 1e-6,
68 del_prox_tol: float = 1e-8,
69 del_prox_n: int = 5,
70 storage: bool = False,
71 storage_iters: List[int] | int | None = None,
72 proximity_measures: List | None = None,
73 alternative_stopping_criterion: Callable | None = None,
74 alternative_stopping_criterion_initial_call: Callable | None = None,
75 ) -> np.ndarray:
76 """
77 Solves the optimization problem using an iterative approach.
79 Parameters
80 ----------
81 x : npt.NDArray
82 Starting point for the algorithm.
83 max_iter : int, optional
84 Maximum number of iterations to perform, by default 500.
85 prox_tol : float, optional
86 The tolerance for the proximity on the constraints, by default 1e-6.
87 del_prox_tol : float, optional
88 The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8.
89 del_prox_n : int, optional
90 The number of iterations that del_prox_tol needs to be met in a row, by default 5.
91 proximity_measures : List, optional
92 The proximity measures to calculate, by default a l2 norm measure is used. Right now only the first in the list is used to check the feasibility.
93 storage : bool, optional
94 Flag indicating whether to store intermediate solutions, by default False.
95 storage_iters : List[int] or int, optional
96 Controls which iterations are stored (when storage=True). If None, all iterations are stored.
97 If a list of ints, only those iteration indices are stored (0 = initial point).
98 If an int, storage occurs every that many iterations.
99 alternative_stopping_criterion : callable, optional
100 Alternative stopping criterion
101 alternative_stopping_criterion_initial_call : callable, optional
102 Initial call for an alternative stopping criterion
104 Returns
105 -------
106 npt.NDArray
107 The solution after the iterative process.
108 """
109 xp = cp if isinstance(x, cp.ndarray) else np
110 self._n_tol = 0
112 if proximity_measures is None:
113 proximity_measures = [("p_norm", 2)]
115 self.proximities = [self.proximity(x, proximity_measures)]
116 i = 0
118 def _should_store(idx):
119 if storage_iters is None:
120 return True
121 if isinstance(storage_iters, int):
122 return idx % storage_iters == 0
123 return idx in storage_iters
125 to_host = (
126 (lambda v: np.array(v.copy())) if isinstance(x, np.ndarray) else (lambda v: v.get())
127 )
129 if storage is True:
130 self.all_x = []
131 if _should_store(0):
132 self.all_x.append(to_host(x))
134 if alternative_stopping_criterion_initial_call is not None:
135 stop = alternative_stopping_criterion_initial_call(x, self)
136 else:
137 stop = False # criterion for stopping the algorithm
139 while i < max_iter and not stop:
140 x = self.project(x)
141 if storage is True and _should_store(i + 1):
142 self.all_x.append(to_host(x))
143 self.proximities.append(self.proximity(x, proximity_measures))
145 if alternative_stopping_criterion is not None:
146 stop = alternative_stopping_criterion(x, self)
147 else:
148 stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n)
150 i += 1
152 if self.all_x is not None:
153 self.all_x = np.array(self.all_x)
155 self.proximities = xp.array(self.proximities)
157 return x
159 def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: float):
160 """"""
161 if self.proximities[-1][0] < prox_tol: # proximity below goal/tolerance
162 return True
163 else: # check that last n proximity changes are below a threshold
164 if self.proximities[-2][0] - self.proximities[-1][0] < del_prox_tol:
165 self._n_tol += 1
166 else:
167 self._n_tol = 0
168 if self._n_tol >= del_prox_n: # n proximity changes below threshold
169 return True
170 return False
173class LinearFeasibility(Feasibility, ABC):
174 """
175 LinearFeasibility class for handling linear feasibility problems.
177 Parameters
178 ----------
179 A : npt.NDArray or sparse.sparray
180 Matrix for linear systems
181 algorithmic_relaxation : npt.NDArray or float, optional
182 The relaxation parameter used by the algorithm, by default 1.0.
183 relaxation : float, optional
184 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
185 proximity_flag : bool, optional
186 Flag indicating whether to consider this object for proximity calculations. If False proximity calculations will always return 0, by default True.
188 Attributes
189 ----------
190 A : LinearMapping
191 Matrix for linear system, represented through internal object.
192 algorithmic_relaxation : npt.NDArray or float, optional
193 The relaxation parameter used by the algorithm, by default 1.0.
194 relaxation : float, optional
195 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
196 proximity_flag : bool, optional
197 Flag indicating whether to consider this object for proximity calculations. If False proximity calculations will always return 0, by default True.
198 _use_gpu : bool, optional
199 Flag to indicate whether to use GPU for computations, by default False.
200 """
202 def __init__(
203 self,
204 A: npt.NDArray | sparse.sparray,
205 algorithmic_relaxation: npt.NDArray | float = 1.0,
206 relaxation: float = 1.0,
207 proximity_flag: bool = True,
208 ):
209 _, _use_gpu = LinearMapping.get_flags(A)
210 super().__init__(algorithmic_relaxation, relaxation, proximity_flag, _use_gpu)
211 self.A = LinearMapping(A)
212 self.inverse_row_norm = 1 / self.A.row_norm(2, 2)
214 def map(self, x: npt.NDArray) -> np.ndarray:
215 """
216 Applies the linear mapping to the input array x.
218 Parameters
219 ----------
220 x : npt.NDArray
221 The input array to which the linear mapping is applied.
223 Returns
224 -------
225 npt.NDArray
226 The result of applying the linear mapping to the input array.
227 """
228 return self.A @ x
230 def single_map(self, x: npt.NDArray, i: int) -> np.ndarray:
231 """
232 Applies the linear mapping to the input array x at a specific index
233 i.
235 Parameters
236 ----------
237 x : npt.NDArray
238 The input array to which the linear mapping is applied.
239 i : int
240 The specific index at which the linear mapping is applied.
242 Returns
243 -------
244 npt.NDArray
245 The result of applying the linear mapping to the input array at the specified index.
246 """
247 return self.A.single_map(x, i)
249 def indexed_map(self, x: npt.NDArray, idx: List[int] | npt.NDArray) -> np.ndarray:
250 """
251 Applies the linear mapping to the input array x at multiple
252 specified indices.
254 Parameters
255 ----------
256 x : npt.NDArray
257 The input array to which the linear mapping is applied.
258 idx : List[int] or npt.NDArray
259 The indices at which the linear mapping is applied.
261 Returns
262 -------
263 npt.NDArray
264 The result of applying the linear mapping to the input array at the specified indices.
265 """
266 return self.A.index_map(x, idx)
269class HyperplaneFeasibility(LinearFeasibility, ABC):
270 """
271 HyperplaneFeasibility class for solving halfspace feasibility problems.
273 Parameters
274 ----------
275 A : npt.NDArray or sparse.sparray
276 Matrix for linear systems
277 b : npt.NDArray
278 Bound for linear systems
279 algorithmic_relaxation : npt.NDArray or float, optional
280 The relaxation parameter used by the algorithm, by default 1.0.
281 relaxation : float, optional
282 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
283 proximity_flag : bool, optional
284 Flag indicating whether to use proximity, by default True.
286 Attributes
287 ----------
288 A : LinearMapping
289 Matrix for linear system (stored in internal LinearMapping object).
290 b : npt.NDArray
291 Bound for linear systems
292 algorithmic_relaxation : npt.NDArray or float, optional
293 The relaxation parameter used by the algorithm, by default 1.0.
294 relaxation : float, optional
295 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
296 proximity_flag : bool, optional
297 Flag to indicate whether to calculate proximity, by default True.
298 _use_gpu : bool, optional
299 Flag to indicate whether to use GPU for computations, by default False.
300 """
302 def __init__(
303 self,
304 A: npt.NDArray | sparse.sparray,
305 b: npt.NDArray,
306 algorithmic_relaxation: npt.NDArray | float = 1.0,
307 relaxation: float = 1.0,
308 proximity_flag: bool = True,
309 ):
310 super().__init__(A, algorithmic_relaxation, relaxation, proximity_flag)
311 try:
312 len(b)
313 if A.shape[0] != len(b):
314 raise ValueError("Matrix A and vector b must have the same number of rows.")
315 except TypeError:
316 # create an array for b if it is a scalar
317 if not self.A.gpu:
318 b = np.ones(A.shape[0]) * b
319 else:
320 b = cp.ones(A.shape[0]) * b
321 self.b = b
323 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
324 p = self.map(x)
325 res = abs(self.b - p)
326 measures = []
327 for measure in proximity_measures:
328 if isinstance(measure, tuple):
329 if measure[0] == "p_norm":
330 measures.append(1 / len(res) * (res ** measure[1]).sum())
331 else:
332 raise ValueError("Invalid proximity measure")
333 elif isinstance(measure, str) and measure == "max_norm":
334 measures.append(res.max())
335 else:
336 raise ValueError("Invalid proximity measure")
337 return measures
340class HalfspaceFeasibility(LinearFeasibility, ABC):
341 """
342 HalfspaceFeasibility class for solving halfspace feasibility problems.
344 Parameters
345 ----------
346 A : npt.NDArray or sparse.sparray
347 Matrix for linear systems
348 b : npt.NDArray
349 Bound for linear systems
350 algorithmic_relaxation : npt.NDArray or float, optional
351 The relaxation parameter used by the algorithm, by default 1.0.
352 relaxation : float, optional
353 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
354 proximity_flag : bool, optional
355 Flag indicating whether to use proximity, by default True.
357 Attributes
358 ----------
359 A : LinearMapping
360 Matrix for linear system (stored in internal LinearMapping object).
361 b : npt.NDArray
362 Bound for linear systems
363 algorithmic_relaxation : npt.NDArray or float, optional
364 The relaxation parameter used by the algorithm, by default 1.0.
365 relaxation : float, optional
366 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
367 proximity_flag : bool, optional
368 Flag to indicate whether to calculate proximity, by default True.
369 _use_gpu : bool, optional
370 Flag to indicate whether to use GPU for computations, by default False.
371 """
373 def __init__(
374 self,
375 A: npt.NDArray | sparse.sparray,
376 b: npt.NDArray,
377 algorithmic_relaxation: npt.NDArray | float = 1.0,
378 relaxation: float = 1.0,
379 proximity_flag: bool = True,
380 ):
381 super().__init__(A, algorithmic_relaxation, relaxation, proximity_flag)
382 try:
383 len(b)
384 if A.shape[0] != len(b):
385 raise ValueError("Matrix A and vector b must have the same number of rows.")
386 except TypeError:
387 # create an array for b if it is a scalar
388 if not self.A.gpu:
389 b = np.ones(A.shape[0]) * b
390 else:
391 b = cp.ones(A.shape[0]) * b
392 self.b = b
394 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
395 p = self.map(x)
396 res = self.b - p
397 res[res > 0] = 0
398 res = -res
400 measures = []
401 for measure in proximity_measures:
402 if isinstance(measure, tuple):
403 if measure[0] == "p_norm":
404 measures.append(1 / len(res) * (res ** measure[1]).sum())
405 else:
406 raise ValueError("Invalid proximity measure")
407 elif isinstance(measure, str) and measure == "max_norm":
408 measures.append(res.max())
409 else:
410 raise ValueError("Invalid proximity measure")
411 return measures
414class HyperslabFeasibility(LinearFeasibility, ABC):
415 """
416 A class used for solving feasibility problems for hyperslabs.
418 Parameters
419 ----------
420 A : npt.NDArray or sparse.sparray
421 Matrix for linear systems
422 lb : npt.NDArray
423 The lower bounds for the hyperslab.
424 ub : npt.NDArray
425 The upper bounds for the hyperslab.
426 algorithmic_relaxation : npt.NDArray or float, optional
427 The relaxation parameter used by the algorithm, by default 1.0.
428 relaxation : float, optional
429 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
430 proximity_flag : bool, optional
431 A flag indicating whether to use proximity, by default True.
433 Attributes
434 ----------
435 bounds : Bounds
436 Objective for handling the upper and lower bounds of the hyperslab.
437 A : LinearMapping
438 Matrix for linear system (stored in internal LinearMapping object).
439 algorithmic_relaxation : npt.NDArray or float, optional
440 The relaxation parameter used by the algorithm, by default 1.0.
441 relaxation : float, optional
442 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
443 proximity_flag : bool, optional
444 Flag to indicate whether to calculate proximity, by default True.
445 _use_gpu : bool, optional
446 Flag to indicate whether to use GPU for computations, by default False.
447 """
449 def __init__(
450 self,
451 A: npt.NDArray,
452 lb: npt.NDArray,
453 ub: npt.NDArray,
454 algorithmic_relaxation: npt.NDArray | float = 1.0,
455 relaxation: float = 1.0,
456 proximity_flag: bool = True,
457 ):
458 super().__init__(A, algorithmic_relaxation, relaxation, proximity_flag)
459 self.bounds = Bounds(lb, ub)
460 if self.A.shape[0] != len(self.bounds.l):
461 raise ValueError("Matrix A and bound vector must have the same number of rows.")
463 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
464 p = self.map(x)
465 (res_l, res_u) = self.bounds.residual(p)
466 res_l[res_l > 0] = 0
467 res_u[res_u > 0] = 0
468 res = -res_l - res_u
470 measures = []
471 for measure in proximity_measures:
472 if isinstance(measure, tuple):
473 if measure[0] == "p_norm":
474 measures.append(1 / len(res) * (res ** measure[1]).sum())
475 else:
476 raise ValueError("Invalid proximity measure")
477 elif isinstance(measure, str) and measure == "max_norm":
478 measures.append(res.max())
479 else:
480 raise ValueError("Invalid proximity measure")
481 return measures