Coverage for intelligence_toolkit/tests/unit/match_entity_records/test_detect.py: 99%
265 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 13:41 -0300
1# Copyright (c) 2024 Microsoft Corporation. All rights reserved.
2# Licensed under the MIT license. See LICENSE file in the project.
3#
5import re
6from collections import defaultdict
8import numpy as np
9import polars as pl
10import pytest
12from intelligence_toolkit.match_entity_records.detect import (
13 _calculate_mean_score,
14 build_attributes_dataframe,
15 build_matches,
16 build_matches_dataset,
17 build_near_map,
18 build_nearest_neighbors,
19 build_sentence_pair_scores,
20 convert_to_sentences,
21)
24class TestConvertToSentences:
25 @pytest.fixture()
26 def merged_df(self) -> pl.DataFrame:
27 return pl.DataFrame(
28 {
29 "ID1": [10, 20, 30, 40, 50],
30 "Entity name": ["A", "B", "C", "D", "E"],
31 "VehicleType": [
32 "Hatch 1",
33 "Sedan 1",
34 "Truck 1",
35 "SUV 3",
36 "CyberTruck 3",
37 ],
38 "VehicleColor": ["Blue", "Red", "Blue", "Black", "Silver"],
39 "VehicleYear": ["2021", "2022", "2022", "2023", "2024"],
40 }
41 )
43 def test_df_empty(self) -> None:
44 df_empty = pl.DataFrame()
45 result = convert_to_sentences(df_empty, [])
46 assert len(result) == 0
48 def test_skip_empty(self, merged_df) -> None:
49 result = convert_to_sentences(merged_df, [])
51 assert len(result) == 5
52 assert "ID1" in result[0]["text"]
54 def test_skip(self, merged_df) -> None:
55 result = convert_to_sentences(merged_df, ["ID1"])
57 for res in result:
58 assert "ID1" not in res["text"]
60 def test_sentence(self, merged_df) -> None:
61 result = convert_to_sentences(merged_df)
63 assert len(result) == 5
64 for re in result:
65 text = re["text"]
66 assert "ID1:" in text
67 assert "ENTITY NAME:" in text
68 assert "VEHICLETYPE:" in text
69 assert "VEHICLECOLOR:" in text
70 assert "VEHICLEYEAR:" in text
72 def test_val_nan(self, merged_df) -> None:
73 # add one row with nan value
74 merged_df = pl.concat(
75 [
76 merged_df,
77 pl.DataFrame(
78 {
79 "ID1": [60],
80 "Entity name": ["F"],
81 "VehicleType": ["NAN"],
82 "VehicleColor": ["Blue"],
83 "VehicleYear": ["2021"],
84 }
85 ),
86 ]
87 )
88 result = convert_to_sentences(merged_df)
90 re = result[-1]
91 assert "VEHICLETYPE: ;" in re["text"]
94class TestBuildNearestNeighbors:
95 @pytest.fixture()
96 def embeddings(self) -> np.array:
97 return np.random.rand(1000, 10)
99 def test_neighbors_greater_than_embeddings(self) -> None:
100 embeddings = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
102 expected_msg = (
103 "Number of neighbors (50) is greater than number of embeddings (3)"
104 )
105 escaped_expected_msg = re.escape(expected_msg)
106 with pytest.raises(ValueError, match=escaped_expected_msg):
107 build_nearest_neighbors(embeddings, 50)
109 def test_neighbors_5(self, embeddings) -> None:
110 result = build_nearest_neighbors(embeddings, 5)
112 assert len(result) == 2
113 assert result[0].shape == (1000, 5)
115 def test_neighbors_10(self, embeddings) -> None:
116 result = build_nearest_neighbors(embeddings, 10)
118 assert len(result) == 2
119 assert result[0].shape == (1000, 10)
122class TestBuildNearMap:
123 @pytest.fixture()
124 def all_sentences(self) -> list[str]:
125 return [
126 "ID1: 10; ENTITY NAME: A; VEHICLETYPE: Hatch 1; VEHICLECOLOR: Blue; VEHICLEYEAR: 2021;",
127 "ID1: 20; ENTITY NAME: B; VEHICLETYPE: Sedan 1; VEHICLECOLOR: Red; VEHICLEYEAR: 2022;",
128 "ID1: 30; ENTITY NAME: C; VEHICLETYPE: Truck 1; VEHICLECOLOR: Blue; VEHICLEYEAR: 2022;",
129 ]
131 def test_result(self, all_sentences) -> None:
132 distances = np.array([[0.01, 0.02, 0.03], [0.04, 0.05, 0.06], [0.03, 0.8, 0.9]])
133 indices = np.array([[0, 1, 2], [0, 1, 2], [0, 1, 2]])
134 result = build_near_map(distances, indices, all_sentences)
136 expected = defaultdict(list)
137 expected[0].extend([1, 2])
138 expected[1].append(1)
140 assert len(result) == 2
141 assert result == expected
143 def test_result_max_record(self, all_sentences) -> None:
144 distances = np.array([[0.01, 0.02, 0.03], [0.04, 0.05, 0.06], [0.03, 0.8, 0.9]])
145 indices = np.array([[0, 1, 2], [0, 1, 2], [0, 1, 2]])
146 result = build_near_map(distances, indices, all_sentences, 0.1)
148 expected = defaultdict(list)
149 expected[0].extend([1, 2])
150 expected[1].extend([1, 2])
152 assert len(result) == 2
153 assert result == expected
156class TestBuildSentencePairScores:
157 @pytest.fixture()
158 def near_map(self) -> dict:
159 return {0: [1, 1, 2, 2], 1: [1, 1]}
161 @pytest.fixture()
162 def merged_df(self) -> pl.DataFrame:
163 return pl.DataFrame(
164 {
165 "ID1": [10, 20, 30, 40, 50],
166 "Entity name": ["A", "B", "C", "D", "E"],
167 "VehicleType": [
168 "Hatch 1",
169 "Sedan 1",
170 "Truck 1",
171 "SUV 3",
172 "CyberTruck 3",
173 ],
174 "VehicleColor": ["Blue", "Red", "Blue", "Black", "Silver"],
175 "VehicleYear": ["2021", "2022", "2022", "2023", "2024"],
176 }
177 )
179 def test_empty(self) -> None:
180 result = build_sentence_pair_scores({}, pl.DataFrame())
181 assert result == []
183 def test_build_sentence_pair_scores(self, merged_df) -> None:
184 near_map = {0: [1, 1, 2, 2], 1: [1, 1]}
186 result = build_sentence_pair_scores(near_map, merged_df)
187 expected = [(0, 1, 0), (0, 1, 0), (0, 2, 0), (0, 2, 0), (1, 1, 1), (1, 1, 1)]
188 assert result == expected
190 def test_single_pair(self, merged_df) -> None:
191 near_map = {0: [1]}
193 result = build_sentence_pair_scores(near_map, merged_df)
194 expected = [(0, 1, 0)]
195 assert result == expected
197 def test_multiple_pairs_different_keys(self, merged_df) -> None:
198 near_map = {0: [1, 2], 1: [2, 3]}
200 result = build_sentence_pair_scores(near_map, merged_df)
201 expected = [(0, 1, 0), (0, 2, 0), (1, 2, 0), (1, 3, 0)]
202 assert result == expected
204 def test_no_matches(self, merged_df) -> None:
205 near_map = {}
207 result = build_sentence_pair_scores(near_map, merged_df)
208 expected = []
209 assert result == expected
212class TestBuildMatches:
213 @pytest.fixture()
214 def merged_df(self) -> pl.DataFrame:
215 data = {
216 "Entity name": ["A", "B", "C", "D", "E"],
217 "Dataset": ["X", "X", "Y", "Y", "Z"],
218 }
219 return pl.DataFrame(data)
221 @pytest.fixture()
222 def sentence_pair_scores(self) -> list[tuple[int, int, float]]:
223 return [(0, 1, 0.8), (0, 2, 0.6), (3, 4, 0.7)]
225 def test_basic_grouping(self, merged_df, sentence_pair_scores) -> None:
226 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
227 expected = {"A::X": 0, "B::X": 0}
228 assert entity_to_group == expected
230 def test_empty_scores(self, merged_df) -> None:
231 entity_to_group, _, _ = build_matches([], merged_df)
232 assert entity_to_group == {}
234 def test_all_below_threshold(self, merged_df) -> None:
235 sentence_pair_scores = [(0, 1, 0.2), (0, 2, 0.3), (3, 4, 0.1)]
236 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
237 assert entity_to_group == {}
239 def test_single_pair_above_threshold(self, merged_df) -> None:
240 sentence_pair_scores = [(0, 1, 0.8)]
241 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
242 expected = {
243 "A::X": 0,
244 "B::X": 0,
245 }
246 assert entity_to_group == expected
248 def test_overlapping_groups(self, merged_df) -> None:
249 sentence_pair_scores = [(0, 1, 0.8), (1, 2, 0.75), (2, 3, 0.85)]
250 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
251 expected = {
252 "A::X": 0,
253 "B::X": 0,
254 "C::Y": 0,
255 "D::Y": 0,
256 }
257 assert entity_to_group == expected
259 def test_non_jaccard_change(self, merged_df) -> None:
260 sentence_pair_scores = [(0, 1, 0.8), (3, 4, 0.7)]
261 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df, 0.5)
262 expected = {
263 "A::X": 0,
264 "B::X": 0,
265 "D::Y": 1,
266 "E::Z": 1,
267 }
268 assert entity_to_group == expected
270 def test_non_overlapping_groups(self, merged_df) -> None:
271 sentence_pair_scores = [(0, 1, 0.8), (3, 4, 0.75)]
272 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
273 expected = {
274 "A::X": 0,
275 "B::X": 0,
276 "D::Y": 1,
277 "E::Z": 1,
278 }
279 assert entity_to_group == expected
281 def test_both_entities_in_same_group(self, merged_df) -> None:
282 sentence_pair_scores = [(0, 1, 0.8), (1, 2, 0.7), (2, 0, 0.9)]
283 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
284 expected = {
285 "A::X": 0,
286 "B::X": 0,
287 "C::Y": 0,
288 }
289 assert entity_to_group == expected
291 def test_both_entities_in_different_groups(self, merged_df) -> None:
292 sentence_pair_scores = [(0, 1, 0.8), (2, 3, 0.9), (1, 2, 0.76)]
293 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
294 expected = {
295 "A::X": 0,
296 "B::X": 0,
297 "C::Y": 0,
298 "D::Y": 0,
299 }
300 assert entity_to_group == expected
302 def test_one_entity_in_group_other_not(self, merged_df) -> None:
303 sentence_pair_scores = [(0, 1, 0.8), (4, 4, 0.9)]
304 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
305 expected = {
306 "A::X": 1,
307 "B::X": 1,
308 "E::Z": 0,
309 }
310 assert entity_to_group == expected
312 def test_similar_names_different_datasets(self, merged_df) -> None:
313 sentence_pair_scores = [(0, 4, 0.8)]
314 entity_to_group, _, _ = build_matches(sentence_pair_scores, merged_df)
315 expected = {
316 "A::X": 0,
317 "E::Z": 0,
318 }
319 assert entity_to_group == expected
321 def test_matches(self, merged_df, sentence_pair_scores) -> None:
322 _, matches, _ = build_matches(sentence_pair_scores, merged_df)
323 assert len(matches) == 2
325 def test_pair_to_match(self, merged_df, sentence_pair_scores) -> None:
326 _, _, pair_to_match = build_matches(sentence_pair_scores, merged_df)
327 expected = {("A::X", "B::X"): 0.8}
328 assert pair_to_match == expected
331class TestCalculateMeanScore:
332 @pytest.fixture()
333 def pair_to_match(self) -> dict:
334 return {("A::X", "B::X"): 0.8, ("C::Y", "D::Y"): 0.7}
336 @pytest.fixture()
337 def entity_to_group(self) -> dict:
338 return {"A::X": 0, "B::X": 0, "C::Y": 1, "D::Y": 1}
340 def test_empty(self) -> None:
341 result = _calculate_mean_score({}, {})
342 assert result == {}
344 def test_no_matches(self, entity_to_group) -> None:
345 result = _calculate_mean_score({}, entity_to_group)
346 assert result == {}
348 def test_single_group(self, pair_to_match, entity_to_group) -> None:
349 result = _calculate_mean_score(pair_to_match, entity_to_group)
350 expected = {0: 0.8, 1: 0.7}
351 assert result == expected
353 def test_multiple_groups(self, pair_to_match, entity_to_group) -> None:
354 pair_to_match[("B::X", "C::Y")] = 0.8
355 result = _calculate_mean_score(pair_to_match, entity_to_group)
356 expected = {0: 0.8, 1: 0.7}
357 assert result == expected
359 def test_no_group(self, pair_to_match, entity_to_group) -> None:
360 pair_to_match[("B::X", "C::Y")] = 0.8
361 del entity_to_group["C::Y"]
362 result = _calculate_mean_score(pair_to_match, entity_to_group)
363 expected = {0: 0.8}
364 assert result == expected
366 def test_no_pair(self, pair_to_match, entity_to_group) -> None:
367 del pair_to_match[("A::X", "B::X")]
368 result = _calculate_mean_score(pair_to_match, entity_to_group)
369 expected = {1: 0.7}
370 assert result == expected
373class TestBuildMatchesDataset:
374 @pytest.fixture()
375 def merged_df(self) -> pl.DataFrame:
376 return pl.DataFrame(
377 {
378 "Entity ID": ["10", "20", "30", "40", "50"],
379 "Entity name": ["A", "B", "C", "D", "E"],
380 "Group ID": [0, 0, 1, 1, 2],
381 "Dataset": ["X", "X", "Y", "Y", "Z"],
382 }
383 )
385 @pytest.fixture()
386 def pair_to_match(self) -> dict[tuple[str, str], float]:
387 return {("A::X", "B::X"): 0.8, ("C::Y", "D::Y"): 0.7}
389 @pytest.fixture()
390 def entity_to_group(self) -> dict:
391 return {"A::X": 0, "B::X": 0, "C::Y": 1, "D::Y": 1}
393 def test_empty_df(self, entity_to_group) -> None:
394 result = build_matches_dataset(pl.DataFrame(), [], entity_to_group)
395 assert result.is_empty()
397 def test_empty_pair(self, merged_df, entity_to_group) -> None:
398 result = build_matches_dataset(merged_df, {}, entity_to_group)
399 assert len(result) == 0
401 def test_empty_group(self, merged_df, pair_to_match) -> None:
402 result = build_matches_dataset(merged_df, pair_to_match, {})
403 assert len(result) == 0
405 def test_basic_grouping(self, merged_df, pair_to_match, entity_to_group) -> None:
406 result = build_matches_dataset(merged_df, pair_to_match, entity_to_group)
407 assert len(result) == 4
408 assert result["Group ID"].n_unique() == 2
409 assert result["Group size"].sum() == 8
410 assert result["Name similarity"].sum() == 3.0
412 def test_no_matches(self) -> None:
413 pair_to_match = {("A::X", "B::X"): 0.8}
414 entity_to_group = {"A::X": 0}
415 merged_df = pl.DataFrame(
416 {
417 "Entity ID": ["10", "20"],
418 "Entity name": ["A", "B"],
419 "Group ID": [0, 1],
420 "Dataset": ["X", "X"],
421 }
422 )
423 result = build_matches_dataset(merged_df, pair_to_match, entity_to_group)
424 assert result.height == 0
426 # def test_basic_grouping_ordering(
427 # self, merged_df, pair_to_match, entity_to_group
428 # ) -> None:
429 # result = build_matches_dataset(merged_df, pair_to_match, entity_to_group)
430 # columns_ordered = [
431 # "Group ID",
432 # "Group size",
433 # "Entity name",
434 # "Dataset",
435 # "Name similarity",
436 # ]
437 # assert result.columns[0] == "Group ID"
438 # assert result.columns[1] == "Group name"
440 # def test_single_group(
441 # self, merged_df, sentence_pair_scores, entity_to_group
442 # ) -> None:
443 # entity_to_group["E::Z"] = 2
444 # result = build_matches_dataset(merged_df, sentence_pair_scores, entity_to_group)
445 # assert len(result) == 5
446 # assert result["Group ID"].n_unique() == 3
447 # assert result["Group size"].sum() == 3
448 # assert result["Name similarity"].sum() == 2.1
451class TestBuildAttributesDataFrame:
452 def test_empty(self) -> None:
453 matching_dfs = {}
454 result = build_attributes_dataframe(matching_dfs, atts_to_datasets={})
455 assert result.is_empty()
457 def test_one_df(self) -> None:
458 matching_dfs = {
459 "X": pl.DataFrame(
460 {
461 "Entity ID": ["10", "20", "30", "40", "50"],
462 "Entity name": ["A", "B", "C", "D", "E"],
463 "VehicleType": [
464 "Hatch 1",
465 "Sedan 1",
466 "Truck 1",
467 "SUV 3",
468 "CyberTruck 3",
469 ],
470 "VehicleColor": ["Blue", "Red", "Blue", "Black", "Silver"],
471 "VehicleYear": ["2021", "2022", "2022", "2023", "2024"],
472 }
473 )
474 }
475 atts_to_datasets = {
476 "X": {"VehicleType": "VehicleType1"},
477 }
478 result = build_attributes_dataframe(
479 matching_dfs, atts_to_datasets=atts_to_datasets
480 )
481 assert "VehicleType1" in result.columns
482 assert "Entity ID" in result.columns
483 assert "Entity name" in result.columns
484 assert "VehicleColor" not in result.columns
485 assert "VehicleYear" not in result.columns
487 def test_multiple_dfs(self) -> None:
488 matching_dfs = {
489 "X": pl.DataFrame(
490 {
491 "Entity ID": ["10", "20", "30", "40", "50"],
492 "Entity name": ["A", "B", "C", "D", "E"],
493 "VehicleType": [
494 "Hatch 1",
495 "Sedan 1",
496 "Truck 1",
497 "SUV 3",
498 "CyberTruck 3",
499 ],
500 "VehicleColor": ["Blue", "Red", "Blue", "Black", "Silver"],
501 "VehicleYear": ["2021", "2022", "2022", "2023", "2024"],
502 }
503 ),
504 "Y": pl.DataFrame(
505 {
506 "Entity ID": ["10", "20", "30", "40", "50"],
507 "Entity name": ["A", "B", "C", "D", "E"],
508 "VehicleType": [
509 "Hatch 1",
510 "Sedan 1",
511 "Truck 1",
512 "SUV 3",
513 "CyberTruck 3",
514 ],
515 "VehicleColor2": ["Blue", "Red", "Blue", "Black", "Silver"],
516 "VehicleYear": ["2021", "2022", "2022", "2023", "2024"],
517 }
518 ),
519 }
520 atts_to_datasets = {
521 "X": {"VehicleType": "VehicleType1", "VehicleColor": "VehicleColor1"},
522 "Y": {"VehicleType": "VehicleType1", "VehicleColor2": "VehicleColor1"},
523 }
524 result = build_attributes_dataframe(
525 matching_dfs, atts_to_datasets=atts_to_datasets
526 )
527 assert "VehicleType1" in result.columns
528 assert "VehicleColor1" in result.columns
529 assert "VehicleColor" not in result.columns
530 assert "Entity ID" in result.columns
531 assert "Entity name" in result.columns
532 assert "VehicleYear" not in result.columns
533 assert "VehicleType" not in result.columns
534 assert result.height == 10
535 assert result["VehicleType1"].n_unique() == 5
536 assert result["VehicleColor1"].n_unique() == 4