Coverage for intelligence_toolkit/graph/graph_encoder_embed.py: 100%
62 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
4# ruff: noqa
5import numpy as np
6from scipy import sparse
8# invalide devide resutls will be nan
9np.seterr(divide="ignore", invalid="ignore")
12############------------graph_encoder_embed_start----------------###############
13class GraphEncoderEmbed:
14 def run(self, X, Y, n, **kwargs):
15 defaultKwargs = {
16 "EdgeList": False,
17 "DiagA": True,
18 "Laplacian": False,
19 "Correlation": True,
20 }
21 kwargs = {**defaultKwargs, **kwargs}
23 if kwargs["EdgeList"]:
24 size_flag = self.edge_list_size
25 X = self.Edge_to_Sparse(X, n, size_flag)
27 if kwargs["DiagA"]:
28 X = self.Diagonal(X, n)
30 if kwargs["Laplacian"]:
31 X = self.Laplacian(X, n)
33 Z, W = self.Basic(X, Y, n)
35 if kwargs["Correlation"]:
36 Z = self.Correlation(Z)
38 return Z, W
40 def Basic(self, X, Y, n):
41 """
42 graph embedding basic function
43 input X is sparse csr matrix of adjacency matrix
44 -- if there is a connection between node i and node j:
45 ---- X(i,j) = 1, no edge weight
46 ---- X(i,j) = edge weight.
47 -- if there is no connection between node i and node j:
48 ---- X(i,j) = 0,
49 ---- note there is no storage for this in sparse matrix.
50 ---- No storage means 0 in sparse matrix.
51 input Y is numpy array with size (n,1):
52 -- value -1 indicate no lable
53 -- value >=0 indicate real label
54 input train_idx: a list of indices of input X for training set
55 """
56 # assign k to the max along the first column
57 # Note for python, label Y starts from 0. Python index starts from 0. thus size k should be max + 1
58 k = Y[:, 0].max() + 1
60 # nk: 1*n array, contains the number of observations in each class
61 nk = np.zeros((1, k))
62 for i in range(k):
63 nk[0, i] = np.count_nonzero(Y[:, 0] == i)
65 # W: sparse matrix for encoder marix. W[i,k] = {1/nk if Yi==k, otherwise 0}
66 W = sparse.dok_matrix((n, k), dtype=np.float32)
68 for i in range(n):
69 k_i = Y[i, 0]
70 if k_i >= 0:
71 W[i, k_i] = 1 / nk[0, k_i]
73 W = sparse.csr_matrix(W)
74 Z = X.dot(W)
76 return Z, W
78 def Diagonal(self, X, n):
79 """
80 input X is sparse csr matrix of adjacency matrix
81 return a sparse csr matrix of X matrix with 1s on the diagonal
82 """
83 I = sparse.identity(n)
84 X = X + I
85 return X
87 def Laplacian(self, X, n):
88 """
89 input X is sparse csr matrix of adjacency matrix
90 return a sparse csr matrix of Laplacian normalization of X matrix
91 """
92 X_sparse = sparse.csr_matrix(X)
93 # get an array of degrees
94 dig = X_sparse.sum(axis=0).A1
95 # diagonal sparse matrix of D
96 D = sparse.diags(dig, 0)
97 _D = D.power(-0.5)
98 # D^-0.5 x A x D^-0.5
99 L = _D.dot(X_sparse.dot(_D))
101 # _L = _D.dot(X_sparse.dot(_D))
102 # # L = I - D^-0.5 x A x D^-0.5
103 # I = sparse.identity(n)
104 # L = I - _L
106 return L
108 def Correlation(self, Z):
109 """
110 input Z is sparse csr matrix of embedding matrix from the basic function
111 return normalized Z sparse matrix
112 Calculation:
113 Calculate each row's 2-norm (Euclidean distance).
114 e.g.row_x: [ele_i,ele_j,ele_k]. norm2 = sqr(sum(ele_i^2+ele_i^2+ele_i^2))
115 then divide each element by their row norm
116 e.g. [ele_i/norm2,ele_j/norm2,ele_k/norm2]
117 """
118 # 2-norm
119 row_norm = sparse.linalg.norm(Z, axis=1)
121 # row division to get the normalized Z
122 diag = np.nan_to_num(1 / row_norm)
123 N = sparse.diags(diag, 0)
124 Z = N.dot(Z)
126 return Z
128 def edge_list_size(self, X):
129 """
130 set default edge list size as S3.
131 If find X only has 2 columns,
132 return a flag "S2" indicating this is S2 edge list
133 """
134 if X.shape[1] == 2:
135 return "S2"
137 return "S3"
139 def Edge_to_Sparse(self, X, n, size_flag):
140 """
141 input X is an edge list.
142 For S2 edge list (e.g. node_i, node_j per row), add one to all connections
143 return a sparse csr matrix of S3 edge list
144 """
145 # Build an empty sparse matrix.
146 X_new = sparse.dok_matrix((n, n), dtype=np.float32)
148 for row in X:
149 if size_flag == "S2":
150 [node_i, node_j] = [int(row[0]), int(row[1])]
151 X_new[node_i, node_j] = 1
152 else:
153 [node_i, node_j, weight] = [int(row[0]), int(row[1]), float(row[2])]
154 X_new[node_i, node_j] = weight
156 X_new = sparse.csr_matrix(X_new)
158 return X_new