Coverage for src/driada/dim_reduction/mvu.py: 26.23%

61 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1import cvxpy as cp 

2import numpy as np 

3from scipy.sparse.csgraph import laplacian 

4from sklearn.neighbors import NearestNeighbors 

5 

6class MaximumVarianceUnfolding(object): 

7 

8 def __init__(self, equation="berkley", solver=cp.SCS, solver_tol=1e-2, 

9 eig_tol=1.0e-10, solver_iters=2500, warm_start=False, seed=None): 

10 """ 

11 :param equation: A string either "berkley" or "wikipedia" to represent 

12 two different equations for the same problem. 

13 :param solver: A CVXPY solver object. 

14 :param solver_tol: A float representing the tolerance the solver uses to know when to stop. 

15 :param eig_tol: The positive semi-definite constraint is only so accurate, this sets 

16 eigenvalues that lie in -eig_tol < 0 < eig_tol to 0. 

17 :param solver_iters: The max number of iterations the solver will go through. 

18 :param warm_start: Whether or not to use a warm start for the solver. 

19 Useful if you are running multiple tests on the same data. 

20 :param seed: The numpy seed for random numbers. 

21 """ 

22 self.equation = equation 

23 self.solver = solver 

24 self.solver_tol = solver_tol 

25 self.eig_tol = eig_tol 

26 self.solver_iters = solver_iters 

27 self.warm_start = warm_start 

28 self.seed = seed 

29 self.neighborhood_graph = None 

30 

31 def fit(self, data, k): 

32 """ 

33 The method to fit an MVU model to the data. 

34 :param data: The data to which the model will be fitted. 

35 :param k: The number of neighbors to fix. 

36 :param dropout_rate: The number of neighbors to discount. 

37 :return: Embedded Gramian: The Gramian matrix of the embedded data. 

38 """ 

39 # Number of data points in the set 

40 n = data.shape[0] 

41 

42 # Set the seed 

43 np.random.seed(self.seed) 

44 

45 # Calculate the nearest neighbors of each data point and build a graph 

46 N = NearestNeighbors(n_neighbors=k).fit(data).kneighbors_graph(data).todense() 

47 N = np.array(N) 

48 

49 # Save the neighborhood graph to be accessed latter 

50 self.neighborhood_graph = N 

51 

52 # To check for disconnected regions in the neighbor graph 

53 lap = laplacian(N, normed=True) 

54 eigvals, _ = np.linalg.eig(lap) 

55 

56 for e in eigvals: 

57 if e == 0. and self.solver_iters is None: 

58 raise ValueError("DISCONNECTED REGIONS IN NEIGHBORHOOD GRAPH. " 

59 "PLEASE SPECIFY MAX ITERATIONS FOR THE SOLVER") 

60 

61 # Declare some CVXPy variables 

62 # Gramian of the original data 

63 P = cp.Constant(data.dot(data.T)) 

64 # The projection of the Gramian 

65 Q = cp.Variable((n, n), PSD=True) 

66 # Initialized to zeros 

67 Q.value = np.zeros((n, n)) 

68 # A shorter way to call a vector of 1's 

69 ONES = cp.Constant(np.ones((n, 1))) 

70 # A variable to keep the notation consistent with the Berkley lecture 

71 T = cp.Constant(n) 

72 

73 # Declare placeholders to get rid of annoying warnings 

74 objective = None 

75 constraints = [] 

76 

77 # Wikipedia Solution 

78 if self.equation == "wikipedia": 

79 objective = cp.Maximize(cp.trace(Q)) 

80 

81 constraints = [Q >> 0, cp.sum(Q, axis=1) == 0] 

82 

83 for i in range(n): 

84 for j in range(n): 

85 if N[i, j] == 1: 

86 constraints.append((P[i, i] + P[j, j] - P[i, j] - P[j, i]) - 

87 (Q[i, i] + Q[j, j] - Q[i, j] - Q[j, i]) == 0) 

88 

89 # UC Berkley Solution 

90 if self.equation == "berkley": 

91 objective = cp.Maximize(cp.multiply((1 / T), cp.trace(Q)) - 

92 cp.multiply((1 / (T * T)), cp.trace(cp.matmul(cp.matmul(Q, ONES), ONES.T)))) 

93 

94 constraints = [Q >> 0, cp.sum(Q, axis=1) == 0] 

95 for i in range(n): 

96 for j in range(n): 

97 if N[i, j] == 1.: 

98 constraints.append(Q[i, i] - 2 * Q[i, j] + Q[j, j] - 

99 (P[i, i] - 2 * P[i, j] + P[j, j]) == 0) 

100 

101 # Solve the problem with the SCS Solver 

102 problem = cp.Problem(objective, constraints) 

103 # FIXME The solvertol syntax is unique to SCS 

104 problem.solve(solver=self.solver, 

105 eps=self.solver_tol, 

106 max_iters=self.solver_iters, 

107 warm_start=self.warm_start) 

108 

109 return Q.value 

110 

111 

112 def fit_transform(self, data, dim, k): 

113 """ 

114 The method to fit and transform an MVU model to the data. 

115 :param data: The data to which the model will be fitted. 

116 :param dim: The new dimension of the dataset. 

117 :param k: The number of neighbors to fix. 

118 :param dropout_rate: The number of neighbors to discount. 

119 :return: embedded_data: The embedded form of the data. 

120 """ 

121 

122 embedded_gramian = self.fit(data, k) 

123 

124 # Retrieve Q 

125 embedded_gramian = embedded_gramian 

126 

127 # Decompose gramian to recover the projection 

128 eigenvalues, eigenvectors = np.linalg.eig(embedded_gramian) 

129 

130 # Set the eigenvalues that are within +/- eig_tol to 0 

131 eigenvalues[np.logical_and(-self.eig_tol < eigenvalues, eigenvalues < self.eig_tol)] = 0. 

132 

133 # Assuming the eigenvalues and eigenvectors aren't sorted, 

134 # sort them and get the top "dim" ones 

135 sorted_indices = eigenvalues.argsort()[::-1] 

136 top_eigenvalue_indices = sorted_indices[:dim] 

137 

138 # Take the top eigenvalues and eigenvectors 

139 top_eigenvalues = eigenvalues[top_eigenvalue_indices] 

140 top_eigenvectors = eigenvectors[:, top_eigenvalue_indices] 

141 

142 # Some quick math to get the projection and return it 

143 lbda = np.diag(top_eigenvalues ** 0.5) 

144 embedded_data = lbda.dot(top_eigenvectors.T).T 

145 

146 return embedded_data