Coverage for src/dataknobs_xization/lexicon.py: 31%
213 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-18 17:41 -0700
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-18 17:41 -0700
1"""Lexical matching and token alignment for text processing.
3Provides classes for lexical expansion, normalization, token alignment,
4and pattern matching in text with support for variations and fuzzy matching.
5"""
7from abc import abstractmethod
8from collections import defaultdict
9from collections.abc import Callable
10from typing import Any, Dict, List, Set, Union
12import more_itertools
13import numpy as np
14import pandas as pd
16import dataknobs_structures.document as dk_doc
17import dataknobs_xization.annotations as dk_anns
18import dataknobs_xization.authorities as dk_auth
19import dataknobs_xization.masking_tokenizer as dk_tok
20from dataknobs_utils import emoji_utils
23class LexicalExpander:
24 """A class to expand and/or normalize original lexical input terms, to
25 keep back-references from generated data to corresponding original input,
26 and to build consistent tokens for lexical matching.
27 """
29 def __init__(
30 self,
31 variations_fn: Callable[[str], Set[str]],
32 normalize_fn: Callable[[str], str],
33 split_input_camelcase: bool = True,
34 detect_emojis: bool = False,
35 ):
36 """Initialize with the given functions.
38 Args:
39 variations_fn: A function, f(t), to expand a raw input term to
40 all of its variations (including itself if desired). If None, the
41 default is to expand each term to itself.
42 normalize_fn: A function to normalize a raw input term or any
43 of its variations. If None, then the identity function is used.
44 split_input_camelcase: True to split input camelcase tokens.
45 detect_emojis: True to detect emojis. If split_input_camelcase,
46 then adjacent emojis will also be split; otherwise, adjacent
47 emojis will appear as a single token.
48 """
49 self.variations_fn = variations_fn if variations_fn else lambda x: {x}
50 self.normalize_fn = normalize_fn if normalize_fn else lambda x: x
51 self.split_input_camelcase = split_input_camelcase
52 self.emoji_data = emoji_utils.load_emoji_data() if detect_emojis else None
53 self.v2t = defaultdict(set)
55 def __call__(self, term: Any, normalize: bool = True) -> Set[str]:
56 """Get all variations of the original term.
58 Args:
59 term: The term whose variations to compute.
60 normalize: True to normalize the resulting variations.
62 Returns:
63 All variations.
64 """
65 variations = self.variations_fn(term)
66 if normalize:
67 variations = {self.normalize_fn(v) for v in variations}
68 # Add a mapping from each variation to its original term
69 if variations is not None and len(variations) > 0:
70 more_itertools.consume(self.v2t[v].add(term) for v in variations)
71 return variations
73 def normalize(self, input_term: str) -> str:
74 """Normalize the given input term or variation.
76 Args:
77 input_term: An input term to normalize.
79 Returns:
80 The normalized string of the input_term.
81 """
82 return self.normalize_fn(input_term)
84 def get_terms(self, variation: str) -> Set[Any]:
85 """Get the term ids for which the given variation was generated.
87 Args:
88 variation: A variation whose reference term(s) to retrieve.
90 Returns:
91 The set term ids for the variation or the missing_value.
92 """
93 return self.v2t.get(variation, set())
95 def build_first_token(
96 self,
97 doctext: Union[dk_doc.Text, str],
98 ) -> dk_tok.Token:
99 inputf = dk_tok.TextFeatures(
100 doctext, split_camelcase=self.split_input_camelcase, emoji_data=self.emoji_data
101 )
102 return inputf.build_first_token(normalize_fn=self.normalize_fn)
105class TokenMatch:
106 """Represents a match between tokens and a lexical authority variation.
108 Matches a sequence of tokens against a lexical authority variation,
109 tracking whether the match is complete and providing access to
110 matched text and annotation generation.
111 """
113 def __init__(self, auth: dk_auth.LexicalAuthority, val_idx: int, var: str, token: dk_tok.Token):
114 self.auth = auth
115 self.val_idx = val_idx
116 self.var = var
117 self.token = token
119 self.varparts = var.split()
120 self.matches = True
121 self.tokens = []
122 t = token
123 for v in self.varparts:
124 if t is not None and v == t.norm_text:
125 self.tokens.append(t)
126 t = t.next_token
127 else:
128 self.matches = False
129 break
131 def __repr__(self):
132 ttext = " ".join(t.token_text for t in self.tokens)
133 return (
134 f"Match_{self.tokens[0].token_num}-{self.tokens[-1].token_num}({ttext})[{self.val_idx}]"
135 )
137 @property
138 def next_token(self):
139 next_token = None
140 if self.matches:
141 next_token = self.tokens[-1].next_token
142 return next_token
144 @property
145 def matched_text(self):
146 """Get the matched original text."""
147 return self.token.input_text[self.tokens[0].start_pos : self.tokens[-1].end_pos]
149 def build_annotation(self):
150 return self.auth.build_annotation(
151 start_pos=self.tokens[0].start_pos,
152 end_pos=self.tokens[-1].end_pos,
153 entity_text=self.matched_text,
154 auth_value_id=self.val_idx,
155 )
158class TokenAligner:
159 """Aligns tokens with a lexical authority to generate annotations.
161 Processes a token stream, matching tokens against lexical authority
162 variations and generating annotations for matches. Handles overlapping
163 matches and tracks processed tokens.
164 """
166 def __init__(self, first_token: dk_tok.Token, authority: dk_auth.LexicalAuthority):
167 self.first_token = first_token
168 self.auth = authority
169 self.annotations = [] # List[Dict[str, Any]]
170 self._processed_idx = set()
171 self._process(self.first_token)
173 def _process(self, token):
174 if token is not None:
175 if token.token_num not in self._processed_idx:
176 token_matches = self._get_token_matches(token)
177 for token_match in token_matches:
178 self.annotations.append(token_match.build_annotation())
179 self._process(token_match.next_token)
180 self._process(token.next_token)
182 def _get_token_matches(self, token):
183 token_matches = []
184 vs = self.auth.find_variations(token.norm_text, starts_with=True)
185 if len(vs) > 0:
186 for val_idx, var in vs.items():
187 token_match = TokenMatch(self.auth, val_idx, var, token)
188 if token_match.matches:
189 # mark token position(s) as matched
190 self._processed_idx.update({t.token_num for t in token_match.tokens})
191 token_matches.append(token_match)
192 return token_matches
195class DataframeAuthority(dk_auth.LexicalAuthority):
196 """A pandas dataframe-based lexical authority."""
198 def __init__(
199 self,
200 name: str,
201 lexical_expander: LexicalExpander,
202 authdata: dk_auth.AuthorityData,
203 auth_anns_builder: dk_auth.AuthorityAnnotationsBuilder = None,
204 field_groups: dk_auth.DerivedFieldGroups = None,
205 anns_validator: Callable[[dk_auth.Authority, Dict[str, Any]], bool] = None,
206 parent_auth: dk_auth.Authority = None,
207 ):
208 """Initialize with the name, values, and associated ids of the authority;
209 and with the lexical expander for authoritative values.
211 Args:
212 name: The authority name, if different from df.columns[0].
213 lexical_expander: The lexical expander for the values.
214 authdata: The data for this authority.
215 auth_anns_builder: The authority annotations row builder to use
216 for building annotation rows.
217 field_groups: The derived field groups to use.
218 anns_validator: fn(auth, anns_dict_list) that returns True if
219 the list of annotation row dicts are valid to be added as
220 annotations for a single match or "entity".
221 parent_auth: This authority's parent authority (if any).
222 """
223 super().__init__(
224 name if name else authdata.df.columns[0],
225 auth_anns_builder=auth_anns_builder,
226 authdata=authdata,
227 field_groups=field_groups,
228 anns_validator=anns_validator,
229 parent_auth=parent_auth,
230 )
231 self.lexical_expander = lexical_expander
232 self._variations = None
233 self._prev_aligner = None
235 @property
236 def prev_aligner(self) -> TokenAligner:
237 """Get the token aligner created in the latest call to annotate_text."""
238 return self._prev_aligner
240 @property
241 def variations(self) -> pd.Series:
242 """Get all lexical variations in a series whose index has associated
243 value IDs.
245 Returns:
246 A pandas series with index-identified variations.
247 """
248 if self._variations is None:
249 self._variations = (
250 self.authdata.df[self.name].apply(self.lexical_expander).explode().dropna()
251 )
252 return self._variations
254 def get_id_by_variation(self, variation: str) -> Set[str]:
255 """Get the IDs of the value(s) associated with the given variation.
257 Args:
258 variation: Variation text.
260 Returns:
261 The possibly empty set of associated value IDS.
262 """
263 ids = set()
264 for value in self.lexical_expander.get_terms(variation):
265 ids.update(self.get_value_ids(value))
266 return ids
268 def get_variations(self, value: Any, normalize: bool = True) -> Set[Any]:
269 """Convenience method to compute variations for the value.
271 Args:
272 value: The authority value, or term, whose variations to compute.
273 normalize: True to normalize the variations.
275 Returns:
276 The set of variations for the value.
277 """
278 return self.lexical_expander(value, normalize=normalize)
280 def has_value(self, value: Any) -> bool:
281 """Determine whether the given value is in this authority.
283 Args:
284 value: A possible authority value.
286 Returns:
287 True if the value is a valid entity value.
288 """
289 return np.any(self.authdata.df[self.name] == value)
291 def get_value_ids(self, value: Any) -> Set[Any]:
292 """Get all IDs associated with the given value. Note that typically
293 there is a single ID for any value, but this allows for inherent
294 ambiguities in the authority.
296 Args:
297 value: An authority value.
299 Returns:
300 The associated IDs or an empty set if the value is not valid.
301 """
302 return set(self.authdata.lookup_values(value).index.tolist())
304 def get_values_by_id(self, value_id: Any) -> Set[Any]:
305 """Get all values for the associated value ID. Note that typically
306 there is a single value for an ID, but this allows for inherent
307 ambiguities in the authority.
309 Args:
310 value_id: An authority value ID.
312 Returns:
313 The associated values or an empty set if the value ID is not valid.
314 """
315 return set(self.authdata.lookup_values(value_id, is_id=True)[self.name].tolist())
317 def find_variations(
318 self,
319 variation: str,
320 starts_with: bool = False,
321 ends_with: bool = False,
322 scope: str = "fullmatch",
323 ) -> pd.Series:
324 """Find all matches to the given variation.
326 Note:
327 Only the first true of starts_with, ends_with, and scope will
328 be applied. If none of these are true, a full match on the pattern
329 is performed.
331 Args:
332 variation: The text to find; treated as a regular expression
333 unless either starts_with or ends_with is True.
334 starts_with: When True, find all terms that start with the
335 variation text.
336 ends_with: When True, find all terms that end with the variation
337 text.
338 scope: 'fullmatch' (default), 'match', or 'contains' for
339 strict, less strict, and least strict matching.
341 Returns:
342 The matching variations as a pd.Series.
343 """
344 vs = self.variations
345 if starts_with:
346 vs = vs[vs.str.startswith(variation)]
347 elif ends_with:
348 vs = vs[vs.str.endswith(variation)]
349 else:
350 if scope == "fullmatch":
351 hits = vs.str.fullmatch(variation)
352 elif scope == "match":
353 hits = vs.str.match(variation)
354 else:
355 hits = vs.str.contains(variation)
356 vs = vs[hits]
357 vs = vs.drop_duplicates()
358 return vs
360 def get_variations_df(
361 self,
362 variations: pd.Series,
363 variations_colname: str = "variation",
364 ids_colname: str = None,
365 lookup_values: bool = False,
366 ) -> pd.DataFrame:
367 """Create a DataFrame including associated ids for each variation.
369 Args:
370 variations: The variations to include in the dataframe.
371 variations_colname: The name of the variations column.
372 ids_colname: The column name for value ids.
373 lookup_values: When True, include a self.name column
374 with associated values.
375 """
376 if ids_colname is None:
377 ids_colname = f"{self.name}_id"
378 df = pd.DataFrame(
379 {
380 variations_colname: variations,
381 ids_colname: variations.apply(self.get_id_by_variation),
382 }
383 ).explode(ids_colname)
384 if lookup_values:
385 df[self.name] = df[ids_colname].apply(self.get_values_by_id)
386 df = df.explode(self.name)
387 return df
389 def add_annotations(
390 self,
391 doctext: dk_doc.Text,
392 annotations: dk_anns.Annotations,
393 ) -> dk_anns.Annotations:
394 """Method to do the work of finding, validating, and adding annotations.
396 Args:
397 doctext: The text to process.
398 annotations: The annotations object to add annotations to.
400 Returns:
401 The given or a new Annotations instance.
402 """
403 first_token = self.lexical_expander.build_first_token(
404 doctext.text, input_id=doctext.text_id
405 )
406 token_aligner = TokenAligner(first_token, self)
407 self._prev_aligner = token_aligner
408 if self.validate_ann_dicts(token_aligner.annotations):
409 annotations.add_dicts(token_aligner.annotations)
410 return annotations
413class CorrelatedAuthorityData(dk_auth.AuthorityData):
414 """Container for authoritative data containing correlated data for multiple
415 "sub" authorities.
416 """
418 def __init__(self, df: pd.DataFrame, name: str):
419 super().__init__(df, name)
420 self._authority_data = {}
422 def sub_authority_names(self) -> List[str]:
423 """Get the "sub" authority names."""
424 return None
426 @abstractmethod
427 def auth_values_mask(self, name: str, value_id: int) -> pd.Series:
428 """Identify full-authority data corresponding to this sub-value.
430 Args:
431 name: The sub-authority name.
432 value_id: The sub-authority value_id.
434 Returns:
435 A series representing relevant full-authority data.
436 """
437 raise NotImplementedError
439 @abstractmethod
440 def auth_records_mask(
441 self,
442 record_value_ids: Dict[str, int],
443 filter_mask: pd.Series = None,
444 ) -> pd.Series:
445 """Get a series identifying records in the full authority matching
446 the given records of the form {<sub-name>: <sub-value-id>}.
448 Args:
449 record_value_ids: The dict of field names to value_ids.
450 filter_mask: A pre-filter limiting records to consider and/or
451 building records incrementally.
453 Returns:
454 A series identifying where all fields exist.
455 """
456 raise NotImplementedError
458 @abstractmethod
459 def get_auth_records(self, records_mask: pd.Series) -> pd.DataFrame:
460 """Get the authority records identified by the mask.
462 Args:
463 records_mask: A series identifying records in the full data.
465 Returns:
466 The records for which the mask is True.
467 """
468 raise NotImplementedError
470 @abstractmethod
471 def combine_masks(self, mask1: pd.Series, mask2: pd.Series) -> pd.Series:
472 """Combine the masks if possible, returning the valid combination or None.
474 Args:
475 mask1: An auth_records_mask consistent with this data.
476 mask2: Another data auth_records_mask.
478 Returns:
479 The combined consistent records_mask or None.
480 """
481 raise NotImplementedError
484class MultiAuthorityData(CorrelatedAuthorityData):
485 """Container for authoritative data containing correlated data for multiple
486 "sub" authorities composed of explicit data for each component.
487 """
489 def __init__(self, df: pd.DataFrame, name: str):
490 super().__init__(df, name)
491 self._authority_data = {}
493 @abstractmethod
494 def build_authority_data(self, name: str) -> dk_auth.AuthorityData:
495 """Build an authority for the named sub-authority.
497 Args:
498 name: The "sub" authority name.
500 Returns:
501 The "sub" authority data.
502 """
503 raise NotImplementedError
505 @property
506 def authority_data(self, name: str) -> dk_auth.AuthorityData:
507 """Retrieve without building the named authority data, or None"""
508 return self._authority_data.get(name, None)
510 def get_authority_data(self, name: str) -> dk_auth.AuthorityData:
511 """Get AuthorityData for the named "sub" authority, building if needed.
513 Args:
514 name: The "sub" authority name.
516 Returns:
517 The "sub" authority data.
518 """
519 if name not in self._authority_data:
520 self._authority_data[name] = self.build_authority_data(name)
521 return self._authority_data[name]
523 @staticmethod
524 def get_unique_vals_df(col: pd.Series, name: str) -> pd.DataFrame:
525 """Get a dataframe with the unique values from the column and the given
526 column name.
527 """
528 data = np.sort(pd.unique(col.dropna()))
529 if np.issubdtype(col.dtype, np.integer):
530 # IDs for an integer column are the integers themselves
531 col_df = pd.DataFrame({name: data}, index=data)
532 else:
533 # IDs for other columns are auto-generated from 0 to n-1
534 col_df = pd.DataFrame({name: data})
535 return col_df
537 def lookup_subauth_values(self, name: str, value: int, is_id: bool = False) -> pd.DataFrame:
538 """Lookup "sub" authority data for the named "sub" authority value.
540 Args:
541 name: The sub-authority name.
542 value: The value for the sub-authority to lookup.
543 is_id: True if value is an ID.
545 Returns:
546 The applicable authority dataframe rows.
547 """
548 values_df = None
549 authdata = self._authority_data.get(name, None)
550 if authdata is not None:
551 values_df = authdata.lookup_values(value, is_id=is_id)
552 return values_df
554 def lookup_auth_values(
555 self,
556 name: str,
557 value: str,
558 ) -> pd.DataFrame:
559 """Lookup original authority data for the named "sub" authority value.
561 Args:
562 name: The sub-authority name.
563 value: The sub-authority value(s) (or dataframe row(s)).
565 Returns:
566 The original authority dataframe rows.
567 """
568 return self.df[self.df[name] == value]
570 def auth_values_mask(self, name: str, value_id: int) -> pd.Series:
571 """Identify the rows in the full authority corresponding to this sub-value.
573 Args:
574 name: The sub-authority name.
575 value_id: The sub-authority value_id.
577 Returns:
578 A boolean series where the field exists.
579 """
580 field_values = self.lookup_subauth_values(name, value_id, is_id=True)
581 return self.df[name].isin(field_values[name].tolist())
583 def auth_records_mask(
584 self,
585 record_value_ids: Dict[str, int],
586 filter_mask: pd.Series = None,
587 ) -> pd.Series:
588 """Get a boolean series identifying records in the full authority matching
589 the given records of the form {<sub-name>: <sub-value-id>}.
591 Args:
592 record_value_ids: The dict of field names to value_ids.
593 filter_mask: A pre-filter limiting records to consider and/or
594 building records incrementally.
596 Returns:
597 A boolean series where all fields exist or None.
598 """
599 has_fields = filter_mask
600 for name, value_id in record_value_ids.items():
601 has_field = self.auth_values_mask(name, value_id)
602 if has_fields is None:
603 has_fields = has_field
604 else:
605 has_fields &= has_field
606 return has_fields
608 def get_auth_records(self, records_mask: pd.Series) -> pd.DataFrame:
609 """Get the authority records identified by the mask.
611 Args:
612 records_mask: A boolean series identifying records in the full df.
614 Returns:
615 The records/rows for which the mask is True.
616 """
617 return self.df[records_mask]
619 def combine_masks(self, mask1: pd.Series, mask2: pd.Series) -> pd.Series:
620 """Combine the masks if possible, returning the valid combination or None.
622 Args:
623 mask1: An auth_records_mask consistent with this data.
624 mask2: Another data auth_records_mask.
626 Returns:
627 The combined consistent records_mask or None.
628 """
629 result = None
630 if mask1 is not None and mask2 is not None:
631 result = mask1 & mask2
632 elif mask1 is not None:
633 result = mask1
634 elif mask2 is not None:
635 result = mask2
636 return result if np.any(result) else None
639class SimpleMultiAuthorityData(MultiAuthorityData):
640 """Data class for pulling a single column from the multi-authority data
641 as a "sub" authority.
642 """
644 def build_authority_data(self, name: str) -> dk_auth.AuthorityData:
645 """Build an authority for the named column holding authority data.
647 Note:
648 Only unique values are kept and the full dataframe's index
649 will not be preserved.
651 Args:
652 name: The "sub" authority (and column) name.
654 Returns:
655 The "sub" authority data.
656 """
657 col = self.df[name]
658 col_df = self.get_unique_vals_df(col, name)
659 return dk_auth.AuthorityData(col_df, name)
662class MultiAuthorityFactory(dk_auth.AuthorityFactory):
663 """An factory for building a "sub" authority directly or indirectly
664 from MultiAuthorityData.
665 """
667 def __init__(
668 self,
669 auth_name: str,
670 lexical_expander: LexicalExpander = None,
671 ):
672 """Initialize the MultiAuthorityFactory.
674 Args:
675 auth_name: The name of the dataframe authority to build.
676 lexical_expander: The lexical expander to use (default=identity).
677 """
678 self.auth_name = auth_name
679 self._lexical_expander = lexical_expander
681 def get_lexical_expander(self, name: str) -> LexicalExpander:
682 """Get the lexical expander for the named (column) data.
684 Args:
685 name: The name of the column to expand.
687 Returns:
688 The appropriate lexical_expander.
689 """
690 if self._lexical_expander is None:
691 self._lexical_expander = LexicalExpander(None, None)
692 return self._lexical_expander
694 def build_authority(
695 self,
696 name: str,
697 auth_anns_builder: dk_auth.AuthorityAnnotationsBuilder,
698 multiauthdata: MultiAuthorityData,
699 parent_auth: dk_auth.Authority = None,
700 ) -> DataframeAuthority:
701 """Build a DataframeAuthority.
703 Args:
704 name: The name of the authority to build.
705 auth_anns_builder: The authority annotations row builder to use
706 for building annotation rows.
707 multiauthdata: The multi-authority source data.
708 parent_auth: The parent authority.
710 Returns:
711 The DataframeAuthority instance.
712 """
713 authdata = multiauthdata.get_authority_data(name)
714 field_groups = None # TODO: get from instance var set on construction?
715 anns_validator = None # TODO: get from multiauthdata?
716 return DataframeAuthority(
717 name,
718 self.get_lexical_expander(name),
719 authdata,
720 field_groups=field_groups,
721 anns_validator=anns_validator,
722 parent_auth=parent_auth,
723 )