Coverage for intelligence_toolkit/tests/unit/detect_entity_networks/test_exposure_report.py: 100%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4 

5from collections import defaultdict 

6 

7import networkx as nx 

8import polars as pl 

9import pytest 

10 

11from intelligence_toolkit.detect_entity_networks.exposure_report import ( 

12 build_exposure_data, 

13) 

14 

15 

16class TestExposureData: 

17 @pytest.fixture() 

18 def graph(self): 

19 G = nx.Graph() 

20 G.add_node("ENTITY==A") 

21 G.add_node("ENTITY==C") 

22 G.add_node("ENTITY==D") 

23 G.add_node("ENTITY==F") 

24 G.add_node("ENTITY==Z") 

25 

26 G.add_edge("ENTITY==A", "ENTITY==C") 

27 G.add_edge("ENTITY==A", "ENTITY==D") 

28 G.add_edge("ENTITY==C", "ENTITY==F") 

29 G.add_edge("ENTITY==D", "ENTITY==F") 

30 return G 

31 

32 @pytest.fixture() 

33 def flags(self): 

34 return pl.DataFrame( 

35 { 

36 "entity": ["A", "C", "D", "F", "Z", "X"], 

37 "type": [ 

38 "flags_numb", 

39 "flags_numb", 

40 "flags_numb", 

41 "flags_numb", 

42 "flags_numb", 

43 "flags_numb", 

44 ], 

45 "flag": [ 

46 "flags_numb", 

47 "flags_numb", 

48 "flags_numb", 

49 "flags_numb", 

50 "flags_numb", 

51 "flags_numb", 

52 ], 

53 "count": [8, 2, 3, 0, 3, 2], 

54 "qualified_entity": [ 

55 "ENTITY==A", 

56 "ENTITY==C", 

57 "ENTITY==D", 

58 "ENTITY==F", 

59 "ENTITY==Z", 

60 "ENTITY==X", 

61 ], 

62 } 

63 ) 

64 

65 @pytest.fixture() 

66 def c_nodes(self): 

67 return ["ENTITY==A", "ENTITY==C", "ENTITY==D"] 

68 

69 def test_generation_paths_summary(self, graph, flags, c_nodes): 

70 summary, paths, _ = build_exposure_data(flags, c_nodes, "C", graph) 

71 

72 expected_paths = [ 

73 [["ENTITY==D"], ["ENTITY==A"], ["ENTITY==C"]], 

74 [["ENTITY==D"], ["ENTITY==F"], ["ENTITY==C"]], 

75 [["ENTITY==A"], ["ENTITY==C"]], 

76 ] 

77 expected_summary = {"direct": 2, "indirect": 11, "paths": 3, "entities": 2} 

78 

79 assert len(paths) == len(expected_paths) 

80 assert summary == expected_summary 

81 for ex in expected_paths: 

82 assert ex in paths 

83 

84 def test_generation_nodes(self, graph, flags, c_nodes): 

85 _, _, nodes = build_exposure_data(flags, c_nodes, "C", graph) 

86 

87 expected_nodes = [ 

88 {"node": "ENTITY==A", "flags": 8}, 

89 {"node": "ENTITY==C", "flags": 2}, 

90 {"node": "ENTITY==D", "flags": 3}, 

91 ] 

92 assert len(nodes) == len(expected_nodes) 

93 for ex in expected_nodes: 

94 assert ex in nodes 

95 

96 def test_generation_nodes_inferred_not_passed(self, graph, flags, c_nodes): 

97 graph.add_edge("ENTITY==X", "ENTITY==F") 

98 _, _, nodes = build_exposure_data(flags, c_nodes, "X", graph) 

99 

100 expected_nodes = [ 

101 {"node": "ENTITY==C", "flags": 2}, 

102 {"node": "ENTITY==D", "flags": 3}, 

103 ] 

104 for ex in expected_nodes: 

105 assert ex in nodes 

106 

107 def test_generation_nodes_inferred_passed(self, graph, flags, c_nodes): 

108 graph.add_edge("ENTITY==X", "ENTITY==F") 

109 inferred_links = defaultdict(set) 

110 inferred_links["ENTITY==X"].add("ENTITY==F") 

111 _, _, nodes = build_exposure_data(flags, c_nodes, "X", graph, inferred_links) 

112 

113 expected_nodes = [ 

114 {"node": "ENTITY==C", "flags": 2}, 

115 {"node": "ENTITY==D", "flags": 3}, 

116 {"node": "ENTITY==X", "flags": 2}, 

117 ] 

118 for ex in expected_nodes: 

119 assert ex in nodes 

120 

121 def test_generation_nodes_inferred_c_nodes_set(self, graph, flags, c_nodes) -> None: 

122 # c_nodes as set 

123 c_nodes = {"ENTITY==A", "ENTITY==C", "ENTITY==D"} 

124 graph.add_edge("ENTITY==X", "ENTITY==F") 

125 inferred_links = defaultdict(set) 

126 inferred_links["ENTITY==X"].add("ENTITY==F") 

127 _, _, nodes = build_exposure_data(flags, c_nodes, "X", graph, inferred_links) 

128 

129 expected_nodes = [ 

130 {"node": "ENTITY==C", "flags": 2}, 

131 {"node": "ENTITY==D", "flags": 3}, 

132 {"node": "ENTITY==X", "flags": 2}, 

133 ] 

134 for ex in expected_nodes: 

135 assert ex in nodes 

136 

137 def test_generation_nodes_inferred_path(self, graph, flags, c_nodes) -> None: 

138 graph.add_edge("ENTITY==X", "ENTITY==F") 

139 inferred_links = defaultdict(set) 

140 inferred_links["ENTITY==X"].add("ENTITY==F") 

141 _, paths, nodes = build_exposure_data( 

142 flags, c_nodes, "F", graph, inferred_links 

143 ) 

144 

145 expected_nodes = [ 

146 {"node": "ENTITY==C", "flags": 2}, 

147 {"node": "ENTITY==D", "flags": 3}, 

148 {"node": "ENTITY==X", "flags": 2}, 

149 ] 

150 for ex in expected_nodes: 

151 assert ex in nodes 

152 

153 assert [["ENTITY==A"], ["ENTITY==C"], ["ENTITY==F"]] in paths 

154 assert [["ENTITY==A"], ["ENTITY==D"], ["ENTITY==F"]] in paths 

155 assert [["ENTITY==C", "ENTITY==D", "ENTITY==X"], ["ENTITY==F"]] in paths