Coverage for suppy\feasibility\_bands\_ams_algorithms.py: 88%
177 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1import warnings
2from abc import ABC
3from typing import List
4import numpy as np
5import numpy.typing as npt
7try:
8 import cupy as cp
10 NO_GPU = False
12except ImportError:
13 NO_GPU = True
14 cp = np
16from suppy.feasibility._linear_algorithms import HyperslabFeasibility
17from suppy.utils import LinearMapping
20class HyperslabAlgorithm(HyperslabFeasibility, ABC):
21 """
22 The HyperslabAlgorithm class is used to find a feasible solution to a
23 set of linear inequalities.
25 Parameters
26 ----------
27 A : npt.NDArray or sparse.sparray
28 Matrix for linear systems
29 lb : npt.NDArray
30 The lower bounds for the hyperslab.
31 ub : npt.NDArray
32 The upper bounds for the hyperslab.
33 algorithmic_relaxation : npt.NDArray or float, optional
34 The relaxation parameter used by the algorithm, by default 1.0.
35 relaxation : float, optional
36 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
37 proximity_flag : bool, optional
38 A flag indicating whether to use proximity in the algorithm, by default True.
39 """
41 def __init__(
42 self,
43 A: npt.NDArray,
44 lb: npt.NDArray,
45 ub: npt.NDArray,
46 algorithmic_relaxation: npt.NDArray | float = 1.0,
47 relaxation: float = 1.0,
48 proximity_flag: bool = True,
49 ):
50 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
53class SequentialAMSHyperslab(HyperslabAlgorithm):
54 """
55 SequentialAMSHyperslab class for sequentially applying the AMS algorithm
56 on hyperslabs.
58 Parameters
59 ----------
60 A : npt.NDArray or sparse.sparray
61 Matrix for linear systems
62 lb : npt.NDArray
63 The lower bounds for the hyperslab.
64 ub : npt.NDArray
65 The upper bounds for the hyperslab.
66 algorithmic_relaxation : npt.NDArray or float, optional
67 The relaxation parameter used by the algorithm, by default 1.0.
68 relaxation : float, optional
69 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
70 cs : None or List[int], optional
71 The list of indices for the constraints, by default None.
72 proximity_flag : bool, optional
73 Flag to indicate if proximity should be considered, by default True.
74 """
76 def __init__(
77 self,
78 A: npt.NDArray,
79 lb: npt.NDArray,
80 ub: npt.NDArray,
81 algorithmic_relaxation: npt.NDArray | float = 1.0,
82 relaxation: float = 1.0,
83 cs: None | List[int] = None,
84 proximity_flag: bool = True,
85 ):
87 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
88 xp = cp if self._use_gpu else np
89 if cs is None:
90 self.cs = xp.arange(self.A.shape[0])
91 else:
92 self.cs = cs
94 def _project(self, x: npt.NDArray) -> np.ndarray:
95 """
96 Projects the input array `x` onto the feasible region defined by the
97 constraints.
99 Parameters
100 ----------
101 x : npt.NDArray
102 The input array to be projected.
104 Returns
105 -------
106 npt.NDArray
107 The projected array.
108 """
110 for i in self.cs:
111 p_i = self.single_map(x, i)
112 (res_li, res_ui) = self.bounds.single_residual(p_i, i) # returns floats
114 if res_ui < 0:
115 self.A.update_step(
116 x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_ui, i
117 )
118 elif res_li < 0:
119 self.A.update_step(
120 x, -1 * self.algorithmic_relaxation * self.inverse_row_norm[i] * res_li, i
121 )
122 return x
125class SequentialWeightedAMSHyperslab(SequentialAMSHyperslab):
126 """
127 Parameters
128 ----------
129 A : npt.NDArray or sparse.sparray
130 Matrix for linear systems
131 lb : npt.NDArray
132 The lower bounds of the hyperslab.
133 ub : npt.NDArray
134 The upper bounds of the hyperslab.
135 weights : None, list of float, or npt.NDArray, optional
136 The weights assigned to each constraint. If None, default weights are
137 used.
138 algorithmic_relaxation : npt.NDArray or float, optional
139 The relaxation parameter used by the algorithm, by default 1.0.
140 relaxation : float, optional
141 Outer relaxation parameter, applied to the entire solution of the
142 iterate by default 1.0.
143 weight_decay : float, optional
144 Parameter that determines the rate at which the weights are reduced
145 after each phase (weights * weight_decay). Default is 1.0.
146 cs : None or list of int, optional
147 The indices of the constraints to be considered. Default is None.
148 proximity_flag : bool, optional
149 Flag to indicate if proximity should be considered. Default is True.
151 Attributes
152 ----------
153 weights : npt.NDArray
154 The weights assigned to each constraint.
155 weight_decay : float
156 Decay rate for the weights.
157 temp_weight_decay : float
158 Initial value for weight decay.
159 """
161 def __init__(
162 self,
163 A: npt.NDArray,
164 lb: npt.NDArray,
165 ub: npt.NDArray,
166 weights: None | List[float] | npt.NDArray = None,
167 algorithmic_relaxation: npt.NDArray | float = 1.0,
168 relaxation: float = 1.0,
169 weight_decay: float = 1.0,
170 cs: None | List[int] = None,
171 proximity_flag: bool = True,
172 ):
174 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, cs, proximity_flag)
175 xp = cp if self._use_gpu else np
176 self.weight_decay = weight_decay
177 self.temp_weight_decay = 1.0
179 if weights is None:
180 self.weights = xp.ones(self.A.shape[0])
181 else:
182 self.weights = weights
184 def _project(self, x: npt.NDArray) -> np.ndarray:
185 """
186 Projects the input array `x` onto a feasible region defined by the
187 constraints.
189 Parameters
190 ----------
191 x : npt.NDArray
192 The input array to be projected.
194 Returns
195 -------
196 npt.NDArray
197 The projected array.
199 Notes
200 -----
201 This method iteratively adjusts the input array `x` based on the constraints
202 defined in `self.cs`. For each constraint, it computes the projection and
203 checks if the constraints are violated. If a constraint is violated, it updates
204 the array `x` using a weighted relaxation factor. The weight decay is applied
205 to the temporary weight decay after each iteration.
206 """
208 weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay
210 for i in self.cs:
212 p_i = self.single_map(x, i)
214 (res_li, res_ui) = self.bounds.single_residual(p_i, i) # returns floats
216 if res_ui < 0:
217 self.A.update_step(
218 x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res_ui, i
219 )
220 elif res_li < 0:
221 self.A.update_step(
222 x,
223 -1 * weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res_li,
224 i,
225 )
227 self.temp_weight_decay *= self.weight_decay
228 return x
230 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
231 p = self.map(x)
232 # residuals are positive if constraints are met
233 (res_l, res_u) = self.bounds.residual(p)
234 res_u[res_u > 0] = 0
235 res_l[res_l > 0] = 0
236 res = -res_u - res_l
237 measures = []
238 for measure in proximity_measures:
239 if isinstance(measure, tuple):
240 if measure[0] == "p_norm":
241 measures.append((self.weights @ (res ** measure[1])) / (self.weights.sum()))
242 else:
243 raise ValueError("Invalid proximity measure")
244 elif isinstance(measure, str) and measure == "max_norm":
245 measures.append(res.max())
246 else:
247 raise ValueError("Invalid proximity measure")
248 return measures
251class SimultaneousAMSHyperslab(HyperslabAlgorithm):
252 """
253 SimultaneousAMSHyperslab class for simultaneous application of the AMS
254 algorithm on hyperslabs.
256 Parameters
257 ----------
258 A : npt.NDArray or sparse.sparray
259 Matrix for linear systems
260 lb : npt.NDArray
261 The lower bounds for the hyperslab.
262 ub : npt.NDArray
263 The upper bounds for the hyperslab.
264 algorithmic_relaxation : npt.NDArray or float, optional
265 The relaxation parameter used by the algorithm, by default 1.0.
266 relaxation : float, optional
267 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
268 weights : None or List[float], optional
269 The weights for the constraints, by default None.
270 proximity_flag : bool, optional
271 Flag to indicate if proximity calculations should be performed, by default True.
272 """
274 def __init__(
275 self,
276 A: npt.NDArray,
277 lb: npt.NDArray,
278 ub: npt.NDArray,
279 algorithmic_relaxation: npt.NDArray | float = 1.0,
280 relaxation: float = 1.0,
281 weights: None | List[float] = None,
282 proximity_flag: bool = True,
283 ):
285 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
287 xp = cp if self._use_gpu else np
289 if weights is None:
290 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
291 elif xp.abs((weights.sum() - 1)) > 1e-10:
292 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2)
293 self.weights = weights / weights.sum()
294 else:
295 self.weights = weights
297 def _project(self, x: npt.NDArray) -> np.ndarray:
298 # simultaneous projection
299 p = self.map(x)
300 (res_l, res_u) = self.bounds.residual(p)
301 d_idx = res_u < 0
302 c_idx = res_l < 0
303 x += self.algorithmic_relaxation * (
304 (self.weights * self.inverse_row_norm)[d_idx] * res_u[d_idx] @ self.A[d_idx, :]
305 - (self.weights * self.inverse_row_norm)[c_idx] * res_l[c_idx] @ self.A[c_idx, :]
306 )
308 return x
310 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
311 p = self.map(x)
312 # residuals are positive if constraints are met
313 (res_l, res_u) = self.bounds.residual(p)
314 res_u[res_u > 0] = 0
315 res_l[res_l > 0] = 0
316 res = -res_u - res_l
317 measures = []
318 for measure in proximity_measures:
319 if isinstance(measure, tuple):
320 if measure[0] == "p_norm":
321 measures.append(self.weights @ (res ** measure[1]))
322 else:
323 raise ValueError("Invalid proximity measure")
324 elif isinstance(measure, str) and measure == "max_norm":
325 measures.append(res.max())
326 else:
327 raise ValueError("Invalid proximity measure")
328 return measures
331class SARTHyperslab(SimultaneousAMSHyperslab):
332 """
333 SART Hyperslab class for simultaneous application of the SART
334 algorithm on hyperslabs.
336 Parameters
337 ----------
338 A : npt.NDArray or sparse.sparray
339 Matrix for linear systems
340 lb : npt.NDArray
341 The lower bounds for the hyperslab.
342 ub : npt.NDArray
343 The upper bounds for the hyperslab.
344 algorithmic_relaxation : npt.NDArray or float, optional
345 The relaxation parameter used by the algorithm, by default 1.0.
346 relaxation : float, optional
347 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
348 weights : None or List[float], optional
349 The weights for the constraints, by default None.
350 proximity_flag : bool, optional
351 Flag to indicate if proximity calculations should be performed, by default True.
352 """
354 def __init__(
355 self,
356 A: npt.NDArray,
357 lb: npt.NDArray,
358 ub: npt.NDArray,
359 algorithmic_relaxation: npt.NDArray | float = 1.0,
360 relaxation: float = 1.0,
361 weights: None | List[float] = None,
362 proximity_flag: bool = True,
363 ):
365 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, weights, proximity_flag)
366 self.inverse_linear_row_norm = 1 / self.A.row_norm(1, 1)
367 self.inverse_linear_column_norm = 1 / self.A.column_norm(1, 1)
369 def _project(self, x: npt.NDArray) -> np.ndarray:
370 # SART projection
371 p = self.map(x)
372 (res_l, res_u) = self.bounds.residual(p)
373 d_idx = res_u < 0
374 c_idx = res_l < 0
375 x += (
376 self.algorithmic_relaxation
377 * self.inverse_linear_column_norm
378 * (
379 (self.weights * self.inverse_linear_row_norm)[d_idx]
380 * res_u[d_idx]
381 @ self.A[d_idx, :]
382 - (self.weights * self.inverse_linear_row_norm)[c_idx]
383 * res_l[c_idx]
384 @ self.A[c_idx, :]
385 )
386 )
388 return x
391class BlockIterativeAMSHyperslab(HyperslabAlgorithm):
392 """
393 Block Iterative AMS Algorithm for hyperslabs.
395 Parameters
396 ----------
397 A : npt.NDArray or sparse.sparray
398 Matrix for linear systems
399 lb : npt.NDArray
400 The lower bounds for the hyperslab.
401 ub : npt.NDArray
402 The upper bounds for the hyperslab.
403 weights : List[List[float]] or List[npt.NDArray]
404 A list of lists or arrays representing the weights for each block. Each list/array should sum to 1.
405 algorithmic_relaxation : npt.NDArray or float, optional
406 The relaxation parameter used by the algorithm, by default 1.0.
407 relaxation : float, optional
408 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
409 proximity_flag : bool, optional
410 A flag indicating whether to use proximity measures, by default True.
412 Raises
413 ------
414 ValueError
415 If any of the weight lists do not sum to 1.
416 """
418 def __init__(
419 self,
420 A: npt.NDArray,
421 lb: npt.NDArray,
422 ub: npt.NDArray,
423 weights: List[List[float]] | List[npt.NDArray],
424 algorithmic_relaxation: npt.NDArray | float = 1.0,
425 relaxation: float = 1.0,
426 proximity_flag: bool = True,
427 ):
429 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
431 xp = cp if self._use_gpu else np
433 # check that weights is a list of lists that add up to 1 each
434 for el in weights:
435 if xp.abs((xp.sum(el) - 1)) > 1e-10:
436 raise ValueError("Weights do not add up to 1!")
438 self.weights = []
439 self.block_idxs = [
440 xp.where(xp.array(el) > 0)[0] for el in weights
441 ] # get idxs that meet requirements
443 # assemble a list of general weights
444 self.total_weights = xp.zeros_like(weights[0])
445 for el in weights:
446 el = xp.asarray(el)
447 self.weights.append(el[xp.array(el) > 0]) # remove non zero weights
448 self.total_weights += el / len(weights)
450 def _project(self, x: npt.NDArray) -> np.ndarray:
451 # simultaneous projection
452 for el, block_idx in zip(self.weights, self.block_idxs): # get mask and associated weights
453 p = self.indexed_map(x, block_idx)
454 (res_l, res_u) = self.bounds.indexed_residual(p, block_idx)
455 d_idx = res_u < 0
456 c_idx = res_l < 0
457 full_d_idx = block_idx[d_idx]
458 full_c_idx = block_idx[c_idx]
460 x += self.algorithmic_relaxation * (
461 self.inverse_row_norm[full_d_idx]
462 * el[d_idx]
463 * res_u[d_idx]
464 @ self.A[full_d_idx, :]
465 - self.inverse_row_norm[full_c_idx]
466 * el[c_idx]
467 * res_l[c_idx]
468 @ self.A[full_c_idx, :]
469 )
471 return x
473 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
474 p = self.map(x)
475 # residuals are positive if constraints are met
476 (res_l, res_u) = self.bounds.residual(p)
477 res_u[res_u > 0] = 0
478 res_l[res_l > 0] = 0
479 res = -res_u - res_l
480 measures = []
481 for measure in proximity_measures:
482 if isinstance(measure, tuple):
483 if measure[0] == "p_norm":
484 measures.append(self.total_weights @ (res ** measure[1]))
485 else:
486 raise ValueError("Invalid proximity measure")
487 elif isinstance(measure, str) and measure == "max_norm":
488 measures.append(res.max())
489 else:
490 raise ValueError("Invalid proximity measure")
491 return measures
494class StringAveragedAMSHyperslab(HyperslabAlgorithm):
495 """
496 StringAveragedAMSHyperslab is a string averaged implementation of the
497 AMS algorithm.
499 Parameters
500 ----------
501 A : npt.NDArray or sparse.sparray
502 Matrix for linear systems
503 lb : npt.NDArray
504 The lower bounds for the hyperslab.
505 ub : npt.NDArray
506 The upper bounds for the hyperslab.
507 strings : List[List[int]]
508 A list of lists, where each inner list represents a string of indices.
509 algorithmic_relaxation : npt.NDArray or float, optional
510 The relaxation parameter used by the algorithm, by default 1.0.
511 relaxation : float, optional
512 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
513 weights : None or List[float], optional
514 The weights for each string, by default None. If None, equal weights are assigned.
515 proximity_flag : bool, optional
516 A flag indicating whether to use proximity, by default True.
517 """
519 def __init__(
520 self,
521 A: npt.NDArray,
522 lb: npt.NDArray,
523 ub: npt.NDArray,
524 strings: List[List[int]],
525 algorithmic_relaxation: npt.NDArray | float = 1.0,
526 relaxation: float = 1.0,
527 weights: None | List[float] = None,
528 proximity_flag: bool = True,
529 ):
531 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
532 xp = cp if self._use_gpu else np
533 self.strings = strings
534 if weights is None:
535 self.weights = xp.ones(len(strings)) / len(strings)
536 else:
537 if len(weights) != len(self.strings):
538 raise ValueError("The number of weights must be equal to the number of strings.")
539 self.weights = weights
541 def _project(self, x):
542 # string averaged projection
543 x_c = x.copy() # create a general copy of x
544 x -= x # reset x is this viable?
545 for string, weight in zip(self.strings, self.weights):
546 x_s = x_c.copy() # generate a copy for individual strings
547 for i in string:
548 p_i = self.single_map(x_s, i)
549 (res_li, res_ui) = self.bounds.single_residual(p_i, i)
550 if res_ui < 0:
551 self.A.update_step(
552 x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_ui, i
553 )
554 elif res_li < 0:
555 self.A.update_step(
556 x_s,
557 -1 * self.algorithmic_relaxation * self.inverse_row_norm[i] * res_li,
558 i,
559 )
561 x += weight * x_s
562 return x