Coverage for intelligence_toolkit/tests/unit/compare_case_groups/test_temporal_process.py: 100%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 13:41 -0300

1# Copyright (c) 2024 Microsoft Corporation. All rights reserved. 

2# Licensed under the MIT license. See LICENSE file in the project. 

3# 

4 

5import polars as pl 

6import pytest 

7 

8from intelligence_toolkit.compare_case_groups.temporal_process import ( 

9 build_temporal_count, 

10 build_temporal_data, 

11 calculate_window_delta, 

12 create_window_df, 

13) 

14 

15 

16class TestCreateWindowDf: 

17 def test_basic(self) -> None: 

18 # Assuming the groups, temporal, and aggregates variables are defined somewhere 

19 groups = ["group1", "group2"] 

20 temporal = "time" 

21 aggregates = ["agg1", "agg2"] 

22 

23 data = { 

24 "group1": ["A", "A", "B", "B"], 

25 "group2": ["X", "X", "Y", "Y"], 

26 "time": [1, 2, 1, 2], 

27 "agg1": [10, 20, 30, 40], 

28 "agg2": [5, 15, 25, 35], 

29 } 

30 wdf = pl.DataFrame(data) 

31 

32 # Expected output 

33 expected_data = { 

34 "group1": ["A", "A", "A", "A", "B", "B", "B", "B"], 

35 "group2": ["X", "X", "X", "X", "Y", "Y", "Y", "Y"], 

36 "time": [1, 1, 2, 2, 1, 1, 2, 2], 

37 "attribute_value": [ 

38 "agg1:10", 

39 "agg2:5", 

40 "agg1:20", 

41 "agg2:15", 

42 "agg1:30", 

43 "agg2:25", 

44 "agg1:40", 

45 "agg2:35", 

46 ], 

47 "time_window_count": [1, 1, 1, 1, 1, 1, 1, 1], 

48 } 

49 expected_df = pl.DataFrame(expected_data).sort( 

50 [*groups, temporal, "attribute_value"] 

51 ) 

52 

53 # Call the function with the sample DataFrame 

54 result_df = create_window_df(groups, temporal, aggregates, wdf) 

55 

56 # Assert the result 

57 assert result_df.equals(expected_df) 

58 

59 

60class TestCalculateWindowDelta: 

61 def test_basic(self) -> None: 

62 data = { 

63 "Group": ["A", "A", "B", "B"], 

64 "temporal": [1, 2, 1, 2], 

65 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"], 

66 "temporal_window_count": [5, 3, 8, 6], 

67 } 

68 sample_data = pl.DataFrame(data) 

69 temporal = "temporal" 

70 groups = ["Group"] 

71 

72 result = calculate_window_delta(groups, sample_data, temporal) 

73 

74 assert all(result["temporal_window_delta"].is_not_nan()) 

75 

76 def test_groups(self) -> None: 

77 data = { 

78 "Group": [ 

79 "Bayview", 

80 "Bayview", 

81 "Bayview", 

82 "Bayview", 

83 "Bakeview", 

84 "Bayview", 

85 ], 

86 "temporal": [1, 1, 2, 3, 3, 4], 

87 "attribute_value": ["X:15", "X:10", "X:10", "X:10", "X:9", "X:10"], 

88 "temporal_window_count": [9, 5, 3, 8, 7, 6], 

89 } 

90 sample_data = pl.DataFrame(data) 

91 temporal = "temporal" 

92 

93 expected = { 

94 "Group": [ 

95 "Bakeview", 

96 "Bayview", 

97 "Bayview", 

98 "Bayview", 

99 "Bayview", 

100 "Bayview", 

101 ], 

102 "temporal": [3, 1, 1, 2, 3, 4], 

103 "attribute_value": [ 

104 "X:9", 

105 "X:10", 

106 "X:15", 

107 "X:10", 

108 "X:10", 

109 "X:10", 

110 ], 

111 "temporal_window_count": [7, 5, 9, 3, 8, 6], 

112 "temporal_window_delta": [0, 0, 0, -2, 5, -2], 

113 } 

114 sample_df = pl.DataFrame(expected) 

115 

116 groups = ["Group"] 

117 

118 result = calculate_window_delta(groups, sample_data, temporal) 

119 

120 assert result.equals(sample_df) 

121 

122 def test_multiple_groups_no_temporal(self) -> None: 

123 data = { 

124 "Group": [ 

125 "Bayview", 

126 "Westview", 

127 "Bayview", 

128 "Bayview", 

129 "Bakeview", 

130 "Bayview", 

131 ], 

132 "temporal": [1, 2, 2, 3, 3, 4], 

133 "attribute_value": ["X:10", "X:10", "X:10", "X:10", "X:9", "X:10"], 

134 "temporal_window_count": [5, 2, 3, 8, 7, 6], 

135 } 

136 sample_data = pl.DataFrame(data) 

137 temporal = "temporal" 

138 

139 expected = { 

140 "Group": [ 

141 "Bakeview", 

142 "Bayview", 

143 "Bayview", 

144 "Bayview", 

145 "Bayview", 

146 "Westview", 

147 ], 

148 "temporal": [3, 1, 2, 3, 4, 2], 

149 "attribute_value": [ 

150 "X:9", 

151 "X:10", 

152 "X:10", 

153 "X:10", 

154 "X:10", 

155 "X:10", 

156 ], 

157 "temporal_window_count": [7, 5, 3, 8, 6, 2], 

158 "temporal_window_delta": [0, 0, -2, 5, -2, 0], 

159 } 

160 sample_df = pl.DataFrame(expected) 

161 

162 groups = ["Group"] 

163 

164 result = calculate_window_delta(groups, sample_data, temporal) 

165 

166 assert result.equals(sample_df) 

167 

168 def test_missing_values(self): 

169 data = { 

170 "Group": ["A", "A", "B", "B"], 

171 "temporal": [1, 2, 1, 2], 

172 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"], 

173 "temporal_window_count": [5, None, 8, None], 

174 } 

175 sample_data = pl.DataFrame(data) 

176 temporal = "temporal" 

177 

178 groups = ["Group"] 

179 

180 result = calculate_window_delta(groups, sample_data, temporal) 

181 

182 assert result["temporal_window_count"].is_nan().sum() == 0 

183 assert result.filter(pl.col("temporal_window_delta") == 0).height == 4 

184 

185 

186class TestBuildtemporalCount: 

187 def test_basic(self) -> None: 

188 data = { 

189 "Group": ["A", "A", "B", "B"], 

190 "temporal": [1, 2, 1, 2], 

191 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"], 

192 "temporal_window_count": [5, 3, 8, 6], 

193 } 

194 sample_data = pl.DataFrame(data) 

195 groups = ["Group"] 

196 temporal = "temporal" 

197 

198 result = build_temporal_count(sample_data, groups, temporal) 

199 

200 assert "temporal_window_delta" in result.columns 

201 assert all(result["temporal_window_delta"].is_not_nan()) 

202 

203 def test_missing_values(self): 

204 data = { 

205 "Group": ["A", "A", "B", "B"], 

206 "temporal": [1, 2, 1, 20], 

207 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"], 

208 "temporal_window_count": [5, None, 8, None], 

209 } 

210 sample_data = pl.DataFrame(data) 

211 groups = ["Group"] 

212 temporal = "temporal" 

213 

214 result = build_temporal_count(sample_data, groups, temporal) 

215 

216 assert result["temporal_window_count"].is_nan().sum() == 0 

217 

218 def test_multiple_groups(self): 

219 data = { 

220 "Group": ["A", "A", "B", "B"], 

221 "SubGroup": ["X", "Y", "X", "Y"], 

222 "temporal": [1, 2, 1, 2], 

223 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"], 

224 "temporal_window_count": [5, 3, 8, 6], 

225 } 

226 sample_data = pl.DataFrame(data) 

227 groups = ["Group", "SubGroup"] 

228 temporal = "temporal" 

229 

230 result = build_temporal_count(sample_data, groups, temporal) 

231 

232 # get how many unique groups in result.groupby(["Group", "SubGroup"]) 

233 n_groups = result.group_by(["Group", "SubGroup"]).agg(pl.len()).height 

234 

235 assert n_groups == 4 

236 

237 def test_delta_calculation(self): 

238 data = { 

239 "Group": ["A", "A", "A", "B", "B", "B"], 

240 "temporal": [1, 2, 3, 1, 2, 3], 

241 "attribute_value": ["X:10", "X:20", "X:30", "Y:40", "Y:50", "Y:60"], 

242 "temporal_window_count": [5, 3, 1, 8, 6, 4], 

243 } 

244 sample_data = pl.DataFrame(data) 

245 groups = ["Group"] 

246 temporal = "temporal" 

247 result = build_temporal_count(sample_data, groups, temporal) 

248 

249 group_a_deltas = result.filter(pl.col("Group") == "A").select( 

250 "temporal_window_delta" 

251 ) 

252 group_a_deltas_values = ( 

253 group_a_deltas.filter(pl.col("temporal_window_delta") != 0.0) 

254 .to_series() 

255 .to_list() 

256 ) 

257 

258 group_b_deltas = result.filter(pl.col("Group") == "B").select( 

259 "temporal_window_delta" 

260 ) 

261 group_b_deltas_values = ( 

262 group_b_deltas.filter(pl.col("temporal_window_delta") != 0.0) 

263 .to_series() 

264 .to_list() 

265 ) 

266 

267 # Assertions 

268 assert group_a_deltas.height == 9 

269 assert group_b_deltas.height == 9 

270 for v in [-5, 3, -3, 1]: 

271 assert v in group_a_deltas_values 

272 for v in [-8, 6, -6, 4]: 

273 assert v in group_b_deltas_values 

274 

275 assert ( 

276 result.select(pl.col("temporal_window_delta").is_not_nan()) 

277 .to_series() 

278 .all() 

279 ) 

280 

281 data = { 

282 "Group": ["A", "A", "B", "B"], 

283 "temporal": [1, 2, 1, 2], 

284 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"], 

285 "temporal_window_count": [5, 3, 8, 6], 

286 } 

287 

288 def test_delta_calculation_temporal_zeroed(self) -> None: 

289 data = { 

290 "Group": ["A", "A", "A", "B", "B", "B"], 

291 "temporal": [1, 2, 3, 1, 2, 3], 

292 "attribute_value": ["X:10", "X:20", "X:30", "Y:40", "Y:50", "Y:60"], 

293 "temporal_window_count": [5, 0, 1, 8, 0, 4], 

294 } 

295 sample_data = pl.DataFrame(data) 

296 groups = ["Group"] 

297 temporal = "temporal" 

298 result = build_temporal_count(sample_data, groups, temporal) 

299 

300 group_a_deltas = result.filter(pl.col("Group") == "A").select( 

301 "temporal_window_delta" 

302 ) 

303 group_a_deltas_values = ( 

304 group_a_deltas.filter(pl.col("temporal_window_delta") != 0.0) 

305 .to_series() 

306 .to_list() 

307 ) 

308 

309 group_b_deltas = result.filter(pl.col("Group") == "B").select( 

310 "temporal_window_delta" 

311 ) 

312 group_b_deltas_values = ( 

313 group_b_deltas.filter(pl.col("temporal_window_delta") != 0.0) 

314 .to_series() 

315 .to_list() 

316 ) 

317 

318 # Assertions 

319 assert group_a_deltas.height == 6 

320 assert group_b_deltas.height == 6 

321 for v in [-5, 1]: 

322 assert v in group_a_deltas_values 

323 for v in [-8, 4]: 

324 assert v in group_b_deltas_values 

325 

326 assert ( 

327 result.select(pl.col("temporal_window_delta").is_not_nan()) 

328 .to_series() 

329 .all() 

330 ) 

331 

332 data = { 

333 "Group": ["A", "A", "B", "B"], 

334 "temporal": [1, 2, 1, 2], 

335 "attribute_value": ["X:10", "X:20", "Y:30", "Y:40"], 

336 "temporal_window_count": [5, 3, 8, 6], 

337 } 

338 

339 

340class TestBuildtemporalData: 

341 @pytest.fixture() 

342 def expected_df_mock(self): 

343 return pl.DataFrame( 

344 { 

345 "Group": ["A", "A", "B", "B"], 

346 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

347 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"], 

348 "temporal_window_count": [5, 3, 8, 6], 

349 } 

350 ) 

351 

352 def test_empty_dataframe(self): 

353 ldf = pl.DataFrame() 

354 result = build_temporal_data(ldf, groups=[], temporal_atts=[], temporal="") 

355 assert result.is_empty() 

356 

357 def test_single_temporal_attribute(self, expected_df_mock, mocker): 

358 data = { 

359 "Group": ["A", "A", "B", "B"], 

360 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"], 

361 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

362 } 

363 ldf = pl.DataFrame(data) 

364 

365 mocker.patch( 

366 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count" 

367 ).return_value = expected_df_mock 

368 result = build_temporal_data( 

369 ldf, groups=["Group"], temporal_atts=["2023-01-01"], temporal="temporal" 

370 ) 

371 expected_data = { 

372 "Group": ["A", "B"], 

373 "temporal": ["2023-01-01", "2023-01-01"], 

374 "attribute_value": ["V1:10", "V1:30"], 

375 "temporal_window_count": [5, 8], 

376 "temporal_window_rank": [1.0, 1.0], 

377 } 

378 expected_df = pl.DataFrame(expected_data) 

379 assert result.equals(expected_df) 

380 

381 def test_multiple_temporal_attributes(self, expected_df_mock, mocker): 

382 data = { 

383 "Group": ["A", "A", "B", "B"], 

384 "attribute_value": [1, 2, 3, 4], 

385 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

386 } 

387 ldf = pl.DataFrame(data) 

388 

389 mocker.patch( 

390 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count" 

391 ).return_value = expected_df_mock 

392 result = build_temporal_data( 

393 ldf, 

394 groups=["Group"], 

395 temporal_atts=["2023-01-01", "2023-01-02"], 

396 temporal="temporal", 

397 ) 

398 expected_data = { 

399 "Group": ["A", "B", "A", "B"], 

400 "temporal": ["2023-01-01", "2023-01-01", "2023-01-02", "2023-01-02"], 

401 "attribute_value": ["V1:10", "V1:30", "V2:20", "V2:40"], 

402 "temporal_window_count": [5, 8, 3, 6], 

403 "temporal_window_rank": [1.0, 1.0, 1.0, 1.0], 

404 } 

405 expected_df = pl.DataFrame(expected_data) 

406 assert result.equals(expected_df) 

407 

408 def test_multiple_groups(self, mocker): 

409 data = { 

410 "Group": ["A", "A", "B", "B"], 

411 "SubGroup": ["X", "Y", "X", "Y"], 

412 "attribute_value": [1, 2, 3, 4], 

413 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

414 } 

415 ldf = pl.DataFrame(data) 

416 

417 df_mock = pl.DataFrame( 

418 { 

419 "Group": ["A", "A", "B", "B"], 

420 "SubGroup": ["X", "Y", "X", "Y"], 

421 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

422 "attribute_value": ["V1:10", "V2:20", "V1:30", "V2:40"], 

423 "temporal_window_count": [5, 3, 8, 6], 

424 } 

425 ) 

426 mocker.patch( 

427 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count" 

428 ).return_value = df_mock 

429 result = build_temporal_data( 

430 ldf, 

431 groups=["Group", "SubGroup"], 

432 temporal_atts=["2023-01-01", "2023-01-02"], 

433 temporal="temporal", 

434 ) 

435 expected_data = { 

436 "Group": ["A", "B", "A", "B"], 

437 "SubGroup": ["X", "X", "Y", "Y"], 

438 "temporal": ["2023-01-01", "2023-01-01", "2023-01-02", "2023-01-02"], 

439 "attribute_value": ["V1:10", "V1:30", "V2:20", "V2:40"], 

440 "temporal_window_count": [5, 8, 3, 6], 

441 "temporal_window_rank": [1.0, 1.0, 1.0, 1.0], 

442 } 

443 expected_df = pl.DataFrame(expected_data) 

444 assert result.equals(expected_df) 

445 

446 def test_missing_values(self, expected_df_mock, mocker): 

447 data = { 

448 "Group": ["A", "A", "B", "B"], 

449 "attribute_value": [1, 2, 3, 4], 

450 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

451 } 

452 ldf = pl.DataFrame(data) 

453 

454 mocker.patch( 

455 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count" 

456 ).return_value = expected_df_mock 

457 result = build_temporal_data( 

458 ldf, 

459 groups=["Group"], 

460 temporal_atts=["2023-01-01", "2023-01-02"], 

461 temporal="temporal", 

462 ) 

463 

464 assert result["temporal_window_count"].is_nan().sum() == 0 

465 

466 def test_non_existent_temporal_values(self, expected_df_mock, mocker): 

467 data = { 

468 "Group": ["A", "A", "B", "B"], 

469 "attribute_value": [1, 2, 3, 4], 

470 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

471 } 

472 ldf = pl.DataFrame(data) 

473 

474 mocker.patch( 

475 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count" 

476 ).return_value = expected_df_mock 

477 result = build_temporal_data( 

478 ldf, 

479 groups=["Group"], 

480 temporal_atts=["2023-01-03"], 

481 temporal="temporal", 

482 ) 

483 

484 assert result.is_empty() 

485 

486 def test_incorrect_groups(self, mocker): 

487 data = { 

488 "Group": ["A", "A", "B", "B"], 

489 "attribute_value": [1, 2, 3, 4], 

490 "temporal": ["2023-01-01", "2023-01-02", "2023-01-01", "2023-01-02"], 

491 } 

492 ldf = pl.DataFrame(data) 

493 

494 mocker.patch( 

495 "intelligence_toolkit.compare_case_groups.temporal_process.build_temporal_count" 

496 ).return_value = pl.DataFrame() 

497 result = build_temporal_data( 

498 ldf, 

499 groups=["Group", "NonExistent"], 

500 temporal_atts=["2023-01-01", "2023-01-02"], 

501 temporal="temporal", 

502 ) 

503 

504 assert result.is_empty()