Coverage for suppy\feasibility\_bands\_ams_extrapolations.py: 53%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1from abc import ABC 

2from typing import List 

3import numpy as np 

4import numpy.typing as npt 

5from scipy import sparse 

6 

7try: 

8 import cupy as cp 

9 

10 NO_GPU = False 

11 

12except ImportError: 

13 NO_GPU = True 

14 cp = np 

15 

16 

17from suppy.feasibility._bands._ams_algorithms import SimultaneousAMSHyperslab 

18 

19 

20class ExtrapolatedLandweberHyperslab(SimultaneousAMSHyperslab): 

21 """ 

22 Extrapolated Landweber algorithm for solving linear inequalities of the 

23 form lb <= Ax <= ub. 

24 

25 Parameters 

26 ---------- 

27 A : npt.NDArray or sparse.sparray 

28 Matrix for linear systems 

29 lb : npt.NDArray 

30 The lower bounds for the hyperslab. 

31 ub : npt.NDArray 

32 The upper bounds for the hyperslab. 

33 weights : None or List[float], optional 

34 The weights for the constraints, by default None. 

35 relaxation : float, optional 

36 The outer relaxation parameter, by default 1.0. 

37 proximity_flag : bool, optional 

38 A flag indicating whether to use proximity measures, by default True. 

39 

40 References 

41 ---------- 

42 - [1] https://doi.org/10.1007/s11075-025-02163-0 

43 - [2] https://doi.org/10.1007/978-3-642-30901-4 

44 """ 

45 

46 def __init__( 

47 self, 

48 A: npt.NDArray | sparse.sparray, 

49 lb: npt.NDArray, 

50 ub: npt.NDArray, 

51 relaxation: float = 1.0, 

52 weights: None | List[float] | npt.NDArray = None, 

53 proximity_flag: bool = True, 

54 ): 

55 super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag) 

56 self.a_i = self.A.row_norm(2, 2) 

57 # To avoid division by zero 

58 self.weight_norm = self.weights / self.a_i 

59 self.weight_norm[self.a_i == 0] = 0 

60 # TODO: Division-by-zero guard above produces NaNs once all other criteria are met 

61 

62 def _project(self, x: npt.NDArray) -> np.ndarray: 

63 xp = cp if self._use_gpu else np 

64 p = self.map(x) 

65 (res_l, res_u) = self.bounds.residual(p) 

66 d_idx = res_u < 0 

67 c_idx = res_l < 0 

68 if not (xp.any(d_idx)) and not (xp.any(c_idx)): 

69 return x 

70 t_u = self.weight_norm[d_idx] * res_u[d_idx] 

71 t_l = self.weight_norm[c_idx] * res_l[c_idx] 

72 t_u_2 = t_u @ self.A[d_idx, :] 

73 t_l_2 = t_l @ self.A[c_idx, :] 

74 

75 sig = ((res_l[c_idx] @ t_l) + (res_u[d_idx] @ t_u)) / ((t_u_2 - t_l_2) @ (t_u_2 - t_l_2)) 

76 x += sig * (t_u_2 - t_l_2) 

77 

78 if xp.isnan(x).any(): 

79 raise ValueError("NaN encountered in Extrapolated Landweber Hyperslab projection.") 

80 

81 return x 

82 

83 

84class BlockIterativeExtrapolatedLandweberHyperslab(ExtrapolatedLandweberHyperslab): 

85 """ 

86 Block-iterative variant of the Extrapolated Landweber algorithm for 

87 hyperslabs, solving lb <= Ax <= ub block by block. 

88 

89 Parameters 

90 ---------- 

91 A : npt.NDArray or sparse.sparray 

92 Matrix for linear systems. 

93 lb : npt.NDArray 

94 The lower bounds for the hyperslab. 

95 ub : npt.NDArray 

96 The upper bounds for the hyperslab. 

97 length_blocks : List[int] 

98 Number of rows in each block. Must sum to the total number of rows in A. 

99 relaxation : float, optional 

100 The outer relaxation parameter, by default 1.0. 

101 weights : None or List[float], optional 

102 The weights for the constraints, by default None. 

103 proximity_flag : bool, optional 

104 A flag indicating whether to use proximity measures, by default True. 

105 """ 

106 

107 def __init__( 

108 self, 

109 A: npt.NDArray | sparse.sparray, 

110 lb: npt.NDArray, 

111 ub: npt.NDArray, 

112 length_blocks: List[int], 

113 relaxation: float = 1.0, 

114 weights: None | List[float] | npt.NDArray = None, 

115 proximity_flag: bool = True, 

116 ): 

117 super().__init__(A, lb, ub, relaxation, weights, proximity_flag) 

118 self.length_blocks = length_blocks 

119 

120 self.As = [] 

121 self.ls = [] 

122 self.us = [] 

123 self.block_bounds = [] 

124 self.weight_norms = [] 

125 _lower_bound = 0 

126 for i in range(len(self.length_blocks)): 

127 self.As.append(A[_lower_bound : _lower_bound + self.length_blocks[i]]) 

128 self.ls.append(lb[_lower_bound : _lower_bound + self.length_blocks[i]]) 

129 self.us.append(ub[_lower_bound : _lower_bound + self.length_blocks[i]]) 

130 self.weight_norms.append( 

131 self.weight_norm[_lower_bound : _lower_bound + self.length_blocks[i]] 

132 ) 

133 _lower_bound += self.length_blocks[i] 

134 

135 def _project(self, x: npt.NDArray) -> np.ndarray: 

136 xp = cp if self._use_gpu else np 

137 for i, A_block in enumerate(self.As): 

138 p = A_block @ x 

139 res_l, res_u = p - self.ls[i], self.us[i] - p 

140 d_idx = res_u < 0 

141 c_idx = res_l < 0 

142 if not (xp.any(d_idx)) and not (xp.any(c_idx)): 

143 continue 

144 t_u = self.weight_norms[i][d_idx] * res_u[d_idx] 

145 t_l = self.weight_norms[i][c_idx] * res_l[c_idx] 

146 t_u_2 = t_u @ A_block[d_idx, :] 

147 t_l_2 = t_l @ A_block[c_idx, :] 

148 

149 sig = ((res_l[c_idx] @ t_l) + (res_u[d_idx] @ t_u)) / ( 

150 (t_u_2 - t_l_2) @ (t_u_2 - t_l_2) 

151 ) 

152 x += sig * (t_u_2 - t_l_2) 

153 return x 

154 

155 

156class AdaptiveStepLandweberHyperslab(SimultaneousAMSHyperslab): 

157 """ 

158 Adaptive step-size Landweber algorithm for solving linear inequalities 

159 of 

160 the form lb <= Ax <= ub. 

161 

162 Parameters 

163 ---------- 

164 A : npt.NDArray or sparse.sparray 

165 Matrix for linear systems 

166 lb : npt.NDArray 

167 The lower bounds for the hyperslab. 

168 ub : npt.NDArray 

169 The upper bounds for the hyperslab. 

170 weights : None or List[float], optional 

171 The weights for the constraints, by default None. 

172 relaxation : float, optional 

173 The outer relaxation parameter, by default 1.0. 

174 proximity_flag : bool, optional 

175 A flag indicating whether to use proximity measures, by default True. 

176 

177 References 

178 ---------- 

179 - [1] https://doi.org/10.1515/jiip-2015-0082 

180 """ 

181 

182 def __init__( 

183 self, 

184 A: npt.NDArray | sparse.sparray, 

185 lb: npt.NDArray, 

186 ub: npt.NDArray, 

187 weights: None | List[float] | npt.NDArray = None, 

188 relaxation: float = 1.0, 

189 proximity_flag: bool = True, 

190 ): 

191 super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag) 

192 

193 def _project(self, x: npt.NDArray) -> np.ndarray: 

194 xp = cp if self._use_gpu else np 

195 p = self.map(x) 

196 (res_l, res_u) = self.bounds.residual(p) 

197 d_idx = res_u < 0 

198 c_idx = res_l < 0 

199 

200 if not (xp.any(d_idx)) and not (xp.any(c_idx)): 

201 return x 

202 

203 u_t_u = (self.weights[d_idx] * res_u[d_idx]) @ self.A[d_idx, :] 

204 u_t_l = (self.weights[c_idx] * res_l[c_idx]) @ self.A[c_idx, :] 

205 

206 u_diff = u_t_u - u_t_l 

207 Au_t = self.A @ u_diff 

208 sig = (u_diff @ u_diff) / (Au_t @ (self.weights * Au_t)) 

209 x += sig * u_diff 

210 

211 return x 

212 

213 

214class AdaptiveStepLandweberHyperslab2(SimultaneousAMSHyperslab): 

215 """ 

216 Adaptive step-size Landweber algorithm (variant 2) for solving linear 

217 inequalities of the form lb <= Ax <= ub. 

218 

219 Parameters 

220 ---------- 

221 A : npt.NDArray or sparse.sparray 

222 Matrix for linear systems 

223 lb : npt.NDArray 

224 The lower bounds for the hyperslab. 

225 ub : npt.NDArray 

226 The upper bounds for the hyperslab. 

227 relaxation : float, optional 

228 The outer relaxation parameter, by default 1.0. 

229 weights : None or List[float], optional 

230 The weights for the constraints, by default None. 

231 proximity_flag : bool, optional 

232 A flag indicating whether to use proximity measures, by default True. 

233 

234 References 

235 ---------- 

236 - [1] https://doi.org/10.1515/jiip-2015-0082 

237 """ 

238 

239 def __init__( 

240 self, 

241 A: npt.NDArray | sparse.sparray, 

242 lb: npt.NDArray, 

243 ub: npt.NDArray, 

244 relaxation: float = 1.0, 

245 weights: None | List[float] | npt.NDArray = None, 

246 proximity_flag: bool = True, 

247 ): 

248 super().__init__(A, lb, ub, 1.0, relaxation, weights, proximity_flag) 

249 

250 def _project(self, x: npt.NDArray) -> np.ndarray: 

251 xp = cp if self._use_gpu else np 

252 p = self.map(x) 

253 (res_l, res_u) = self.bounds.residual(p) 

254 d_idx = res_u < 0 

255 c_idx = res_l < 0 

256 

257 res_u[~d_idx] = 0 

258 res_l[~c_idx] = 0 

259 

260 if not (xp.any(d_idx)) and not (xp.any(c_idx)): 

261 return x 

262 

263 idx = d_idx | c_idx 

264 # Note: weights[idx] has shape (k,); result of @ has shape (n,). 

265 # This formula is designed for cases where the weighted combination is applied element-wise. 

266 u_diff = self.weights[idx] * (res_u[idx] - res_l[idx]) @ self.A[idx, :] 

267 

268 Au_t = self.A @ u_diff 

269 sig = (u_diff @ u_diff) / (Au_t @ (self.weights * Au_t)) 

270 x += sig * u_diff 

271 

272 return x