Coverage for intelligence_toolkit/tests/unit/compare_case_groups/test_temporal_process.py: 100%
159 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
5import polars as pl
6import pytest
8from intelligence_toolkit.compare_case_groups.temporal_process import (
9 build_temporal_count,
10 build_temporal_data,
11 calculate_window_delta,
12 create_window_df,
13)
16class TestCreateWindowDf:
17 def test_basic(self) -> None:
18 # Assuming the groups, temporal, and aggregates variables are defined somewhere
19 groups = ["group1", "group2"]
20 temporal = "time"
21 aggregates = ["agg1", "agg2"]
23 data = {
24 "group1": ["A", "A", "B", "B"],
25 "group2": ["X", "X", "Y", "Y"],
26 "time": [1, 2, 1, 2],
27 "agg1": [10, 20, 30, 40],
28 "agg2": [5, 15, 25, 35],
29 }
30 wdf = pl.DataFrame(data)
32 # Expected output
33 expected_data = {
34 "group1": ["A", "A", "A", "A", "B", "B", "B", "B"],
35 "group2": ["X", "X", "X", "X", "Y", "Y", "Y", "Y"],
36 "time": [1, 1, 2, 2, 1, 1, 2, 2],
37 "attribute_value": [
38 "agg1:10",
39 "agg2:5",
40 "agg1:20",
41 "agg2:15",
42 "agg1:30",
43 "agg2:25",
44 "agg1:40",
45 "agg2:35",
46 ],
47 "time_window_count": [1, 1, 1, 1, 1, 1, 1, 1],
48 }
49 expected_df = pl.DataFrame(expected_data).sort(
50 [*groups, temporal, "attribute_value"]
51 )
53 # Call the function with the sample DataFrame
54 result_df = create_window_df(groups, temporal, aggregates, wdf)
56 # Assert the result
57 assert result_df.equals(expected_df)
60class TestCalculateWindowDelta:
61 def test_basic(self) -> None:
62 data = {
63 "Group": ["A", "A", "B", "B"],
64 "temporal": [1, 2, 1, 2],
65 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"],
66 "temporal_window_count": [5, 3, 8, 6],
67 }
68 sample_data = pl.DataFrame(data)
69 temporal = "temporal"
70 groups = ["Group"]
72 result = calculate_window_delta(groups, sample_data, temporal)
74 assert all(result["temporal_window_delta"].is_not_nan())
76 def test_groups(self) -> None:
77 data = {
78 "Group": [
79 "Bayview",
80 "Bayview",
81 "Bayview",
82 "Bayview",
83 "Bakeview",
84 "Bayview",
85 ],
86 "temporal": [1, 1, 2, 3, 3, 4],
87 "attribute_value": ["X:15", "X:10", "X:10", "X:10", "X:9", "X:10"],
88 "temporal_window_count": [9, 5, 3, 8, 7, 6],
89 }
90 sample_data = pl.DataFrame(data)
91 temporal = "temporal"
93 expected = {
94 "Group": [
95 "Bakeview",
96 "Bayview",
97 "Bayview",
98 "Bayview",
99 "Bayview",
100 "Bayview",
101 ],
102 "temporal": [3, 1, 1, 2, 3, 4],
103 "attribute_value": [
104 "X:9",
105 "X:10",
106 "X:15",
107 "X:10",
108 "X:10",
109 "X:10",
110 ],
111 "temporal_window_count": [7, 5, 9, 3, 8, 6],
112 "temporal_window_delta": [0, 0, 0, -2, 5, -2],
113 }
114 sample_df = pl.DataFrame(expected)
116 groups = ["Group"]
118 result = calculate_window_delta(groups, sample_data, temporal)
120 assert result.equals(sample_df)
122 def test_multiple_groups_no_temporal(self) -> None:
123 data = {
124 "Group": [
125 "Bayview",
126 "Westview",
127 "Bayview",
128 "Bayview",
129 "Bakeview",
130 "Bayview",
131 ],
132 "temporal": [1, 2, 2, 3, 3, 4],
133 "attribute_value": ["X:10", "X:10", "X:10", "X:10", "X:9", "X:10"],
134 "temporal_window_count": [5, 2, 3, 8, 7, 6],
135 }
136 sample_data = pl.DataFrame(data)
137 temporal = "temporal"
139 expected = {
140 "Group": [
141 "Bakeview",
142 "Bayview",
143 "Bayview",
144 "Bayview",
145 "Bayview",
146 "Westview",
147 ],
148 "temporal": [3, 1, 2, 3, 4, 2],
149 "attribute_value": [
150 "X:9",
151 "X:10",
152 "X:10",
153 "X:10",
154 "X:10",
155 "X:10",
156 ],
157 "temporal_window_count": [7, 5, 3, 8, 6, 2],
158 "temporal_window_delta": [0, 0, -2, 5, -2, 0],
159 }
160 sample_df = pl.DataFrame(expected)
162 groups = ["Group"]
164 result = calculate_window_delta(groups, sample_data, temporal)
166 assert result.equals(sample_df)
168 def test_missing_values(self):
169 data = {
170 "Group": ["A", "A", "B", "B"],
171 "temporal": [1, 2, 1, 2],
172 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"],
173 "temporal_window_count": [5, None, 8, None],
174 }
175 sample_data = pl.DataFrame(data)
176 temporal = "temporal"
178 groups = ["Group"]
180 result = calculate_window_delta(groups, sample_data, temporal)
182 assert result["temporal_window_count"].is_nan().sum() == 0
183 assert result.filter(pl.col("temporal_window_delta") == 0).height == 4
186class TestBuildtemporalCount:
187 def test_basic(self) -> None:
188 data = {
189 "Group": ["A", "A", "B", "B"],
190 "temporal": [1, 2, 1, 2],
191 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"],
192 "temporal_window_count": [5, 3, 8, 6],
193 }
194 sample_data = pl.DataFrame(data)
195 groups = ["Group"]
196 temporal = "temporal"
198 result = build_temporal_count(sample_data, groups, temporal)
200 assert "temporal_window_delta" in result.columns
201 assert all(result["temporal_window_delta"].is_not_nan())
203 def test_missing_values(self):
204 data = {
205 "Group": ["A", "A", "B", "B"],
206 "temporal": [1, 2, 1, 20],
207 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"],
208 "temporal_window_count": [5, None, 8, None],
209 }
210 sample_data = pl.DataFrame(data)
211 groups = ["Group"]
212 temporal = "temporal"
214 result = build_temporal_count(sample_data, groups, temporal)
216 assert result["temporal_window_count"].is_nan().sum() == 0
218 def test_multiple_groups(self):
219 data = {
220 "Group": ["A", "A", "B", "B"],
221 "SubGroup": ["X", "Y", "X", "Y"],
222 "temporal": [1, 2, 1, 2],
223 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"],
224 "temporal_window_count": [5, 3, 8, 6],
225 }
226 sample_data = pl.DataFrame(data)
227 groups = ["Group", "SubGroup"]
228 temporal = "temporal"
230 result = build_temporal_count(sample_data, groups, temporal)
232 # get how many unique groups in result.groupby(["Group", "SubGroup"])
233 n_groups = result.group_by(["Group", "SubGroup"]).agg(pl.len()).height
235 assert n_groups == 4
237 def test_delta_calculation(self):
238 data = {
239 "Group": ["A", "A", "A", "B", "B", "B"],
240 "temporal": [1, 2, 3, 1, 2, 3],
241 "attribute_value": ["X:10", "X:20", "X:30", "Y:40", "Y:50", "Y:60"],
242 "temporal_window_count": [5, 3, 1, 8, 6, 4],
243 }
244 sample_data = pl.DataFrame(data)
245 groups = ["Group"]
246 temporal = "temporal"
247 result = build_temporal_count(sample_data, groups, temporal)
249 group_a_deltas = result.filter(pl.col("Group") == "A").select(
250 "temporal_window_delta"
251 )
252 group_a_deltas_values = (
253 group_a_deltas.filter(pl.col("temporal_window_delta") != 0.0)
254 .to_series()
255 .to_list()
256 )
258 group_b_deltas = result.filter(pl.col("Group") == "B").select(
259 "temporal_window_delta"
260 )
261 group_b_deltas_values = (
262 group_b_deltas.filter(pl.col("temporal_window_delta") != 0.0)
263 .to_series()
264 .to_list()
265 )
267 # Assertions
268 assert group_a_deltas.height == 9
269 assert group_b_deltas.height == 9
270 for v in [-5, 3, -3, 1]:
271 assert v in group_a_deltas_values
272 for v in [-8, 6, -6, 4]:
273 assert v in group_b_deltas_values
275 assert (
276 result.select(pl.col("temporal_window_delta").is_not_nan())
277 .to_series()
278 .all()
279 )
281 data = {
282 "Group": ["A", "A", "B", "B"],
283 "temporal": [1, 2, 1, 2],
284 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"],
285 "temporal_window_count": [5, 3, 8, 6],
286 }
288 def test_delta_calculation_temporal_zeroed(self) -> None:
289 data = {
290 "Group": ["A", "A", "A", "B", "B", "B"],
291 "temporal": [1, 2, 3, 1, 2, 3],
292 "attribute_value": ["X:10", "X:20", "X:30", "Y:40", "Y:50", "Y:60"],
293 "temporal_window_count": [5, 0, 1, 8, 0, 4],
294 }
295 sample_data = pl.DataFrame(data)
296 groups = ["Group"]
297 temporal = "temporal"
298 result = build_temporal_count(sample_data, groups, temporal)
300 group_a_deltas = result.filter(pl.col("Group") == "A").select(
301 "temporal_window_delta"
302 )
303 group_a_deltas_values = (
304 group_a_deltas.filter(pl.col("temporal_window_delta") != 0.0)
305 .to_series()
306 .to_list()
307 )
309 group_b_deltas = result.filter(pl.col("Group") == "B").select(
310 "temporal_window_delta"
311 )
312 group_b_deltas_values = (
313 group_b_deltas.filter(pl.col("temporal_window_delta") != 0.0)
314 .to_series()
315 .to_list()
316 )
318 # Assertions
319 assert group_a_deltas.height == 6
320 assert group_b_deltas.height == 6
321 for v in [-5, 1]:
322 assert v in group_a_deltas_values
323 for v in [-8, 4]:
324 assert v in group_b_deltas_values
326 assert (
327 result.select(pl.col("temporal_window_delta").is_not_nan())
328 .to_series()
329 .all()
330 )
332 data = {
333 "Group": ["A", "A", "B", "B"],
334 "temporal": [1, 2, 1, 2],
335 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"],
336 "temporal_window_count": [5, 3, 8, 6],
337 }
340class TestBuildtemporalData:
341 @pytest.fixture()
342 def expected_df_mock(self):
343 return pl.DataFrame(
344 {
345 "Group": ["A", "A", "B", "B"],
346 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
347 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"],
348 "temporal_window_count": [5, 3, 8, 6],
349 }
350 )
352 def test_empty_dataframe(self):
353 ldf = pl.DataFrame()
354 result = build_temporal_data(ldf, groups=[], temporal_atts=[], temporal="")
355 assert result.is_empty()
357 def test_single_temporal_attribute(self, expected_df_mock, mocker):
358 data = {
359 "Group": ["A", "A", "B", "B"],
360 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"],
361 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
362 }
363 ldf = pl.DataFrame(data)
365 mocker.patch(
366 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count"
367 ).return_value = expected_df_mock
368 result = build_temporal_data(
369 ldf, groups=["Group"], temporal_atts=["2023-01-01"], temporal="temporal"
370 )
371 expected_data = {
372 "Group": ["A", "B"],
373 "temporal": ["2023-01-01", "2023-01-01"],
374 "attribute_value": ["V1:10", "V1:30"],
375 "temporal_window_count": [5, 8],
376 "temporal_window_rank": [1.0, 1.0],
377 }
378 expected_df = pl.DataFrame(expected_data)
379 assert result.equals(expected_df)
381 def test_multiple_temporal_attributes(self, expected_df_mock, mocker):
382 data = {
383 "Group": ["A", "A", "B", "B"],
384 "attribute_value": [1, 2, 3, 4],
385 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
386 }
387 ldf = pl.DataFrame(data)
389 mocker.patch(
390 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count"
391 ).return_value = expected_df_mock
392 result = build_temporal_data(
393 ldf,
394 groups=["Group"],
395 temporal_atts=["2023-01-01", "2023-01-02"],
396 temporal="temporal",
397 )
398 expected_data = {
399 "Group": ["A", "B", "A", "B"],
400 "temporal": ["2023-01-01", "2023-01-01", "2023-01-02", "2023-01-02"],
401 "attribute_value": ["V1:10", "V1:30", "V2:20", "V2:40"],
402 "temporal_window_count": [5, 8, 3, 6],
403 "temporal_window_rank": [1.0, 1.0, 1.0, 1.0],
404 }
405 expected_df = pl.DataFrame(expected_data)
406 assert result.equals(expected_df)
408 def test_multiple_groups(self, mocker):
409 data = {
410 "Group": ["A", "A", "B", "B"],
411 "SubGroup": ["X", "Y", "X", "Y"],
412 "attribute_value": [1, 2, 3, 4],
413 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
414 }
415 ldf = pl.DataFrame(data)
417 df_mock = pl.DataFrame(
418 {
419 "Group": ["A", "A", "B", "B"],
420 "SubGroup": ["X", "Y", "X", "Y"],
421 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
422 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"],
423 "temporal_window_count": [5, 3, 8, 6],
424 }
425 )
426 mocker.patch(
427 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count"
428 ).return_value = df_mock
429 result = build_temporal_data(
430 ldf,
431 groups=["Group", "SubGroup"],
432 temporal_atts=["2023-01-01", "2023-01-02"],
433 temporal="temporal",
434 )
435 expected_data = {
436 "Group": ["A", "B", "A", "B"],
437 "SubGroup": ["X", "X", "Y", "Y"],
438 "temporal": ["2023-01-01", "2023-01-01", "2023-01-02", "2023-01-02"],
439 "attribute_value": ["V1:10", "V1:30", "V2:20", "V2:40"],
440 "temporal_window_count": [5, 8, 3, 6],
441 "temporal_window_rank": [1.0, 1.0, 1.0, 1.0],
442 }
443 expected_df = pl.DataFrame(expected_data)
444 assert result.equals(expected_df)
446 def test_missing_values(self, expected_df_mock, mocker):
447 data = {
448 "Group": ["A", "A", "B", "B"],
449 "attribute_value": [1, 2, 3, 4],
450 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
451 }
452 ldf = pl.DataFrame(data)
454 mocker.patch(
455 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count"
456 ).return_value = expected_df_mock
457 result = build_temporal_data(
458 ldf,
459 groups=["Group"],
460 temporal_atts=["2023-01-01", "2023-01-02"],
461 temporal="temporal",
462 )
464 assert result["temporal_window_count"].is_nan().sum() == 0
466 def test_non_existent_temporal_values(self, expected_df_mock, mocker):
467 data = {
468 "Group": ["A", "A", "B", "B"],
469 "attribute_value": [1, 2, 3, 4],
470 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
471 }
472 ldf = pl.DataFrame(data)
474 mocker.patch(
475 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count"
476 ).return_value = expected_df_mock
477 result = build_temporal_data(
478 ldf,
479 groups=["Group"],
480 temporal_atts=["2023-01-03"],
481 temporal="temporal",
482 )
484 assert result.is_empty()
486 def test_incorrect_groups(self, mocker):
487 data = {
488 "Group": ["A", "A", "B", "B"],
489 "attribute_value": [1, 2, 3, 4],
490 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"],
491 }
492 ldf = pl.DataFrame(data)
494 mocker.patch(
495 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count"
496 ).return_value = pl.DataFrame()
497 result = build_temporal_data(
498 ldf,
499 groups=["Group", "NonExistent"],
500 temporal_atts=["2023-01-01", "2023-01-02"],
501 temporal="temporal",
502 )
504 assert result.is_empty()