Coverage for suppy\feasibility\_hyperplanes\_variants.py: 88%

24 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2026-05-08 15:16 +0200

1from typing import List 

2import numpy as np 

3import numpy.typing as npt 

4from scipy import sparse 

5 

6try: 

7 import cupy as cp 

8 

9 NO_GPU = False 

10 

11except ImportError: 

12 NO_GPU = True 

13 cp = np 

14 

15 

16from suppy.feasibility._hyperplanes._kaczmarz_algorithms import SimultaneousKaczmarzMethod 

17 

18 

19class DROPHyperplane(SimultaneousKaczmarzMethod): 

20 """ 

21 Diagonally Relaxed Orthogonal Projections (DROP) algorithm for solving 

22 linear equalities of the form Ax = b. 

23 

24 Parameters 

25 ---------- 

26 A : npt.NDArray or sparse.sparray 

27 Sparse matrix for linear systems. Must support ``getnnz(axis=0)``. 

28 b : npt.NDArray 

29 Bound for linear systems 

30 algorithmic_relaxation : npt.NDArray or float, optional 

31 The relaxation parameter used by the algorithm, by default 1.0. 

32 relaxation : float, optional 

33 Outer relaxation parameter, applied to the entire solution of the iterate by default 1.0. 

34 proximity_flag : bool, optional 

35 Flag to indicate if proximity calculations should be performed, by default True. 

36 

37 References 

38 ---------- 

39 - [1] https://epubs.siam.org/doi/10.1137/050639399 

40 """ 

41 

42 def __init__( 

43 self, 

44 A: npt.NDArray | sparse.sparray, 

45 b: npt.NDArray, 

46 algorithmic_relaxation: npt.NDArray | float = 1.0, 

47 relaxation: float = 1.0, 

48 proximity_flag: bool = True, 

49 ): 

50 # not NO_GPU guard ensures cp.sparse is never accessed when CuPy is unavailable 

51 A_cpu = A.get() if (not NO_GPU and isinstance(A, cp.sparse.csr_matrix)) else A 

52 xp = np if (isinstance(A_cpu, np.ndarray) or isinstance(A_cpu, sparse.sparray) or isinstance(A_cpu, sparse.spmatrix)) else cp 

53 nnz_counts = xp.array( 

54 A_cpu.getnnz(axis=0), dtype=xp.float32 

55 ) # number of non-zeros per column 

56 del A_cpu 

57 

58 super().__init__(A, b, algorithmic_relaxation, relaxation, proximity_flag=proximity_flag) 

59 

60 self.inv_nnz_counts = 1 / nnz_counts 

61 

62 def _project(self, x: npt.NDArray) -> np.ndarray: 

63 # simultaneous projection 

64 p = self.map(x) 

65 res = self.b - p 

66 x += ( 

67 self.inv_nnz_counts 

68 * self.algorithmic_relaxation 

69 * (self.inverse_row_norm * res @ self.A) 

70 ) 

71 return x 

72