Coverage for suppy\feasibility\_hyperplanes\_kaczmarz_algorithms.py: 91%

127 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1import warnings 

2from abc import ABC 

3from typing import List 

4import numpy as np 

5import numpy.typing as npt 

6 

7try: 

8 import cupy as cp 

9 

10 NO_GPU = False 

11 

12except ImportError: 

13 NO_GPU = True 

14 cp = np 

15 

16from suppy.feasibility._linear_algorithms import HyperplaneFeasibility 

17 

18 

19class HyperplaneAlgorithm(HyperplaneFeasibility, ABC): 

20 """ 

21 The HyperplaneAlgorithm class is used to find a feasible solution to 

22 a set of linear equalities. 

23 

24 Parameters 

25 ---------- 

26 A : npt.NDArray or sparse.sparray 

27 Matrix for linear systems 

28 b : npt.NDArray 

29 Bound for linear systems 

30 algorithmic_relaxation : npt.NDArray or float, optional 

31 The relaxation parameter used by the algorithm, by default 1.0. 

32 relaxation : float, optional 

33 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

34 proximity_flag : bool, optional 

35 A flag indicating whether to use proximity in the algorithm, by default True. 

36 """ 

37 

38 def __init__( 

39 self, 

40 A: npt.NDArray, 

41 b: npt.NDArray, 

42 algorithmic_relaxation: npt.NDArray | float = 1.0, 

43 relaxation: float = 1.0, 

44 proximity_flag: bool = True, 

45 ): 

46 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

47 

48 

49class KaczmarzMethod(HyperplaneAlgorithm): 

50 """ 

51 Kaczmarz method for sequentially solving linear equality constraints. 

52 

53 Parameters 

54 ---------- 

55 A : npt.NDArray or sparse.sparray 

56 Matrix for linear systems 

57 b : npt.NDArray 

58 Bound for linear systems 

59 algorithmic_relaxation : npt.NDArray or float, optional 

60 The relaxation parameter used by the algorithm, by default 1.0. 

61 relaxation : float, optional 

62 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

63 cs : None or List[int], optional 

64 The list of indices for the constraints, by default None. 

65 proximity_flag : bool, optional 

66 Flag to indicate if proximity should be considered, by default True. 

67 """ 

68 

69 def __init__( 

70 self, 

71 A: npt.NDArray, 

72 b: npt.NDArray, 

73 algorithmic_relaxation: npt.NDArray | float = 1.0, 

74 relaxation: float = 1.0, 

75 cs: None | List[int] = None, 

76 proximity_flag: bool = True, 

77 ): 

78 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

79 xp = cp if self._use_gpu else np 

80 if cs is None: 

81 self.cs = xp.arange(self.A.shape[0]) 

82 else: 

83 self.cs = cs 

84 

85 def _project(self, x: npt.NDArray) -> np.ndarray: 

86 """ 

87 Projects the input array `x` onto the feasible region defined by the 

88 constraints. 

89 

90 Parameters 

91 ---------- 

92 x : npt.NDArray 

93 The input array to be projected. 

94 

95 Returns 

96 ------- 

97 npt.NDArray 

98 The projected array. 

99 """ 

100 for i in self.cs: 

101 p_i = self.single_map(x, i) 

102 res = self.b[i] - p_i 

103 self.A.update_step(x, self.algorithmic_relaxation * self.inverse_row_norm[i] * res, i) 

104 return x 

105 

106 

107class SequentialWeightedKaczmarz(KaczmarzMethod): 

108 """ 

109 Parameters 

110 ---------- 

111 A : npt.NDArray or sparse.sparray 

112 Matrix for linear systems 

113 b : npt.NDArray 

114 Bound for linear systems 

115 weights : None, list of float, or npt.NDArray, optional 

116 The weights assigned to each constraint. If None, default weights are 

117 used. 

118 algorithmic_relaxation : npt.NDArray or float, optional 

119 The relaxation parameter used by the algorithm, by default 1.0. 

120 relaxation : float, optional 

121 Outer relaxation parameter, applied to the entire solution of the 

122 iterate by default 1.0. 

123 weight_decay : float, optional 

124 Parameter that determines the rate at which the weights are reduced 

125 after each phase (weights * weight_decay). Default is 1.0. 

126 cs : None or list of int, optional 

127 The indices of the constraints to be considered. Default is None. 

128 proximity_flag : bool, optional 

129 Flag to indicate if proximity should be considered. Default is True. 

130 

131 Attributes 

132 ---------- 

133 weights : npt.NDArray 

134 The weights assigned to each constraint. 

135 weight_decay : float 

136 Decay rate for the weights. 

137 temp_weight_decay : float 

138 Initial value for weight decay. 

139 """ 

140 

141 def __init__( 

142 self, 

143 A: npt.NDArray, 

144 b: npt.NDArray, 

145 weights: None | List[float] | npt.NDArray = None, 

146 algorithmic_relaxation: npt.NDArray | float = 1.0, 

147 relaxation: float = 1.0, 

148 weight_decay: float = 1.0, 

149 cs: None | List[int] = None, 

150 proximity_flag: bool = True, 

151 ): 

152 super().__init__(A, b, algorithmic_relaxation, relaxation, cs, proximity_flag) 

153 xp = cp if self._use_gpu else np 

154 self.weight_decay = weight_decay 

155 self.temp_weight_decay = 1.0 

156 

157 if weights is None: 

158 self.weights = xp.ones(self.A.shape[0]) 

159 else: 

160 self.weights = weights 

161 

162 def _project(self, x: npt.NDArray) -> np.ndarray: 

163 """ 

164 Projects the input array `x` onto a feasible region defined by the 

165 constraints. 

166 

167 Parameters 

168 ---------- 

169 x : npt.NDArray 

170 The input array to be projected. 

171 

172 Returns 

173 ------- 

174 npt.NDArray 

175 The projected array. 

176 

177 Notes 

178 ----- 

179 This method iteratively adjusts the input array `x` based on the constraints 

180 defined in `self.cs`. For each constraint, it computes the projection and 

181 checks if the constraints are violated. If a constraint is violated, it updates 

182 the array `x` using a weighted relaxation factor. The weight decay is applied 

183 to the temporary weight decay after each iteration. 

184 """ 

185 weighted_relaxation = self.algorithmic_relaxation * self.temp_weight_decay 

186 

187 for i in self.cs: 

188 p_i = self.single_map(x, i) 

189 res = self.b[i] - p_i 

190 self.A.update_step( 

191 x, weighted_relaxation * self.weights[i] * self.inverse_row_norm[i] * res, i 

192 ) 

193 

194 self.temp_weight_decay *= self.weight_decay 

195 return x 

196 

197 

198class SimultaneousKaczmarzMethod(HyperplaneAlgorithm): 

199 """ 

200 SimultaneousKaczmarzMethod is an implementation of the Kaczmarz 

201 algorithm 

202 that performs simultaneous projections and proximity calculations. 

203 

204 Parameters 

205 ---------- 

206 A : npt.NDArray or sparse.sparray 

207 Matrix for linear systems 

208 b : npt.NDArray 

209 Bound for linear systems 

210 algorithmic_relaxation : npt.NDArray or float, optional 

211 The relaxation parameter used by the algorithm, by default 1.0. 

212 relaxation : float, optional 

213 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

214 weights : None or List[float], optional 

215 The weights for the constraints, by default None. 

216 proximity_flag : bool, optional 

217 Flag to indicate if proximity calculations should be performed, by default True. 

218 """ 

219 

220 def __init__( 

221 self, 

222 A: npt.NDArray, 

223 b: npt.NDArray, 

224 algorithmic_relaxation: npt.NDArray | float = 1.0, 

225 relaxation: float = 1.0, 

226 weights: None | List[float] = None, 

227 proximity_flag: bool = True, 

228 ): 

229 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

230 

231 xp = cp if self._use_gpu else np 

232 

233 if weights is None: 

234 self.weights = xp.ones(self.A.shape[0]) / self.A.shape[0] 

235 elif xp.abs((weights.sum() - 1)) > 1e-10: 

236 warnings.warn("Weights do not add up to 1! Renormalizing to 1...", stacklevel=2) 

237 self.weights = weights / weights.sum() 

238 else: 

239 self.weights = weights 

240 

241 def _project(self, x: npt.NDArray) -> np.ndarray: 

242 # simultaneous projection 

243 p = self.map(x) 

244 res = self.b - p 

245 x += self.algorithmic_relaxation * (self.weights * self.inverse_row_norm * res @ self.A) 

246 return x 

247 

248 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

249 p = self.map(x) 

250 res = abs(self.b - p) 

251 measures = [] 

252 for measure in proximity_measures: 

253 if isinstance(measure, tuple): 

254 if measure[0] == "p_norm": 

255 measures.append(self.weights @ (res ** measure[1])) 

256 else: 

257 raise ValueError("Invalid proximity measure") 

258 elif isinstance(measure, str) and measure == "max_norm": 

259 measures.append(res.max()) 

260 else: 

261 raise ValueError("Invalid proximity measure") 

262 return measures 

263 

264 

265class BlockIterativeKaczmarz(HyperplaneAlgorithm): 

266 """ 

267 Block iterative Kaczmarz algorithm for solving linear equality 

268 constraints 

269 in a block-wise manner. 

270 

271 Parameters 

272 ---------- 

273 A : npt.NDArray or sparse.sparray 

274 Matrix for linear systems 

275 b : npt.NDArray 

276 Bound for linear systems 

277 weights : List[List[float]] or List[npt.NDArray] 

278 A list of lists or arrays representing the weights for each block. Each list/array should sum to 1. 

279 algorithmic_relaxation : npt.NDArray or float, optional 

280 The relaxation parameter used by the algorithm, by default 1.0. 

281 relaxation : float, optional 

282 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

283 proximity_flag : bool, optional 

284 A flag indicating whether to use proximity measures, by default True. 

285 

286 Raises 

287 ------ 

288 ValueError 

289 If any of the weight lists do not sum to 1. 

290 """ 

291 

292 def __init__( 

293 self, 

294 A: npt.NDArray, 

295 b: npt.NDArray, 

296 weights: List[List[float]] | List[npt.NDArray], 

297 algorithmic_relaxation: npt.NDArray | float = 1.0, 

298 relaxation: float = 1.0, 

299 proximity_flag: bool = True, 

300 ): 

301 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

302 

303 xp = cp if self._use_gpu else np 

304 

305 # check that weights is a list of lists that add up to 1 each 

306 for el in weights: 

307 if xp.abs((xp.sum(el) - 1)) > 1e-10: 

308 raise ValueError("Weights do not add up to 1!") 

309 

310 self.weights = [] 

311 self.block_idxs = [ 

312 xp.where(xp.array(el) > 0)[0] for el in weights 

313 ] # get idxs that meet requirements 

314 

315 # assemble a list of general weights 

316 self.total_weights = xp.zeros_like(weights[0]) 

317 for el in weights: 

318 el = xp.asarray(el) 

319 self.weights.append(el[xp.array(el) > 0]) # remove non zero weights 

320 self.total_weights += el / len(weights) 

321 

322 def _project(self, x: npt.NDArray) -> np.ndarray: 

323 # simultaneous projection 

324 for el, block_idx in zip(self.weights, self.block_idxs): 

325 p = self.indexed_map(x, block_idx) 

326 res = self.b[block_idx] - p 

327 

328 x += self.algorithmic_relaxation * ( 

329 el * self.inverse_row_norm[block_idx] * res @ self.A[block_idx, :] 

330 ) 

331 return x 

332 

333 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

334 p = self.map(x) 

335 res = abs(self.b - p) 

336 measures = [] 

337 for measure in proximity_measures: 

338 if isinstance(measure, tuple): 

339 if measure[0] == "p_norm": 

340 measures.append(self.total_weights @ (res ** measure[1])) 

341 else: 

342 raise ValueError("Invalid proximity measure") 

343 elif isinstance(measure, str) and measure == "max_norm": 

344 measures.append(res.max()) 

345 else: 

346 raise ValueError("Invalid proximity measure") 

347 return measures 

348 

349 

350class StringAveragedKaczmarz(HyperplaneAlgorithm): 

351 """ 

352 StringAveragedKaczmarz is an implementation of the HyperplaneAlgorithm 

353 that performs string averaged projections. 

354 

355 Parameters 

356 ---------- 

357 A : npt.NDArray or sparse.sparray 

358 Matrix for linear systems 

359 b : npt.NDArray 

360 Bound for linear systems 

361 strings : List[List[int]] 

362 A list of lists, where each inner list represents a string of indices. 

363 algorithmic_relaxation : npt.NDArray or float, optional 

364 The relaxation parameter used by the algorithm, by default 1.0. 

365 relaxation : float, optional 

366 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

367 weights : None or List[float], optional 

368 The weights for each string, by default None. If None, equal weights are assigned. 

369 proximity_flag : bool, optional 

370 A flag indicating whether to use proximity, by default True. 

371 """ 

372 

373 def __init__( 

374 self, 

375 A: npt.NDArray, 

376 b: npt.NDArray, 

377 strings: List[List[int]], 

378 algorithmic_relaxation: npt.NDArray | float = 1.0, 

379 relaxation: float = 1.0, 

380 weights: None | List[float] = None, 

381 proximity_flag: bool = True, 

382 ): 

383 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag) 

384 xp = cp if self._use_gpu else np 

385 self.strings = strings 

386 if weights is None: 

387 self.weights = xp.ones(len(strings)) / len(strings) 

388 else: 

389 if len(weights) != len(self.strings): 

390 raise ValueError("The number of weights must be equal to the number of strings.") 

391 self.weights = weights 

392 

393 def _project(self, x: npt.NDArray) -> np.ndarray: 

394 # string averaged projection 

395 x_c = x.copy() # create a general copy of x 

396 x -= x # reset x is this viable? 

397 for string, weight in zip(self.strings, self.weights): 

398 x_s = x_c.copy() # generate a copy for individual strings 

399 for i in string: 

400 p_i = self.single_map(x_s, i) 

401 res_i = self.b[i] - p_i 

402 self.A.update_step( 

403 x_s, self.algorithmic_relaxation * self.inverse_row_norm[i] * res_i, i 

404 ) 

405 x += weight * x_s 

406 return x