Coverage for intelligence_toolkit/graph/graph_encoder_embed.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4# ruff: noqa 

5import numpy as np 

6from scipy import sparse 

7 

8# invalide devide resutls will be nan 

9np.seterr(divide="ignore", invalid="ignore") 

10 

11 

12############------------graph_encoder_embed_start----------------############### 

13class GraphEncoderEmbed: 

14 def run(self, X, Y, n, **kwargs): 

15 defaultKwargs = { 

16 "EdgeList": False, 

17 "DiagA": True, 

18 "Laplacian": False, 

19 "Correlation": True, 

20 } 

21 kwargs = {**defaultKwargs, **kwargs} 

22 

23 if kwargs["EdgeList"]: 

24 size_flag = self.edge_list_size 

25 X = self.Edge_to_Sparse(X, n, size_flag) 

26 

27 if kwargs["DiagA"]: 

28 X = self.Diagonal(X, n) 

29 

30 if kwargs["Laplacian"]: 

31 X = self.Laplacian(X, n) 

32 

33 Z, W = self.Basic(X, Y, n) 

34 

35 if kwargs["Correlation"]: 

36 Z = self.Correlation(Z) 

37 

38 return Z, W 

39 

40 def Basic(self, X, Y, n): 

41 """ 

42 graph embedding basic function 

43 input X is sparse csr matrix of adjacency matrix 

44 -- if there is a connection between node i and node j: 

45 ---- X(i,j) = 1, no edge weight 

46 ---- X(i,j) = edge weight. 

47 -- if there is no connection between node i and node j: 

48 ---- X(i,j) = 0, 

49 ---- note there is no storage for this in sparse matrix. 

50 ---- No storage means 0 in sparse matrix. 

51 input Y is numpy array with size (n,1): 

52 -- value -1 indicate no lable 

53 -- value >=0 indicate real label 

54 input train_idx: a list of indices of input X for training set 

55 """ 

56 # assign k to the max along the first column 

57 # Note for python, label Y starts from 0. Python index starts from 0. thus size k should be max + 1 

58 k = Y[:, 0].max() + 1 

59 

60 # nk: 1*n array, contains the number of observations in each class 

61 nk = np.zeros((1, k)) 

62 for i in range(k): 

63 nk[0, i] = np.count_nonzero(Y[:, 0] == i) 

64 

65 # W: sparse matrix for encoder marix. W[i,k] = {1/nk if Yi==k, otherwise 0} 

66 W = sparse.dok_matrix((n, k), dtype=np.float32) 

67 

68 for i in range(n): 

69 k_i = Y[i, 0] 

70 if k_i >= 0: 

71 W[i, k_i] = 1 / nk[0, k_i] 

72 

73 W = sparse.csr_matrix(W) 

74 Z = X.dot(W) 

75 

76 return Z, W 

77 

78 def Diagonal(self, X, n): 

79 """ 

80 input X is sparse csr matrix of adjacency matrix 

81 return a sparse csr matrix of X matrix with 1s on the diagonal 

82 """ 

83 I = sparse.identity(n) 

84 X = X + I 

85 return X 

86 

87 def Laplacian(self, X, n): 

88 """ 

89 input X is sparse csr matrix of adjacency matrix 

90 return a sparse csr matrix of Laplacian normalization of X matrix 

91 """ 

92 X_sparse = sparse.csr_matrix(X) 

93 # get an array of degrees 

94 dig = X_sparse.sum(axis=0).A1 

95 # diagonal sparse matrix of D 

96 D = sparse.diags(dig, 0) 

97 _D = D.power(-0.5) 

98 # D^-0.5 x A x D^-0.5 

99 L = _D.dot(X_sparse.dot(_D)) 

100 

101 # _L = _D.dot(X_sparse.dot(_D)) 

102 # # L = I - D^-0.5 x A x D^-0.5 

103 # I = sparse.identity(n) 

104 # L = I - _L 

105 

106 return L 

107 

108 def Correlation(self, Z): 

109 """ 

110 input Z is sparse csr matrix of embedding matrix from the basic function 

111 return normalized Z sparse matrix 

112 Calculation: 

113 Calculate each row's 2-norm (Euclidean distance). 

114 e.g.row_x: [ele_i,ele_j,ele_k]. norm2 = sqr(sum(ele_i^2+ele_i^2+ele_i^2)) 

115 then divide each element by their row norm 

116 e.g. [ele_i/norm2,ele_j/norm2,ele_k/norm2] 

117 """ 

118 # 2-norm 

119 row_norm = sparse.linalg.norm(Z, axis=1) 

120 

121 # row division to get the normalized Z 

122 diag = np.nan_to_num(1 / row_norm) 

123 N = sparse.diags(diag, 0) 

124 Z = N.dot(Z) 

125 

126 return Z 

127 

128 def edge_list_size(self, X): 

129 """ 

130 set default edge list size as S3. 

131 If find X only has 2 columns, 

132 return a flag "S2" indicating this is S2 edge list 

133 """ 

134 if X.shape[1] == 2: 

135 return "S2" 

136 

137 return "S3" 

138 

139 def Edge_to_Sparse(self, X, n, size_flag): 

140 """ 

141 input X is an edge list. 

142 For S2 edge list (e.g. node_i, node_j per row), add one to all connections 

143 return a sparse csr matrix of S3 edge list 

144 """ 

145 # Build an empty sparse matrix. 

146 X_new = sparse.dok_matrix((n, n), dtype=np.float32) 

147 

148 for row in X: 

149 if size_flag == "S2": 

150 [node_i, node_j] = [int(row[0]), int(row[1])] 

151 X_new[node_i, node_j] = 1 

152 else: 

153 [node_i, node_j, weight] = [int(row[0]), int(row[1]), float(row[2])] 

154 X_new[node_i, node_j] = weight 

155 

156 X_new = sparse.csr_matrix(X_new) 

157 

158 return X_new