Coverage for intelligence_toolkit/detect_case_patterns/model.py: 70%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4from collections import defaultdict 

5 

6import numpy as np 

7import pandas as pd 

8 

9from intelligence_toolkit.AI.metaprompts import do_not_harm 

10from intelligence_toolkit.AI.utils import generate_messages 

11from intelligence_toolkit.helpers import df_functions 

12 

13from .detection_functions import ( 

14 create_close_node_rows, 

15 create_pattern_rows, 

16 create_period_to_patterns, 

17) 

18from .graph_functions import convert_edge_df_to_graph, create_edge_df_from_atts 

19from .prompts import report_prompt, user_prompt 

20from .record_counter import RecordCounter 

21 

22 

23def generate_graph_model(df, period_col, type_val_sep): 

24 att_cols = [ 

25 col for col in df.columns.to_numpy() if col not in ["Subject ID", period_col] 

26 ] 

27 model_df = df_functions.fix_null_ints(df) 

28 

29 model_df["Subject ID"] = [str(x) for x in range(1, len(model_df) + 1)] 

30 model_df["Subject ID"] = model_df["Subject ID"].astype(str) 

31 pdf = model_df.copy(deep=True)[[period_col, "Subject ID", *att_cols]] 

32 pdf = pdf[pdf[period_col].notna() & pdf["Subject ID"].notna()] 

33 pdf.rename(columns={period_col: "Period"}, inplace=True) 

34 

35 pdf["Period"] = pdf["Period"].astype(str) 

36 

37 pdf = pd.melt( 

38 pdf, 

39 id_vars=["Subject ID", "Period"], 

40 value_vars=att_cols, 

41 var_name="Attribute Type", 

42 value_name="Attribute Value", 

43 ) 

44 pdf = pdf[pdf["Attribute Value"] != ""] 

45 pdf["Full Attribute"] = pdf.apply( 

46 lambda x: str(x["Attribute Type"]) + type_val_sep + str(x["Attribute Value"]), 

47 axis=1, 

48 ) 

49 return pdf[pdf["Period"] != ""] 

50 

51 

52def compute_attribute_counts(df, pattern, period_col, period, type_val_sep): 

53 print( 

54 f"Computing attribute counts for pattern: {pattern} with period: {period} for period column: {period_col}" 

55 ) 

56 atts = pattern.split(" & ") 

57 # Combine astype and replace operations 

58 fdf = df_functions.fix_null_ints(df) 

59 fdf = fdf[fdf[period_col] == period] 

60 # Pre-filter columns to avoid unnecessary processing 

61 relevant_columns = [c for c in fdf.columns if c not in ["Subject ID", period_col]] 

62 # fdf = fdf[["Subject ID", period_col, *relevant_columns]] 

63 

64 for att in atts: 

65 if type_val_sep not in att: 

66 continue 

67 attribute, value = att.split(type_val_sep) 

68 fdf = fdf[fdf[attribute] == value] 

69 

70 # Melt with pre-filtered columns 

71 fdf["Subject ID"] = range(len(fdf)) 

72 melted = pd.melt( 

73 fdf, 

74 id_vars=["Subject ID"], 

75 value_vars=relevant_columns, 

76 var_name="Attribute", 

77 value_name="Value", 

78 ) 

79 melted = melted[melted["Value"] != ""] 

80 melted["AttributeValue"] = melted["Attribute"] + type_val_sep + melted["Value"] 

81 # Directly use nunique in groupby 

82 count_df = ( 

83 melted.groupby("AttributeValue")["Subject ID"] 

84 .nunique() 

85 .reset_index(name="Count") 

86 .sort_values(by="Count", ascending=False) 

87 ) 

88 return count_df 

89 

90 

91def create_time_series_df(model, pattern_df): 

92 record_counter = RecordCounter(model) 

93 

94 rows = [] 

95 for _, row in pattern_df.iterrows(): 

96 rows.extend(record_counter.create_time_series_rows(row["pattern"].split(" & "))) 

97 columns = ["period", "pattern", "count"] 

98 return pd.DataFrame(rows, columns=columns) 

99 

100 

101def prepare_graph(dynamic_df, min_edge_weight, missing_edge_prop): 

102 time_to_graph = {} 

103 pdf = dynamic_df.copy() 

104 atts = sorted(pdf["Full Attribute"].unique()) 

105 pdf["Grouping ID"] = pdf["Subject ID"].astype(str) + "@" + pdf["Period"].astype(str) 

106 

107 periods = sorted(pdf["Period"].unique()) 

108 

109 for ix, period in enumerate(periods): 

110 tdf = pdf[pdf["Period"] == period].copy() 

111 tdf["Grouping ID"] = ( 

112 tdf["Subject ID"].astype(str) + "@" + tdf["Period"].astype(str) 

113 ) 

114 tdf = tdf.groupby("Grouping ID")["Full Attribute"].agg(list).reset_index() 

115 dedge_df = create_edge_df_from_atts( 

116 atts, tdf, min_edge_weight, missing_edge_prop 

117 ) 

118 G, lcc = convert_edge_df_to_graph(dedge_df) 

119 time_to_graph[period] = G 

120 return pdf, time_to_graph 

121 

122 

123def detect_patterns( 

124 node_to_period_to_pos, 

125 dynamic_df, 

126 type_val_sep, 

127 min_pattern_count=5, 

128 max_pattern_length=100, 

129) -> tuple[pd.DataFrame, int, int]: 

130 sorted_nodes = sorted(node_to_period_to_pos.keys()) 

131 record_counter = RecordCounter(dynamic_df) 

132 used_periods = sorted(dynamic_df["Period"].unique()) 

133 # # for each period, find all pairs of nodes close 

134 close_node_df, all_pairs, close_pairs = create_close_node_rows( 

135 used_periods, 

136 node_to_period_to_pos, 

137 sorted_nodes, 

138 min_pattern_count, 

139 record_counter, 

140 type_val_sep, 

141 ) 

142 period_to_patterns = create_period_to_patterns( 

143 used_periods, 

144 close_node_df, 

145 max_pattern_length, 

146 min_pattern_count, 

147 record_counter, 

148 ) 

149 # convert to df 

150 pattern_rows = create_pattern_rows(period_to_patterns, record_counter) 

151 

152 columns = ["period", "pattern", "length", "count", "mean", "z_score"] 

153 pattern_df = pd.DataFrame(pattern_rows, columns=columns) 

154 

155 # Count the number of periods per pattern and merge it into the DataFrame 

156 detections = ( 

157 pattern_df.groupby("pattern", as_index=False)["period"] 

158 .count() 

159 .rename(columns={"period": "detections"}) 

160 ) 

161 pattern_df = pattern_df.merge(detections, on="pattern") 

162 

163 # Calculate the overall score 

164 pattern_df["overall_score"] = ( 

165 pattern_df["z_score"] 

166 * pattern_df["length"] 

167 * pattern_df["detections"] 

168 * np.log1p(pattern_df["count"]) # np.log1p(x) is equivalent to np.log(x + 1) 

169 ) 

170 

171 # Normalize the overall score 

172 pattern_df["overall_score"] = ( 

173 pattern_df["overall_score"] / pattern_df["overall_score"].max() 

174 ) 

175 pattern_df["overall_score"] = pattern_df["overall_score"].round(2) 

176 

177 # Sort the DataFrame by the overall score in descending order 

178 pattern_df = pattern_df.sort_values("overall_score", ascending=False) 

179 return pattern_df, close_pairs, all_pairs 

180 

181 

182def prepare_for_ai_report( 

183 pattern: str, 

184 period: str, 

185 time_series: pd.DataFrame, 

186 attribute_counts: pd.DataFrame, 

187 u_prompt: str = user_prompt, 

188) -> list[dict[str, str]]: 

189 variables = { 

190 "pattern": pattern, 

191 "period": period, 

192 "time_series": time_series.to_csv(index=False), 

193 "attribute_counts": attribute_counts.to_csv(index=False), 

194 } 

195 

196 safety_prompt = do_not_harm 

197 return generate_messages(u_prompt, report_prompt, variables, safety_prompt)