Coverage for intelligence_toolkit/detect_case_patterns/detection_functions.py: 11%

85 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4from collections import defaultdict 

5from itertools import combinations 

6 

7from intelligence_toolkit.graph.graph_fusion_encoder_embedding import is_converging_pair 

8 

9import numpy as np 

10import pandas as pd 

11 

12 

13def _create_period_to_close_nodes( 

14 used_periods, 

15 node_to_period_to_pos, 

16 sorted_nodes, 

17 min_pattern_count, 

18 rc, 

19 type_val_sep, 

20): 

21 period_to_close_nodes = {} 

22 all_pairs = 0 

23 close_pairs = 0 

24 for period in used_periods: 

25 period_to_close_nodes[period] = [] 

26 for ix, node1 in enumerate(sorted_nodes): 

27 for node2 in sorted_nodes[ix + 1 :]: 

28 all_pairs += 1 

29 n1a = node1.split(type_val_sep)[0] 

30 n2a = node2.split(type_val_sep)[0] 

31 if n1a != n2a: 

32 if is_converging_pair( 

33 period, node1, node2, node_to_period_to_pos, all_time=True 

34 ): 

35 period_count = rc.count_records([period, node1, node2]) 

36 if period_count >= min_pattern_count: 

37 close_pairs += 1 

38 period_to_close_nodes[period].append((node1, node2)) 

39 return all_pairs, close_pairs, period_to_close_nodes 

40 

41 

42def create_close_node_rows( 

43 used_periods, 

44 node_to_period_to_pos, 

45 sorted_nodes, 

46 min_pattern_count, 

47 rc, 

48 type_val_sep, 

49): 

50 all_pairs, close_pairs, period_to_close_nodes = _create_period_to_close_nodes( 

51 used_periods, 

52 node_to_period_to_pos, 

53 sorted_nodes, 

54 min_pattern_count, 

55 rc, 

56 type_val_sep, 

57 ) 

58 

59 close_node_rows = [] 

60 for period, close_nodes in period_to_close_nodes.items(): 

61 for node1, node2 in close_nodes: 

62 period_count = rc.count_records([period, node1, node2]) 

63 mean_count, _, _ = rc.compute_period_mean_sd_max([node1, node2]) 

64 if period_count >= min_pattern_count: 

65 count_factor = period_count / mean_count 

66 count_delta = period_count - mean_count 

67 row = [ 

68 period, 

69 node1, 

70 node2, 

71 period_count, 

72 mean_count, 

73 count_delta, 

74 count_factor, 

75 ] 

76 close_node_rows.append(row) 

77 columns = [ 

78 "period", 

79 "node1", 

80 "node2", 

81 "period_count", 

82 "mean_count", 

83 "count_delta", 

84 "count_factor", 

85 ] 

86 close_node_df = pd.DataFrame(close_node_rows, columns=columns) 

87 return close_node_df, all_pairs, close_pairs 

88 

89 

90def create_period_to_patterns( 

91 used_periods, close_node_df, max_pattern_length, min_pattern_count, rc 

92): 

93 period_to_patterns = {} 

94 pattern_to_periods = defaultdict(set) 

95 for period in used_periods: 

96 period_pair_counts = close_node_df[close_node_df["period"] == period][ 

97 ["node1", "node2", "period_count"] 

98 ].values.tolist() 

99 period_to_patterns[period] = [([], 0)] 

100 period_pairs = [tuple(sorted([a, b])) for a, b, c in period_pair_counts] 

101 for pattern, _ in period_to_patterns[period]: 

102 for a, b in period_pairs: 

103 a_in_pattern = a in pattern 

104 b_in_pattern = b in pattern 

105 if len(pattern) > 0 and ( 

106 (a_in_pattern and b_in_pattern) 

107 or (not a_in_pattern and not b_in_pattern) 

108 ): 

109 continue 

110 candidate = None 

111 if a_in_pattern and not b_in_pattern: 

112 candidate = [b] 

113 elif b_in_pattern and not a_in_pattern: 

114 candidate = [a] 

115 elif not a_in_pattern and not b_in_pattern: 

116 candidate = [a, b] 

117 

118 if candidate is not None: 

119 candidate_pattern = sorted(pattern + candidate) 

120 if len(candidate_pattern) <= max_pattern_length: 

121 if candidate_pattern not in [ 

122 p for p, _ in period_to_patterns[period] 

123 ]: 

124 candidate_pairs = combinations(candidate_pattern, 2) 

125 exclude = False 

126 for pair in candidate_pairs: 

127 if pair not in period_pairs: 

128 exclude = True 

129 break 

130 if not exclude: 

131 pcount = rc.count_records( 

132 [ 

133 period, 

134 *list(candidate_pattern), 

135 ] 

136 ) 

137 if pcount > min_pattern_count: 

138 period_to_patterns[period].append( 

139 ( 

140 candidate_pattern, 

141 pcount, 

142 ) 

143 ) 

144 pattern_to_periods[tuple(candidate_pattern)].add( 

145 period 

146 ) 

147 return period_to_patterns 

148 

149 

150def create_pattern_rows(period_to_patterns, rc): 

151 pattern_rows = [] 

152 for period, patterns in period_to_patterns.items(): 

153 for pattern, count in patterns: 

154 if count > 0: 

155 mean, sd, _ = rc.compute_period_mean_sd_max(pattern) 

156 score = (count - mean) / sd 

157 if score >= 0: 

158 row = [ 

159 period, 

160 " & ".join(pattern), 

161 len(pattern), 

162 count, 

163 round(mean, 0), 

164 round(score, 2), 

165 ] 

166 pattern_rows.append(row) 

167 return pattern_rows