Coverage for intelligence_toolkit/detect_case_patterns/model.py: 70%
81 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
4from collections import defaultdict
6import numpy as np
7import pandas as pd
9from intelligence_toolkit.AI.metaprompts import do_not_harm
10from intelligence_toolkit.AI.utils import generate_messages
11from intelligence_toolkit.helpers import df_functions
13from .detection_functions import (
14 create_close_node_rows,
15 create_pattern_rows,
16 create_period_to_patterns,
17)
18from .graph_functions import convert_edge_df_to_graph, create_edge_df_from_atts
19from .prompts import report_prompt, user_prompt
20from .record_counter import RecordCounter
23def generate_graph_model(df, period_col, type_val_sep):
24 att_cols = [
25 col for col in df.columns.to_numpy() if col not in ["Subject ID", period_col]
26 ]
27 model_df = df_functions.fix_null_ints(df)
29 model_df["Subject ID"] = [str(x) for x in range(1, len(model_df) + 1)]
30 model_df["Subject ID"] = model_df["Subject ID"].astype(str)
31 pdf = model_df.copy(deep=True)[[period_col, "Subject ID", *att_cols]]
32 pdf = pdf[pdf[period_col].notna() & pdf["Subject ID"].notna()]
33 pdf.rename(columns={period_col: "Period"}, inplace=True)
35 pdf["Period"] = pdf["Period"].astype(str)
37 pdf = pd.melt(
38 pdf,
39 id_vars=["Subject ID", "Period"],
40 value_vars=att_cols,
41 var_name="Attribute Type",
42 value_name="Attribute Value",
43 )
44 pdf = pdf[pdf["Attribute Value"] != ""]
45 pdf["Full Attribute"] = pdf.apply(
46 lambda x: str(x["Attribute Type"]) + type_val_sep + str(x["Attribute Value"]),
47 axis=1,
48 )
49 return pdf[pdf["Period"] != ""]
52def compute_attribute_counts(df, pattern, period_col, period, type_val_sep):
53 print(
54 f"Computing attribute counts for pattern: {pattern} with period: {period} for period column: {period_col}"
55 )
56 atts = pattern.split(" & ")
57 # Combine astype and replace operations
58 fdf = df_functions.fix_null_ints(df)
59 fdf = fdf[fdf[period_col] == period]
60 # Pre-filter columns to avoid unnecessary processing
61 relevant_columns = [c for c in fdf.columns if c not in ["Subject ID", period_col]]
62 # fdf = fdf[["Subject ID", period_col, *relevant_columns]]
64 for att in atts:
65 if type_val_sep not in att:
66 continue
67 attribute, value = att.split(type_val_sep)
68 fdf = fdf[fdf[attribute] == value]
70 # Melt with pre-filtered columns
71 fdf["Subject ID"] = range(len(fdf))
72 melted = pd.melt(
73 fdf,
74 id_vars=["Subject ID"],
75 value_vars=relevant_columns,
76 var_name="Attribute",
77 value_name="Value",
78 )
79 melted = melted[melted["Value"] != ""]
80 melted["AttributeValue"] = melted["Attribute"] + type_val_sep + melted["Value"]
81 # Directly use nunique in groupby
82 count_df = (
83 melted.groupby("AttributeValue")["Subject ID"]
84 .nunique()
85 .reset_index(name="Count")
86 .sort_values(by="Count", ascending=False)
87 )
88 return count_df
91def create_time_series_df(model, pattern_df):
92 record_counter = RecordCounter(model)
94 rows = []
95 for _, row in pattern_df.iterrows():
96 rows.extend(record_counter.create_time_series_rows(row["pattern"].split(" & ")))
97 columns = ["period", "pattern", "count"]
98 return pd.DataFrame(rows, columns=columns)
101def prepare_graph(dynamic_df, min_edge_weight, missing_edge_prop):
102 time_to_graph = {}
103 pdf = dynamic_df.copy()
104 atts = sorted(pdf["Full Attribute"].unique())
105 pdf["Grouping ID"] = pdf["Subject ID"].astype(str) + "@" + pdf["Period"].astype(str)
107 periods = sorted(pdf["Period"].unique())
109 for ix, period in enumerate(periods):
110 tdf = pdf[pdf["Period"] == period].copy()
111 tdf["Grouping ID"] = (
112 tdf["Subject ID"].astype(str) + "@" + tdf["Period"].astype(str)
113 )
114 tdf = tdf.groupby("Grouping ID")["Full Attribute"].agg(list).reset_index()
115 dedge_df = create_edge_df_from_atts(
116 atts, tdf, min_edge_weight, missing_edge_prop
117 )
118 G, lcc = convert_edge_df_to_graph(dedge_df)
119 time_to_graph[period] = G
120 return pdf, time_to_graph
123def detect_patterns(
124 node_to_period_to_pos,
125 dynamic_df,
126 type_val_sep,
127 min_pattern_count=5,
128 max_pattern_length=100,
129) -> tuple[pd.DataFrame, int, int]:
130 sorted_nodes = sorted(node_to_period_to_pos.keys())
131 record_counter = RecordCounter(dynamic_df)
132 used_periods = sorted(dynamic_df["Period"].unique())
133 # # for each period, find all pairs of nodes close
134 close_node_df, all_pairs, close_pairs = create_close_node_rows(
135 used_periods,
136 node_to_period_to_pos,
137 sorted_nodes,
138 min_pattern_count,
139 record_counter,
140 type_val_sep,
141 )
142 period_to_patterns = create_period_to_patterns(
143 used_periods,
144 close_node_df,
145 max_pattern_length,
146 min_pattern_count,
147 record_counter,
148 )
149 # convert to df
150 pattern_rows = create_pattern_rows(period_to_patterns, record_counter)
152 columns = ["period", "pattern", "length", "count", "mean", "z_score"]
153 pattern_df = pd.DataFrame(pattern_rows, columns=columns)
155 # Count the number of periods per pattern and merge it into the DataFrame
156 detections = (
157 pattern_df.groupby("pattern", as_index=False)["period"]
158 .count()
159 .rename(columns={"period": "detections"})
160 )
161 pattern_df = pattern_df.merge(detections, on="pattern")
163 # Calculate the overall score
164 pattern_df["overall_score"] = (
165 pattern_df["z_score"]
166 * pattern_df["length"]
167 * pattern_df["detections"]
168 * np.log1p(pattern_df["count"]) # np.log1p(x) is equivalent to np.log(x + 1)
169 )
171 # Normalize the overall score
172 pattern_df["overall_score"] = (
173 pattern_df["overall_score"] / pattern_df["overall_score"].max()
174 )
175 pattern_df["overall_score"] = pattern_df["overall_score"].round(2)
177 # Sort the DataFrame by the overall score in descending order
178 pattern_df = pattern_df.sort_values("overall_score", ascending=False)
179 return pattern_df, close_pairs, all_pairs
182def prepare_for_ai_report(
183 pattern: str,
184 period: str,
185 time_series: pd.DataFrame,
186 attribute_counts: pd.DataFrame,
187 u_prompt: str = user_prompt,
188) -> list[dict[str, str]]:
189 variables = {
190 "pattern": pattern,
191 "period": period,
192 "time_series": time_series.to_csv(index=False),
193 "attribute_counts": attribute_counts.to_csv(index=False),
194 }
196 safety_prompt = do_not_harm
197 return generate_messages(u_prompt, report_prompt, variables, safety_prompt)