Coverage for suppy\feasibility\_bands\_arm_algorithms.py: 68%

123 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1import warnings 

2from abc import ABC 

3from typing import List 

4import numpy as np 

5import numpy.typing as npt 

6from suppy.feasibility._linear_algorithms import HyperslabFeasibility 

7 

8try: 

9 import cupy as cp 

10 

11 NO_GPU = False 

12 

13except ImportError: 

14 NO_GPU = True 

15 cp = np 

16 

17 

18class ARMAlgorithm(HyperslabFeasibility, ABC): 

19 """ 

20 ARMAlgorithm class for handling feasibility problems with additional 

21 algorithmic relaxation. 

22 

23 Parameters 

24 ---------- 

25 A : npt.NDArray or sparse.sparray 

26 Matrix for linear systems 

27 lb : npt.NDArray 

28 The lower bounds for the hyperslab. 

29 ub : npt.NDArray 

30 The upper bounds for the hyperslab. 

31 algorithmic_relaxation : npt.NDArray or float, optional 

32 The relaxation parameter used by the algorithm, by default 1.0. 

33 relaxation : float, optional 

34 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

35 proximity_flag : bool, optional 

36 Flag to indicate if proximity constraints should be considered, by default True. 

37 """ 

38 

39 def __init__( 

40 self, 

41 A: npt.NDArray, 

42 lb: npt.NDArray, 

43 ub: npt.NDArray, 

44 algorithmic_relaxation: npt.NDArray | float = 1.0, 

45 relaxation: float = 1.0, 

46 proximity_flag: bool = True, 

47 ): 

48 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

49 

50 

51class SequentialARM(ARMAlgorithm): 

52 """ 

53 SequentialARM is a class that implements a sequential algorithm for 

54 Adaptive Relaxation Method (ARM). 

55 

56 Parameters 

57 ---------- 

58 A : npt.NDArray or sparse.sparray 

59 Matrix for linear systems 

60 lb : npt.NDArray 

61 The lower bounds for the hyperslab. 

62 ub : npt.NDArray 

63 The upper bounds for the hyperslab. 

64 algorithmic_relaxation : npt.NDArray or float, optional 

65 The relaxation parameter used by the algorithm, by default 1.0. 

66 relaxation : float, optional 

67 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

68 cs : None or List[int], optional 

69 The list of indices for the constraints, by default None. 

70 proximity_flag : bool, optional 

71 Flag to indicate if proximity should be considered, by default True. 

72 """ 

73 

74 def __init__( 

75 self, 

76 A: npt.NDArray, 

77 lb: npt.NDArray, 

78 ub: npt.NDArray, 

79 algorithmic_relaxation: npt.NDArray | float = 1.0, 

80 relaxation: float = 1.0, 

81 cs: None | List[int] = None, 

82 proximity_flag: bool = True, 

83 ): 

84 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

85 xp = cp if self._use_gpu else np 

86 if cs is None: 

87 self.cs = xp.arange(self.A.shape[0]) 

88 else: 

89 self.cs = cs 

90 

91 def _project(self, x: npt.NDArray) -> np.ndarray: 

92 xp = cp if self._use_gpu else np 

93 

94 for i in self.cs: 

95 p_i = self.single_map(x, i) 

96 d = p_i - self.bounds.center[i] 

97 psi = (self.bounds.u[i] - self.bounds.l[i]) / 2 

98 if xp.abs(d) > psi: 

99 self.A.update_step( 

100 x, 

101 -1 

102 * self.algorithmic_relaxation 

103 / 2 

104 * self.inverse_row_norm[i] 

105 * ((d**2 - psi**2) / d), 

106 i, 

107 ) 

108 return x 

109 

110 

111class SimultaneousARM(ARMAlgorithm): 

112 """ 

113 SimultaneousARM is a class that implements an ARM (Adaptive Relaxation 

114 Method) algorithm 

115 for solving feasibility problems. 

116 

117 Parameters 

118 ---------- 

119 A : npt.NDArray or sparse.sparray 

120 Matrix for linear systems 

121 lb : npt.NDArray 

122 The lower bounds for the hyperslab. 

123 ub : npt.NDArray 

124 The upper bounds for the hyperslab. 

125 algorithmic_relaxation : npt.NDArray or float, optional 

126 The relaxation parameter used by the algorithm, by default 1.0. 

127 relaxation : float, optional 

128 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

129 weights : None, List[float], or npt.NDArray, optional 

130 The weights for the constraints. If None, weights are set to be uniform. Default is None. 

131 proximity_flag : bool, optional 

132 Flag to indicate whether to use proximity in the algorithm. Default is True. 

133 

134 Methods 

135 ------- 

136 _project(x) 

137 Performs the simultaneous projection of the input vector x. 

138 _proximity(x) 

139 Computes the proximity measure of the input vector x. 

140 """ 

141 

142 def __init__( 

143 self, 

144 A: npt.NDArray, 

145 lb: npt.NDArray, 

146 ub: npt.NDArray, 

147 algorithmic_relaxation: npt.NDArray | float = 1.0, 

148 relaxation: float = 1.0, 

149 weights: None | List[float] | npt.NDArray = None, 

150 proximity_flag: bool = True, 

151 ): 

152 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

153 xp = cp if self._use_gpu else np 

154 

155 if weights is None: 

156 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

157 elif xp.abs((weights.sum() - 1)) > 1e-10: 

158 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2) 

159 self.weights = weights / weights.sum() 

160 else: 

161 self.weights = weights 

162 

163 def _project(self, x): 

164 xp = cp if self._use_gpu else np 

165 # simultaneous projection 

166 p = self.map(x) 

167 d = p - self.bounds.center 

168 psi = self.bounds.half_distance 

169 d_idx = xp.abs(d) > psi 

170 x -= ( 

171 self.algorithmic_relaxation 

172 / 2 

173 * ( 

174 self.weights[d_idx] 

175 * self.inverse_row_norm[d_idx] 

176 * (d[d_idx] - (psi[d_idx] ** 2) / d[d_idx]) 

177 ) 

178 @ self.A[d_idx, :] 

179 ) 

180 

181 return x 

182 

183 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

184 p = self.map(x) 

185 (res_l, res_u) = self.bounds.residual(p) 

186 res_u[res_u > 0] = 0 

187 res_l[res_l > 0] = 0 

188 res = -res_u - res_l 

189 measures = [] 

190 for measure in proximity_measures: 

191 if isinstance(measure, tuple): 

192 if measure[0] == "p_norm": 

193 measures.append(self.weights @ (res ** measure[1])) 

194 else: 

195 raise ValueError("Invalid proximity measure") 

196 elif isinstance(measure, str) and measure == "max_norm": 

197 measures.append(res.max()) 

198 else: 

199 raise ValueError("Invalid proximity measure") 

200 return measures 

201 

202 

203class BIPARM(ARMAlgorithm): 

204 """ 

205 BIPARM Algorithm for feasibility problems. 

206 

207 Parameters 

208 ---------- 

209 A : npt.NDArray or sparse.sparray 

210 Matrix for linear systems 

211 lb : npt.NDArray 

212 The lower bounds for the hyperslab. 

213 ub : npt.NDArray 

214 The upper bounds for the hyperslab. 

215 algorithmic_relaxation : npt.NDArray or float, optional 

216 The relaxation parameter used by the algorithm, by default 1.0. 

217 relaxation : float, optional 

218 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

219 weights : None, List[float], or npt.NDArray, optional 

220 The weights for the constraints, by default None. 

221 proximity_flag : bool, optional 

222 Flag to indicate if proximity should be considered, by default True. 

223 

224 Methods 

225 ------- 

226 _project(x) 

227 Perform the simultaneous projection of x. 

228 _proximity(x) 

229 Calculate the proximity measure for x. 

230 """ 

231 

232 def __init__( 

233 self, 

234 A: npt.NDArray, 

235 lb: npt.NDArray, 

236 ub: npt.NDArray, 

237 algorithmic_relaxation: npt.NDArray | float = 1.0, 

238 relaxation: float = 1.0, 

239 weights: None | List[float] | npt.NDArray = None, 

240 proximity_flag: bool = True, 

241 ): 

242 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

243 xp = cp if self._use_gpu else np 

244 if weights is None: 

245 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

246 elif xp.abs((weights.sum() - 1)) > 1e-10: 

247 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2) 

248 self.weights = weights / weights.sum() 

249 else: 

250 self.weights = weights 

251 

252 def _project(self, x): 

253 xp = cp if self._use_gpu else np 

254 p = self.map(x) 

255 d = p - self.bounds.center 

256 psi = self.bounds.half_distance 

257 d_idx = xp.abs(d) > psi 

258 x -= ( 

259 self.algorithmic_relaxation 

260 / 2 

261 * ( 

262 self.weights[d_idx] 

263 * self.inverse_row_norm[d_idx] 

264 * (d[d_idx] - (psi[d_idx] ** 2) / d[d_idx]) 

265 ) 

266 @ self.A[d_idx, :] 

267 ) 

268 

269 return x 

270 

271 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

272 p = self.map(x) 

273 (res_l, res_u) = self.bounds.residual(p) 

274 res_u[res_u > 0] = 0 

275 res_l[res_l > 0] = 0 

276 res = -res_u - res_l 

277 measures = [] 

278 for measure in proximity_measures: 

279 if isinstance(measure, tuple): 

280 if measure[0] == "p_norm": 

281 measures.append(self.weights @ (res ** measure[1])) 

282 else: 

283 raise ValueError("Invalid proximity measure") 

284 elif isinstance(measure, str) and measure == "max_norm": 

285 measures.append(res.max()) 

286 else: 

287 raise ValueError("Invalid proximity measure") 

288 return measures 

289 

290 

291class StringAveragedARM(ARMAlgorithm): 

292 """ 

293 String Averaged ARM Algorithm. 

294 This class implements the String Averaged ARM (Adaptive Relaxation Method) 

295 algorithm, 

296 which is used for feasibility problems involving strings of indices. 

297 

298 Parameters 

299 ---------- 

300 A : npt.NDArray or sparse.sparray 

301 Matrix for linear systems 

302 lb : npt.NDArray 

303 The lower bounds for the hyperslab. 

304 ub : npt.NDArray 

305 The upper bounds for the hyperslab. 

306 strings : List[List[int]] 

307 A list of lists, where each inner list represents a string of indices. 

308 algorithmic_relaxation : npt.NDArray or float, optional 

309 The relaxation parameter used by the algorithm, by default 1.0. 

310 relaxation : float, optional 

311 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

312 weights : None or List[float], optional 

313 The weights for each string, by default None. If None, equal weights are assigned. 

314 proximity_flag : bool, optional 

315 A flag indicating whether to use proximity, by default True. 

316 

317 Methods 

318 ------- 

319 _project(x) 

320 Projects the input vector x using the string averaged projection method. 

321 

322 Raises 

323 ------ 

324 ValueError 

325 If the number of weights does not match the number of strings. 

326 """ 

327 

328 def __init__( 

329 self, 

330 A: npt.NDArray, 

331 lb: npt.NDArray, 

332 ub: npt.NDArray, 

333 strings: List[List[int]], 

334 algorithmic_relaxation: npt.NDArray | float = 1.0, 

335 relaxation: float = 1.0, 

336 weights: None | List[float] = None, 

337 proximity_flag: bool = True, 

338 ): 

339 super().__init__(A, lb, ub, algorithmic_relaxation, relaxation, proximity_flag) 

340 xp = cp if self._use_gpu else np 

341 self.strings = strings 

342 

343 if weights is None: 

344 self.weights = xp.ones(len(strings)) / len(strings) 

345 else: 

346 if len(weights) != len(self.strings): 

347 raise ValueError("The number of weights must be equal to the number of strings.") 

348 self.weights = weights 

349 

350 def _project(self, x): 

351 xp = cp if self._use_gpu else np 

352 # string averaged projection 

353 x_c = x.copy() # create a general copy of x 

354 x -= x # reset x is this viable? 

355 for string, weight in zip(self.strings, self.weights): 

356 x_s = x_c.copy() # generate a copy for individual strings 

357 for i in string: 

358 p_i = self.single_map(x_s, i) 

359 d = p_i - self.bounds.center[i] 

360 psi = (self.bounds.u[i] - self.bounds.l[i]) / 2 

361 if xp.abs(d) > psi: 

362 self.A.update_step( 

363 x_s, 

364 -1 

365 * self.algorithmic_relaxation 

366 / 2 

367 * ((d**2 - psi**2) / d) 

368 * self.inverse_row_norm[i], 

369 i, 

370 ) 

371 x += weight * x_s 

372 return x