Coverage for src/driada/network/randomization.py: 3.88%
232 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
1from .graph_utils import *
2from .matrix_utils import *
3import warnings
4import random
6def adj_random_rewiring_iom_preserving(a, is_weighted, r=10):
7 # print ('Rewiring double connections...')
9 s = symmetric_component(a, is_weighted)
10 rs = turn_to_partially_directed(s, directed=1.0, weighted=is_weighted)
11 rows, cols = rs.todense().nonzero()
12 edgeset = set(zip(rows, cols))
13 upper = [l for l in edgeset] # if l[0]<l[1]]
14 source_nodes = [e[0] for e in upper]
15 target_nodes = [e[1] for e in upper]
17 double_edges = len(upper)
19 i = 0
21 while i < double_edges * r:
22 # print('i=',i)
23 good_choice = 0
24 while not good_choice:
25 ind1, ind2 = np.random.choice(double_edges, 2)
26 n1, n3 = source_nodes[ind1], source_nodes[ind2]
27 n2, n4 = target_nodes[ind1], target_nodes[ind2]
29 if len(set([n1, n2, n3, n4])) == 4:
30 good_choice = 1
32 w1 = s[n1, n2]
33 w2 = s[n2, n1]
34 w3 = s[n3, n4]
35 w4 = s[n4, n3]
37 '''
38 if w1*w2*w3*w4 == 0:
39 print(i, w1, w2, w3, w4)
40 '''
41 if s[n1, n3] + s[n1, n4] + s[n2, n3] + s[n2, n4] == 0:
42 s[n1, n4] = w1
43 s[n4, n1] = w2
44 s[n2, n3] = w3
45 s[n3, n2] = w4
47 s[n1, n2] = 0
48 s[n2, n1] = 0
49 s[n3, n4] = 0
50 s[n4, n3] = 0
52 target_nodes[ind1], target_nodes[ind2] = n4, n2
54 i += 1
55 # print(get_symmetry_index(sp.csr_array(A)))
57 # plt.matshow(s)
58 # print ('Rewiring single connections...')
60 ns = non_symmetric_component(a, is_weighted)
62 # plt.matshow(ns)
63 rows, cols = ns.nonzero()
64 edges = list((set(zip(rows, cols))))
65 source_nodes = [e[0] for e in edges]
66 target_nodes = [e[1] for e in edges]
67 single_edges = len(edges)
69 i = 0
71 while i < single_edges * r:
72 # while i < 10:
73 good_choice = 0
74 while not good_choice:
75 ind1, ind2 = np.random.choice(single_edges, 2)
76 n1, n3 = source_nodes[ind1], source_nodes[ind2]
77 n2, n4 = target_nodes[ind1], target_nodes[ind2]
79 if len(set([n1, n2, n3, n4])) == 4:
80 good_choice = 1
82 w1 = ns[n1, n2]
83 w2 = ns[n3, n4]
85 checklist = [ns[n1, n3], ns[n1, n4], ns[n2, n3], ns[n2, n4],
86 ns[n3, n1], ns[n4, n1], ns[n3, n2], ns[n4, n2],
87 s[n3, n1], s[n4, n1], s[n3, n2], s[n4, n2]]
89 if checklist.count(0) == 12:
90 ns[n1, n4] = w1
91 ns[n3, n2] = w2
93 ns[n1, n2] = 0
94 ns[n3, n4] = 0
96 i += 1
98 target_nodes[ind1], target_nodes[ind2] = n4, n2
99 # print(get_symmetry_index(sp.csr_array(A)))
101 res = s + ns
102 if not is_weighted:
103 res = res.astype(bool)
105 return sp.csr_array(res)
108def random_rewiring_complete_graph(a, p=1.0):
109 # p ranges from 0 to 1 and defines the degree of reshuffling (percent of edges affected)
110 n = a.shape[0]
111 if len(np.nonzero(a)[0]) < n ** 2 - n:
112 raise Exception('Graph is not complete')
114 symmetric = np.allclose(a, a.T)
116 all_x_inds, all_y_inds = np.nonzero(a)
117 vals = a[all_x_inds, all_y_inds]
118 shuffled_positions = np.random.choice(np.arange((n ** 2 - n)),
119 replace=False,
120 size=int(p * (n ** 2 - n)))
122 shuffled_x_inds = all_x_inds[shuffled_positions]
123 shuffled_y_inds = all_y_inds[shuffled_positions]
124 shuffled_vals = vals[shuffled_positions]
125 np.random.shuffle(shuffled_vals) # shuffling in-place
127 stable_part = a.copy()
128 if symmetric:
129 stable_part[shuffled_x_inds, shuffled_y_inds] = 0
130 stable_part[shuffled_y_inds, shuffled_x_inds] = 0
131 else:
132 stable_part[shuffled_x_inds, shuffled_y_inds] = 0
134 if symmetric:
135 shuffled_half = np.zeros(a.shape)
136 shuffled_half[shuffled_x_inds, shuffled_y_inds] = shuffled_vals
137 shuffled_part = (shuffled_half + shuffled_half.T) / 2.0
138 else:
139 shuffled_part = np.zeros(a.shape)
140 shuffled_part[shuffled_x_inds, shuffled_y_inds] = shuffled_vals
142 rewired = stable_part + shuffled_part
144 return rewired
147def random_rewiring_dense_graph(a):
148 if isinstance(a, np.ndarray):
149 afull = a
150 nelem = len(np.nonzero(a)[0])
151 else:
152 afull = a.todense()
153 nelem = a.nnz
155 if nelem != a.shape[0] ** 2 - a.shape[0]:
156 # warnings.warn('Graph is not complete, proceeding with gap filling trick')
157 temp = 0.0001
158 else:
159 temp = 0
161 psA = afull + np.full(a.shape, temp) - np.eye(a.shape[0]) * temp
162 vals = psA[np.nonzero(np.triu(psA))]
163 np.random.shuffle(vals)
165 tri = np.zeros(a.shape)
166 tri[np.triu_indices(a.shape[0], 1)] = vals
167 rewired = tri + tri.T
168 res = rewired - np.full(a.shape, temp) + np.eye(a.shape[0]) * temp
170 return res
173def get_single_double_edges_lists(g):
174 L1 = []
175 L2 = []
176 h = nx.to_undirected(g).copy()
177 for e in h.edges():
178 if g.has_edge(e[1], e[0]):
179 if g.has_edge(e[0], e[1]):
180 L2.append((e[0], e[1]))
181 else:
182 L1.append((e[1], e[0]))
183 else:
184 L1.append((e[0], e[1]))
186 return [L1, L2]
189# warning: the code below is legacy and may be inefficient
190def random_rewiring_IOM_preserving(G, r=10):
192 [L1, L2] = get_single_double_edges_lists(G)
193 Number_of_single_edges = len(L1)
194 Number_of_double_edges = len(L2)
195 Number_of_rewired_1_edge_pairs = Number_of_single_edges * r
196 Number_of_rewired_2_edge_pairs = Number_of_double_edges * r
197 # Number_of_rewired_2_edge_pairs = 20
198 i = 0
199 count = 0
200 previous_text = ''
202 # print len(set(L).intersection(List_of_edges))
203 print('Rewiring double connections...')
204 while i < Number_of_rewired_2_edge_pairs:
205 Edge_index_1 = random.randint(0, Number_of_double_edges - 1)
206 Edge_index_2 = random.randint(0, Number_of_double_edges - 1)
207 Edge_1 = L2[Edge_index_1]
208 Edge_2 = L2[Edge_index_2]
209 [Node_A, Node_B] = Edge_1
210 [Node_C, Node_D] = Edge_2
211 while (Node_A == Node_C) or (Node_A == Node_D) or (Node_B == Node_C) or (Node_B == Node_D):
212 Edge_index_1 = random.randint(0, Number_of_double_edges - 1)
213 Edge_index_2 = random.randint(0, Number_of_double_edges - 1)
214 Edge_1 = L2[Edge_index_1]
215 Edge_2 = L2[Edge_index_2]
216 [Node_A, Node_B] = Edge_1
217 [Node_C, Node_D] = Edge_2
219 # print ('Edges:',Node_A, Node_B, ';',Node_C, Node_D)
220 # print G.has_edge(Node_A, Node_B), G.has_edge(Node_B, Node_A), G.has_edge(Node_C, Node_D), G.has_edge(Node_D, Node_C)
221 if G.has_edge(Node_A, Node_D) == 0 and G.has_edge(Node_D, Node_A) == 0 and G.has_edge(Node_C,
222 Node_B) == 0 and G.has_edge(
223 Node_B, Node_C) == 0:
224 # try:
225 try:
226 w_ab = G.get_edge_data(Node_A, Node_B)['weight']
227 except:
228 pass
229 G.remove_edge(Node_A, Node_B)
230 G.remove_edge(Node_B, Node_A)
231 '''
232 except nx.NetworkXError:
233 pass
234 #print('fuck')
235 '''
236 try:
237 try:
238 w_cd = G.get_edge_data(Node_C, Node_D)['weight']
239 except:
240 pass
241 G.remove_edge(Node_C, Node_D)
242 G.remove_edge(Node_D, Node_C)
243 except nx.NetworkXError:
244 pass
245 # print('fuck')
247 try:
248 G.add_edge(Node_A, Node_D, weight=w_ab)
249 G.add_edge(Node_D, Node_A, weight=w_ab)
250 except:
251 G.add_edge(Node_A, Node_D)
252 G.add_edge(Node_D, Node_A)
254 try:
255 G.add_edge(Node_C, Node_B, weight=w_cd)
256 G.add_edge(Node_B, Node_C, weight=w_cd)
257 except:
258 G.add_edge(Node_C, Node_B)
259 G.add_edge(Node_B, Node_C)
261 # print L2[Edge_index_1]
262 L2[Edge_index_1] = (Node_A, Node_D)
263 # print L2[Edge_index_1]
264 # L2[Edge_index_1+1] = (Node_D, Node_A)
265 L2[Edge_index_2] = (Node_C, Node_B)
266 # L2[Edge_index_2+1] = (Node_B, Node_C)
267 i += 1
269 if (i != 0) and (i % (Number_of_double_edges // 1)) == 0:
270 text = str(round(100.0 * i / Number_of_rewired_2_edge_pairs, 0)) + "%"
271 if text != previous_text:
272 # print text
273 pass
274 previous_text = text
276 i = 0
277 print('Rewiring single connections...')
278 while i < Number_of_rewired_1_edge_pairs:
279 Edge_index_1 = random.randint(0, Number_of_single_edges - 1)
280 Edge_index_2 = random.randint(0, Number_of_single_edges - 1)
281 Edge_1 = L1[Edge_index_1]
282 Edge_2 = L1[Edge_index_2]
283 [Node_A, Node_B] = Edge_1
284 [Node_C, Node_D] = Edge_2
285 while (Node_A == Node_C) or (Node_A == Node_D) or (Node_B == Node_C) or (Node_B == Node_D):
286 Edge_index_1 = random.randint(0, Number_of_single_edges - 1)
287 Edge_index_2 = random.randint(0, Number_of_single_edges - 1)
288 Edge_1 = L1[Edge_index_1]
289 Edge_2 = L1[Edge_index_2]
290 [Node_A, Node_B] = Edge_1
291 [Node_C, Node_D] = Edge_2
293 if G.has_edge(Node_A, Node_D) == 0 and G.has_edge(Node_D, Node_A) == 0 and G.has_edge(Node_C,
294 Node_B) == 0 and G.has_edge(
295 Node_B, Node_C) == 0:
296 try:
297 try:
298 w_ab = G.get_edge_data(Node_A, Node_B)['weight']
299 except:
300 pass
301 G.remove_edge(Node_A, Node_B)
304 except nx.NetworkXError:
305 print('fuck')
307 try:
308 try:
309 w_cd = G.get_edge_data(Node_C, Node_D)['weight']
310 except:
311 pass
312 G.remove_edge(Node_C, Node_D)
314 except nx.NetworkXError:
315 print('fuck')
317 try:
318 G.add_edge(Node_A, Node_D, weight=w_ab)
319 except:
320 G.add_edge(Node_A, Node_D)
322 try:
323 G.add_edge(Node_C, Node_B, weight=w_cd)
324 except:
325 G.add_edge(Node_C, Node_B)
327 L1[Edge_index_1] = (Node_A, Node_D)
328 L1[Edge_index_2] = (Node_C, Node_B)
329 i += 1
331 if (i != 0) and (i % (Number_of_single_edges // 1)) == 0:
332 text = str(round(100.0 * i / Number_of_rewired_1_edge_pairs, 0)) + "%"
333 if text != previous_text:
334 # print text
335 pass
336 previous_text = text
338 G_rewired = copy.deepcopy(G)
340 return G_rewired