Coverage for intelligence_toolkit/detect_case_patterns/detection_functions.py: 11%
85 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
4from collections import defaultdict
5from itertools import combinations
7from intelligence_toolkit.graph.graph_fusion_encoder_embedding import is_converging_pair
9import numpy as np
10import pandas as pd
13def _create_period_to_close_nodes(
14 used_periods,
15 node_to_period_to_pos,
16 sorted_nodes,
17 min_pattern_count,
18 rc,
19 type_val_sep,
20):
21 period_to_close_nodes = {}
22 all_pairs = 0
23 close_pairs = 0
24 for period in used_periods:
25 period_to_close_nodes[period] = []
26 for ix, node1 in enumerate(sorted_nodes):
27 for node2 in sorted_nodes[ix + 1 :]:
28 all_pairs += 1
29 n1a = node1.split(type_val_sep)[0]
30 n2a = node2.split(type_val_sep)[0]
31 if n1a != n2a:
32 if is_converging_pair(
33 period, node1, node2, node_to_period_to_pos, all_time=True
34 ):
35 period_count = rc.count_records([period, node1, node2])
36 if period_count >= min_pattern_count:
37 close_pairs += 1
38 period_to_close_nodes[period].append((node1, node2))
39 return all_pairs, close_pairs, period_to_close_nodes
42def create_close_node_rows(
43 used_periods,
44 node_to_period_to_pos,
45 sorted_nodes,
46 min_pattern_count,
47 rc,
48 type_val_sep,
49):
50 all_pairs, close_pairs, period_to_close_nodes = _create_period_to_close_nodes(
51 used_periods,
52 node_to_period_to_pos,
53 sorted_nodes,
54 min_pattern_count,
55 rc,
56 type_val_sep,
57 )
59 close_node_rows = []
60 for period, close_nodes in period_to_close_nodes.items():
61 for node1, node2 in close_nodes:
62 period_count = rc.count_records([period, node1, node2])
63 mean_count, _, _ = rc.compute_period_mean_sd_max([node1, node2])
64 if period_count >= min_pattern_count:
65 count_factor = period_count / mean_count
66 count_delta = period_count - mean_count
67 row = [
68 period,
69 node1,
70 node2,
71 period_count,
72 mean_count,
73 count_delta,
74 count_factor,
75 ]
76 close_node_rows.append(row)
77 columns = [
78 "period",
79 "node1",
80 "node2",
81 "period_count",
82 "mean_count",
83 "count_delta",
84 "count_factor",
85 ]
86 close_node_df = pd.DataFrame(close_node_rows, columns=columns)
87 return close_node_df, all_pairs, close_pairs
90def create_period_to_patterns(
91 used_periods, close_node_df, max_pattern_length, min_pattern_count, rc
92):
93 period_to_patterns = {}
94 pattern_to_periods = defaultdict(set)
95 for period in used_periods:
96 period_pair_counts = close_node_df[close_node_df["period"] == period][
97 ["node1", "node2", "period_count"]
98 ].values.tolist()
99 period_to_patterns[period] = [([], 0)]
100 period_pairs = [tuple(sorted([a, b])) for a, b, c in period_pair_counts]
101 for pattern, _ in period_to_patterns[period]:
102 for a, b in period_pairs:
103 a_in_pattern = a in pattern
104 b_in_pattern = b in pattern
105 if len(pattern) > 0 and (
106 (a_in_pattern and b_in_pattern)
107 or (not a_in_pattern and not b_in_pattern)
108 ):
109 continue
110 candidate = None
111 if a_in_pattern and not b_in_pattern:
112 candidate = [b]
113 elif b_in_pattern and not a_in_pattern:
114 candidate = [a]
115 elif not a_in_pattern and not b_in_pattern:
116 candidate = [a, b]
118 if candidate is not None:
119 candidate_pattern = sorted(pattern + candidate)
120 if len(candidate_pattern) <= max_pattern_length:
121 if candidate_pattern not in [
122 p for p, _ in period_to_patterns[period]
123 ]:
124 candidate_pairs = combinations(candidate_pattern, 2)
125 exclude = False
126 for pair in candidate_pairs:
127 if pair not in period_pairs:
128 exclude = True
129 break
130 if not exclude:
131 pcount = rc.count_records(
132 [
133 period,
134 *list(candidate_pattern),
135 ]
136 )
137 if pcount > min_pattern_count:
138 period_to_patterns[period].append(
139 (
140 candidate_pattern,
141 pcount,
142 )
143 )
144 pattern_to_periods[tuple(candidate_pattern)].add(
145 period
146 )
147 return period_to_patterns
150def create_pattern_rows(period_to_patterns, rc):
151 pattern_rows = []
152 for period, patterns in period_to_patterns.items():
153 for pattern, count in patterns:
154 if count > 0:
155 mean, sd, _ = rc.compute_period_mean_sd_max(pattern)
156 score = (count - mean) / sd
157 if score >= 0:
158 row = [
159 period,
160 " & ".join(pattern),
161 len(pattern),
162 count,
163 round(mean, 0),
164 round(score, 2),
165 ]
166 pattern_rows.append(row)
167 return pattern_rows