Coverage for suppy\feasibility\_split_algorithms.py: 57%

127 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 13:56 +0200

1"""Algorithms for split feasibility problem.""" 

2import warnings 

3from abc import ABC, abstractmethod 

4from typing import List, Callable 

5import numpy as np 

6import numpy.typing as npt 

7from scipy import sparse 

8 

9try: 

10 import cupy as cp 

11 

12 NO_GPU = False 

13except ImportError: 

14 cp = np 

15 NO_GPU = True 

16 

17from suppy.utils import LinearMapping 

18from suppy.utils import ensure_float_array 

19from suppy.projections._projections import Projection 

20 

21from suppy.feasibility._linear_algorithms import Feasibility 

22 

23 

24class SplitFeasibility(Feasibility, ABC): 

25 """ 

26 Abstract base class used to represent split feasibility problems. 

27 

28 Parameters 

29 ---------- 

30 A : npt.NDArray 

31 Matrix connecting input and target space. 

32 algorithmic_relaxation : npt.NDArray or float, optional 

33 The relaxation parameter used by the algorithm, by default 1.0. 

34 proximity_flag : bool, optional 

35 A flag indicating whether to use this object for proximity calculations, by default True. 

36 

37 Attributes 

38 ---------- 

39 A : LinearMapping 

40 Linear mapping between input and target space. 

41 proximities : list 

42 A list to store proximity values during the solve process. 

43 algorithmic_relaxation : npt.NDArray or float, optional 

44 The relaxation parameter used by the algorithm, by default 1.0. 

45 proximity_flag : bool, optional 

46 A flag indicating whether to use this object for proximity calculations. 

47 relaxation : float, optional 

48 The relaxation parameter for the projection, by default 1.0. 

49 """ 

50 

51 def __init__( 

52 self, 

53 A: npt.NDArray | sparse.sparray, 

54 algorithmic_relaxation: npt.NDArray | float = 1.0, 

55 proximity_flag: bool = True, 

56 ): 

57 _, _use_gpu = LinearMapping.get_flags(A) 

58 super().__init__(algorithmic_relaxation, 1, proximity_flag, _use_gpu=_use_gpu) 

59 self.A = LinearMapping(A) 

60 self.proximities = [] 

61 self.all_x = None 

62 

63 @ensure_float_array 

64 def solve( 

65 self, 

66 x: npt.NDArray, 

67 max_iter: int = 10, 

68 prox_tol: float = 1e-6, 

69 del_prox_tol: float = 1e-8, 

70 del_prox_n: int = 5, 

71 storage: bool = False, 

72 storage_iters: List[int] | int | None = None, 

73 proximity_measures: List | None = None, 

74 alternative_stopping_criterion: Callable | None = None, 

75 alternative_stopping_criterion_initial_call: Callable | None = None, 

76 ) -> np.ndarray: 

77 """ 

78 Solves the split feasibility problem for a given input array. 

79 

80 Parameters 

81 ---------- 

82 x : npt.NDArray 

83 Starting point for the algorithm. 

84 max_iter : int, optional 

85 The maximum number of iterations (default is 10). 

86 prox_tol : float, optional 

87 Stopping criterium for the feasibility seeking algorithm. 

88 Solution deemed feasible if the proximity drops below this value (default is 1e-6). 

89 del_prox_tol : float, optional 

90 The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8. 

91 del_prox_n : int, optional 

92 The number of iterations to check for the change in proximity, by default 5. 

93 storage : bool, optional 

94 A flag indicating whether to store all intermediate solutions (default is False). 

95 storage_iters : List[int] or int, optional 

96 Controls which iterations are stored (when storage=True). If None, all iterations are stored. 

97 If a list of ints, only those iteration indices are stored (0 = initial point). 

98 If an int, storage occurs every that many iterations. 

99 proximity_measures : List, optional 

100 The proximity measures to calculate, by default None. 

101 Right now only the first in the list is used to check the feasibility. 

102 alternative_stopping_criterion : callable, optional 

103 Alternative stopping criterion. 

104 alternative_stopping_criterion_initial_call : callable, optional 

105 Initial call for an alternative stopping criterion. 

106 

107 Returns 

108 ------- 

109 npt.NDArray 

110 The solution after applying the feasibility seeking algorithm. 

111 """ 

112 xp = cp if isinstance(x, cp.ndarray) else np 

113 self._n_tol = 0 

114 

115 if proximity_measures is None: 

116 proximity_measures = [("p_norm", 2)] 

117 else: 

118 # TODO: Check if the proximity measures are valid 

119 _ = None 

120 

121 self.proximities = [self.proximity(x, proximity_measures)] 

122 i = 0 

123 

124 def _should_store(idx): 

125 if storage_iters is None: 

126 return True 

127 if isinstance(storage_iters, int): 

128 return idx % storage_iters == 0 

129 return idx in storage_iters 

130 

131 if storage is True: 

132 self.all_x = [] 

133 if _should_store(0): 

134 if isinstance(x, cp.ndarray) and not NO_GPU: 

135 self.all_x.append((x.get())) 

136 else: 

137 self.all_x.append(np.array(x.copy())) 

138 

139 if alternative_stopping_criterion_initial_call is not None: 

140 stop = alternative_stopping_criterion_initial_call(x, self) 

141 else: 

142 stop = False 

143 

144 while i < max_iter and not stop: 

145 x, _ = self.step(x) 

146 if storage is True and _should_store(i + 1): 

147 if isinstance(x, np.ndarray): # convert to np array if cp 

148 self.all_x.append(np.array(x.copy())) 

149 else: 

150 self.all_x.append((x.get())) 

151 self.proximities.append(self.proximity(x, proximity_measures)) 

152 

153 if alternative_stopping_criterion is not None: 

154 stop = alternative_stopping_criterion(x, self) 

155 else: 

156 stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n) 

157 

158 i += 1 

159 

160 if self.all_x is not None: 

161 self.all_x = np.array(self.all_x) 

162 

163 self.proximities = xp.array(self.proximities) 

164 

165 return x 

166 

167 def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: int) -> bool: 

168 """Returns True when convergence is detected, False otherwise.""" 

169 if self.proximities[-1][0] < prox_tol: 

170 return True 

171 else: # check that last n proximity changes are below a threshold 

172 if self.proximities[-2][0] - self.proximities[-1][0] < del_prox_tol: 

173 self._n_tol += 1 

174 else: 

175 self._n_tol = 0 

176 if self._n_tol >= del_prox_n: 

177 return True 

178 return False 

179 

180 def project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: 

181 """ 

182 Projects the input array onto the feasible set. 

183 

184 Parameters 

185 ---------- 

186 x : npt.NDArray 

187 The input array to project. 

188 y : npt.NDArray, optional 

189 An optional array for projection (default is None). 

190 

191 Returns 

192 ------- 

193 tuple 

194 A (x, y) pair of projected arrays; y may be None. 

195 """ 

196 return self._project(x, y) 

197 

198 def step(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: 

199 return self.project(x, y) 

200 

201 @abstractmethod 

202 def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: 

203 pass 

204 

205 def map(self, x: npt.NDArray) -> np.ndarray: 

206 """ 

207 Maps the input space array to the target space via matrix 

208 multiplication. 

209 

210 Parameters 

211 ---------- 

212 x : npt.NDArray 

213 The input space array to be mapped. 

214 

215 Returns 

216 ------- 

217 npt.NDArray 

218 The corresponding target space array. 

219 """ 

220 return self.A @ x 

221 

222 def map_back(self, y: npt.NDArray) -> np.ndarray: 

223 """ 

224 Transposed map of the target space array to the input space. 

225 

226 Parameters 

227 ---------- 

228 y : npt.NDArray 

229 The target space array to map. 

230 

231 Returns 

232 ------- 

233 npt.NDArray 

234 The corresponding array in input space. 

235 """ 

236 return self.A.T @ y 

237 

238 

239class CQAlgorithm(SplitFeasibility): 

240 """ 

241 Implementation for the CQ algorithm to solve split feasibility problems. 

242 

243 Parameters 

244 ---------- 

245 A : npt.NDArray 

246 Matrix connecting input and target space. 

247 C_projection : Projection 

248 The projection operator onto the set C. 

249 Q_projection : Projection 

250 The projection operator onto the set Q. 

251 algorithmic_relaxation : npt.NDArray or float, optional 

252 The relaxation parameter used by the algorithm, by default 1.0. 

253 proximity_flag : bool, optional 

254 A flag indicating whether to use this object for proximity calculations, by default True. 

255 

256 Attributes 

257 ---------- 

258 A : LinearMapping 

259 Linear mapping between input and target space. 

260 C_projection : Projection 

261 The projection operator onto the set C. 

262 Q_projection : Projection 

263 The projection operator onto the set Q. 

264 proximities : list 

265 A list to store proximity values during the solve process. 

266 algorithmic_relaxation : npt.NDArray or float, optional 

267 The relaxation parameter used by the algorithm, by default 1.0. 

268 proximity_flag : bool 

269 A flag indicating whether to use this object for proximity calculations. 

270 """ 

271 

272 def __init__( 

273 self, 

274 A: npt.NDArray | sparse.sparray, 

275 C_projection: Projection, 

276 Q_projection: Projection, 

277 algorithmic_relaxation: float = 1.0, 

278 proximity_flag: bool = True, 

279 ): 

280 super().__init__(A, algorithmic_relaxation, proximity_flag) 

281 self.c_projection = C_projection 

282 self.q_projection = Q_projection 

283 

284 def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: 

285 """ 

286 Perform one step of the CQ algorithm. 

287 

288 Parameters 

289 ---------- 

290 x : npt.NDArray 

291 The point in the input space to be projected. 

292 y : npt.NDArray or None, optional 

293 The point in the target space to be projected, 

294 obtained through e.g. a perturbation step. 

295 If None, it is calculated from x. 

296 

297 Returns 

298 ------- 

299 tuple 

300 A (x, None) pair with the updated input-space point. 

301 """ 

302 if y is None: 

303 y = self.map(x) 

304 

305 y_p = self.q_projection.project(y.copy()) 

306 x = x - self.algorithmic_relaxation * self.map_back(y - y_p) 

307 

308 return self.c_projection.project(x), None 

309 

310 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]: 

311 return [ 

312 self.c_projection.proximity(x, proximity_measures), 

313 self.q_projection.proximity(self.map(x), proximity_measures), 

314 ] 

315 

316 def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: int) -> bool: 

317 """Returns True when convergence is detected, False otherwise.""" 

318 if self.proximities[-1][1][0] < prox_tol: 

319 return True 

320 else: # check that last n proximity changes of are below a threshold 

321 if self.proximities[-2][1][0] - self.proximities[-1][1][0] < del_prox_tol: 

322 self._n_tol += 1 

323 else: 

324 self._n_tol = 0 

325 if self._n_tol >= del_prox_n: 

326 return True 

327 return False 

328 

329 

330class ProductSpaceAlgorithm(SplitFeasibility): 

331 """ 

332 Implementation for a product space algorithm to solve split feasibility 

333 problems. 

334 

335 Parameters 

336 ---------- 

337 A : npt.NDArray 

338 Matrix connecting input and target space. 

339 C_projections : List[Projection] 

340 The projection operators onto the sets C. 

341 Q_projections : List[Projection] 

342 The projection operators onto the sets Q. 

343 algorithmic_relaxation : npt.NDArray or float, optional 

344 The relaxation parameter used by the algorithm, by default 1.0. 

345 proximity_flag : bool, optional 

346 A flag indicating whether to use this object for proximity calculations, by default True. 

347 

348 Attributes 

349 ---------- 

350 Pv : npt.NDArray 

351 Projection matrix onto the constraint manifold. 

352 xs : list 

353 Input-space iterates accumulated during solve. 

354 ys : list 

355 Target-space iterates accumulated during solve. 

356 """ 

357 

358 def __init__( 

359 self, 

360 A: npt.NDArray | sparse.sparray, 

361 C_projections: List[Projection], 

362 Q_projections: List[Projection], 

363 algorithmic_relaxation: npt.NDArray | float = 1.0, 

364 proximity_flag: bool = True, 

365 ): 

366 super().__init__(A, algorithmic_relaxation, proximity_flag) 

367 self.c_projections = C_projections 

368 self.q_projections = Q_projections 

369 

370 # calculate projection back into Ax=b space 

371 Z = np.concatenate([A, -1 * np.eye(A.shape[0])], axis=1) 

372 self.Pv = np.eye(Z.shape[1]) - LinearMapping(Z.T @ (np.linalg.inv(Z @ Z.T)) @ Z) 

373 

374 warnings.warn( 

375 "ProductSpaceAlgorithm is only suitable for small scale problems. " 

376 "Use CQAlgorithm for larger problems.", 

377 stacklevel=2, 

378 ) 

379 self.xs = [] 

380 self.ys = [] 

381 

382 def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple: 

383 """ 

384 Perform one step of the product space algorithm. 

385 

386 Parameters 

387 ---------- 

388 x : npt.NDArray 

389 The point in the input space to be projected. 

390 y : npt.NDArray or None, optional 

391 The point in the target space to be projected, obtained through e.g. a perturbation step. 

392 If None, it is calculated from x. 

393 

394 Returns 

395 ------- 

396 tuple 

397 A (x, None) pair with the updated input-space point. 

398 """ 

399 if y is None: 

400 y = self.map(x) 

401 for el in self.c_projections: 

402 x = el.project(x) 

403 for el in self.q_projections: 

404 y = el.project(y) 

405 xy = self.Pv @ np.concatenate([x, y]) 

406 self.xs.append(xy[: len(x)].copy()) 

407 self.ys.append(xy[len(x) :].copy()) 

408 return xy[: len(x)], None 

409 

410 def _proximity(self, x: npt.NDArray, _proximity_measures: List) -> list[float]: 

411 raise NotImplementedError("Proximity not implemented for ProductSpaceAlgorithm.")