Coverage for suppy\feasibility\_hyperplanes\_variants.py: 88%
24 statements
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 15:16 +0200
« prev ^ index » next coverage.py v7.6.4, created at 2026-05-08 15:16 +0200
1from typing import List
2import numpy as np
3import numpy.typing as npt
4from scipy import sparse
6try:
7 import cupy as cp
9 NO_GPU = False
11except ImportError:
12 NO_GPU = True
13 cp = np
16from suppy.feasibility._hyperplanes._kaczmarz_algorithms import SimultaneousKaczmarzMethod
19class DROPHyperplane(SimultaneousKaczmarzMethod):
20 """
21 Diagonally Relaxed Orthogonal Projections (DROP) algorithm for solving
22 linear equalities of the form Ax = b.
24 Parameters
25 ----------
26 A : npt.NDArray or sparse.sparray
27 Sparse matrix for linear systems. Must support ``getnnz(axis=0)``.
28 b : npt.NDArray
29 Bound for linear systems
30 algorithmic_relaxation : npt.NDArray or float, optional
31 The relaxation parameter used by the algorithm, by default 1.0.
32 relaxation : float, optional
33 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0.
34 proximity_flag : bool, optional
35 Flag to indicate if proximity calculations should be performed, by default True.
37 References
38 ----------
39 - [1] https://epubs.siam.org/doi/10.1137/050639399
40 """
42 def __init__(
43 self,
44 A: npt.NDArray | sparse.sparray,
45 b: npt.NDArray,
46 algorithmic_relaxation: npt.NDArray | float = 1.0,
47 relaxation: float = 1.0,
48 proximity_flag: bool = True,
49 ):
50 # not NO_GPU guard ensures cp.sparse is never accessed when CuPy is unavailable
51 A_cpu = A.get() if (not NO_GPU and isinstance(A, cp.sparse.csr_matrix)) else A
52 xp = np if (isinstance(A_cpu, np.ndarray) or isinstance(A_cpu, sparse.sparray) or isinstance(A_cpu, sparse.spmatrix)) else cp
53 nnz_counts = xp.array(
54 A_cpu.getnnz(axis=0), dtype=xp.float32
55 ) # number of non-zeros per column
56 del A_cpu
58 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag=proximity_flag)
60 self.inv_nnz_counts = 1 / nnz_counts
62 def _project(self, x: npt.NDArray) -> np.ndarray:
63 # simultaneous projection
64 p = self.map(x)
65 res = self.b - p
66 x += (
67 self.inv_nnz_counts
68 * self.algorithmic_relaxation
69 * (self.inverse_row_norm * res @ self.A)
70 )
71 return x