Coverage for intelligence_toolkit/compare_case_groups/temporal_process.py: 100%
49 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
4import polars as pl
7def create_window_df(
8 groups: list[str], temporal: str, aggregates: list[str], wdf: pl.DataFrame
9) -> pl.DataFrame:
10 ldf = wdf.melt(
11 id_vars=[*groups, temporal],
12 value_vars=aggregates,
13 variable_name="Attribute",
14 value_name="Value",
15 )
17 # transform attribute and value columns into string
18 ldf = ldf.with_columns(
19 pl.col("Attribute").cast(pl.String), pl.col("Value").cast(pl.String)
20 )
21 ldf = ldf.with_columns(
22 (pl.col("Attribute") + ":" + pl.col("Value")).alias("attribute_value")
23 )
25 ldf = ldf.filter(pl.col("Value").is_not_null())
26 for group in groups:
27 ldf = ldf.filter(pl.col(group).is_not_null())
29 # Group by groups and count attribute values
30 return (
31 ldf.group_by([*groups, temporal, "attribute_value"])
32 .agg(pl.len().alias(f"{temporal}_window_count"))
33 .sort([*groups, temporal, "attribute_value"])
34 )
36# Calculate deltas in counts within each group and attribute value
37def calculate_window_delta(
38 groups: list[str], temporal_df: pl.DataFrame, temporal
39) -> pl.DataFrame:
40 return temporal_df.sort([*groups, temporal, "attribute_value"]).with_columns(
41 [
42 pl.col(f"{temporal}_window_count")
43 .diff()
44 .over([*groups, "attribute_value"])
45 .fill_null(0)
46 .alias(f"{temporal}_window_delta")
47 ]
48 )
51def build_temporal_count(
52 ldf: pl.DataFrame, groups: list[str], temporal: str
53) -> pl.DataFrame:
54 grouped_df = ldf.group_by(groups)
56 unique_time_vals = ldf[temporal].unique()
57 unique_att_vals = ldf["attribute_value"].unique()
59 new_rows = []
61 # Iterate over unique temporal values
62 for time_val in unique_time_vals:
63 for name, group in grouped_df:
64 for att_val in unique_att_vals:
65 # Filter the group based on temporal and attribute value
66 filtered_df = group.filter(
67 (pl.col(temporal) == time_val)
68 & (pl.col("attribute_value") == att_val)
69 )
70 # Check if the filtered dataframe is empty
71 if filtered_df.height == 0:
72 # Append the new row to the list
73 new_rows.append([*name, time_val, att_val, 0])
74 # Convert new rows to a DataFrame and append to the original DataFrame
75 if new_rows:
76 new_rows_df = pl.DataFrame(new_rows, schema=ldf.schema)
77 ldf = ldf.vstack(new_rows_df)
79 # Remove attribute_values for groups where attribute_count are 0 for the window
80 aggregated_df = ldf.group_by([*groups, "attribute_value"]).agg(
81 [pl.col(f"{temporal}_window_count").sum().alias("total_window_count")]
82 )
83 zero_count_groups = aggregated_df.filter(pl.col("total_window_count") == 0)
85 filtered_result = ldf.join(
86 zero_count_groups, on=[*groups, "attribute_value"], how="anti"
87 )
89 return calculate_window_delta(groups, filtered_result, temporal)
92def build_temporal_data(
93 ldf: pl.DataFrame, groups: list[str], temporal_atts: list[str], temporal: str
94) -> pl.DataFrame:
95 tdfs = []
96 if ldf.shape[0] == 0:
97 return ldf
98 ldf = build_temporal_count(ldf, groups, temporal)
100 if ldf.is_empty():
101 return ldf
103 for tatt in temporal_atts:
104 tdf = ldf.filter(pl.col(temporal) == tatt)
105 tdf = tdf.with_columns(pl.lit(0).alias(f"{temporal}_window_rank"))
106 for att_val in tdf.select("attribute_value").unique().to_numpy():
107 filtered_df = tdf.filter(pl.col("attribute_value") == att_val)
108 ranked_series = filtered_df[f"{temporal}_window_count"].rank(
109 descending=True, method="dense"
110 )
111 filtered_ranked = filtered_df.with_columns(
112 ranked_series.alias(f"{temporal}_window_rank")
113 )
114 tdf = tdf.join(
115 filtered_ranked,
116 on=[*groups, temporal, "attribute_value"],
117 how="left",
118 suffix=("_r"),
119 )
120 tdf = tdf.with_columns(
121 pl.when(pl.col("attribute_value") == att_val)
122 .then(pl.col(f"{temporal}_window_rank_r"))
123 .otherwise(pl.col(f"{temporal}_window_rank"))
124 .alias(f"{temporal}_window_rank")
125 )
127 tdf = tdf.drop(
128 [
129 f"{temporal}_window_delta_r",
130 f"{temporal}_window_rank_r",
131 f"{temporal}_window_count_r",
132 ]
133 )
134 tdf = tdf.with_columns(
135 pl.col(f"{temporal}_window_rank").cast(pl.Int64),
136 )
137 tdfs.append(tdf)
138 return pl.concat(tdfs).sort(by=temporal)