Coverage for suppy\feasibility\_bands\_art3_algorithms.py: 57%
119 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1from abc import ABC
2from typing import List
3import numpy as np
4import numpy.typing as npt
6try:
7 import cupy as cp
9 NO_GPU = False
11except ImportError:
12 NO_GPU = True
13 cp = np
15from suppy.utils import ensure_float_array
16from suppy.feasibility._linear_algorithms import HyperslabFeasibility
19class ART3plusAlgorithm(HyperslabFeasibility, ABC):
20 """
21 ART3plusAlgorithm class for implementing the ART3+ algorithm.
23 Parameters
24 ----------
25 A : npt.NDArray or sparse.sparray
26 Matrix for linear systems
27 lb : npt.NDArray
28 The lower bounds for the hyperslab.
29 ub : npt.NDArray
30 The upper bounds for the hyperslab.
31 algorithmic_relaxation : npt.NDArray or float, optional
32 The relaxation parameter used by the algorithm, by default 1.0.
33 relaxation : float, optional
34 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
35 proximity_flag : bool, optional
36 Flag to indicate whether to use proximity in the algorithm, by default True.
37 """
39 def __init__(
40 self,
41 A: npt.NDArray,
42 lb: npt.NDArray,
43 ub: npt.NDArray,
44 algorithmic_relaxation: npt.NDArray | float = 1,
45 relaxation: float = 1,
46 proximity_flag=True,
47 ):
48 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag)
51class SequentialART3plus(ART3plusAlgorithm):
52 """
53 SequentialART3plus is an implementation of the ART3plus algorithm for
54 solving feasibility problems.
56 Parameters
57 ----------
58 A : npt.NDArray or sparse.sparray
59 Matrix for linear systems
60 lb : npt.NDArray
61 The lower bounds for the hyperslab.
62 ub : npt.NDArray
63 The upper bounds for the hyperslab.
64 cs : None or List[int], optional
65 The control sequence for the algorithm. If None, it will be initialized to the range of the number of rows in A.
66 proximity_flag : bool, optional
67 A flag indicating whether to use proximity in the algorithm. Default is True.
69 Attributes
70 ----------
71 initial_cs : List[int]
72 The initial control sequence.
73 cs : List[int]
74 The current control sequence.
75 _feasible : bool
76 A flag indicating whether the current solution is feasible.
78 Methods
79 -------
80 _project(x)
81 Projects the point x onto the feasible region defined by the constraints.
82 solve(x, max_iter)
83 Solves the feasibility problem using the ART3plus algorithm.
84 """
86 def __init__(
87 self,
88 A: npt.NDArray,
89 lb: npt.NDArray,
90 ub: npt.NDArray,
91 cs: None | List[int] = None,
92 proximity_flag=True,
93 ):
95 super().__init__(A, lb, ub, 1, 1, proximity_flag)
96 xp = cp if self.A.gpu else np
97 if cs is None:
98 self.initial_cs = xp.arange(self.A.shape[0])
99 else:
100 self.initial_cs = cs
102 self.cs = self.initial_cs.copy()
104 def _project(self, x: npt.NDArray) -> np.ndarray:
105 to_remove = []
106 for i in self.cs:
107 # TODO: add a boolean variable that skips this if the projection did not move the point?
108 p_i = self.single_map(x, i)
109 # should be precomputed
110 if (
111 3 / 2 * self.bounds.l[i] - 1 / 2 * self.bounds.u[i] <= p_i < self.bounds.l[i]
112 ): # lowe bound reflection
113 self.A.update_step(
114 x, 2 * self.inverse_row_norm[i] * (self.bounds.l[i] - p_i), i
115 ) # reflection
117 elif (
118 self.bounds.u[i] < p_i <= 3 / 2 * self.bounds.u[i] - 1 / 2 * self.bounds.l[i]
119 ): # upper bound reflection
120 self.A.update_step(
121 x, 2 * self.inverse_row_norm[i] * (self.bounds.u[i] - p_i), i
122 ) # reflection
124 elif self.bounds.u[i] - self.bounds.l[i] < abs(
125 p_i - (self.bounds.l[i] + self.bounds.u[i]) / 2
126 ):
127 self.A.update_step(
128 x,
129 self.inverse_row_norm[i] * ((self.bounds.l[i] + self.bounds.u[i]) / 2 - p_i),
130 i,
131 ) # projection onto center of hyperslab
133 else: # constraint is already met
134 to_remove.append(i)
136 # after loop remove constraints that are already met
137 self.cs = [i for i in self.cs if i not in to_remove] # is this fast?
138 return x
140 @ensure_float_array
141 def solve(
142 self,
143 x: npt.NDArray,
144 max_iter: int = 500,
145 prox_tol: float = 1e-6,
146 del_prox_tol: float = 1e-8,
147 del_prox_n: int = 5,
148 storage: bool = False,
149 storage_iters: List[int] | int | None = None,
150 proximity_measures: List | None = None,
151 ) -> np.ndarray:
152 """
153 Solves the optimization problem using an iterative approach.
155 Parameters
156 ----------
157 x : npt.NDArray
158 Starting point for the algorithm.
159 max_iter : int, optional
160 Maximum number of iterations to perform.
161 prox_tol : float, optional
162 The tolerance for the proximity on the constraints, by default 1e-6.
163 del_prox_tol : float, optional
164 The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8.
165 del_prox_n : int, optional
166 The number of iterations that del_prox_tol needs to be met in a row, by default 5.
167 proximity_measures : List, optional
168 The proximity measures to calculate, by default a l2 norm measure is used. Right now only the first in the list is used to check the feasibility.
169 storage : bool, optional
170 Flag indicating whether to store intermediate solutions, by default False.
171 storage_iters : List[int] or int, optional
172 Controls which iterations are stored (when storage=True). If None, all iterations are stored.
173 If a list of ints, only those iteration indices are stored (0 = initial point).
174 If an int, storage occurs every that many iterations.
176 Returns
177 -------
178 npt.NDArray
179 The solution after the iterative process.
180 """
181 self.cs = self.initial_cs.copy()
182 xp = cp if isinstance(x, cp.ndarray) else np
184 if proximity_measures is None:
185 proximity_measures = [("p_norm", 2)]
186 else:
187 # TODO: Check if the proximity measures are valid
188 _ = None
190 self.proximities = [self.proximity(x, proximity_measures)]
191 i = 0
193 def _should_store(idx):
194 if storage_iters is None:
195 return True
196 if isinstance(storage_iters, int):
197 return idx % storage_iters == 0
198 return idx in storage_iters
200 if storage is True:
201 self.all_x = []
202 if _should_store(0):
203 if isinstance(x, np.ndarray):
204 self.all_x.append(np.array(x.copy()))
205 else:
206 self.all_x.append((x.get()))
208 stop = False # criterion for stopping the algorithm
209 self._n_tol = 0
211 while i < max_iter and not stop:
213 if len(self.cs) == 0:
214 self.cs = self.initial_cs.copy()
216 x = self.project(x)
217 if storage is True and _should_store(i + 1):
218 if isinstance(x, np.ndarray): # convert to np array if cp
219 self.all_x.append(np.array(x.copy()))
220 else:
221 self.all_x.append((x.get()))
222 self.proximities.append(self.proximity(x, proximity_measures))
224 # TODO: If proximity changes x some potential issues!
225 stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n)
227 i += 1
229 if self.all_x is not None:
230 self.all_x = np.array(self.all_x)
232 self.proximities = xp.array(self.proximities)
234 return x
237class SimultaneousART3plus(ART3plusAlgorithm):
238 """
239 SimultaneousART3plus is an implementation of the ART3plus algorithm for
240 solving feasibility problems.
242 Parameters
243 ----------
244 A : npt.NDArray or sparse.sparray
245 Matrix for linear systems
246 lb : npt.NDArray
247 The lower bounds for the hyperslab.
248 ub : npt.NDArray
249 The upper bounds for the hyperslab.
250 weights : None | List[float] | npt.NDArray, optional
251 The weights for the constraints. If None, default weights are used. Default is None.
252 proximity_flag : bool, optional
253 Flag to indicate whether to use proximity measure. Default is True.
255 Attributes
256 ----------
257 weights : npt.NDArray
258 The weights for the constraints.
259 """
261 def __init__(
262 self,
263 A: npt.NDArray,
264 lb: npt.NDArray,
265 ub: npt.NDArray,
266 weights: None | List[float] | npt.NDArray = None,
267 proximity_flag=True,
268 ):
270 super().__init__(A, lb, ub, 1, 1, proximity_flag)
271 xp = cp if self.A.gpu else np
272 if weights is None:
273 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
274 elif abs((weights.sum() - 1)) > 1e-10:
275 print("Weights do not add up to 1! Choosing default weight vector...")
276 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
277 else:
278 self.weights = weights
280 self._not_met = xp.arange(self.A.shape[0])
282 self._not_met_init = self._not_met.copy()
284 def _project(self, x: npt.NDArray) -> np.ndarray:
285 """
286 Perform one step of the ART3plus algorithm.
288 Args:
289 x (npt.NDArray): The point to be projected.
291 Returns:
292 npt.NDArray: The projected point.
293 """
294 p = self.map(x)
295 p = p[self._not_met]
296 l_redux = self.bounds.l[self._not_met]
297 u_redux = self.bounds.u[self._not_met]
299 # following calculations are performed on subarrays
300 # assign different subsets
301 idx_1 = p < l_redux
302 idx_2 = p > u_redux
303 idx_3 = p < l_redux - (u_redux - l_redux) / 2
304 idx_4 = p > u_redux + (u_redux - l_redux) / 2
306 # sets on subarrays
307 set_1 = idx_1 & (not idx_3) # idxs for lower bound reflection
308 set_2 = idx_2 & (not idx_4) # idxs for upper bound reflection
309 set_3 = idx_3 | idx_4 # idxs for projections
310 # there should be no overlap between the different regions here!
311 x += (
312 self.weights[self._not_met][set_1]
313 * self.inverse_row_norm[self._not_met][set_1]
314 * (2 * (l_redux - p))[set_1]
315 @ self.A[self._not_met][set_1, :]
316 )
317 x += (
318 self.weights[self._not_met][set_2]
319 * self.inverse_row_norm[self._not_met][set_2]
320 * (2 * (u_redux - p))[set_2]
321 @ self.A[self._not_met][set_2, :]
322 )
323 x += (
324 self.weights[self._not_met][set_3]
325 * self.inverse_row_norm[self._not_met][set_3]
326 * ((l_redux + u_redux) / 2 - p)[set_3]
327 @ self.A[self._not_met][set_3, :]
328 )
330 # remove constraints that were already met before
331 self._not_met = self._not_met[(idx_1 | idx_2)]
333 return x
335 def _proximity(self, x: npt.NDArray, proximity_measures: List[str]) -> float:
336 p = self.map(x)
337 # residuals are positive if constraints are met
338 (res_l, res_u) = self.bounds.residual(p)
339 res_u[res_u > 0] = 0
340 res_l[res_l > 0] = 0
341 res = -res_u - res_l
342 measures = []
343 for measure in proximity_measures:
344 if isinstance(measure, tuple):
345 if measure[0] == "p_norm":
346 measures.append(self.weights @ (res ** measure[1]))
347 else:
348 raise ValueError("Invalid proximity measure")
349 elif isinstance(measure, str) and measure == "max_norm":
350 measures.append(res.max())
351 else:
352 raise ValueError("Invalid proximity measure)")
353 return measures