Coverage for intelligence_toolkit/detect_case_patterns/graph_functions.py: 100%
37 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
4from collections import Counter
5from itertools import combinations
7import networkx as nx
8import numpy as np
9import pandas as pd
12def convert_edge_df_to_graph(edge_df):
13 G = nx.from_pandas_edgelist(edge_df, "source", "target", "weight")
14 # get largest connected component
15 lcc = max(nx.connected_components(G), key=len)
16 return G, lcc
18def create_edge_df_from_atts(
19 all_atts, pdf, min_edge_weight, missing_edge_prop
20) -> pd.DataFrame:
21 edge_counter = Counter()
22 att_counter = Counter()
23 for _, row in pdf.iterrows():
24 atts = row["Full Attribute"]
25 edges = [(a, b) if a < b else (b, a) for a, b in combinations(atts, 2)]
26 edge_counter.update(edges)
27 att_counter.update(atts)
28 edge_df = pd.DataFrame.from_dict(edge_counter, orient="index").reset_index()
29 edge_df.rename(columns={"index": "edge", 0: "count"}, inplace=True)
30 edge_df["source"] = edge_df["edge"].apply(lambda x: x[0])
31 edge_df["target"] = edge_df["edge"].apply(lambda x: x[1])
32 att_count = sum(att_counter.values())
33 edge_count = sum(edge_counter.values())
34 edge_df["weight"] = edge_df.apply(lambda x: edge_counter[x["edge"]], axis=1)
36 max_w = edge_df["weight"].max()
37 min_w = edge_df["weight"].min()
38 edge_df["weight"] = edge_df["weight"].apply(
39 lambda x: ((x - min_w) / (max_w - min_w)) * (1 - min_edge_weight)
40 + min_edge_weight
41 )
43 null_rows = []
44 missing_w = missing_edge_prop * min_edge_weight
45 for ix, att1 in enumerate(all_atts):
46 for att2 in all_atts[ix + 1 :]:
47 edge = (att1, att2) if att1 < att2 else (att2, att1)
48 if edge not in edge_counter:
49 null_rows.append({"source": att1, "target": att2, "weight": missing_w})
50 null_df = pd.DataFrame(null_rows)
51 edge_df = pd.concat([edge_df, null_df])
52 return edge_df.sort_values("weight", ascending=False)