Coverage for suppy\feasibility\_bands\_ams_extrapolations.py: 53%
104 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1from abc import ABC
2from typing import List
3import numpy as np
4import numpy.typing as npt
5from scipy import sparse
7try:
8 import cupy as cp
10 NO_GPU = False
12except ImportError:
13 NO_GPU = True
14 cp = np
17from suppy.feasibility._bands._ams_algorithms import SimultaneousAMSHyperslab
20class ExtrapolatedLandweberHyperslab(SimultaneousAMSHyperslab):
21 """
22 Extrapolated Landweber algorithm for solving linear inequalities of the
23 form lb <= Ax <= ub.
25 Parameters
26 ----------
27 A : npt.NDArray or sparse.sparray
28 Matrix for linear systems
29 lb : npt.NDArray
30 The lower bounds for the hyperslab.
31 ub : npt.NDArray
32 The upper bounds for the hyperslab.
33 weights : None or List[float], optional
34 The weights for the constraints, by default None.
35 relaxation : float, optional
36 The outer relaxation parameter, by default 1.0.
37 proximity_flag : bool, optional
38 A flag indicating whether to use proximity measures, by default True.
40 References
41 ----------
42 - [1] https://doi.org/10.1007/s11075-025-02163-0
43 - [2] https://doi.org/10.1007/978-3-642-30901-4
44 """
46 def __init__(
47 self,
48 A: npt.NDArray | sparse.sparray,
49 lb: npt.NDArray,
50 ub: npt.NDArray,
51 relaxation: float = 1.0,
52 weights: None | List[float] | npt.NDArray = None,
53 proximity_flag: bool = True,
54 ):
55 super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag)
56 self.a_i = self.A.row_norm(2, 2)
57 # To avoid division by zero
58 self.weight_norm = self.weights / self.a_i
59 self.weight_norm[self.a_i == 0] = 0
60 # TODO: Division-by-zero guard above produces NaNs once all other criteria are met
62 def _project(self, x: npt.NDArray) -> np.ndarray:
63 xp = cp if self._use_gpu else np
64 p = self.map(x)
65 (res_l, res_u) = self.bounds.residual(p)
66 d_idx = res_u < 0
67 c_idx = res_l < 0
68 if not (xp.any(d_idx)) and not (xp.any(c_idx)):
69 return x
70 t_u = self.weight_norm[d_idx] * res_u[d_idx]
71 t_l = self.weight_norm[c_idx] * res_l[c_idx]
72 t_u_2 = t_u @ self.A[d_idx, :]
73 t_l_2 = t_l @ self.A[c_idx, :]
75 sig = ((res_l[c_idx] @ t_l) + (res_u[d_idx] @ t_u)) / ((t_u_2 - t_l_2) @ (t_u_2 - t_l_2))
76 x += sig * (t_u_2 - t_l_2)
78 if xp.isnan(x).any():
79 raise ValueError("NaN encountered in Extrapolated Landweber Hyperslab projection.")
81 return x
84class BlockIterativeExtrapolatedLandweberHyperslab(ExtrapolatedLandweberHyperslab):
85 """
86 Block-iterative variant of the Extrapolated Landweber algorithm for
87 hyperslabs, solving lb <= Ax <= ub block by block.
89 Parameters
90 ----------
91 A : npt.NDArray or sparse.sparray
92 Matrix for linear systems.
93 lb : npt.NDArray
94 The lower bounds for the hyperslab.
95 ub : npt.NDArray
96 The upper bounds for the hyperslab.
97 length_blocks : List[int]
98 Number of rows in each block. Must sum to the total number of rows in A.
99 relaxation : float, optional
100 The outer relaxation parameter, by default 1.0.
101 weights : None or List[float], optional
102 The weights for the constraints, by default None.
103 proximity_flag : bool, optional
104 A flag indicating whether to use proximity measures, by default True.
105 """
107 def __init__(
108 self,
109 A: npt.NDArray | sparse.sparray,
110 lb: npt.NDArray,
111 ub: npt.NDArray,
112 length_blocks: List[int],
113 relaxation: float = 1.0,
114 weights: None | List[float] | npt.NDArray = None,
115 proximity_flag: bool = True,
116 ):
117 super().__init__(A, lb, ub, relaxation, weights, proximity_flag)
118 self.length_blocks = length_blocks
120 self.As = []
121 self.ls = []
122 self.us = []
123 self.block_bounds = []
124 self.weight_norms = []
125 _lower_bound = 0
126 for i in range(len(self.length_blocks)):
127 self.As.append(A[_lower_bound : _lower_bound + self.length_blocks[i]])
128 self.ls.append(lb[_lower_bound : _lower_bound + self.length_blocks[i]])
129 self.us.append(ub[_lower_bound : _lower_bound + self.length_blocks[i]])
130 self.weight_norms.append(
131 self.weight_norm[_lower_bound : _lower_bound + self.length_blocks[i]]
132 )
133 _lower_bound += self.length_blocks[i]
135 def _project(self, x: npt.NDArray) -> np.ndarray:
136 xp = cp if self._use_gpu else np
137 for i, A_block in enumerate(self.As):
138 p = A_block @ x
139 res_l, res_u = p - self.ls[i], self.us[i] - p
140 d_idx = res_u < 0
141 c_idx = res_l < 0
142 if not (xp.any(d_idx)) and not (xp.any(c_idx)):
143 continue
144 t_u = self.weight_norms[i][d_idx] * res_u[d_idx]
145 t_l = self.weight_norms[i][c_idx] * res_l[c_idx]
146 t_u_2 = t_u @ A_block[d_idx, :]
147 t_l_2 = t_l @ A_block[c_idx, :]
149 sig = ((res_l[c_idx] @ t_l) + (res_u[d_idx] @ t_u)) / (
150 (t_u_2 - t_l_2) @ (t_u_2 - t_l_2)
151 )
152 x += sig * (t_u_2 - t_l_2)
153 return x
156class AdaptiveStepLandweberHyperslab(SimultaneousAMSHyperslab):
157 """
158 Adaptive step-size Landweber algorithm for solving linear inequalities
159 of
160 the form lb <= Ax <= ub.
162 Parameters
163 ----------
164 A : npt.NDArray or sparse.sparray
165 Matrix for linear systems
166 lb : npt.NDArray
167 The lower bounds for the hyperslab.
168 ub : npt.NDArray
169 The upper bounds for the hyperslab.
170 weights : None or List[float], optional
171 The weights for the constraints, by default None.
172 relaxation : float, optional
173 The outer relaxation parameter, by default 1.0.
174 proximity_flag : bool, optional
175 A flag indicating whether to use proximity measures, by default True.
177 References
178 ----------
179 - [1] https://doi.org/10.1515/jiip-2015-0082
180 """
182 def __init__(
183 self,
184 A: npt.NDArray | sparse.sparray,
185 lb: npt.NDArray,
186 ub: npt.NDArray,
187 weights: None | List[float] | npt.NDArray = None,
188 relaxation: float = 1.0,
189 proximity_flag: bool = True,
190 ):
191 super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag)
193 def _project(self, x: npt.NDArray) -> np.ndarray:
194 xp = cp if self._use_gpu else np
195 p = self.map(x)
196 (res_l, res_u) = self.bounds.residual(p)
197 d_idx = res_u < 0
198 c_idx = res_l < 0
200 if not (xp.any(d_idx)) and not (xp.any(c_idx)):
201 return x
203 u_t_u = (self.weights[d_idx] * res_u[d_idx]) @ self.A[d_idx, :]
204 u_t_l = (self.weights[c_idx] * res_l[c_idx]) @ self.A[c_idx, :]
206 u_diff = u_t_u - u_t_l
207 Au_t = self.A @ u_diff
208 sig = (u_diff @ u_diff) / (Au_t @ (self.weights * Au_t))
209 x += sig * u_diff
211 return x
214class AdaptiveStepLandweberHyperslab2(SimultaneousAMSHyperslab):
215 """
216 Adaptive step-size Landweber algorithm (variant 2) for solving linear
217 inequalities of the form lb <= Ax <= ub.
219 Parameters
220 ----------
221 A : npt.NDArray or sparse.sparray
222 Matrix for linear systems
223 lb : npt.NDArray
224 The lower bounds for the hyperslab.
225 ub : npt.NDArray
226 The upper bounds for the hyperslab.
227 relaxation : float, optional
228 The outer relaxation parameter, by default 1.0.
229 weights : None or List[float], optional
230 The weights for the constraints, by default None.
231 proximity_flag : bool, optional
232 A flag indicating whether to use proximity measures, by default True.
234 References
235 ----------
236 - [1] https://doi.org/10.1515/jiip-2015-0082
237 """
239 def __init__(
240 self,
241 A: npt.NDArray | sparse.sparray,
242 lb: npt.NDArray,
243 ub: npt.NDArray,
244 relaxation: float = 1.0,
245 weights: None | List[float] | npt.NDArray = None,
246 proximity_flag: bool = True,
247 ):
248 super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag)
250 def _project(self, x: npt.NDArray) -> np.ndarray:
251 xp = cp if self._use_gpu else np
252 p = self.map(x)
253 (res_l, res_u) = self.bounds.residual(p)
254 d_idx = res_u < 0
255 c_idx = res_l < 0
257 res_u[~d_idx] = 0
258 res_l[~c_idx] = 0
260 if not (xp.any(d_idx)) and not (xp.any(c_idx)):
261 return x
263 idx = d_idx | c_idx
264 # Note: weights[idx] has shape (k,); result of @ has shape (n,).
265 # This formula is designed for cases where the weighted combination is applied element-wise.
266 u_diff = self.weights[idx] * (res_u[idx] - res_l[idx]) @ self.A[idx, :]
268 Au_t = self.A @ u_diff
269 sig = (u_diff @ u_diff) / (Au_t @ (self.weights * Au_t))
270 x += sig * u_diff
272 return x