Coverage for intelligence_toolkit/detect_entity_networks/exposure_report.py: 75%
96 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
5import json
6from collections import defaultdict
8import networkx as nx
9import polars as pl
11from intelligence_toolkit.detect_entity_networks.config import ENTITY_LABEL
12from intelligence_toolkit.helpers.constants import ATTRIBUTE_VALUE_SEPARATOR
15def build_exposure_data(
16 integrated_flags: pl.DataFrame,
17 c_nodes: list[str],
18 selected_entity: str,
19 graph: nx.Graph,
20 inferred_links: dict[set] | None = None,
21):
22 if integrated_flags.is_empty():
23 return ""
25 if isinstance(c_nodes, set):
26 c_nodes = list(c_nodes)
28 qualified_selected = f"{ENTITY_LABEL}{ATTRIBUTE_VALUE_SEPARATOR}{selected_entity}"
29 rdf = integrated_flags
30 c_nodes = c_nodes.copy()
31 if inferred_links:
32 for key, values in inferred_links.items():
33 if key not in c_nodes and key in graph:
34 c_nodes.append(key)
35 for value in values:
36 if value not in c_nodes and value in graph:
37 c_nodes.append(value)
39 rdf = rdf.filter(pl.col("qualified_entity").is_in(c_nodes))
40 rdf = rdf.group_by(["qualified_entity", "flag"]).agg(pl.col("count").sum())
41 all_flagged = (
42 rdf.filter(pl.col("count") > 0)
43 .select("qualified_entity")
44 .unique()
45 .to_series()
46 .to_list()
47 )
49 target_flags = (
50 rdf.filter(pl.col("qualified_entity") == qualified_selected)
51 .select(pl.col("count").sum())
52 .item()
53 )
54 total_flags = rdf.select(pl.col("count").sum()).item()
55 net_flags = total_flags - target_flags
56 net_flagged = len(all_flagged)
57 if qualified_selected in all_flagged:
58 net_flagged -= 1
60 steps_list = []
61 nodes = []
62 for flagged in all_flagged:
63 all_paths = [
64 list(x) for x in nx.all_shortest_paths(graph, flagged, qualified_selected)
65 ]
66 for path in all_paths:
67 path_steps_list = []
68 if len(path) <= 1:
69 continue
71 for _, step in enumerate(path):
72 if ENTITY_LABEL in step:
73 step_risks = rdf.filter(pl.col("qualified_entity") == step)[
74 "count"
75 ].sum()
77 if step_risks == 0:
78 continue
79 node_flag = {"node": step, "flags": step_risks}
80 else:
81 step_entities = nx.degree(graph, step)
82 if step_risks == 0:
83 continue
84 node_flag = {"node": step, "entities": step_entities}
86 if node_flag not in nodes:
87 nodes.append(node_flag)
89 for j, step in enumerate(path):
90 if j < len(path) - 1:
91 source = step
92 destination = path[j + 1]
93 step1 = {"source": source, "target": destination}
94 path_steps_list.append(step1)
95 steps_list.append(path_steps_list)
97 path_items = defaultdict(list)
98 paths = []
99 for step in steps_list:
100 source = step[0]["source"]
101 path = step[1:]
102 if len(path) == 0:
103 path = [{"target": step[0]["target"]}]
104 path_items[json.dumps(path)].append(source)
106 for path, sources in path_items.items():
107 path_list = []
108 sources.sort()
109 path_list.append(sources)
111 for ixx, node in enumerate(json.loads(path)):
112 if ixx == 0 and "source" in node:
113 path_list.append([node["source"]])
114 path_list.append([node["target"]])
116 paths.append(path_list)
118 flags_summary_count = {
119 "direct": target_flags,
120 "indirect": net_flags,
121 "paths": len(paths),
122 "entities": net_flagged,
123 }
124 return flags_summary_count, paths, nodes
127def build_exposure_report(
128 integrated_flags: pl.DataFrame,
129 selected_entity: str,
130 c_nodes: list[str],
131 graph: nx.Graph,
132 inferred_links: dict[set] | None = None,
133) -> str:
134 if selected_entity == "":
135 return ""
136 selected_data, all_paths, nodes = build_exposure_data(
137 integrated_flags, c_nodes, selected_entity, graph, inferred_links
138 )
139 context = "##### Flag Exposure Paths\n\n"
140 context += f"The selected entity **{selected_entity}** has **{selected_data['direct']}** direct flags and is linked to **{selected_data['indirect']}** indirect flags via **{selected_data['paths']}** paths from **{selected_data['entities']}** related entities:\n\n"
142 for i, path in enumerate(all_paths):
143 context += f"**Path {i + 1}**\n\n```\n"
144 for ix, node in enumerate(path):
145 indent = "".join([" "] * ix)
146 for step in node:
147 node_value = [val for val in nodes if val["node"] == step]
148 if ENTITY_LABEL in step:
149 step = f"{step} [linked to {node_value[0]['flags'] if len(node_value) > 0 and 'flags' in node_value[0] else 0} flags]"
150 else:
151 step = f"{step} [linked to {node_value[0]['entities'] if len(node_value) > 0 else 0} entities]"
152 context += f"{indent}{step}\n"
153 if ix < len(path) - 1:
154 context += f"{indent}--->\n"
155 context += "```\n\n"
157 return context