Coverage for intelligence_toolkit/detect_entity_networks/exposure_report.py: 75%

96 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4 

5import json 

6from collections import defaultdict 

7 

8import networkx as nx 

9import polars as pl 

10 

11from intelligence_toolkit.detect_entity_networks.config import ENTITY_LABEL 

12from intelligence_toolkit.helpers.constants import ATTRIBUTE_VALUE_SEPARATOR 

13 

14 

15def build_exposure_data( 

16 integrated_flags: pl.DataFrame, 

17 c_nodes: list[str], 

18 selected_entity: str, 

19 graph: nx.Graph, 

20 inferred_links: dict[set] | None = None, 

21): 

22 if integrated_flags.is_empty(): 

23 return "" 

24 

25 if isinstance(c_nodes, set): 

26 c_nodes = list(c_nodes) 

27 

28 qualified_selected = f"{ENTITY_LABEL}{ATTRIBUTE_VALUE_SEPARATOR}{selected_entity}" 

29 rdf = integrated_flags 

30 c_nodes = c_nodes.copy() 

31 if inferred_links: 

32 for key, values in inferred_links.items(): 

33 if key not in c_nodes and key in graph: 

34 c_nodes.append(key) 

35 for value in values: 

36 if value not in c_nodes and value in graph: 

37 c_nodes.append(value) 

38 

39 rdf = rdf.filter(pl.col("qualified_entity").is_in(c_nodes)) 

40 rdf = rdf.group_by(["qualified_entity", "flag"]).agg(pl.col("count").sum()) 

41 all_flagged = ( 

42 rdf.filter(pl.col("count") > 0) 

43 .select("qualified_entity") 

44 .unique() 

45 .to_series() 

46 .to_list() 

47 ) 

48 

49 target_flags = ( 

50 rdf.filter(pl.col("qualified_entity") == qualified_selected) 

51 .select(pl.col("count").sum()) 

52 .item() 

53 ) 

54 total_flags = rdf.select(pl.col("count").sum()).item() 

55 net_flags = total_flags - target_flags 

56 net_flagged = len(all_flagged) 

57 if qualified_selected in all_flagged: 

58 net_flagged -= 1 

59 

60 steps_list = [] 

61 nodes = [] 

62 for flagged in all_flagged: 

63 all_paths = [ 

64 list(x) for x in nx.all_shortest_paths(graph, flagged, qualified_selected) 

65 ] 

66 for path in all_paths: 

67 path_steps_list = [] 

68 if len(path) <= 1: 

69 continue 

70 

71 for _, step in enumerate(path): 

72 if ENTITY_LABEL in step: 

73 step_risks = rdf.filter(pl.col("qualified_entity") == step)[ 

74 "count" 

75 ].sum() 

76 

77 if step_risks == 0: 

78 continue 

79 node_flag = {"node": step, "flags": step_risks} 

80 else: 

81 step_entities = nx.degree(graph, step) 

82 if step_risks == 0: 

83 continue 

84 node_flag = {"node": step, "entities": step_entities} 

85 

86 if node_flag not in nodes: 

87 nodes.append(node_flag) 

88 

89 for j, step in enumerate(path): 

90 if j < len(path) - 1: 

91 source = step 

92 destination = path[j + 1] 

93 step1 = {"source": source, "target": destination} 

94 path_steps_list.append(step1) 

95 steps_list.append(path_steps_list) 

96 

97 path_items = defaultdict(list) 

98 paths = [] 

99 for step in steps_list: 

100 source = step[0]["source"] 

101 path = step[1:] 

102 if len(path) == 0: 

103 path = [{"target": step[0]["target"]}] 

104 path_items[json.dumps(path)].append(source) 

105 

106 for path, sources in path_items.items(): 

107 path_list = [] 

108 sources.sort() 

109 path_list.append(sources) 

110 

111 for ixx, node in enumerate(json.loads(path)): 

112 if ixx == 0 and "source" in node: 

113 path_list.append([node["source"]]) 

114 path_list.append([node["target"]]) 

115 

116 paths.append(path_list) 

117 

118 flags_summary_count = { 

119 "direct": target_flags, 

120 "indirect": net_flags, 

121 "paths": len(paths), 

122 "entities": net_flagged, 

123 } 

124 return flags_summary_count, paths, nodes 

125 

126 

127def build_exposure_report( 

128 integrated_flags: pl.DataFrame, 

129 selected_entity: str, 

130 c_nodes: list[str], 

131 graph: nx.Graph, 

132 inferred_links: dict[set] | None = None, 

133) -> str: 

134 if selected_entity == "": 

135 return "" 

136 selected_data, all_paths, nodes = build_exposure_data( 

137 integrated_flags, c_nodes, selected_entity, graph, inferred_links 

138 ) 

139 context = "##### Flag Exposure Paths\n\n" 

140 context += f"The selected entity **{selected_entity}** has **{selected_data['direct']}** direct flags and is linked to **{selected_data['indirect']}** indirect flags via **{selected_data['paths']}** paths from **{selected_data['entities']}** related entities:\n\n" 

141 

142 for i, path in enumerate(all_paths): 

143 context += f"**Path {i + 1}**\n\n```\n" 

144 for ix, node in enumerate(path): 

145 indent = "".join([" "] * ix) 

146 for step in node: 

147 node_value = [val for val in nodes if val["node"] == step] 

148 if ENTITY_LABEL in step: 

149 step = f"{step} [linked to {node_value[0]['flags'] if len(node_value) > 0 and 'flags' in node_value[0] else 0} flags]" 

150 else: 

151 step = f"{step} [linked to {node_value[0]['entities'] if len(node_value) > 0 else 0} entities]" 

152 context += f"{indent}{step}\n" 

153 if ix < len(path) - 1: 

154 context += f"{indent}--->\n" 

155 context += "```\n\n" 

156 

157 return context