Coverage for suppy\feasibility\_linear_algorithms.py: 83%

151 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1"""Base classes for linear feasibility problems.""" 

2from abc import ABC 

3from typing import List, Callable 

4import numpy as np 

5import numpy.typing as npt 

6 

7from scipy import sparse 

8 

9from suppy.utils import Bounds 

10from suppy.utils import LinearMapping 

11from suppy.utils import ensure_float_array 

12from suppy.projections._projections import Projection 

13 

14try: 

15 import cupy as cp 

16 

17 NO_GPU = False 

18 

19except ImportError: 

20 NO_GPU = True 

21 cp = np 

22 

23 

24class Feasibility(Projection, ABC): 

25 """ 

26 Parameters 

27 ---------- 

28 algorithmic_relaxation : npt.NDArray or float, optional 

29 The relaxation parameter used by the algorithm, by default 1.0. 

30 relaxation : float, optional 

31 Outer relaxation parameter, applied to the entire solution of the 

32 iterate by default 1.0. 

33 proximity_flag : bool, optional 

34 A flag indicating whether to use this object for proximity 

35 calculations, by default True. 

36 

37 Attributes 

38 ---------- 

39 algorithmic_relaxation : npt.NDArray or float, optional 

40 The relaxation parameter used by the algorithm, by default 1.0. 

41 relaxation : float, optional 

42 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

43 proximity_flag : bool, optional 

44 Flag to indicate whether to calculate proximity, by default True. 

45 _use_gpu : bool, optional 

46 Flag to indicate whether to use GPU for computations, by default False. 

47 """ 

48 

49 def __init__( 

50 self, 

51 algorithmic_relaxation: npt.NDArray | float = 1.0, 

52 relaxation: float = 1.0, 

53 proximity_flag: bool = True, 

54 _use_gpu: bool = False, 

55 ): 

56 super().__init__(relaxation, proximity_flag, _use_gpu) 

57 self.algorithmic_relaxation = algorithmic_relaxation 

58 self.all_x = None 

59 self.proximities = None 

60 # self._n_tol = 0 # counter for stopping criterion 

61 

62 @ensure_float_array 

63 def solve( 

64 self, 

65 x: npt.NDArray, 

66 max_iter: int = 500, 

67 prox_tol: float = 1e-6, 

68 del_prox_tol: float = 1e-8, 

69 del_prox_n: int = 5, 

70 storage: bool = False, 

71 storage_iters: List[int] | int | None = None, 

72 proximity_measures: List | None = None, 

73 alternative_stopping_criterion: Callable | None = None, 

74 alternative_stopping_criterion_initial_call: Callable | None = None, 

75 ) -> np.ndarray: 

76 """ 

77 Solves the optimization problem using an iterative approach. 

78 

79 Parameters 

80 ---------- 

81 x : npt.NDArray 

82 Starting point for the algorithm. 

83 max_iter : int, optional 

84 Maximum number of iterations to perform, by default 500. 

85 prox_tol : float, optional 

86 The tolerance for the proximity on the constraints, by default 1e-6. 

87 del_prox_tol : float, optional 

88 The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8. 

89 del_prox_n : int, optional 

90 The number of iterations that del_prox_tol needs to be met in a row, by default 5. 

91 proximity_measures : List, optional 

92 The proximity measures to calculate, by default a l2 norm measure is used. Right now only the first in the list is used to check the feasibility. 

93 storage : bool, optional 

94 Flag indicating whether to store intermediate solutions, by default False. 

95 storage_iters : List[int] or int, optional 

96 Controls which iterations are stored (when storage=True). If None, all iterations are stored. 

97 If a list of ints, only those iteration indices are stored (0 = initial point). 

98 If an int, storage occurs every that many iterations. 

99 alternative_stopping_criterion : callable, optional 

100 Alternative stopping criterion 

101 alternative_stopping_criterion_initial_call : callable, optional 

102 Initial call for an alternative stopping criterion 

103 

104 Returns 

105 ------- 

106 npt.NDArray 

107 The solution after the iterative process. 

108 """ 

109 xp = cp if isinstance(x, cp.ndarray) else np 

110 self._n_tol = 0 

111 

112 if proximity_measures is None: 

113 proximity_measures = [("p_norm", 2)] 

114 

115 self.proximities = [self.proximity(x, proximity_measures)] 

116 i = 0 

117 

118 def _should_store(idx): 

119 if storage_iters is None: 

120 return True 

121 if isinstance(storage_iters, int): 

122 return idx % storage_iters == 0 

123 return idx in storage_iters 

124 

125 to_host = ( 

126 (lambda v: np.array(v.copy())) if isinstance(x, np.ndarray) else (lambda v: v.get()) 

127 ) 

128 

129 if storage is True: 

130 self.all_x = [] 

131 if _should_store(0): 

132 self.all_x.append(to_host(x)) 

133 

134 if alternative_stopping_criterion_initial_call is not None: 

135 stop = alternative_stopping_criterion_initial_call(x, self) 

136 else: 

137 stop = False # criterion for stopping the algorithm 

138 

139 while i < max_iter and not stop: 

140 x = self.project(x) 

141 if storage is True and _should_store(i + 1): 

142 self.all_x.append(to_host(x)) 

143 self.proximities.append(self.proximity(x, proximity_measures)) 

144 

145 if alternative_stopping_criterion is not None: 

146 stop = alternative_stopping_criterion(x, self) 

147 else: 

148 stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n) 

149 

150 i += 1 

151 

152 if self.all_x is not None: 

153 self.all_x = np.array(self.all_x) 

154 

155 self.proximities = xp.array(self.proximities) 

156 

157 return x 

158 

159 def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: float): 

160 """""" 

161 if self.proximities[-1][0] < prox_tol: # proximity below goal/tolerance 

162 return True 

163 else: # check that last n proximity changes are below a threshold 

164 if self.proximities[-2][0] - self.proximities[-1][0] < del_prox_tol: 

165 self._n_tol += 1 

166 else: 

167 self._n_tol = 0 

168 if self._n_tol >= del_prox_n: # n proximity changes below threshold 

169 return True 

170 return False 

171 

172 

173class LinearFeasibility(Feasibility, ABC): 

174 """ 

175 LinearFeasibility class for handling linear feasibility problems. 

176 

177 Parameters 

178 ---------- 

179 A : npt.NDArray or sparse.sparray 

180 Matrix for linear systems 

181 algorithmic_relaxation : npt.NDArray or float, optional 

182 The relaxation parameter used by the algorithm, by default 1.0. 

183 relaxation : float, optional 

184 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

185 proximity_flag : bool, optional 

186 Flag indicating whether to consider this object for proximity calculations. If False proximity calculations will always return 0, by default True. 

187 

188 Attributes 

189 ---------- 

190 A : LinearMapping 

191 Matrix for linear system, represented through internal object. 

192 algorithmic_relaxation : npt.NDArray or float, optional 

193 The relaxation parameter used by the algorithm, by default 1.0. 

194 relaxation : float, optional 

195 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

196 proximity_flag : bool, optional 

197 Flag indicating whether to consider this object for proximity calculations. If False proximity calculations will always return 0, by default True. 

198 _use_gpu : bool, optional 

199 Flag to indicate whether to use GPU for computations, by default False. 

200 """ 

201 

202 def __init__( 

203 self, 

204 A: npt.NDArray | sparse.sparray, 

205 algorithmic_relaxation: npt.NDArray | float = 1.0, 

206 relaxation: float = 1.0, 

207 proximity_flag: bool = True, 

208 ): 

209 _, _use_gpu = LinearMapping.get_flags(A) 

210 super().__init__(algorithmic_relaxation, relaxation, proximity_flag, _use_gpu) 

211 self.A = LinearMapping(A) 

212 self.inverse_row_norm = 1 / self.A.row_norm(2, 2) 

213 

214 def map(self, x: npt.NDArray) -> np.ndarray: 

215 """ 

216 Applies the linear mapping to the input array x. 

217 

218 Parameters 

219 ---------- 

220 x : npt.NDArray 

221 The input array to which the linear mapping is applied. 

222 

223 Returns 

224 ------- 

225 npt.NDArray 

226 The result of applying the linear mapping to the input array. 

227 """ 

228 return self.A @ x 

229 

230 def single_map(self, x: npt.NDArray, i: int) -> np.ndarray: 

231 """ 

232 Applies the linear mapping to the input array x at a specific index 

233 i. 

234 

235 Parameters 

236 ---------- 

237 x : npt.NDArray 

238 The input array to which the linear mapping is applied. 

239 i : int 

240 The specific index at which the linear mapping is applied. 

241 

242 Returns 

243 ------- 

244 npt.NDArray 

245 The result of applying the linear mapping to the input array at the specified index. 

246 """ 

247 return self.A.single_map(x, i) 

248 

249 def indexed_map(self, x: npt.NDArray, idx: List[int] | npt.NDArray) -> np.ndarray: 

250 """ 

251 Applies the linear mapping to the input array x at multiple 

252 specified indices. 

253 

254 Parameters 

255 ---------- 

256 x : npt.NDArray 

257 The input array to which the linear mapping is applied. 

258 idx : List[int] or npt.NDArray 

259 The indices at which the linear mapping is applied. 

260 

261 Returns 

262 ------- 

263 npt.NDArray 

264 The result of applying the linear mapping to the input array at the specified indices. 

265 """ 

266 return self.A.index_map(x, idx) 

267 

268 

269class HyperplaneFeasibility(LinearFeasibility, ABC): 

270 """ 

271 HyperplaneFeasibility class for solving halfspace feasibility problems. 

272 

273 Parameters 

274 ---------- 

275 A : npt.NDArray or sparse.sparray 

276 Matrix for linear systems 

277 b : npt.NDArray 

278 Bound for linear systems 

279 algorithmic_relaxation : npt.NDArray or float, optional 

280 The relaxation parameter used by the algorithm, by default 1.0. 

281 relaxation : float, optional 

282 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

283 proximity_flag : bool, optional 

284 Flag indicating whether to use proximity, by default True. 

285 

286 Attributes 

287 ---------- 

288 A : LinearMapping 

289 Matrix for linear system (stored in internal LinearMapping object). 

290 b : npt.NDArray 

291 Bound for linear systems 

292 algorithmic_relaxation : npt.NDArray or float, optional 

293 The relaxation parameter used by the algorithm, by default 1.0. 

294 relaxation : float, optional 

295 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

296 proximity_flag : bool, optional 

297 Flag to indicate whether to calculate proximity, by default True. 

298 _use_gpu : bool, optional 

299 Flag to indicate whether to use GPU for computations, by default False. 

300 """ 

301 

302 def __init__( 

303 self, 

304 A: npt.NDArray | sparse.sparray, 

305 b: npt.NDArray, 

306 algorithmic_relaxation: npt.NDArray | float = 1.0, 

307 relaxation: float = 1.0, 

308 proximity_flag: bool = True, 

309 ): 

310 super().__init__(A, algorithmic_relaxation, relaxation, proximity_flag) 

311 try: 

312 len(b) 

313 if A.shape[0] != len(b): 

314 raise ValueError("Matrix A and vector b must have the same number of rows.") 

315 except TypeError: 

316 # create an array for b if it is a scalar 

317 if not self.A.gpu: 

318 b = np.ones(A.shape[0]) * b 

319 else: 

320 b = cp.ones(A.shape[0]) * b 

321 self.b = b 

322 

323 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

324 p = self.map(x) 

325 res = abs(self.b - p) 

326 measures = [] 

327 for measure in proximity_measures: 

328 if isinstance(measure, tuple): 

329 if measure[0] == "p_norm": 

330 measures.append(1 / len(res) * (res ** measure[1]).sum()) 

331 else: 

332 raise ValueError("Invalid proximity measure") 

333 elif isinstance(measure, str) and measure == "max_norm": 

334 measures.append(res.max()) 

335 else: 

336 raise ValueError("Invalid proximity measure") 

337 return measures 

338 

339 

340class HalfspaceFeasibility(LinearFeasibility, ABC): 

341 """ 

342 HalfspaceFeasibility class for solving halfspace feasibility problems. 

343 

344 Parameters 

345 ---------- 

346 A : npt.NDArray or sparse.sparray 

347 Matrix for linear systems 

348 b : npt.NDArray 

349 Bound for linear systems 

350 algorithmic_relaxation : npt.NDArray or float, optional 

351 The relaxation parameter used by the algorithm, by default 1.0. 

352 relaxation : float, optional 

353 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

354 proximity_flag : bool, optional 

355 Flag indicating whether to use proximity, by default True. 

356 

357 Attributes 

358 ---------- 

359 A : LinearMapping 

360 Matrix for linear system (stored in internal LinearMapping object). 

361 b : npt.NDArray 

362 Bound for linear systems 

363 algorithmic_relaxation : npt.NDArray or float, optional 

364 The relaxation parameter used by the algorithm, by default 1.0. 

365 relaxation : float, optional 

366 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

367 proximity_flag : bool, optional 

368 Flag to indicate whether to calculate proximity, by default True. 

369 _use_gpu : bool, optional 

370 Flag to indicate whether to use GPU for computations, by default False. 

371 """ 

372 

373 def __init__( 

374 self, 

375 A: npt.NDArray | sparse.sparray, 

376 b: npt.NDArray, 

377 algorithmic_relaxation: npt.NDArray | float = 1.0, 

378 relaxation: float = 1.0, 

379 proximity_flag: bool = True, 

380 ): 

381 super().__init__(A, algorithmic_relaxation, relaxation, proximity_flag) 

382 try: 

383 len(b) 

384 if A.shape[0] != len(b): 

385 raise ValueError("Matrix A and vector b must have the same number of rows.") 

386 except TypeError: 

387 # create an array for b if it is a scalar 

388 if not self.A.gpu: 

389 b = np.ones(A.shape[0]) * b 

390 else: 

391 b = cp.ones(A.shape[0]) * b 

392 self.b = b 

393 

394 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

395 p = self.map(x) 

396 res = self.b - p 

397 res[res > 0] = 0 

398 res = -res 

399 

400 measures = [] 

401 for measure in proximity_measures: 

402 if isinstance(measure, tuple): 

403 if measure[0] == "p_norm": 

404 measures.append(1 / len(res) * (res ** measure[1]).sum()) 

405 else: 

406 raise ValueError("Invalid proximity measure") 

407 elif isinstance(measure, str) and measure == "max_norm": 

408 measures.append(res.max()) 

409 else: 

410 raise ValueError("Invalid proximity measure") 

411 return measures 

412 

413 

414class HyperslabFeasibility(LinearFeasibility, ABC): 

415 """ 

416 A class used for solving feasibility problems for hyperslabs. 

417 

418 Parameters 

419 ---------- 

420 A : npt.NDArray or sparse.sparray 

421 Matrix for linear systems 

422 lb : npt.NDArray 

423 The lower bounds for the hyperslab. 

424 ub : npt.NDArray 

425 The upper bounds for the hyperslab. 

426 algorithmic_relaxation : npt.NDArray or float, optional 

427 The relaxation parameter used by the algorithm, by default 1.0. 

428 relaxation : float, optional 

429 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

430 proximity_flag : bool, optional 

431 A flag indicating whether to use proximity, by default True. 

432 

433 Attributes 

434 ---------- 

435 bounds : Bounds 

436 Objective for handling the upper and lower bounds of the hyperslab. 

437 A : LinearMapping 

438 Matrix for linear system (stored in internal LinearMapping object). 

439 algorithmic_relaxation : npt.NDArray or float, optional 

440 The relaxation parameter used by the algorithm, by default 1.0. 

441 relaxation : float, optional 

442 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

443 proximity_flag : bool, optional 

444 Flag to indicate whether to calculate proximity, by default True. 

445 _use_gpu : bool, optional 

446 Flag to indicate whether to use GPU for computations, by default False. 

447 """ 

448 

449 def __init__( 

450 self, 

451 A: npt.NDArray, 

452 lb: npt.NDArray, 

453 ub: npt.NDArray, 

454 algorithmic_relaxation: npt.NDArray | float = 1.0, 

455 relaxation: float = 1.0, 

456 proximity_flag: bool = True, 

457 ): 

458 super().__init__(A, algorithmic_relaxation, relaxation, proximity_flag) 

459 self.bounds = Bounds(lb, ub) 

460 if self.A.shape[0] != len(self.bounds.l): 

461 raise ValueError("Matrix A and bound vector must have the same number of rows.") 

462 

463 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

464 p = self.map(x) 

465 (res_l, res_u) = self.bounds.residual(p) 

466 res_l[res_l > 0] = 0 

467 res_u[res_u > 0] = 0 

468 res = -res_l - res_u 

469 

470 measures = [] 

471 for measure in proximity_measures: 

472 if isinstance(measure, tuple): 

473 if measure[0] == "p_norm": 

474 measures.append(1 / len(res) * (res ** measure[1]).sum()) 

475 else: 

476 raise ValueError("Invalid proximity measure") 

477 elif isinstance(measure, str) and measure == "max_norm": 

478 measures.append(res.max()) 

479 else: 

480 raise ValueError("Invalid proximity measure") 

481 return measures