Coverage for intelligence_toolkit/detect_case_patterns/graph_functions.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4from collections import Counter 

5from itertools import combinations 

6 

7import networkx as nx 

8import numpy as np 

9import pandas as pd 

10 

11 

12def convert_edge_df_to_graph(edge_df): 

13 G = nx.from_pandas_edgelist(edge_df, "source", "target", "weight") 

14 # get largest connected component 

15 lcc = max(nx.connected_components(G), key=len) 

16 return G, lcc 

17 

18def create_edge_df_from_atts( 

19 all_atts, pdf, min_edge_weight, missing_edge_prop 

20) -> pd.DataFrame: 

21 edge_counter = Counter() 

22 att_counter = Counter() 

23 for _, row in pdf.iterrows(): 

24 atts = row["Full Attribute"] 

25 edges = [(a, b) if a < b else (b, a) for a, b in combinations(atts, 2)] 

26 edge_counter.update(edges) 

27 att_counter.update(atts) 

28 edge_df = pd.DataFrame.from_dict(edge_counter, orient="index").reset_index() 

29 edge_df.rename(columns={"index": "edge", 0: "count"}, inplace=True) 

30 edge_df["source"] = edge_df["edge"].apply(lambda x: x[0]) 

31 edge_df["target"] = edge_df["edge"].apply(lambda x: x[1]) 

32 att_count = sum(att_counter.values()) 

33 edge_count = sum(edge_counter.values()) 

34 edge_df["weight"] = edge_df.apply(lambda x: edge_counter[x["edge"]], axis=1) 

35 

36 max_w = edge_df["weight"].max() 

37 min_w = edge_df["weight"].min() 

38 edge_df["weight"] = edge_df["weight"].apply( 

39 lambda x: ((x - min_w) / (max_w - min_w)) * (1 - min_edge_weight) 

40 + min_edge_weight 

41 ) 

42 

43 null_rows = [] 

44 missing_w = missing_edge_prop * min_edge_weight 

45 for ix, att1 in enumerate(all_atts): 

46 for att2 in all_atts[ix + 1 :]: 

47 edge = (att1, att2) if att1 < att2 else (att2, att1) 

48 if edge not in edge_counter: 

49 null_rows.append({"source": att1, "target": att2, "weight": missing_w}) 

50 null_df = pd.DataFrame(null_rows) 

51 edge_df = pd.concat([edge_df, null_df]) 

52 return edge_df.sort_values("weight", ascending=False)