Coverage for suppy\feasibility\_bands\_arm_algorithms.py: 68%
123 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1import warnings
2from abc import ABC
3from typing import List
4import numpy as np
5import numpy.typing as npt
6from suppy.feasibility._linear_algorithms import HyperslabFeasibility
8try:
9 import cupy as cp
11 NO_GPU = False
13except ImportError:
14 NO_GPU = True
15 cp = np
18class ARMAlgorithm(HyperslabFeasibility, ABC):
19 """
20 ARMAlgorithm class for handling feasibility problems with additional
21 algorithmic relaxation.
23 Parameters
24 ----------
25 A : npt.NDArray or sparse.sparray
26 Matrix for linear systems
27 lb : npt.NDArray
28 The lower bounds for the hyperslab.
29 ub : npt.NDArray
30 The upper bounds for the hyperslab.
31 algorithmic_relaxation : npt.NDArray or float, optional
32 The relaxation parameter used by the algorithm, by default 1.0.
33 relaxation : float, optional
34 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
35 proximity_flag : bool, optional
36 Flag to indicate if proximity constraints should be considered, by default True.
37 """
39 def __init__(
40 self,
41 A: npt.NDArray,
42 lb: npt.NDArray,
43 ub: npt.NDArray,
44 algorithmic_relaxation: npt.NDArray | float = 1.0,
45 relaxation: float = 1.0,
46 proximity_flag: bool = True,
47 ):
48 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
51class SequentialARM(ARMAlgorithm):
52 """
53 SequentialARM is a class that implements a sequential algorithm for
54 Adaptive Relaxation Method (ARM).
56 Parameters
57 ----------
58 A : npt.NDArray or sparse.sparray
59 Matrix for linear systems
60 lb : npt.NDArray
61 The lower bounds for the hyperslab.
62 ub : npt.NDArray
63 The upper bounds for the hyperslab.
64 algorithmic_relaxation : npt.NDArray or float, optional
65 The relaxation parameter used by the algorithm, by default 1.0.
66 relaxation : float, optional
67 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
68 cs : None or List[int], optional
69 The list of indices for the constraints, by default None.
70 proximity_flag : bool, optional
71 Flag to indicate if proximity should be considered, by default True.
72 """
74 def __init__(
75 self,
76 A: npt.NDArray,
77 lb: npt.NDArray,
78 ub: npt.NDArray,
79 algorithmic_relaxation: npt.NDArray | float = 1.0,
80 relaxation: float = 1.0,
81 cs: None | List[int] = None,
82 proximity_flag: bool = True,
83 ):
84 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
85 xp = cp if self._use_gpu else np
86 if cs is None:
87 self.cs = xp.arange(self.A.shape[0])
88 else:
89 self.cs = cs
91 def _project(self, x: npt.NDArray) -> np.ndarray:
92 xp = cp if self._use_gpu else np
94 for i in self.cs:
95 p_i = self.single_map(x, i)
96 d = p_i - self.bounds.center[i]
97 psi = (self.bounds.u[i] - self.bounds.l[i]) / 2
98 if xp.abs(d) > psi:
99 self.A.update_step(
100 x,
101 -1
102 * self.algorithmic_relaxation
103 / 2
104 * self.inverse_row_norm[i]
105 * ((d**2 - psi**2) / d),
106 i,
107 )
108 return x
111class SimultaneousARM(ARMAlgorithm):
112 """
113 SimultaneousARM is a class that implements an ARM (Adaptive Relaxation
114 Method) algorithm
115 for solving feasibility problems.
117 Parameters
118 ----------
119 A : npt.NDArray or sparse.sparray
120 Matrix for linear systems
121 lb : npt.NDArray
122 The lower bounds for the hyperslab.
123 ub : npt.NDArray
124 The upper bounds for the hyperslab.
125 algorithmic_relaxation : npt.NDArray or float, optional
126 The relaxation parameter used by the algorithm, by default 1.0.
127 relaxation : float, optional
128 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
129 weights : None, List[float], or npt.NDArray, optional
130 The weights for the constraints. If None, weights are set to be uniform. Default is None.
131 proximity_flag : bool, optional
132 Flag to indicate whether to use proximity in the algorithm. Default is True.
134 Methods
135 -------
136 _project(x)
137 Performs the simultaneous projection of the input vector x.
138 _proximity(x)
139 Computes the proximity measure of the input vector x.
140 """
142 def __init__(
143 self,
144 A: npt.NDArray,
145 lb: npt.NDArray,
146 ub: npt.NDArray,
147 algorithmic_relaxation: npt.NDArray | float = 1.0,
148 relaxation: float = 1.0,
149 weights: None | List[float] | npt.NDArray = None,
150 proximity_flag: bool = True,
151 ):
152 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
153 xp = cp if self._use_gpu else np
155 if weights is None:
156 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
157 elif xp.abs((weights.sum() - 1)) > 1e-10:
158 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2)
159 self.weights = weights / weights.sum()
160 else:
161 self.weights = weights
163 def _project(self, x):
164 xp = cp if self._use_gpu else np
165 # simultaneous projection
166 p = self.map(x)
167 d = p - self.bounds.center
168 psi = self.bounds.half_distance
169 d_idx = xp.abs(d) > psi
170 x -= (
171 self.algorithmic_relaxation
172 / 2
173 * (
174 self.weights[d_idx]
175 * self.inverse_row_norm[d_idx]
176 * (d[d_idx] - (psi[d_idx] ** 2) / d[d_idx])
177 )
178 @ self.A[d_idx, :]
179 )
181 return x
183 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
184 p = self.map(x)
185 (res_l, res_u) = self.bounds.residual(p)
186 res_u[res_u > 0] = 0
187 res_l[res_l > 0] = 0
188 res = -res_u - res_l
189 measures = []
190 for measure in proximity_measures:
191 if isinstance(measure, tuple):
192 if measure[0] == "p_norm":
193 measures.append(self.weights @ (res ** measure[1]))
194 else:
195 raise ValueError("Invalid proximity measure")
196 elif isinstance(measure, str) and measure == "max_norm":
197 measures.append(res.max())
198 else:
199 raise ValueError("Invalid proximity measure")
200 return measures
203class BIPARM(ARMAlgorithm):
204 """
205 BIPARM Algorithm for feasibility problems.
207 Parameters
208 ----------
209 A : npt.NDArray or sparse.sparray
210 Matrix for linear systems
211 lb : npt.NDArray
212 The lower bounds for the hyperslab.
213 ub : npt.NDArray
214 The upper bounds for the hyperslab.
215 algorithmic_relaxation : npt.NDArray or float, optional
216 The relaxation parameter used by the algorithm, by default 1.0.
217 relaxation : float, optional
218 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
219 weights : None, List[float], or npt.NDArray, optional
220 The weights for the constraints, by default None.
221 proximity_flag : bool, optional
222 Flag to indicate if proximity should be considered, by default True.
224 Methods
225 -------
226 _project(x)
227 Perform the simultaneous projection of x.
228 _proximity(x)
229 Calculate the proximity measure for x.
230 """
232 def __init__(
233 self,
234 A: npt.NDArray,
235 lb: npt.NDArray,
236 ub: npt.NDArray,
237 algorithmic_relaxation: npt.NDArray | float = 1.0,
238 relaxation: float = 1.0,
239 weights: None | List[float] | npt.NDArray = None,
240 proximity_flag: bool = True,
241 ):
242 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
243 xp = cp if self._use_gpu else np
244 if weights is None:
245 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
246 elif xp.abs((weights.sum() - 1)) > 1e-10:
247 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2)
248 self.weights = weights / weights.sum()
249 else:
250 self.weights = weights
252 def _project(self, x):
253 xp = cp if self._use_gpu else np
254 p = self.map(x)
255 d = p - self.bounds.center
256 psi = self.bounds.half_distance
257 d_idx = xp.abs(d) > psi
258 x -= (
259 self.algorithmic_relaxation
260 / 2
261 * (
262 self.weights[d_idx]
263 * self.inverse_row_norm[d_idx]
264 * (d[d_idx] - (psi[d_idx] ** 2) / d[d_idx])
265 )
266 @ self.A[d_idx, :]
267 )
269 return x
271 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
272 p = self.map(x)
273 (res_l, res_u) = self.bounds.residual(p)
274 res_u[res_u > 0] = 0
275 res_l[res_l > 0] = 0
276 res = -res_u - res_l
277 measures = []
278 for measure in proximity_measures:
279 if isinstance(measure, tuple):
280 if measure[0] == "p_norm":
281 measures.append(self.weights @ (res ** measure[1]))
282 else:
283 raise ValueError("Invalid proximity measure")
284 elif isinstance(measure, str) and measure == "max_norm":
285 measures.append(res.max())
286 else:
287 raise ValueError("Invalid proximity measure")
288 return measures
291class StringAveragedARM(ARMAlgorithm):
292 """
293 String Averaged ARM Algorithm.
294 This class implements the String Averaged ARM (Adaptive Relaxation Method)
295 algorithm,
296 which is used for feasibility problems involving strings of indices.
298 Parameters
299 ----------
300 A : npt.NDArray or sparse.sparray
301 Matrix for linear systems
302 lb : npt.NDArray
303 The lower bounds for the hyperslab.
304 ub : npt.NDArray
305 The upper bounds for the hyperslab.
306 strings : List[List[int]]
307 A list of lists, where each inner list represents a string of indices.
308 algorithmic_relaxation : npt.NDArray or float, optional
309 The relaxation parameter used by the algorithm, by default 1.0.
310 relaxation : float, optional
311 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
312 weights : None or List[float], optional
313 The weights for each string, by default None. If None, equal weights are assigned.
314 proximity_flag : bool, optional
315 A flag indicating whether to use proximity, by default True.
317 Methods
318 -------
319 _project(x)
320 Projects the input vector x using the string averaged projection method.
322 Raises
323 ------
324 ValueError
325 If the number of weights does not match the number of strings.
326 """
328 def __init__(
329 self,
330 A: npt.NDArray,
331 lb: npt.NDArray,
332 ub: npt.NDArray,
333 strings: List[List[int]],
334 algorithmic_relaxation: npt.NDArray | float = 1.0,
335 relaxation: float = 1.0,
336 weights: None | List[float] = None,
337 proximity_flag: bool = True,
338 ):
339 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
340 xp = cp if self._use_gpu else np
341 self.strings = strings
343 if weights is None:
344 self.weights = xp.ones(len(strings)) / len(strings)
345 else:
346 if len(weights) != len(self.strings):
347 raise ValueError("The number of weights must be equal to the number of strings.")
348 self.weights = weights
350 def _project(self, x):
351 xp = cp if self._use_gpu else np
352 # string averaged projection
353 x_c = x.copy() # create a general copy of x
354 x -= x # reset x is this viable?
355 for string, weight in zip(self.strings, self.weights):
356 x_s = x_c.copy() # generate a copy for individual strings
357 for i in string:
358 p_i = self.single_map(x_s, i)
359 d = p_i - self.bounds.center[i]
360 psi = (self.bounds.u[i] - self.bounds.l[i]) / 2
361 if xp.abs(d) > psi:
362 self.A.update_step(
363 x_s,
364 -1
365 * self.algorithmic_relaxation
366 / 2
367 * ((d**2 - psi**2) / d)
368 * self.inverse_row_norm[i],
369 i,
370 )
371 x += weight * x_s
372 return x