Coverage for suppy\feasibility\_bands\_ams_algorithms.py: 88%

177 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1import warnings 

2from abc import ABC 

3from typing import List 

4import numpy as np 

5import numpy.typing as npt 

6 

7try: 

8 import cupy as cp 

9 

10 NO_GPU = False 

11 

12except ImportError: 

13 NO_GPU = True 

14 cp = np 

15 

16from suppy.feasibility._linear_algorithms import HyperslabFeasibility 

17from suppy.utils import LinearMapping 

18 

19 

20class HyperslabAlgorithm(HyperslabFeasibility, ABC): 

21 """ 

22 The HyperslabAlgorithm class is used to find a feasible solution to a 

23 set of linear inequalities. 

24 

25 Parameters 

26 ---------- 

27 A : npt.NDArray or sparse.sparray 

28 Matrix for linear systems 

29 lb : npt.NDArray 

30 The lower bounds for the hyperslab. 

31 ub : npt.NDArray 

32 The upper bounds for the hyperslab. 

33 algorithmic_relaxation : npt.NDArray or float, optional 

34 The relaxation parameter used by the algorithm, by default 1.0. 

35 relaxation : float, optional 

36 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

37 proximity_flag : bool, optional 

38 A flag indicating whether to use proximity in the algorithm, by default True. 

39 """ 

40 

41 def __init__( 

42 self, 

43 A: npt.NDArray, 

44 lb: npt.NDArray, 

45 ub: npt.NDArray, 

46 algorithmic_relaxation: npt.NDArray | float = 1.0, 

47 relaxation: float = 1.0, 

48 proximity_flag: bool = True, 

49 ): 

50 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

51 

52 

53class SequentialAMSHyperslab(HyperslabAlgorithm): 

54 """ 

55 SequentialAMSHyperslab class for sequentially applying the AMS algorithm 

56 on hyperslabs. 

57 

58 Parameters 

59 ---------- 

60 A : npt.NDArray or sparse.sparray 

61 Matrix for linear systems 

62 lb : npt.NDArray 

63 The lower bounds for the hyperslab. 

64 ub : npt.NDArray 

65 The upper bounds for the hyperslab. 

66 algorithmic_relaxation : npt.NDArray or float, optional 

67 The relaxation parameter used by the algorithm, by default 1.0. 

68 relaxation : float, optional 

69 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

70 cs : None or List[int], optional 

71 The list of indices for the constraints, by default None. 

72 proximity_flag : bool, optional 

73 Flag to indicate if proximity should be considered, by default True. 

74 """ 

75 

76 def __init__( 

77 self, 

78 A: npt.NDArray, 

79 lb: npt.NDArray, 

80 ub: npt.NDArray, 

81 algorithmic_relaxation: npt.NDArray | float = 1.0, 

82 relaxation: float = 1.0, 

83 cs: None | List[int] = None, 

84 proximity_flag: bool = True, 

85 ): 

86 

87 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

88 xp = cp if self._use_gpu else np 

89 if cs is None: 

90 self.cs = xp.arange(self.A.shape[0]) 

91 else: 

92 self.cs = cs 

93 

94 def _project(self, x: npt.NDArray) -> np.ndarray: 

95 """ 

96 Projects the input array `x` onto the feasible region defined by the 

97 constraints. 

98 

99 Parameters 

100 ---------- 

101 x : npt.NDArray 

102 The input array to be projected. 

103 

104 Returns 

105 ------- 

106 npt.NDArray 

107 The projected array. 

108 """ 

109 

110 for i in self.cs: 

111 p_i = self.single_map(x, i) 

112 (res_li, res_ui) = self.bounds.single_residual(p_i, i) # returns floats 

113 

114 if res_ui < 0: 

115 self.A.update_step( 

116 x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_ui, i 

117 ) 

118 elif res_li < 0: 

119 self.A.update_step( 

120 x, -1 * self.algorithmic_relaxation * self.inverse_row_norm[i] * res_li, i 

121 ) 

122 return x 

123 

124 

125class SequentialWeightedAMSHyperslab(SequentialAMSHyperslab): 

126 """ 

127 Parameters 

128 ---------- 

129 A : npt.NDArray or sparse.sparray 

130 Matrix for linear systems 

131 lb : npt.NDArray 

132 The lower bounds of the hyperslab. 

133 ub : npt.NDArray 

134 The upper bounds of the hyperslab. 

135 weights : None, list of float, or npt.NDArray, optional 

136 The weights assigned to each constraint. If None, default weights are 

137 used. 

138 algorithmic_relaxation : npt.NDArray or float, optional 

139 The relaxation parameter used by the algorithm, by default 1.0. 

140 relaxation : float, optional 

141 Outer relaxation parameter, applied to the entire solution of the 

142 iterate by default 1.0. 

143 weight_decay : float, optional 

144 Parameter that determines the rate at which the weights are reduced 

145 after each phase (weights * weight_decay). Default is 1.0. 

146 cs : None or list of int, optional 

147 The indices of the constraints to be considered. Default is None. 

148 proximity_flag : bool, optional 

149 Flag to indicate if proximity should be considered. Default is True. 

150 

151 Attributes 

152 ---------- 

153 weights : npt.NDArray 

154 The weights assigned to each constraint. 

155 weight_decay : float 

156 Decay rate for the weights. 

157 temp_weight_decay : float 

158 Initial value for weight decay. 

159 """ 

160 

161 def __init__( 

162 self, 

163 A: npt.NDArray, 

164 lb: npt.NDArray, 

165 ub: npt.NDArray, 

166 weights: None | List[float] | npt.NDArray = None, 

167 algorithmic_relaxation: npt.NDArray | float = 1.0, 

168 relaxation: float = 1.0, 

169 weight_decay: float = 1.0, 

170 cs: None | List[int] = None, 

171 proximity_flag: bool = True, 

172 ): 

173 

174 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, cs, proximity_flag) 

175 xp = cp if self._use_gpu else np 

176 self.weight_decay = weight_decay 

177 self.temp_weight_decay = 1.0 

178 

179 if weights is None: 

180 self.weights = xp.ones(self.A.shape[0]) 

181 else: 

182 self.weights = weights 

183 

184 def _project(self, x: npt.NDArray) -> np.ndarray: 

185 """ 

186 Projects the input array `x` onto a feasible region defined by the 

187 constraints. 

188 

189 Parameters 

190 ---------- 

191 x : npt.NDArray 

192 The input array to be projected. 

193 

194 Returns 

195 ------- 

196 npt.NDArray 

197 The projected array. 

198 

199 Notes 

200 ----- 

201 This method iteratively adjusts the input array `x` based on the constraints 

202 defined in `self.cs`. For each constraint, it computes the projection and 

203 checks if the constraints are violated. If a constraint is violated, it updates 

204 the array `x` using a weighted relaxation factor. The weight decay is applied 

205 to the temporary weight decay after each iteration. 

206 """ 

207 

208 weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay 

209 

210 for i in self.cs: 

211 

212 p_i = self.single_map(x, i) 

213 

214 (res_li, res_ui) = self.bounds.single_residual(p_i, i) # returns floats 

215 

216 if res_ui < 0: 

217 self.A.update_step( 

218 x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res_ui, i 

219 ) 

220 elif res_li < 0: 

221 self.A.update_step( 

222 x, 

223 -1 * weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res_li, 

224 i, 

225 ) 

226 

227 self.temp_weight_decay *= self.weight_decay 

228 return x 

229 

230 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

231 p = self.map(x) 

232 # residuals are positive if constraints are met 

233 (res_l, res_u) = self.bounds.residual(p) 

234 res_u[res_u > 0] = 0 

235 res_l[res_l > 0] = 0 

236 res = -res_u - res_l 

237 measures = [] 

238 for measure in proximity_measures: 

239 if isinstance(measure, tuple): 

240 if measure[0] == "p_norm": 

241 measures.append((self.weights @ (res ** measure[1])) / (self.weights.sum())) 

242 else: 

243 raise ValueError("Invalid proximity measure") 

244 elif isinstance(measure, str) and measure == "max_norm": 

245 measures.append(res.max()) 

246 else: 

247 raise ValueError("Invalid proximity measure") 

248 return measures 

249 

250 

251class SimultaneousAMSHyperslab(HyperslabAlgorithm): 

252 """ 

253 SimultaneousAMSHyperslab class for simultaneous application of the AMS 

254 algorithm on hyperslabs. 

255 

256 Parameters 

257 ---------- 

258 A : npt.NDArray or sparse.sparray 

259 Matrix for linear systems 

260 lb : npt.NDArray 

261 The lower bounds for the hyperslab. 

262 ub : npt.NDArray 

263 The upper bounds for the hyperslab. 

264 algorithmic_relaxation : npt.NDArray or float, optional 

265 The relaxation parameter used by the algorithm, by default 1.0. 

266 relaxation : float, optional 

267 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

268 weights : None or List[float], optional 

269 The weights for the constraints, by default None. 

270 proximity_flag : bool, optional 

271 Flag to indicate if proximity calculations should be performed, by default True. 

272 """ 

273 

274 def __init__( 

275 self, 

276 A: npt.NDArray, 

277 lb: npt.NDArray, 

278 ub: npt.NDArray, 

279 algorithmic_relaxation: npt.NDArray | float = 1.0, 

280 relaxation: float = 1.0, 

281 weights: None | List[float] = None, 

282 proximity_flag: bool = True, 

283 ): 

284 

285 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

286 

287 xp = cp if self._use_gpu else np 

288 

289 if weights is None: 

290 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

291 elif xp.abs((weights.sum() - 1)) > 1e-10: 

292 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2) 

293 self.weights = weights / weights.sum() 

294 else: 

295 self.weights = weights 

296 

297 def _project(self, x: npt.NDArray) -> np.ndarray: 

298 # simultaneous projection 

299 p = self.map(x) 

300 (res_l, res_u) = self.bounds.residual(p) 

301 d_idx = res_u < 0 

302 c_idx = res_l < 0 

303 x += self.algorithmic_relaxation * ( 

304 (self.weights * self.inverse_row_norm)[d_idx] * res_u[d_idx] @ self.A[d_idx, :] 

305 - (self.weights * self.inverse_row_norm)[c_idx] * res_l[c_idx] @ self.A[c_idx, :] 

306 ) 

307 

308 return x 

309 

310 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

311 p = self.map(x) 

312 # residuals are positive if constraints are met 

313 (res_l, res_u) = self.bounds.residual(p) 

314 res_u[res_u > 0] = 0 

315 res_l[res_l > 0] = 0 

316 res = -res_u - res_l 

317 measures = [] 

318 for measure in proximity_measures: 

319 if isinstance(measure, tuple): 

320 if measure[0] == "p_norm": 

321 measures.append(self.weights @ (res ** measure[1])) 

322 else: 

323 raise ValueError("Invalid proximity measure") 

324 elif isinstance(measure, str) and measure == "max_norm": 

325 measures.append(res.max()) 

326 else: 

327 raise ValueError("Invalid proximity measure") 

328 return measures 

329 

330 

331class SARTHyperslab(SimultaneousAMSHyperslab): 

332 """ 

333 SART Hyperslab class for simultaneous application of the SART 

334 algorithm on hyperslabs. 

335 

336 Parameters 

337 ---------- 

338 A : npt.NDArray or sparse.sparray 

339 Matrix for linear systems 

340 lb : npt.NDArray 

341 The lower bounds for the hyperslab. 

342 ub : npt.NDArray 

343 The upper bounds for the hyperslab. 

344 algorithmic_relaxation : npt.NDArray or float, optional 

345 The relaxation parameter used by the algorithm, by default 1.0. 

346 relaxation : float, optional 

347 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

348 weights : None or List[float], optional 

349 The weights for the constraints, by default None. 

350 proximity_flag : bool, optional 

351 Flag to indicate if proximity calculations should be performed, by default True. 

352 """ 

353 

354 def __init__( 

355 self, 

356 A: npt.NDArray, 

357 lb: npt.NDArray, 

358 ub: npt.NDArray, 

359 algorithmic_relaxation: npt.NDArray | float = 1.0, 

360 relaxation: float = 1.0, 

361 weights: None | List[float] = None, 

362 proximity_flag: bool = True, 

363 ): 

364 

365 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, weights, proximity_flag) 

366 self.inverse_linear_row_norm = 1 / self.A.row_norm(1, 1) 

367 self.inverse_linear_column_norm = 1 / self.A.column_norm(1, 1) 

368 

369 def _project(self, x: npt.NDArray) -> np.ndarray: 

370 # SART projection 

371 p = self.map(x) 

372 (res_l, res_u) = self.bounds.residual(p) 

373 d_idx = res_u < 0 

374 c_idx = res_l < 0 

375 x += ( 

376 self.algorithmic_relaxation 

377 * self.inverse_linear_column_norm 

378 * ( 

379 (self.weights * self.inverse_linear_row_norm)[d_idx] 

380 * res_u[d_idx] 

381 @ self.A[d_idx, :] 

382 - (self.weights * self.inverse_linear_row_norm)[c_idx] 

383 * res_l[c_idx] 

384 @ self.A[c_idx, :] 

385 ) 

386 ) 

387 

388 return x 

389 

390 

391class BlockIterativeAMSHyperslab(HyperslabAlgorithm): 

392 """ 

393 Block Iterative AMS Algorithm for hyperslabs. 

394 

395 Parameters 

396 ---------- 

397 A : npt.NDArray or sparse.sparray 

398 Matrix for linear systems 

399 lb : npt.NDArray 

400 The lower bounds for the hyperslab. 

401 ub : npt.NDArray 

402 The upper bounds for the hyperslab. 

403 weights : List[List[float]] or List[npt.NDArray] 

404 A list of lists or arrays representing the weights for each block. Each list/array should sum to 1. 

405 algorithmic_relaxation : npt.NDArray or float, optional 

406 The relaxation parameter used by the algorithm, by default 1.0. 

407 relaxation : float, optional 

408 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

409 proximity_flag : bool, optional 

410 A flag indicating whether to use proximity measures, by default True. 

411 

412 Raises 

413 ------ 

414 ValueError 

415 If any of the weight lists do not sum to 1. 

416 """ 

417 

418 def __init__( 

419 self, 

420 A: npt.NDArray, 

421 lb: npt.NDArray, 

422 ub: npt.NDArray, 

423 weights: List[List[float]] | List[npt.NDArray], 

424 algorithmic_relaxation: npt.NDArray | float = 1.0, 

425 relaxation: float = 1.0, 

426 proximity_flag: bool = True, 

427 ): 

428 

429 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

430 

431 xp = cp if self._use_gpu else np 

432 

433 # check that weights is a list of lists that add up to 1 each 

434 for el in weights: 

435 if xp.abs((xp.sum(el) - 1)) > 1e-10: 

436 raise ValueError("Weights do not add up to 1!") 

437 

438 self.weights = [] 

439 self.block_idxs = [ 

440 xp.where(xp.array(el) > 0)[0] for el in weights 

441 ] # get idxs that meet requirements 

442 

443 # assemble a list of general weights 

444 self.total_weights = xp.zeros_like(weights[0]) 

445 for el in weights: 

446 el = xp.asarray(el) 

447 self.weights.append(el[xp.array(el) > 0]) # remove non zero weights 

448 self.total_weights += el / len(weights) 

449 

450 def _project(self, x: npt.NDArray) -> np.ndarray: 

451 # simultaneous projection 

452 for el, block_idx in zip(self.weights, self.block_idxs): # get mask and associated weights 

453 p = self.indexed_map(x, block_idx) 

454 (res_l, res_u) = self.bounds.indexed_residual(p, block_idx) 

455 d_idx = res_u < 0 

456 c_idx = res_l < 0 

457 full_d_idx = block_idx[d_idx] 

458 full_c_idx = block_idx[c_idx] 

459 

460 x += self.algorithmic_relaxation * ( 

461 self.inverse_row_norm[full_d_idx] 

462 * el[d_idx] 

463 * res_u[d_idx] 

464 @ self.A[full_d_idx, :] 

465 - self.inverse_row_norm[full_c_idx] 

466 * el[c_idx] 

467 * res_l[c_idx] 

468 @ self.A[full_c_idx, :] 

469 ) 

470 

471 return x 

472 

473 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

474 p = self.map(x) 

475 # residuals are positive if constraints are met 

476 (res_l, res_u) = self.bounds.residual(p) 

477 res_u[res_u > 0] = 0 

478 res_l[res_l > 0] = 0 

479 res = -res_u - res_l 

480 measures = [] 

481 for measure in proximity_measures: 

482 if isinstance(measure, tuple): 

483 if measure[0] == "p_norm": 

484 measures.append(self.total_weights @ (res ** measure[1])) 

485 else: 

486 raise ValueError("Invalid proximity measure") 

487 elif isinstance(measure, str) and measure == "max_norm": 

488 measures.append(res.max()) 

489 else: 

490 raise ValueError("Invalid proximity measure") 

491 return measures 

492 

493 

494class StringAveragedAMSHyperslab(HyperslabAlgorithm): 

495 """ 

496 StringAveragedAMSHyperslab is a string averaged implementation of the 

497 AMS algorithm. 

498 

499 Parameters 

500 ---------- 

501 A : npt.NDArray or sparse.sparray 

502 Matrix for linear systems 

503 lb : npt.NDArray 

504 The lower bounds for the hyperslab. 

505 ub : npt.NDArray 

506 The upper bounds for the hyperslab. 

507 strings : List[List[int]] 

508 A list of lists, where each inner list represents a string of indices. 

509 algorithmic_relaxation : npt.NDArray or float, optional 

510 The relaxation parameter used by the algorithm, by default 1.0. 

511 relaxation : float, optional 

512 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

513 weights : None or List[float], optional 

514 The weights for each string, by default None. If None, equal weights are assigned. 

515 proximity_flag : bool, optional 

516 A flag indicating whether to use proximity, by default True. 

517 """ 

518 

519 def __init__( 

520 self, 

521 A: npt.NDArray, 

522 lb: npt.NDArray, 

523 ub: npt.NDArray, 

524 strings: List[List[int]], 

525 algorithmic_relaxation: npt.NDArray | float = 1.0, 

526 relaxation: float = 1.0, 

527 weights: None | List[float] = None, 

528 proximity_flag: bool = True, 

529 ): 

530 

531 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

532 xp = cp if self._use_gpu else np 

533 self.strings = strings 

534 if weights is None: 

535 self.weights = xp.ones(len(strings)) / len(strings) 

536 else: 

537 if len(weights) != len(self.strings): 

538 raise ValueError("The number of weights must be equal to the number of strings.") 

539 self.weights = weights 

540 

541 def _project(self, x): 

542 # string averaged projection 

543 x_c = x.copy() # create a general copy of x 

544 x -= x # reset x is this viable? 

545 for string, weight in zip(self.strings, self.weights): 

546 x_s = x_c.copy() # generate a copy for individual strings 

547 for i in string: 

548 p_i = self.single_map(x_s, i) 

549 (res_li, res_ui) = self.bounds.single_residual(p_i, i) 

550 if res_ui < 0: 

551 self.A.update_step( 

552 x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_ui, i 

553 ) 

554 elif res_li < 0: 

555 self.A.update_step( 

556 x_s, 

557 -1 * self.algorithmic_relaxation * self.inverse_row_norm[i] * res_li, 

558 i, 

559 ) 

560 

561 x += weight * x_s 

562 return x