Coverage for intelligence_toolkit/compare_case_groups/temporal_process.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4import polars as pl 

5 

6 

7def create_window_df( 

8 groups: list[str], temporal: str, aggregates: list[str], wdf: pl.DataFrame 

9) -> pl.DataFrame: 

10 ldf = wdf.melt( 

11 id_vars=[*groups, temporal], 

12 value_vars=aggregates, 

13 variable_name="Attribute", 

14 value_name="Value", 

15 ) 

16 

17 # transform attribute and value columns into string 

18 ldf = ldf.with_columns( 

19 pl.col("Attribute").cast(pl.String), pl.col("Value").cast(pl.String) 

20 ) 

21 ldf = ldf.with_columns( 

22 (pl.col("Attribute") + ":" + pl.col("Value")).alias("attribute_value") 

23 ) 

24 

25 ldf = ldf.filter(pl.col("Value").is_not_null()) 

26 for group in groups: 

27 ldf = ldf.filter(pl.col(group).is_not_null()) 

28 

29 # Group by groups and count attribute values 

30 return ( 

31 ldf.group_by([*groups, temporal, "attribute_value"]) 

32 .agg(pl.len().alias(f"{temporal}_window_count")) 

33 .sort([*groups, temporal, "attribute_value"]) 

34 ) 

35 

36# Calculate deltas in counts within each group and attribute value 

37def calculate_window_delta( 

38 groups: list[str], temporal_df: pl.DataFrame, temporal 

39) -> pl.DataFrame: 

40 return temporal_df.sort([*groups, temporal, "attribute_value"]).with_columns( 

41 [ 

42 pl.col(f"{temporal}_window_count") 

43 .diff() 

44 .over([*groups, "attribute_value"]) 

45 .fill_null(0) 

46 .alias(f"{temporal}_window_delta") 

47 ] 

48 ) 

49 

50 

51def build_temporal_count( 

52 ldf: pl.DataFrame, groups: list[str], temporal: str 

53) -> pl.DataFrame: 

54 grouped_df = ldf.group_by(groups) 

55 

56 unique_time_vals = ldf[temporal].unique() 

57 unique_att_vals = ldf["attribute_value"].unique() 

58 

59 new_rows = [] 

60 

61 # Iterate over unique temporal values 

62 for time_val in unique_time_vals: 

63 for name, group in grouped_df: 

64 for att_val in unique_att_vals: 

65 # Filter the group based on temporal and attribute value 

66 filtered_df = group.filter( 

67 (pl.col(temporal) == time_val) 

68 & (pl.col("attribute_value") == att_val) 

69 ) 

70 # Check if the filtered dataframe is empty 

71 if filtered_df.height == 0: 

72 # Append the new row to the list 

73 new_rows.append([*name, time_val, att_val, 0]) 

74 # Convert new rows to a DataFrame and append to the original DataFrame 

75 if new_rows: 

76 new_rows_df = pl.DataFrame(new_rows, schema=ldf.schema) 

77 ldf = ldf.vstack(new_rows_df) 

78 

79 # Remove attribute_values for groups where attribute_count are 0 for the window 

80 aggregated_df = ldf.group_by([*groups, "attribute_value"]).agg( 

81 [pl.col(f"{temporal}_window_count").sum().alias("total_window_count")] 

82 ) 

83 zero_count_groups = aggregated_df.filter(pl.col("total_window_count") == 0) 

84 

85 filtered_result = ldf.join( 

86 zero_count_groups, on=[*groups, "attribute_value"], how="anti" 

87 ) 

88 

89 return calculate_window_delta(groups, filtered_result, temporal) 

90 

91 

92def build_temporal_data( 

93 ldf: pl.DataFrame, groups: list[str], temporal_atts: list[str], temporal: str 

94) -> pl.DataFrame: 

95 tdfs = [] 

96 if ldf.shape[0] == 0: 

97 return ldf 

98 ldf = build_temporal_count(ldf, groups, temporal) 

99 

100 if ldf.is_empty(): 

101 return ldf 

102 

103 for tatt in temporal_atts: 

104 tdf = ldf.filter(pl.col(temporal) == tatt) 

105 tdf = tdf.with_columns(pl.lit(0).alias(f"{temporal}_window_rank")) 

106 for att_val in tdf.select("attribute_value").unique().to_numpy(): 

107 filtered_df = tdf.filter(pl.col("attribute_value") == att_val) 

108 ranked_series = filtered_df[f"{temporal}_window_count"].rank( 

109 descending=True, method="dense" 

110 ) 

111 filtered_ranked = filtered_df.with_columns( 

112 ranked_series.alias(f"{temporal}_window_rank") 

113 ) 

114 tdf = tdf.join( 

115 filtered_ranked, 

116 on=[*groups, temporal, "attribute_value"], 

117 how="left", 

118 suffix=("_r"), 

119 ) 

120 tdf = tdf.with_columns( 

121 pl.when(pl.col("attribute_value") == att_val) 

122 .then(pl.col(f"{temporal}_window_rank_r")) 

123 .otherwise(pl.col(f"{temporal}_window_rank")) 

124 .alias(f"{temporal}_window_rank") 

125 ) 

126 

127 tdf = tdf.drop( 

128 [ 

129 f"{temporal}_window_delta_r", 

130 f"{temporal}_window_rank_r", 

131 f"{temporal}_window_count_r", 

132 ] 

133 ) 

134 tdf = tdf.with_columns( 

135 pl.col(f"{temporal}_window_rank").cast(pl.Int64), 

136 ) 

137 tdfs.append(tdf) 

138 return pl.concat(tdfs).sort(by=temporal)