Coverage for suppy\feasibility\_split_algorithms.py: 57%
127 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 13:56 +0200
1"""Algorithms for split feasibility problem."""
2import warnings
3from abc import ABC, abstractmethod
4from typing import List, Callable
5import numpy as np
6import numpy.typing as npt
7from scipy import sparse
9try:
10 import cupy as cp
12 NO_GPU = False
13except ImportError:
14 cp = np
15 NO_GPU = True
17from suppy.utils import LinearMapping
18from suppy.utils import ensure_float_array
19from suppy.projections._projections import Projection
21from suppy.feasibility._linear_algorithms import Feasibility
24class SplitFeasibility(Feasibility, ABC):
25 """
26 Abstract base class used to represent split feasibility problems.
28 Parameters
29 ----------
30 A : npt.NDArray
31 Matrix connecting input and target space.
32 algorithmic_relaxation : npt.NDArray or float, optional
33 The relaxation parameter used by the algorithm, by default 1.0.
34 proximity_flag : bool, optional
35 A flag indicating whether to use this object for proximity calculations, by default True.
37 Attributes
38 ----------
39 A : LinearMapping
40 Linear mapping between input and target space.
41 proximities : list
42 A list to store proximity values during the solve process.
43 algorithmic_relaxation : npt.NDArray or float, optional
44 The relaxation parameter used by the algorithm, by default 1.0.
45 proximity_flag : bool, optional
46 A flag indicating whether to use this object for proximity calculations.
47 relaxation : float, optional
48 The relaxation parameter for the projection, by default 1.0.
49 """
51 def __init__(
52 self,
53 A: npt.NDArray | sparse.sparray,
54 algorithmic_relaxation: npt.NDArray | float = 1.0,
55 proximity_flag: bool = True,
56 ):
57 _, _use_gpu = LinearMapping.get_flags(A)
58 super().__init__(algorithmic_relaxation, 1, proximity_flag, _use_gpu=_use_gpu)
59 self.A = LinearMapping(A)
60 self.proximities = []
61 self.all_x = None
63 @ensure_float_array
64 def solve(
65 self,
66 x: npt.NDArray,
67 max_iter: int = 10,
68 prox_tol: float = 1e-6,
69 del_prox_tol: float = 1e-8,
70 del_prox_n: int = 5,
71 storage: bool = False,
72 storage_iters: List[int] | int | None = None,
73 proximity_measures: List | None = None,
74 alternative_stopping_criterion: Callable | None = None,
75 alternative_stopping_criterion_initial_call: Callable | None = None,
76 ) -> np.ndarray:
77 """
78 Solves the split feasibility problem for a given input array.
80 Parameters
81 ----------
82 x : npt.NDArray
83 Starting point for the algorithm.
84 max_iter : int, optional
85 The maximum number of iterations (default is 10).
86 prox_tol : float, optional
87 Stopping criterium for the feasibility seeking algorithm.
88 Solution deemed feasible if the proximity drops below this value (default is 1e-6).
89 del_prox_tol : float, optional
90 The tolerance for the change in proximity over the last del_prox_n iterations, by default 1e-8.
91 del_prox_n : int, optional
92 The number of iterations to check for the change in proximity, by default 5.
93 storage : bool, optional
94 A flag indicating whether to store all intermediate solutions (default is False).
95 storage_iters : List[int] or int, optional
96 Controls which iterations are stored (when storage=True). If None, all iterations are stored.
97 If a list of ints, only those iteration indices are stored (0 = initial point).
98 If an int, storage occurs every that many iterations.
99 proximity_measures : List, optional
100 The proximity measures to calculate, by default None.
101 Right now only the first in the list is used to check the feasibility.
102 alternative_stopping_criterion : callable, optional
103 Alternative stopping criterion.
104 alternative_stopping_criterion_initial_call : callable, optional
105 Initial call for an alternative stopping criterion.
107 Returns
108 -------
109 npt.NDArray
110 The solution after applying the feasibility seeking algorithm.
111 """
112 xp = cp if isinstance(x, cp.ndarray) else np
113 self._n_tol = 0
115 if proximity_measures is None:
116 proximity_measures = [("p_norm", 2)]
117 else:
118 # TODO: Check if the proximity measures are valid
119 _ = None
121 self.proximities = [self.proximity(x, proximity_measures)]
122 i = 0
124 def _should_store(idx):
125 if storage_iters is None:
126 return True
127 if isinstance(storage_iters, int):
128 return idx % storage_iters == 0
129 return idx in storage_iters
131 if storage is True:
132 self.all_x = []
133 if _should_store(0):
134 if isinstance(x, cp.ndarray) and not NO_GPU:
135 self.all_x.append((x.get()))
136 else:
137 self.all_x.append(np.array(x.copy()))
139 if alternative_stopping_criterion_initial_call is not None:
140 stop = alternative_stopping_criterion_initial_call(x, self)
141 else:
142 stop = False
144 while i < max_iter and not stop:
145 x, _ = self.step(x)
146 if storage is True and _should_store(i + 1):
147 if isinstance(x, np.ndarray): # convert to np array if cp
148 self.all_x.append(np.array(x.copy()))
149 else:
150 self.all_x.append((x.get()))
151 self.proximities.append(self.proximity(x, proximity_measures))
153 if alternative_stopping_criterion is not None:
154 stop = alternative_stopping_criterion(x, self)
155 else:
156 stop = self._stopping_criterion(prox_tol, del_prox_tol, del_prox_n)
158 i += 1
160 if self.all_x is not None:
161 self.all_x = np.array(self.all_x)
163 self.proximities = xp.array(self.proximities)
165 return x
167 def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: int) -> bool:
168 """Returns True when convergence is detected, False otherwise."""
169 if self.proximities[-1][0] < prox_tol:
170 return True
171 else: # check that last n proximity changes are below a threshold
172 if self.proximities[-2][0] - self.proximities[-1][0] < del_prox_tol:
173 self._n_tol += 1
174 else:
175 self._n_tol = 0
176 if self._n_tol >= del_prox_n:
177 return True
178 return False
180 def project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple:
181 """
182 Projects the input array onto the feasible set.
184 Parameters
185 ----------
186 x : npt.NDArray
187 The input array to project.
188 y : npt.NDArray, optional
189 An optional array for projection (default is None).
191 Returns
192 -------
193 tuple
194 A (x, y) pair of projected arrays; y may be None.
195 """
196 return self._project(x, y)
198 def step(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple:
199 return self.project(x, y)
201 @abstractmethod
202 def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple:
203 pass
205 def map(self, x: npt.NDArray) -> np.ndarray:
206 """
207 Maps the input space array to the target space via matrix
208 multiplication.
210 Parameters
211 ----------
212 x : npt.NDArray
213 The input space array to be mapped.
215 Returns
216 -------
217 npt.NDArray
218 The corresponding target space array.
219 """
220 return self.A @ x
222 def map_back(self, y: npt.NDArray) -> np.ndarray:
223 """
224 Transposed map of the target space array to the input space.
226 Parameters
227 ----------
228 y : npt.NDArray
229 The target space array to map.
231 Returns
232 -------
233 npt.NDArray
234 The corresponding array in input space.
235 """
236 return self.A.T @ y
239class CQAlgorithm(SplitFeasibility):
240 """
241 Implementation for the CQ algorithm to solve split feasibility problems.
243 Parameters
244 ----------
245 A : npt.NDArray
246 Matrix connecting input and target space.
247 C_projection : Projection
248 The projection operator onto the set C.
249 Q_projection : Projection
250 The projection operator onto the set Q.
251 algorithmic_relaxation : npt.NDArray or float, optional
252 The relaxation parameter used by the algorithm, by default 1.0.
253 proximity_flag : bool, optional
254 A flag indicating whether to use this object for proximity calculations, by default True.
256 Attributes
257 ----------
258 A : LinearMapping
259 Linear mapping between input and target space.
260 C_projection : Projection
261 The projection operator onto the set C.
262 Q_projection : Projection
263 The projection operator onto the set Q.
264 proximities : list
265 A list to store proximity values during the solve process.
266 algorithmic_relaxation : npt.NDArray or float, optional
267 The relaxation parameter used by the algorithm, by default 1.0.
268 proximity_flag : bool
269 A flag indicating whether to use this object for proximity calculations.
270 """
272 def __init__(
273 self,
274 A: npt.NDArray | sparse.sparray,
275 C_projection: Projection,
276 Q_projection: Projection,
277 algorithmic_relaxation: float = 1.0,
278 proximity_flag: bool = True,
279 ):
280 super().__init__(A, algorithmic_relaxation, proximity_flag)
281 self.c_projection = C_projection
282 self.q_projection = Q_projection
284 def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple:
285 """
286 Perform one step of the CQ algorithm.
288 Parameters
289 ----------
290 x : npt.NDArray
291 The point in the input space to be projected.
292 y : npt.NDArray or None, optional
293 The point in the target space to be projected,
294 obtained through e.g. a perturbation step.
295 If None, it is calculated from x.
297 Returns
298 -------
299 tuple
300 A (x, None) pair with the updated input-space point.
301 """
302 if y is None:
303 y = self.map(x)
305 y_p = self.q_projection.project(y.copy())
306 x = x - self.algorithmic_relaxation * self.map_back(y - y_p)
308 return self.c_projection.project(x), None
310 def _proximity(self, x: npt.NDArray, proximity_measures: List) -> list[float]:
311 return [
312 self.c_projection.proximity(x, proximity_measures),
313 self.q_projection.proximity(self.map(x), proximity_measures),
314 ]
316 def _stopping_criterion(self, prox_tol: float, del_prox_tol: float, del_prox_n: int) -> bool:
317 """Returns True when convergence is detected, False otherwise."""
318 if self.proximities[-1][1][0] < prox_tol:
319 return True
320 else: # check that last n proximity changes of are below a threshold
321 if self.proximities[-2][1][0] - self.proximities[-1][1][0] < del_prox_tol:
322 self._n_tol += 1
323 else:
324 self._n_tol = 0
325 if self._n_tol >= del_prox_n:
326 return True
327 return False
330class ProductSpaceAlgorithm(SplitFeasibility):
331 """
332 Implementation for a product space algorithm to solve split feasibility
333 problems.
335 Parameters
336 ----------
337 A : npt.NDArray
338 Matrix connecting input and target space.
339 C_projections : List[Projection]
340 The projection operators onto the sets C.
341 Q_projections : List[Projection]
342 The projection operators onto the sets Q.
343 algorithmic_relaxation : npt.NDArray or float, optional
344 The relaxation parameter used by the algorithm, by default 1.0.
345 proximity_flag : bool, optional
346 A flag indicating whether to use this object for proximity calculations, by default True.
348 Attributes
349 ----------
350 Pv : npt.NDArray
351 Projection matrix onto the constraint manifold.
352 xs : list
353 Input-space iterates accumulated during solve.
354 ys : list
355 Target-space iterates accumulated during solve.
356 """
358 def __init__(
359 self,
360 A: npt.NDArray | sparse.sparray,
361 C_projections: List[Projection],
362 Q_projections: List[Projection],
363 algorithmic_relaxation: npt.NDArray | float = 1.0,
364 proximity_flag: bool = True,
365 ):
366 super().__init__(A, algorithmic_relaxation, proximity_flag)
367 self.c_projections = C_projections
368 self.q_projections = Q_projections
370 # calculate projection back into Ax=b space
371 Z = np.concatenate([A, -1 * np.eye(A.shape[0])], axis=1)
372 self.Pv = np.eye(Z.shape[1]) - LinearMapping(Z.T @ (np.linalg.inv(Z @ Z.T)) @ Z)
374 warnings.warn(
375 "ProductSpaceAlgorithm is only suitable for small scale problems. "
376 "Use CQAlgorithm for larger problems.",
377 stacklevel=2,
378 )
379 self.xs = []
380 self.ys = []
382 def _project(self, x: npt.NDArray, y: npt.NDArray | None = None) -> tuple:
383 """
384 Perform one step of the product space algorithm.
386 Parameters
387 ----------
388 x : npt.NDArray
389 The point in the input space to be projected.
390 y : npt.NDArray or None, optional
391 The point in the target space to be projected, obtained through e.g. a perturbation step.
392 If None, it is calculated from x.
394 Returns
395 -------
396 tuple
397 A (x, None) pair with the updated input-space point.
398 """
399 if y is None:
400 y = self.map(x)
401 for el in self.c_projections:
402 x = el.project(x)
403 for el in self.q_projections:
404 y = el.project(y)
405 xy = self.Pv @ np.concatenate([x, y])
406 self.xs.append(xy[: len(x)].copy())
407 self.ys.append(xy[len(x) :].copy())
408 return xy[: len(x)], None
410 def _proximity(self, x: npt.NDArray, _proximity_measures: List) -> list[float]:
411 raise NotImplementedError("Proximity not implemented for ProductSpaceAlgorithm.")