Coverage for suppy\feasibility\_hyperplanes\_kaczmarz_algorithms.py: 91%
127 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1import warnings
2from abc import ABC
3from typing import List
4import numpy as np
5import numpy.typing as npt
7try:
8 import cupy as cp
10 NO_GPU = False
12except ImportError:
13 NO_GPU = True
14 cp = np
16from suppy.feasibility._linear_algorithms import HyperplaneFeasibility
19class HyperplaneAlgorithm(HyperplaneFeasibility, ABC):
20 """
21 The HyperplaneAlgorithm class is used to find a feasible solution to
22 a set of linear equalities.
24 Parameters
25 ----------
26 A : npt.NDArray or sparse.sparray
27 Matrix for linear systems
28 b : npt.NDArray
29 Bound for linear systems
30 algorithmic_relaxation : npt.NDArray or float, optional
31 The relaxation parameter used by the algorithm, by default 1.0.
32 relaxation : float, optional
33 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
34 proximity_flag : bool, optional
35 A flag indicating whether to use proximity in the algorithm, by default True.
36 """
38 def __init__(
39 self,
40 A: npt.NDArray,
41 b: npt.NDArray,
42 algorithmic_relaxation: npt.NDArray | float = 1.0,
43 relaxation: float = 1.0,
44 proximity_flag: bool = True,
45 ):
46 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
49class KaczmarzMethod(HyperplaneAlgorithm):
50 """
51 Kaczmarz method for sequentially solving linear equality constraints.
53 Parameters
54 ----------
55 A : npt.NDArray or sparse.sparray
56 Matrix for linear systems
57 b : npt.NDArray
58 Bound for linear systems
59 algorithmic_relaxation : npt.NDArray or float, optional
60 The relaxation parameter used by the algorithm, by default 1.0.
61 relaxation : float, optional
62 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
63 cs : None or List[int], optional
64 The list of indices for the constraints, by default None.
65 proximity_flag : bool, optional
66 Flag to indicate if proximity should be considered, by default True.
67 """
69 def __init__(
70 self,
71 A: npt.NDArray,
72 b: npt.NDArray,
73 algorithmic_relaxation: npt.NDArray | float = 1.0,
74 relaxation: float = 1.0,
75 cs: None | List[int] = None,
76 proximity_flag: bool = True,
77 ):
78 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
79 xp = cp if self._use_gpu else np
80 if cs is None:
81 self.cs = xp.arange(self.A.shape[0])
82 else:
83 self.cs = cs
85 def _project(self, x: npt.NDArray) -> np.ndarray:
86 """
87 Projects the input array `x` onto the feasible region defined by the
88 constraints.
90 Parameters
91 ----------
92 x : npt.NDArray
93 The input array to be projected.
95 Returns
96 -------
97 npt.NDArray
98 The projected array.
99 """
100 for i in self.cs:
101 p_i = self.single_map(x, i)
102 res = self.b[i] - p_i
103 self.A.update_step(x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res, i)
104 return x
107class SequentialWeightedKaczmarz(KaczmarzMethod):
108 """
109 Parameters
110 ----------
111 A : npt.NDArray or sparse.sparray
112 Matrix for linear systems
113 b : npt.NDArray
114 Bound for linear systems
115 weights : None, list of float, or npt.NDArray, optional
116 The weights assigned to each constraint. If None, default weights are
117 used.
118 algorithmic_relaxation : npt.NDArray or float, optional
119 The relaxation parameter used by the algorithm, by default 1.0.
120 relaxation : float, optional
121 Outer relaxation parameter, applied to the entire solution of the
122 iterate by default 1.0.
123 weight_decay : float, optional
124 Parameter that determines the rate at which the weights are reduced
125 after each phase (weights * weight_decay). Default is 1.0.
126 cs : None or list of int, optional
127 The indices of the constraints to be considered. Default is None.
128 proximity_flag : bool, optional
129 Flag to indicate if proximity should be considered. Default is True.
131 Attributes
132 ----------
133 weights : npt.NDArray
134 The weights assigned to each constraint.
135 weight_decay : float
136 Decay rate for the weights.
137 temp_weight_decay : float
138 Initial value for weight decay.
139 """
141 def __init__(
142 self,
143 A: npt.NDArray,
144 b: npt.NDArray,
145 weights: None | List[float] | npt.NDArray = None,
146 algorithmic_relaxation: npt.NDArray | float = 1.0,
147 relaxation: float = 1.0,
148 weight_decay: float = 1.0,
149 cs: None | List[int] = None,
150 proximity_flag: bool = True,
151 ):
152 super().__init__(A, b, algorithmic_relaxation, relaxation, cs, proximity_flag)
153 xp = cp if self._use_gpu else np
154 self.weight_decay = weight_decay
155 self.temp_weight_decay = 1.0
157 if weights is None:
158 self.weights = xp.ones(self.A.shape[0])
159 else:
160 self.weights = weights
162 def _project(self, x: npt.NDArray) -> np.ndarray:
163 """
164 Projects the input array `x` onto a feasible region defined by the
165 constraints.
167 Parameters
168 ----------
169 x : npt.NDArray
170 The input array to be projected.
172 Returns
173 -------
174 npt.NDArray
175 The projected array.
177 Notes
178 -----
179 This method iteratively adjusts the input array `x` based on the constraints
180 defined in `self.cs`. For each constraint, it computes the projection and
181 checks if the constraints are violated. If a constraint is violated, it updates
182 the array `x` using a weighted relaxation factor. The weight decay is applied
183 to the temporary weight decay after each iteration.
184 """
185 weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay
187 for i in self.cs:
188 p_i = self.single_map(x, i)
189 res = self.b[i] - p_i
190 self.A.update_step(
191 x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res, i
192 )
194 self.temp_weight_decay *= self.weight_decay
195 return x
198class SimultaneousKaczmarzMethod(HyperplaneAlgorithm):
199 """
200 SimultaneousKaczmarzMethod is an implementation of the Kaczmarz
201 algorithm
202 that performs simultaneous projections and proximity calculations.
204 Parameters
205 ----------
206 A : npt.NDArray or sparse.sparray
207 Matrix for linear systems
208 b : npt.NDArray
209 Bound for linear systems
210 algorithmic_relaxation : npt.NDArray or float, optional
211 The relaxation parameter used by the algorithm, by default 1.0.
212 relaxation : float, optional
213 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
214 weights : None or List[float], optional
215 The weights for the constraints, by default None.
216 proximity_flag : bool, optional
217 Flag to indicate if proximity calculations should be performed, by default True.
218 """
220 def __init__(
221 self,
222 A: npt.NDArray,
223 b: npt.NDArray,
224 algorithmic_relaxation: npt.NDArray | float = 1.0,
225 relaxation: float = 1.0,
226 weights: None | List[float] = None,
227 proximity_flag: bool = True,
228 ):
229 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
231 xp = cp if self._use_gpu else np
233 if weights is None:
234 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0]
235 elif xp.abs((weights.sum() - 1)) > 1e-10:
236 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2)
237 self.weights = weights / weights.sum()
238 else:
239 self.weights = weights
241 def _project(self, x: npt.NDArray) -> np.ndarray:
242 # simultaneous projection
243 p = self.map(x)
244 res = self.b - p
245 x += self.algorithmic_relaxation * (self.weights * self.inverse_row_norm * res @ self.A)
246 return x
248 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
249 p = self.map(x)
250 res = abs(self.b - p)
251 measures = []
252 for measure in proximity_measures:
253 if isinstance(measure, tuple):
254 if measure[0] == "p_norm":
255 measures.append(self.weights @ (res ** measure[1]))
256 else:
257 raise ValueError("Invalid proximity measure")
258 elif isinstance(measure, str) and measure == "max_norm":
259 measures.append(res.max())
260 else:
261 raise ValueError("Invalid proximity measure")
262 return measures
265class BlockIterativeKaczmarz(HyperplaneAlgorithm):
266 """
267 Block iterative Kaczmarz algorithm for solving linear equality
268 constraints
269 in a block-wise manner.
271 Parameters
272 ----------
273 A : npt.NDArray or sparse.sparray
274 Matrix for linear systems
275 b : npt.NDArray
276 Bound for linear systems
277 weights : List[List[float]] or List[npt.NDArray]
278 A list of lists or arrays representing the weights for each block. Each list/array should sum to 1.
279 algorithmic_relaxation : npt.NDArray or float, optional
280 The relaxation parameter used by the algorithm, by default 1.0.
281 relaxation : float, optional
282 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
283 proximity_flag : bool, optional
284 A flag indicating whether to use proximity measures, by default True.
286 Raises
287 ------
288 ValueError
289 If any of the weight lists do not sum to 1.
290 """
292 def __init__(
293 self,
294 A: npt.NDArray,
295 b: npt.NDArray,
296 weights: List[List[float]] | List[npt.NDArray],
297 algorithmic_relaxation: npt.NDArray | float = 1.0,
298 relaxation: float = 1.0,
299 proximity_flag: bool = True,
300 ):
301 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
303 xp = cp if self._use_gpu else np
305 # check that weights is a list of lists that add up to 1 each
306 for el in weights:
307 if xp.abs((xp.sum(el) - 1)) > 1e-10:
308 raise ValueError("Weights do not add up to 1!")
310 self.weights = []
311 self.block_idxs = [
312 xp.where(xp.array(el) > 0)[0] for el in weights
313 ] # get idxs that meet requirements
315 # assemble a list of general weights
316 self.total_weights = xp.zeros_like(weights[0])
317 for el in weights:
318 el = xp.asarray(el)
319 self.weights.append(el[xp.array(el) > 0]) # remove non zero weights
320 self.total_weights += el / len(weights)
322 def _project(self, x: npt.NDArray) -> np.ndarray:
323 # simultaneous projection
324 for el, block_idx in zip(self.weights, self.block_idxs):
325 p = self.indexed_map(x, block_idx)
326 res = self.b[block_idx] - p
328 x += self.algorithmic_relaxation * (
329 el * self.inverse_row_norm[block_idx] * res @ self.A[block_idx, :]
330 )
331 return x
333 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
334 p = self.map(x)
335 res = abs(self.b - p)
336 measures = []
337 for measure in proximity_measures:
338 if isinstance(measure, tuple):
339 if measure[0] == "p_norm":
340 measures.append(self.total_weights @ (res ** measure[1]))
341 else:
342 raise ValueError("Invalid proximity measure")
343 elif isinstance(measure, str) and measure == "max_norm":
344 measures.append(res.max())
345 else:
346 raise ValueError("Invalid proximity measure")
347 return measures
350class StringAveragedKaczmarz(HyperplaneAlgorithm):
351 """
352 StringAveragedKaczmarz is an implementation of the HyperplaneAlgorithm
353 that performs string averaged projections.
355 Parameters
356 ----------
357 A : npt.NDArray or sparse.sparray
358 Matrix for linear systems
359 b : npt.NDArray
360 Bound for linear systems
361 strings : List[List[int]]
362 A list of lists, where each inner list represents a string of indices.
363 algorithmic_relaxation : npt.NDArray or float, optional
364 The relaxation parameter used by the algorithm, by default 1.0.
365 relaxation : float, optional
366 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
367 weights : None or List[float], optional
368 The weights for each string, by default None. If None, equal weights are assigned.
369 proximity_flag : bool, optional
370 A flag indicating whether to use proximity, by default True.
371 """
373 def __init__(
374 self,
375 A: npt.NDArray,
376 b: npt.NDArray,
377 strings: List[List[int]],
378 algorithmic_relaxation: npt.NDArray | float = 1.0,
379 relaxation: float = 1.0,
380 weights: None | List[float] = None,
381 proximity_flag: bool = True,
382 ):
383 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag)
384 xp = cp if self._use_gpu else np
385 self.strings = strings
386 if weights is None:
387 self.weights = xp.ones(len(strings)) / len(strings)
388 else:
389 if len(weights) != len(self.strings):
390 raise ValueError("The number of weights must be equal to the number of strings.")
391 self.weights = weights
393 def _project(self, x: npt.NDArray) -> np.ndarray:
394 # string averaged projection
395 x_c = x.copy() # create a general copy of x
396 x -= x # reset x is this viable?
397 for string, weight in zip(self.strings, self.weights):
398 x_s = x_c.copy() # generate a copy for individual strings
399 for i in string:
400 p_i = self.single_map(x_s, i)
401 res_i = self.b[i] - p_i
402 self.A.update_step(
403 x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_i, i
404 )
405 x += weight * x_s
406 return x