Coverage for suppy\feasibility\_halfspaces\_ams_algorithms.py: 92%

137 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1import warnings 

2from abc import ABC 

3from typing import List 

4import numpy as np 

5import numpy.typing as npt 

6 

7try: 

8 import cupy as cp 

9 

10 NO_GPU = False 

11 

12except ImportError: 

13 NO_GPU = True 

14 cp = np 

15 

16from suppy.feasibility._linear_algorithms import HalfspaceFeasibility 

17 

18 

19class HalfspaceAlgorithm(HalfspaceFeasibility, ABC): 

20 """ 

21 The HalfspaceAlgorithm class is used to find a feasible solution to a 

22 set of linear inequalities. 

23 

24 Parameters 

25 ---------- 

26 A : npt.NDArray or sparse.sparray 

27 Matrix for linear systems 

28 b : npt.NDArray 

29 Bound for linear systems 

30 algorithmic_relaxation : npt.NDArray or float, optional 

31 The relaxation parameter used by the algorithm, by default 1.0. 

32 relaxation : float, optional 

33 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

34 proximity_flag : bool, optional 

35 A flag indicating whether to use proximity in the algorithm, by default True. 

36 """ 

37 

38 def __init__( 

39 self, 

40 A: npt.NDArray, 

41 b: npt.NDArray, 

42 algorithmic_relaxation: npt.NDArray | float = 1.0, 

43 relaxation: float = 1.0, 

44 proximity_flag: bool = True, 

45 ): 

46 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

47 

48 

49class SequentialAMSHalfspace(HalfspaceAlgorithm): 

50 """ 

51 SequentialAMS class for sequentially applying the AMS algorithm. 

52 

53 Parameters 

54 ---------- 

55 A : npt.NDArray or sparse.sparray 

56 Matrix for linear systems 

57 b : npt.NDArray 

58 Bound for linear systems 

59 algorithmic_relaxation : npt.NDArray or float, optional 

60 The relaxation parameter used by the algorithm, by default 1.0. 

61 relaxation : float, optional 

62 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

63 cs : None or List[int], optional 

64 The list of indices for the constraints, by default None. 

65 proximity_flag : bool, optional 

66 Flag to indicate if proximity should be considered, by default True. 

67 """ 

68 

69 def __init__( 

70 self, 

71 A: npt.NDArray, 

72 b: npt.NDArray, 

73 algorithmic_relaxation: npt.NDArray | float = 1.0, 

74 relaxation: float = 1.0, 

75 cs: None | List[int] = None, 

76 proximity_flag: bool = True, 

77 ): 

78 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

79 xp = cp if self._use_gpu else np 

80 if cs is None: 

81 self.cs = xp.arange(self.A.shape[0]) 

82 else: 

83 self.cs = cs 

84 

85 def _project(self, x: npt.NDArray) -> np.ndarray: 

86 """ 

87 Projects the input array `x` onto the feasible region defined by the 

88 constraints. 

89 

90 Parameters 

91 ---------- 

92 x : npt.NDArray 

93 The input array to be projected. 

94 

95 Returns 

96 ------- 

97 npt.NDArray 

98 The projected array. 

99 """ 

100 for i in self.cs: 

101 p_i = self.single_map(x, i) 

102 res = self.b[i] - p_i 

103 if res < 0: 

104 self.A.update_step( 

105 x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res, i 

106 ) 

107 return x 

108 

109 

110class SequentialWeightedAMSHalfspace(SequentialAMSHalfspace): 

111 """ 

112 Parameters 

113 ---------- 

114 A : npt.NDArray or sparse.sparray 

115 Matrix for linear systems 

116 b : npt.NDArray 

117 Bound for linear systems 

118 weights : None, list of float, or npt.NDArray, optional 

119 The weights assigned to each constraint. If None, default weights are 

120 used. 

121 algorithmic_relaxation : npt.NDArray or float, optional 

122 The relaxation parameter used by the algorithm, by default 1.0. 

123 relaxation : float, optional 

124 Outer relaxation parameter, applied to the entire solution of the 

125 iterate by default 1.0. 

126 weight_decay : float, optional 

127 Parameter that determines the rate at which the weights are reduced 

128 after each phase (weights * weight_decay). Default is 1.0. 

129 cs : None or list of int, optional 

130 The indices of the constraints to be considered. Default is None. 

131 proximity_flag : bool, optional 

132 Flag to indicate if proximity should be considered. Default is True. 

133 

134 Attributes 

135 ---------- 

136 weights : npt.NDArray 

137 The weights assigned to each constraint. 

138 weight_decay : float 

139 Decay rate for the weights. 

140 temp_weight_decay : float 

141 Initial value for weight decay. 

142 """ 

143 

144 def __init__( 

145 self, 

146 A: npt.NDArray, 

147 b: npt.NDArray, 

148 weights: None | List[float] | npt.NDArray = None, 

149 algorithmic_relaxation: npt.NDArray | float = 1.0, 

150 relaxation: float = 1.0, 

151 weight_decay: float = 1.0, 

152 cs: None | List[int] = None, 

153 proximity_flag: bool = True, 

154 ): 

155 super().__init__(A, b, algorithmic_relaxation, relaxation, cs, proximity_flag) 

156 xp = cp if self._use_gpu else np 

157 self.weight_decay = weight_decay 

158 self.temp_weight_decay = 1.0 

159 

160 if weights is None: 

161 self.weights = xp.ones(self.A.shape[0]) 

162 else: 

163 self.weights = weights 

164 

165 def _project(self, x: npt.NDArray) -> np.ndarray: 

166 """ 

167 Projects the input array `x` onto a feasible region defined by the 

168 constraints. 

169 

170 Parameters 

171 ---------- 

172 x : npt.NDArray 

173 The input array to be projected. 

174 

175 Returns 

176 ------- 

177 npt.NDArray 

178 The projected array. 

179 

180 Notes 

181 ----- 

182 This method iteratively adjusts the input array `x` based on the constraints 

183 defined in `self.cs`. For each constraint, it computes the projection and 

184 checks if the constraints are violated. If a constraint is violated, it updates 

185 the array `x` using a weighted relaxation factor. The weight decay is applied 

186 to the temporary weight decay after each iteration. 

187 """ 

188 weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay 

189 

190 for i in self.cs: 

191 p_i = self.single_map(x, i) 

192 res = self.b[i] - p_i 

193 if res < 0: 

194 self.A.update_step( 

195 x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res, i 

196 ) 

197 

198 self.temp_weight_decay *= self.weight_decay 

199 return x 

200 

201 

202class SimultaneousAMSHalfspace(HalfspaceAlgorithm): 

203 """ 

204 SimultaneousAMS is an implementation of the AMS (Alternating 

205 Minimization 

206 Scheme) algorithm that performs simultaneous projections and proximity 

207 calculations. 

208 

209 Parameters 

210 ---------- 

211 A : npt.NDArray or sparse.sparray 

212 Matrix for linear systems 

213 b : npt.NDArray 

214 Bound for linear systems 

215 algorithmic_relaxation : npt.NDArray or float, optional 

216 The relaxation parameter used by the algorithm, by default 1.0. 

217 relaxation : float, optional 

218 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

219 weights : None or List[float], optional 

220 The weights for the constraints, by default None. 

221 proximity_flag : bool, optional 

222 Flag to indicate if proximity calculations should be performed, by default True. 

223 """ 

224 

225 def __init__( 

226 self, 

227 A: npt.NDArray, 

228 b: npt.NDArray, 

229 algorithmic_relaxation: npt.NDArray | float = 1.0, 

230 relaxation: float = 1.0, 

231 weights: None | List[float] = None, 

232 proximity_flag: bool = True, 

233 ): 

234 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

235 

236 xp = cp if self._use_gpu else np 

237 

238 if weights is None: 

239 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

240 elif xp.abs((weights.sum() - 1)) > 1e-10: 

241 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2) 

242 self.weights = weights / weights.sum() 

243 else: 

244 self.weights = weights 

245 

246 def _project(self, x: npt.NDArray) -> np.ndarray: 

247 # simultaneous projection 

248 p = self.map(x) 

249 res = self.b - p 

250 res_idx = res < 0 

251 x += self.algorithmic_relaxation * ( 

252 self.weights[res_idx] 

253 * self.inverse_row_norm[res_idx] 

254 * res[res_idx] 

255 @ self.A[res_idx, :] 

256 ) 

257 return x 

258 

259 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

260 p = self.map(x) 

261 res = self.b - p 

262 res[res > 0] = 0 

263 res = -res 

264 

265 measures = [] 

266 for measure in proximity_measures: 

267 if isinstance(measure, tuple): 

268 if measure[0] == "p_norm": 

269 measures.append(self.weights @ (res ** measure[1])) 

270 else: 

271 raise ValueError("Invalid proximity measure") 

272 elif isinstance(measure, str) and measure == "max_norm": 

273 measures.append(res.max()) 

274 else: 

275 raise ValueError("Invalid proximity measure") 

276 return measures 

277 

278 

279class BlockIterativeAMSHalfspace(HalfspaceAlgorithm): 

280 """ 

281 Block Iterative AMS Algorithm. 

282 This class implements a block iterative version of the AMS (Alternating 

283 Minimization Scheme) algorithm. It is designed to handle constraints and 

284 weights in a block-wise manner. 

285 

286 Parameters 

287 ---------- 

288 A : npt.NDArray or sparse.sparray 

289 Matrix for linear systems 

290 b : npt.NDArray 

291 Bound for linear systems 

292 weights : List[List[float]] or List[npt.NDArray] 

293 A list of lists or arrays representing the weights for each block. Each list/array should sum to 1. 

294 algorithmic_relaxation : npt.NDArray or float, optional 

295 The relaxation parameter used by the algorithm, by default 1.0. 

296 relaxation : float, optional 

297 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

298 proximity_flag : bool, optional 

299 A flag indicating whether to use proximity measures, by default True. 

300 

301 Raises 

302 ------ 

303 ValueError 

304 If any of the weight lists do not sum to 1. 

305 """ 

306 

307 def __init__( 

308 self, 

309 A: npt.NDArray, 

310 b: npt.NDArray, 

311 weights: List[List[float]] | List[npt.NDArray], 

312 algorithmic_relaxation: npt.NDArray | float = 1.0, 

313 relaxation: float = 1.0, 

314 proximity_flag: bool = True, 

315 ): 

316 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

317 

318 xp = cp if self._use_gpu else np 

319 

320 # check that weights is a list of lists that add up to 1 each 

321 for el in weights: 

322 if xp.abs((xp.sum(el) - 1)) > 1e-10: 

323 raise ValueError("Weights do not add up to 1!") 

324 

325 self.weights = [] 

326 self.block_idxs = [ 

327 xp.where(xp.array(el) > 0)[0] for el in weights 

328 ] # get idxs that meet requirements 

329 

330 # assemble a list of general weights 

331 self.total_weights = xp.zeros_like(weights[0]) 

332 for el in weights: 

333 el = xp.asarray(el) 

334 self.weights.append(el[xp.array(el) > 0]) # remove non zero weights 

335 self.total_weights += el / len(weights) 

336 

337 def _project(self, x: npt.NDArray) -> np.ndarray: 

338 # simultaneous projection 

339 for el, block_idx in zip(self.weights, self.block_idxs): 

340 p = self.indexed_map(x, block_idx) 

341 res = self.b[block_idx] - p 

342 

343 res_idx = res < 0 

344 full_idx = block_idx[res_idx] 

345 

346 x += self.algorithmic_relaxation * ( 

347 el[res_idx] * self.inverse_row_norm[full_idx] * res[res_idx] @ self.A[full_idx, :] 

348 ) 

349 

350 return x 

351 

352 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

353 p = self.map(x) 

354 res = self.b - p 

355 res[res > 0] = 0 

356 res = -res 

357 

358 measures = [] 

359 for measure in proximity_measures: 

360 if isinstance(measure, tuple): 

361 if measure[0] == "p_norm": 

362 measures.append(self.total_weights @ (res ** measure[1])) 

363 else: 

364 raise ValueError("Invalid proximity measure") 

365 elif isinstance(measure, str) and measure == "max_norm": 

366 measures.append(res.max()) 

367 else: 

368 raise ValueError("Invalid proximity measure") 

369 return measures 

370 

371 

372class StringAveragedAMSHalfspace(HalfspaceAlgorithm): 

373 """ 

374 StringAveragedAMS is an implementation of the HalfspaceAlgorithm that 

375 performs string averaged projections. 

376 

377 Parameters 

378 ---------- 

379 A : npt.NDArray or sparse.sparray 

380 Matrix for linear systems 

381 b : npt.NDArray 

382 Bound for linear systems 

383 strings : List[List[int]] 

384 A list of lists, where each inner list represents a string of indices. 

385 algorithmic_relaxation : npt.NDArray or float, optional 

386 The relaxation parameter used by the algorithm, by default 1.0. 

387 relaxation : float, optional 

388 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

389 weights : None or List[float], optional 

390 The weights for each string, by default None. If None, equal weights are assigned. 

391 proximity_flag : bool, optional 

392 A flag indicating whether to use proximity, by default True. 

393 """ 

394 

395 def __init__( 

396 self, 

397 A: npt.NDArray, 

398 b: npt.NDArray, 

399 strings: List[List[int]], 

400 algorithmic_relaxation: npt.NDArray | float = 1.0, 

401 relaxation: float = 1.0, 

402 weights: None | List[float] = None, 

403 proximity_flag: bool = True, 

404 ): 

405 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

406 xp = cp if self._use_gpu else np 

407 self.strings = strings 

408 if weights is None: 

409 self.weights = xp.ones(len(strings)) / len(strings) 

410 else: 

411 if len(weights) != len(self.strings): 

412 raise ValueError("The number of weights must be equal to the number of strings.") 

413 self.weights = weights 

414 

415 def _project(self, x): 

416 # string averaged projection 

417 x_c = x.copy() # create a general copy of x 

418 x -= x # reset x is this viable? 

419 for string, weight in zip(self.strings, self.weights): 

420 x_s = x_c.copy() 

421 for i in string: 

422 p_i = self.single_map(x_s, i) 

423 res_i = self.b[i] - p_i 

424 if res_i < 0: 

425 self.A.update_step( 

426 x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_i, i 

427 ) 

428 x += weight * x_s 

429 return x