Coverage for suppy\feasibility\_halfspaces\_ams_algorithms.py: 92%
137 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1import warnings
2from abc import ABC
3from typing import List
4import numpy as np
5import numpy.typing as npt
7try:
8 import cupy as cp
10 NO_GPU = False
12except ImportError:
13 NO_GPU = True
14 cp = np
16from suppy.feasibility._linear_algorithms import HalfspaceFeasibility
19class HalfspaceAlgorithm(HalfspaceFeasibility, ABC):
20 """
21 The HalfspaceAlgorithm class is used to find a feasible solution to a
22 set of linear inequalities.
24 Parameters
25 ----------
26 A : npt.NDArray or sparse.sparray
27 Matrix for linear systems
28 b : npt.NDArray
29 Bound for linear systems
30 algorithmic_relaxation : npt.NDArray or float, optional
31 The relaxation parameter used by the algorithm, by default 1.0.
32 relaxation : float, optional
33 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
34 proximity_flag : bool, optional
35 A flag indicating whether to use proximity in the algorithm, by default True.
36 """
38 def __init__(
39 self,
40 A: npt.NDArray,
41 b: npt.NDArray,
42 algorithmic_relaxation: npt.NDArray | float = 1.0,
43 relaxation: float = 1.0,
44 proximity_flag: bool = True,
45 ):
46 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
49class SequentialAMSHalfspace(HalfspaceAlgorithm):
50 """
51 SequentialAMS class for sequentially applying the AMS algorithm.
53 Parameters
54 ----------
55 A : npt.NDArray or sparse.sparray
56 Matrix for linear systems
57 b : npt.NDArray
58 Bound for linear systems
59 algorithmic_relaxation : npt.NDArray or float, optional
60 The relaxation parameter used by the algorithm, by default 1.0.
61 relaxation : float, optional
62 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
63 cs : None or List[int], optional
64 The list of indices for the constraints, by default None.
65 proximity_flag : bool, optional
66 Flag to indicate if proximity should be considered, by default True.
67 """
69 def __init__(
70 self,
71 A: npt.NDArray,
72 b: npt.NDArray,
73 algorithmic_relaxation: npt.NDArray | float = 1.0,
74 relaxation: float = 1.0,
75 cs: None | List[int] = None,
76 proximity_flag: bool = True,
77 ):
78 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
79 xp = cp if self._use_gpu else np
80 if cs is None:
81 self.cs = xp.arange(self.A.shape[0])
82 else:
83 self.cs = cs
85 def _project(self, x: npt.NDArray) -> np.ndarray:
86 """
87 Projects the input array `x` onto the feasible region defined by the
88 constraints.
90 Parameters
91 ----------
92 x : npt.NDArray
93 The input array to be projected.
95 Returns
96 -------
97 npt.NDArray
98 The projected array.
99 """
100 for i in self.cs:
101 p_i = self.single_map(x, i)
102 res = self.b[i] - p_i
103 if res < 0:
104 self.A.update_step(
105 x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res, i
106 )
107 return x
110class SequentialWeightedAMSHalfspace(SequentialAMSHalfspace):
111 """
112 Parameters
113 ----------
114 A : npt.NDArray or sparse.sparray
115 Matrix for linear systems
116 b : npt.NDArray
117 Bound for linear systems
118 weights : None, list of float, or npt.NDArray, optional
119 The weights assigned to each constraint. If None, default weights are
120 used.
121 algorithmic_relaxation : npt.NDArray or float, optional
122 The relaxation parameter used by the algorithm, by default 1.0.
123 relaxation : float, optional
124 Outer relaxation parameter, applied to the entire solution of the
125 iterate by default 1.0.
126 weight_decay : float, optional
127 Parameter that determines the rate at which the weights are reduced
128 after each phase (weights * weight_decay). Default is 1.0.
129 cs : None or list of int, optional
130 The indices of the constraints to be considered. Default is None.
131 proximity_flag : bool, optional
132 Flag to indicate if proximity should be considered. Default is True.
134 Attributes
135 ----------
136 weights : npt.NDArray
137 The weights assigned to each constraint.
138 weight_decay : float
139 Decay rate for the weights.
140 temp_weight_decay : float
141 Initial value for weight decay.
142 """
144 def __init__(
145 self,
146 A: npt.NDArray,
147 b: npt.NDArray,
148 weights: None | List[float] | npt.NDArray = None,
149 algorithmic_relaxation: npt.NDArray | float = 1.0,
150 relaxation: float = 1.0,
151 weight_decay: float = 1.0,
152 cs: None | List[int] = None,
153 proximity_flag: bool = True,
154 ):
155 super().__init__(A, b, algorithmic_relaxation, relaxation, cs, proximity_flag)
156 xp = cp if self._use_gpu else np
157 self.weight_decay = weight_decay
158 self.temp_weight_decay = 1.0
160 if weights is None:
161 self.weights = xp.ones(self.A.shape[0])
162 else:
163 self.weights = weights
165 def _project(self, x: npt.NDArray) -> np.ndarray:
166 """
167 Projects the input array `x` onto a feasible region defined by the
168 constraints.
170 Parameters
171 ----------
172 x : npt.NDArray
173 The input array to be projected.
175 Returns
176 -------
177 npt.NDArray
178 The projected array.
180 Notes
181 -----
182 This method iteratively adjusts the input array `x` based on the constraints
183 defined in `self.cs`. For each constraint, it computes the projection and
184 checks if the constraints are violated. If a constraint is violated, it updates
185 the array `x` using a weighted relaxation factor. The weight decay is applied
186 to the temporary weight decay after each iteration.
187 """
188 weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay
190 for i in self.cs:
191 p_i = self.single_map(x, i)
192 res = self.b[i] - p_i
193 if res < 0:
194 self.A.update_step(
195 x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res, i
196 )
198 self.temp_weight_decay *= self.weight_decay
199 return x
202class SimultaneousAMSHalfspace(HalfspaceAlgorithm):
203 """
204 SimultaneousAMS is an implementation of the AMS (Alternating
205 Minimization
206 Scheme) algorithm that performs simultaneous projections and proximity
207 calculations.
209 Parameters
210 ----------
211 A : npt.NDArray or sparse.sparray
212 Matrix for linear systems
213 b : npt.NDArray
214 Bound for linear systems
215 algorithmic_relaxation : npt.NDArray or float, optional
216 The relaxation parameter used by the algorithm, by default 1.0.
217 relaxation : float, optional
218 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
219 weights : None or List[float], optional
220 The weights for the constraints, by default None.
221 proximity_flag : bool, optional
222 Flag to indicate if proximity calculations should be performed, by default True.
223 """
225 def __init__(
226 self,
227 A: npt.NDArray,
228 b: npt.NDArray,
229 algorithmic_relaxation: npt.NDArray | float = 1.0,
230 relaxation: float = 1.0,
231 weights: None | List[float] = None,
232 proximity_flag: bool = True,
233 ):
234 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
236 xp = cp if self._use_gpu else np
238 if weights is None:
239 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
240 elif xp.abs((weights.sum() - 1)) > 1e-10:
241 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2)
242 self.weights = weights / weights.sum()
243 else:
244 self.weights = weights
246 def _project(self, x: npt.NDArray) -> np.ndarray:
247 # simultaneous projection
248 p = self.map(x)
249 res = self.b - p
250 res_idx = res < 0
251 x += self.algorithmic_relaxation * (
252 self.weights[res_idx]
253 * self.inverse_row_norm[res_idx]
254 * res[res_idx]
255 @ self.A[res_idx, :]
256 )
257 return x
259 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
260 p = self.map(x)
261 res = self.b - p
262 res[res > 0] = 0
263 res = -res
265 measures = []
266 for measure in proximity_measures:
267 if isinstance(measure, tuple):
268 if measure[0] == "p_norm":
269 measures.append(self.weights @ (res ** measure[1]))
270 else:
271 raise ValueError("Invalid proximity measure")
272 elif isinstance(measure, str) and measure == "max_norm":
273 measures.append(res.max())
274 else:
275 raise ValueError("Invalid proximity measure")
276 return measures
279class BlockIterativeAMSHalfspace(HalfspaceAlgorithm):
280 """
281 Block Iterative AMS Algorithm.
282 This class implements a block iterative version of the AMS (Alternating
283 Minimization Scheme) algorithm. It is designed to handle constraints and
284 weights in a block-wise manner.
286 Parameters
287 ----------
288 A : npt.NDArray or sparse.sparray
289 Matrix for linear systems
290 b : npt.NDArray
291 Bound for linear systems
292 weights : List[List[float]] or List[npt.NDArray]
293 A list of lists or arrays representing the weights for each block. Each list/array should sum to 1.
294 algorithmic_relaxation : npt.NDArray or float, optional
295 The relaxation parameter used by the algorithm, by default 1.0.
296 relaxation : float, optional
297 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
298 proximity_flag : bool, optional
299 A flag indicating whether to use proximity measures, by default True.
301 Raises
302 ------
303 ValueError
304 If any of the weight lists do not sum to 1.
305 """
307 def __init__(
308 self,
309 A: npt.NDArray,
310 b: npt.NDArray,
311 weights: List[List[float]] | List[npt.NDArray],
312 algorithmic_relaxation: npt.NDArray | float = 1.0,
313 relaxation: float = 1.0,
314 proximity_flag: bool = True,
315 ):
316 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
318 xp = cp if self._use_gpu else np
320 # check that weights is a list of lists that add up to 1 each
321 for el in weights:
322 if xp.abs((xp.sum(el) - 1)) > 1e-10:
323 raise ValueError("Weights do not add up to 1!")
325 self.weights = []
326 self.block_idxs = [
327 xp.where(xp.array(el) > 0)[0] for el in weights
328 ] # get idxs that meet requirements
330 # assemble a list of general weights
331 self.total_weights = xp.zeros_like(weights[0])
332 for el in weights:
333 el = xp.asarray(el)
334 self.weights.append(el[xp.array(el) > 0]) # remove non zero weights
335 self.total_weights += el / len(weights)
337 def _project(self, x: npt.NDArray) -> np.ndarray:
338 # simultaneous projection
339 for el, block_idx in zip(self.weights, self.block_idxs):
340 p = self.indexed_map(x, block_idx)
341 res = self.b[block_idx] - p
343 res_idx = res < 0
344 full_idx = block_idx[res_idx]
346 x += self.algorithmic_relaxation * (
347 el[res_idx] * self.inverse_row_norm[full_idx] * res[res_idx] @ self.A[full_idx, :]
348 )
350 return x
352 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
353 p = self.map(x)
354 res = self.b - p
355 res[res > 0] = 0
356 res = -res
358 measures = []
359 for measure in proximity_measures:
360 if isinstance(measure, tuple):
361 if measure[0] == "p_norm":
362 measures.append(self.total_weights @ (res ** measure[1]))
363 else:
364 raise ValueError("Invalid proximity measure")
365 elif isinstance(measure, str) and measure == "max_norm":
366 measures.append(res.max())
367 else:
368 raise ValueError("Invalid proximity measure")
369 return measures
372class StringAveragedAMSHalfspace(HalfspaceAlgorithm):
373 """
374 StringAveragedAMS is an implementation of the HalfspaceAlgorithm that
375 performs string averaged projections.
377 Parameters
378 ----------
379 A : npt.NDArray or sparse.sparray
380 Matrix for linear systems
381 b : npt.NDArray
382 Bound for linear systems
383 strings : List[List[int]]
384 A list of lists, where each inner list represents a string of indices.
385 algorithmic_relaxation : npt.NDArray or float, optional
386 The relaxation parameter used by the algorithm, by default 1.0.
387 relaxation : float, optional
388 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
389 weights : None or List[float], optional
390 The weights for each string, by default None. If None, equal weights are assigned.
391 proximity_flag : bool, optional
392 A flag indicating whether to use proximity, by default True.
393 """
395 def __init__(
396 self,
397 A: npt.NDArray,
398 b: npt.NDArray,
399 strings: List[List[int]],
400 algorithmic_relaxation: npt.NDArray | float = 1.0,
401 relaxation: float = 1.0,
402 weights: None | List[float] = None,
403 proximity_flag: bool = True,
404 ):
405 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
406 xp = cp if self._use_gpu else np
407 self.strings = strings
408 if weights is None:
409 self.weights = xp.ones(len(strings)) / len(strings)
410 else:
411 if len(weights) != len(self.strings):
412 raise ValueError("The number of weights must be equal to the number of strings.")
413 self.weights = weights
415 def _project(self, x):
416 # string averaged projection
417 x_c = x.copy() # create a general copy of x
418 x -= x # reset x is this viable?
419 for string, weight in zip(self.strings, self.weights):
420 x_s = x_c.copy()
421 for i in string:
422 p_i = self.single_map(x_s, i)
423 res_i = self.b[i] - p_i
424 if res_i < 0:
425 self.A.update_step(
426 x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_i, i
427 )
428 x += weight * x_s
429 return x