Coverage for suppy\feasibility\_bands\_art3_algorithms.py: 57%

119 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1from abc import ABC 

2from typing import List 

3import numpy as np 

4import numpy.typing as npt 

5 

6try: 

7 import cupy as cp 

8 

9 NO_GPU = False 

10 

11except ImportError: 

12 NO_GPU = True 

13 cp = np 

14 

15from suppy.utils import ensure_float_array 

16from suppy.feasibility._linear_algorithms import HyperslabFeasibility 

17 

18 

19class ART3plusAlgorithm(HyperslabFeasibility, ABC): 

20 """ 

21 ART3plusAlgorithm class for implementing the ART3+ algorithm. 

22 

23 Parameters 

24 ---------- 

25 A : npt.NDArray or sparse.sparray 

26 Matrix for linear systems 

27 lb : npt.NDArray 

28 The lower bounds for the hyperslab. 

29 ub : npt.NDArray 

30 The upper bounds for the hyperslab. 

31 algorithmic_relaxation : npt.NDArray or float, optional 

32 The relaxation parameter used by the algorithm, by default 1.0. 

33 relaxation : float, optional 

34 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

35 proximity_flag : bool, optional 

36 Flag to indicate whether to use proximity in the algorithm, by default True. 

37 """ 

38 

39 def __init__( 

40 self, 

41 A: npt.NDArray, 

42 lb: npt.NDArray, 

43 ub: npt.NDArray, 

44 algorithmic_relaxation: npt.NDArray | float = 1, 

45 relaxation: float = 1, 

46 proximity_flag=True, 

47 ): 

48 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

49 

50 

51class SequentialART3plus(ART3plusAlgorithm): 

52 """ 

53 SequentialART3plus is an implementation of the ART3plus algorithm for 

54 solving feasibility problems. 

55 

56 Parameters 

57 ---------- 

58 A : npt.NDArray or sparse.sparray 

59 Matrix for linear systems 

60 lb : npt.NDArray 

61 The lower bounds for the hyperslab. 

62 ub : npt.NDArray 

63 The upper bounds for the hyperslab. 

64 cs : None or List[int], optional 

65 The control sequence for the algorithm. If None, it will be initialized to the range of the number of rows in A. 

66 proximity_flag : bool, optional 

67 A flag indicating whether to use proximity in the algorithm. Default is True. 

68 

69 Attributes 

70 ---------- 

71 initial_cs : List[int] 

72 The initial control sequence. 

73 cs : List[int] 

74 The current control sequence. 

75 _feasible : bool 

76 A flag indicating whether the current solution is feasible. 

77 

78 Methods 

79 ------- 

80 _project(x) 

81 Projects the point x onto the feasible region defined by the constraints. 

82 solve(x, max_iter) 

83 Solves the feasibility problem using the ART3plus algorithm. 

84 """ 

85 

86 def __init__( 

87 self, 

88 A: npt.NDArray, 

89 lb: npt.NDArray, 

90 ub: npt.NDArray, 

91 cs: None | List[int] = None, 

92 proximity_flag=True, 

93 ): 

94 

95 super().__init__(A, lb, ub, 1, 1, proximity_flag) 

96 xp = cp if self.A.gpu else np 

97 if cs is None: 

98 self.initial_cs = xp.arange(self.A.shape[0]) 

99 else: 

100 self.initial_cs = cs 

101 

102 self.cs = self.initial_cs.copy() 

103 

104 def _project(self, x: npt.NDArray) -> np.ndarray: 

105 to_remove = [] 

106 for i in self.cs: 

107 # TODO: add a boolean variable that skips this if the projection did not move the point? 

108 p_i = self.single_map(x, i) 

109 # should be precomputed 

110 if ( 

111 3 / 2 * self.bounds.l[i] - 1 / 2 * self.bounds.u[i] <= p_i < self.bounds.l[i] 

112 ): # lowe bound reflection 

113 self.A.update_step( 

114 x, 2 * self.inverse_row_norm[i] * (self.bounds.l[i] - p_i), i 

115 ) # reflection 

116 

117 elif ( 

118 self.bounds.u[i] < p_i <= 3 / 2 * self.bounds.u[i] - 1 / 2 * self.bounds.l[i] 

119 ): # upper bound reflection 

120 self.A.update_step( 

121 x, 2 * self.inverse_row_norm[i] * (self.bounds.u[i] - p_i), i 

122 ) # reflection 

123 

124 elif self.bounds.u[i] - self.bounds.l[i] < abs( 

125 p_i - (self.bounds.l[i] + self.bounds.u[i]) / 2 

126 ): 

127 self.A.update_step( 

128 x, 

129 self.inverse_row_norm[i] * ((self.bounds.l[i] + self.bounds.u[i]) / 2 - p_i), 

130 i, 

131 ) # projection onto center of hyperslab 

132 

133 else: # constraint is already met 

134 to_remove.append(i) 

135 

136 # after loop remove constraints that are already met 

137 self.cs = [i for i in self.cs if i not in to_remove] # is this fast? 

138 return x 

139 

140 @ensure_float_array 

141 def solve( 

142 self, 

143 x: npt.NDArray, 

144 max_iter: int = 500, 

145 prox_tol: float = 1e-6, 

146 del_prox_tol: float = 1e-8, 

147 del_prox_n: int = 5, 

148 storage: bool = False, 

149 storage_iters: List[int] | int | None = None, 

150 proximity_measures: List | None = None, 

151 ) -> np.ndarray: 

152 """ 

153 Solves the optimization problem using an iterative approach. 

154 

155 Parameters 

156 ---------- 

157 x : npt.NDArray 

158 Starting point for the algorithm. 

159 max_iter : int, optional 

160 Maximum number of iterations to perform. 

161 prox_tol : float, optional 

162 The tolerance for the proximity on the constraints, by default 1e-6. 

163 del_prox_tol : float, optional 

164 The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8. 

165 del_prox_n : int, optional 

166 The number of iterations that del_prox_tol needs to be met in a row, by default 5. 

167 proximity_measures : List, optional 

168 The proximity measures to calculate, by default a l2 norm measure is used. Right now only the first in the list is used to check the feasibility. 

169 storage : bool, optional 

170 Flag indicating whether to store intermediate solutions, by default False. 

171 storage_iters : List[int] or int, optional 

172 Controls which iterations are stored (when storage=True). If None, all iterations are stored. 

173 If a list of ints, only those iteration indices are stored (0 = initial point). 

174 If an int, storage occurs every that many iterations. 

175 

176 Returns 

177 ------- 

178 npt.NDArray 

179 The solution after the iterative process. 

180 """ 

181 self.cs = self.initial_cs.copy() 

182 xp = cp if isinstance(x, cp.ndarray) else np 

183 

184 if proximity_measures is None: 

185 proximity_measures = [("p_norm", 2)] 

186 else: 

187 # TODO: Check if the proximity measures are valid 

188 _ = None 

189 

190 self.proximities = [self.proximity(x, proximity_measures)] 

191 i = 0 

192 

193 def _should_store(idx): 

194 if storage_iters is None: 

195 return True 

196 if isinstance(storage_iters, int): 

197 return idx % storage_iters == 0 

198 return idx in storage_iters 

199 

200 if storage is True: 

201 self.all_x = [] 

202 if _should_store(0): 

203 if isinstance(x, np.ndarray): 

204 self.all_x.append(np.array(x.copy())) 

205 else: 

206 self.all_x.append((x.get())) 

207 

208 stop = False # criterion for stopping the algorithm 

209 self._n_tol = 0 

210 

211 while i < max_iter and not stop: 

212 

213 if len(self.cs) == 0: 

214 self.cs = self.initial_cs.copy() 

215 

216 x = self.project(x) 

217 if storage is True and _should_store(i + 1): 

218 if isinstance(x, np.ndarray): # convert to np array if cp 

219 self.all_x.append(np.array(x.copy())) 

220 else: 

221 self.all_x.append((x.get())) 

222 self.proximities.append(self.proximity(x, proximity_measures)) 

223 

224 # TODO: If proximity changes x some potential issues! 

225 stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n) 

226 

227 i += 1 

228 

229 if self.all_x is not None: 

230 self.all_x = np.array(self.all_x) 

231 

232 self.proximities = xp.array(self.proximities) 

233 

234 return x 

235 

236 

237class SimultaneousART3plus(ART3plusAlgorithm): 

238 """ 

239 SimultaneousART3plus is an implementation of the ART3plus algorithm for 

240 solving feasibility problems. 

241 

242 Parameters 

243 ---------- 

244 A : npt.NDArray or sparse.sparray 

245 Matrix for linear systems 

246 lb : npt.NDArray 

247 The lower bounds for the hyperslab. 

248 ub : npt.NDArray 

249 The upper bounds for the hyperslab. 

250 weights : None | List[float] | npt.NDArray, optional 

251 The weights for the constraints. If None, default weights are used. Default is None. 

252 proximity_flag : bool, optional 

253 Flag to indicate whether to use proximity measure. Default is True. 

254 

255 Attributes 

256 ---------- 

257 weights : npt.NDArray 

258 The weights for the constraints. 

259 """ 

260 

261 def __init__( 

262 self, 

263 A: npt.NDArray, 

264 lb: npt.NDArray, 

265 ub: npt.NDArray, 

266 weights: None | List[float] | npt.NDArray = None, 

267 proximity_flag=True, 

268 ): 

269 

270 super().__init__(A, lb, ub, 1, 1, proximity_flag) 

271 xp = cp if self.A.gpu else np 

272 if weights is None: 

273 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

274 elif abs((weights.sum() - 1)) > 1e-10: 

275 print("Weights do not add up to 1! Choosing default weight vector...") 

276 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

277 else: 

278 self.weights = weights 

279 

280 self._not_met = xp.arange(self.A.shape[0]) 

281 

282 self._not_met_init = self._not_met.copy() 

283 

284 def _project(self, x: npt.NDArray) -> np.ndarray: 

285 """ 

286 Perform one step of the ART3plus algorithm. 

287 

288 Args: 

289 x (npt.NDArray): The point to be projected. 

290 

291 Returns: 

292 npt.NDArray: The projected point. 

293 """ 

294 p = self.map(x) 

295 p = p[self._not_met] 

296 l_redux = self.bounds.l[self._not_met] 

297 u_redux = self.bounds.u[self._not_met] 

298 

299 # following calculations are performed on subarrays 

300 # assign different subsets 

301 idx_1 = p < l_redux 

302 idx_2 = p > u_redux 

303 idx_3 = p < l_redux - (u_redux - l_redux) / 2 

304 idx_4 = p > u_redux + (u_redux - l_redux) / 2 

305 

306 # sets on subarrays 

307 set_1 = idx_1 & (not idx_3) # idxs for lower bound reflection 

308 set_2 = idx_2 & (not idx_4) # idxs for upper bound reflection 

309 set_3 = idx_3 | idx_4 # idxs for projections 

310 # there should be no overlap between the different regions here! 

311 x += ( 

312 self.weights[self._not_met][set_1] 

313 * self.inverse_row_norm[self._not_met][set_1] 

314 * (2 * (l_redux - p))[set_1] 

315 @ self.A[self._not_met][set_1, :] 

316 ) 

317 x += ( 

318 self.weights[self._not_met][set_2] 

319 * self.inverse_row_norm[self._not_met][set_2] 

320 * (2 * (u_redux - p))[set_2] 

321 @ self.A[self._not_met][set_2, :] 

322 ) 

323 x += ( 

324 self.weights[self._not_met][set_3] 

325 * self.inverse_row_norm[self._not_met][set_3] 

326 * ((l_redux + u_redux) / 2 - p)[set_3] 

327 @ self.A[self._not_met][set_3, :] 

328 ) 

329 

330 # remove constraints that were already met before 

331 self._not_met = self._not_met[(idx_1 | idx_2)] 

332 

333 return x 

334 

335 def _proximity(self, x: npt.NDArray, proximity_measures: List[str]) -> float: 

336 p = self.map(x) 

337 # residuals are positive if constraints are met 

338 (res_l, res_u) = self.bounds.residual(p) 

339 res_u[res_u > 0] = 0 

340 res_l[res_l > 0] = 0 

341 res = -res_u - res_l 

342 measures = [] 

343 for measure in proximity_measures: 

344 if isinstance(measure, tuple): 

345 if measure[0] == "p_norm": 

346 measures.append(self.weights @ (res ** measure[1])) 

347 else: 

348 raise ValueError("Invalid proximity measure") 

349 elif isinstance(measure, str) and measure == "max_norm": 

350 measures.append(res.max()) 

351 else: 

352 raise ValueError("Invalid proximity measure)") 

353 return measures