Coverage for src/driada/network/randomization.py: 3.88%

232 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1from .graph_utils import * 

2from .matrix_utils import * 

3import warnings 

4import random 

5 

6def adj_random_rewiring_iom_preserving(a, is_weighted, r=10): 

7 # print ('Rewiring double connections...') 

8 

9 s = symmetric_component(a, is_weighted) 

10 rs = turn_to_partially_directed(s, directed=1.0, weighted=is_weighted) 

11 rows, cols = rs.todense().nonzero() 

12 edgeset = set(zip(rows, cols)) 

13 upper = [l for l in edgeset] # if l[0]<l[1]] 

14 source_nodes = [e[0] for e in upper] 

15 target_nodes = [e[1] for e in upper] 

16 

17 double_edges = len(upper) 

18 

19 i = 0 

20 

21 while i < double_edges * r: 

22 # print('i=',i) 

23 good_choice = 0 

24 while not good_choice: 

25 ind1, ind2 = np.random.choice(double_edges, 2) 

26 n1, n3 = source_nodes[ind1], source_nodes[ind2] 

27 n2, n4 = target_nodes[ind1], target_nodes[ind2] 

28 

29 if len(set([n1, n2, n3, n4])) == 4: 

30 good_choice = 1 

31 

32 w1 = s[n1, n2] 

33 w2 = s[n2, n1] 

34 w3 = s[n3, n4] 

35 w4 = s[n4, n3] 

36 

37 ''' 

38 if w1*w2*w3*w4 == 0: 

39 print(i, w1, w2, w3, w4) 

40 ''' 

41 if s[n1, n3] + s[n1, n4] + s[n2, n3] + s[n2, n4] == 0: 

42 s[n1, n4] = w1 

43 s[n4, n1] = w2 

44 s[n2, n3] = w3 

45 s[n3, n2] = w4 

46 

47 s[n1, n2] = 0 

48 s[n2, n1] = 0 

49 s[n3, n4] = 0 

50 s[n4, n3] = 0 

51 

52 target_nodes[ind1], target_nodes[ind2] = n4, n2 

53 

54 i += 1 

55 # print(get_symmetry_index(sp.csr_array(A))) 

56 

57 # plt.matshow(s) 

58 # print ('Rewiring single connections...') 

59 

60 ns = non_symmetric_component(a, is_weighted) 

61 

62 # plt.matshow(ns) 

63 rows, cols = ns.nonzero() 

64 edges = list((set(zip(rows, cols)))) 

65 source_nodes = [e[0] for e in edges] 

66 target_nodes = [e[1] for e in edges] 

67 single_edges = len(edges) 

68 

69 i = 0 

70 

71 while i < single_edges * r: 

72 # while i < 10: 

73 good_choice = 0 

74 while not good_choice: 

75 ind1, ind2 = np.random.choice(single_edges, 2) 

76 n1, n3 = source_nodes[ind1], source_nodes[ind2] 

77 n2, n4 = target_nodes[ind1], target_nodes[ind2] 

78 

79 if len(set([n1, n2, n3, n4])) == 4: 

80 good_choice = 1 

81 

82 w1 = ns[n1, n2] 

83 w2 = ns[n3, n4] 

84 

85 checklist = [ns[n1, n3], ns[n1, n4], ns[n2, n3], ns[n2, n4], 

86 ns[n3, n1], ns[n4, n1], ns[n3, n2], ns[n4, n2], 

87 s[n3, n1], s[n4, n1], s[n3, n2], s[n4, n2]] 

88 

89 if checklist.count(0) == 12: 

90 ns[n1, n4] = w1 

91 ns[n3, n2] = w2 

92 

93 ns[n1, n2] = 0 

94 ns[n3, n4] = 0 

95 

96 i += 1 

97 

98 target_nodes[ind1], target_nodes[ind2] = n4, n2 

99 # print(get_symmetry_index(sp.csr_array(A))) 

100 

101 res = s + ns 

102 if not is_weighted: 

103 res = res.astype(bool) 

104 

105 return sp.csr_array(res) 

106 

107 

108def random_rewiring_complete_graph(a, p=1.0): 

109 # p ranges from 0 to 1 and defines the degree of reshuffling (percent of edges affected) 

110 n = a.shape[0] 

111 if len(np.nonzero(a)[0]) < n ** 2 - n: 

112 raise Exception('Graph is not complete') 

113 

114 symmetric = np.allclose(a, a.T) 

115 

116 all_x_inds, all_y_inds = np.nonzero(a) 

117 vals = a[all_x_inds, all_y_inds] 

118 shuffled_positions = np.random.choice(np.arange((n ** 2 - n)), 

119 replace=False, 

120 size=int(p * (n ** 2 - n))) 

121 

122 shuffled_x_inds = all_x_inds[shuffled_positions] 

123 shuffled_y_inds = all_y_inds[shuffled_positions] 

124 shuffled_vals = vals[shuffled_positions] 

125 np.random.shuffle(shuffled_vals) # shuffling in-place 

126 

127 stable_part = a.copy() 

128 if symmetric: 

129 stable_part[shuffled_x_inds, shuffled_y_inds] = 0 

130 stable_part[shuffled_y_inds, shuffled_x_inds] = 0 

131 else: 

132 stable_part[shuffled_x_inds, shuffled_y_inds] = 0 

133 

134 if symmetric: 

135 shuffled_half = np.zeros(a.shape) 

136 shuffled_half[shuffled_x_inds, shuffled_y_inds] = shuffled_vals 

137 shuffled_part = (shuffled_half + shuffled_half.T) / 2.0 

138 else: 

139 shuffled_part = np.zeros(a.shape) 

140 shuffled_part[shuffled_x_inds, shuffled_y_inds] = shuffled_vals 

141 

142 rewired = stable_part + shuffled_part 

143 

144 return rewired 

145 

146 

147def random_rewiring_dense_graph(a): 

148 if isinstance(a, np.ndarray): 

149 afull = a 

150 nelem = len(np.nonzero(a)[0]) 

151 else: 

152 afull = a.todense() 

153 nelem = a.nnz 

154 

155 if nelem != a.shape[0] ** 2 - a.shape[0]: 

156 # warnings.warn('Graph is not complete, proceeding with gap filling trick') 

157 temp = 0.0001 

158 else: 

159 temp = 0 

160 

161 psA = afull + np.full(a.shape, temp) - np.eye(a.shape[0]) * temp 

162 vals = psA[np.nonzero(np.triu(psA))] 

163 np.random.shuffle(vals) 

164 

165 tri = np.zeros(a.shape) 

166 tri[np.triu_indices(a.shape[0], 1)] = vals 

167 rewired = tri + tri.T 

168 res = rewired - np.full(a.shape, temp) + np.eye(a.shape[0]) * temp 

169 

170 return res 

171 

172 

173def get_single_double_edges_lists(g): 

174 L1 = [] 

175 L2 = [] 

176 h = nx.to_undirected(g).copy() 

177 for e in h.edges(): 

178 if g.has_edge(e[1], e[0]): 

179 if g.has_edge(e[0], e[1]): 

180 L2.append((e[0], e[1])) 

181 else: 

182 L1.append((e[1], e[0])) 

183 else: 

184 L1.append((e[0], e[1])) 

185 

186 return [L1, L2] 

187 

188 

189# warning: the code below is legacy and may be inefficient 

190def random_rewiring_IOM_preserving(G, r=10): 

191 

192 [L1, L2] = get_single_double_edges_lists(G) 

193 Number_of_single_edges = len(L1) 

194 Number_of_double_edges = len(L2) 

195 Number_of_rewired_1_edge_pairs = Number_of_single_edges * r 

196 Number_of_rewired_2_edge_pairs = Number_of_double_edges * r 

197 # Number_of_rewired_2_edge_pairs = 20 

198 i = 0 

199 count = 0 

200 previous_text = '' 

201 

202 # print len(set(L).intersection(List_of_edges)) 

203 print('Rewiring double connections...') 

204 while i < Number_of_rewired_2_edge_pairs: 

205 Edge_index_1 = random.randint(0, Number_of_double_edges - 1) 

206 Edge_index_2 = random.randint(0, Number_of_double_edges - 1) 

207 Edge_1 = L2[Edge_index_1] 

208 Edge_2 = L2[Edge_index_2] 

209 [Node_A, Node_B] = Edge_1 

210 [Node_C, Node_D] = Edge_2 

211 while (Node_A == Node_C) or (Node_A == Node_D) or (Node_B == Node_C) or (Node_B == Node_D): 

212 Edge_index_1 = random.randint(0, Number_of_double_edges - 1) 

213 Edge_index_2 = random.randint(0, Number_of_double_edges - 1) 

214 Edge_1 = L2[Edge_index_1] 

215 Edge_2 = L2[Edge_index_2] 

216 [Node_A, Node_B] = Edge_1 

217 [Node_C, Node_D] = Edge_2 

218 

219 # print ('Edges:',Node_A, Node_B, ';',Node_C, Node_D) 

220 # print G.has_edge(Node_A, Node_B), G.has_edge(Node_B, Node_A), G.has_edge(Node_C, Node_D), G.has_edge(Node_D, Node_C) 

221 if G.has_edge(Node_A, Node_D) == 0 and G.has_edge(Node_D, Node_A) == 0 and G.has_edge(Node_C, 

222 Node_B) == 0 and G.has_edge( 

223 Node_B, Node_C) == 0: 

224 # try: 

225 try: 

226 w_ab = G.get_edge_data(Node_A, Node_B)['weight'] 

227 except: 

228 pass 

229 G.remove_edge(Node_A, Node_B) 

230 G.remove_edge(Node_B, Node_A) 

231 ''' 

232 except nx.NetworkXError: 

233 pass 

234 #print('fuck') 

235 ''' 

236 try: 

237 try: 

238 w_cd = G.get_edge_data(Node_C, Node_D)['weight'] 

239 except: 

240 pass 

241 G.remove_edge(Node_C, Node_D) 

242 G.remove_edge(Node_D, Node_C) 

243 except nx.NetworkXError: 

244 pass 

245 # print('fuck') 

246 

247 try: 

248 G.add_edge(Node_A, Node_D, weight=w_ab) 

249 G.add_edge(Node_D, Node_A, weight=w_ab) 

250 except: 

251 G.add_edge(Node_A, Node_D) 

252 G.add_edge(Node_D, Node_A) 

253 

254 try: 

255 G.add_edge(Node_C, Node_B, weight=w_cd) 

256 G.add_edge(Node_B, Node_C, weight=w_cd) 

257 except: 

258 G.add_edge(Node_C, Node_B) 

259 G.add_edge(Node_B, Node_C) 

260 

261 # print L2[Edge_index_1] 

262 L2[Edge_index_1] = (Node_A, Node_D) 

263 # print L2[Edge_index_1] 

264 # L2[Edge_index_1+1] = (Node_D, Node_A) 

265 L2[Edge_index_2] = (Node_C, Node_B) 

266 # L2[Edge_index_2+1] = (Node_B, Node_C) 

267 i += 1 

268 

269 if (i != 0) and (i % (Number_of_double_edges // 1)) == 0: 

270 text = str(round(100.0 * i / Number_of_rewired_2_edge_pairs, 0)) + "%" 

271 if text != previous_text: 

272 # print text 

273 pass 

274 previous_text = text 

275 

276 i = 0 

277 print('Rewiring single connections...') 

278 while i < Number_of_rewired_1_edge_pairs: 

279 Edge_index_1 = random.randint(0, Number_of_single_edges - 1) 

280 Edge_index_2 = random.randint(0, Number_of_single_edges - 1) 

281 Edge_1 = L1[Edge_index_1] 

282 Edge_2 = L1[Edge_index_2] 

283 [Node_A, Node_B] = Edge_1 

284 [Node_C, Node_D] = Edge_2 

285 while (Node_A == Node_C) or (Node_A == Node_D) or (Node_B == Node_C) or (Node_B == Node_D): 

286 Edge_index_1 = random.randint(0, Number_of_single_edges - 1) 

287 Edge_index_2 = random.randint(0, Number_of_single_edges - 1) 

288 Edge_1 = L1[Edge_index_1] 

289 Edge_2 = L1[Edge_index_2] 

290 [Node_A, Node_B] = Edge_1 

291 [Node_C, Node_D] = Edge_2 

292 

293 if G.has_edge(Node_A, Node_D) == 0 and G.has_edge(Node_D, Node_A) == 0 and G.has_edge(Node_C, 

294 Node_B) == 0 and G.has_edge( 

295 Node_B, Node_C) == 0: 

296 try: 

297 try: 

298 w_ab = G.get_edge_data(Node_A, Node_B)['weight'] 

299 except: 

300 pass 

301 G.remove_edge(Node_A, Node_B) 

302 

303 

304 except nx.NetworkXError: 

305 print('fuck') 

306 

307 try: 

308 try: 

309 w_cd = G.get_edge_data(Node_C, Node_D)['weight'] 

310 except: 

311 pass 

312 G.remove_edge(Node_C, Node_D) 

313 

314 except nx.NetworkXError: 

315 print('fuck') 

316 

317 try: 

318 G.add_edge(Node_A, Node_D, weight=w_ab) 

319 except: 

320 G.add_edge(Node_A, Node_D) 

321 

322 try: 

323 G.add_edge(Node_C, Node_B, weight=w_cd) 

324 except: 

325 G.add_edge(Node_C, Node_B) 

326 

327 L1[Edge_index_1] = (Node_A, Node_D) 

328 L1[Edge_index_2] = (Node_C, Node_B) 

329 i += 1 

330 

331 if (i != 0) and (i % (Number_of_single_edges // 1)) == 0: 

332 text = str(round(100.0 * i / Number_of_rewired_1_edge_pairs, 0)) + "%" 

333 if text != previous_text: 

334 # print text 

335 pass 

336 previous_text = text 

337 

338 G_rewired = copy.deepcopy(G) 

339 

340 return G_rewired