Coverage for src / dataknobs_xization / lexicon.py: 31%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:46 -0700

1"""Lexical matching and token alignment for text processing. 

2 

3Provides classes for lexical expansion, normalization, token alignment, 

4and pattern matching in text with support for variations and fuzzy matching. 

5""" 

6 

7from abc import abstractmethod 

8from collections import defaultdict 

9from collections.abc import Callable 

10from typing import Any, Dict, List, Set, Union 

11 

12import more_itertools 

13import numpy as np 

14import pandas as pd 

15 

16import dataknobs_structures.document as dk_doc 

17import dataknobs_xization.annotations as dk_anns 

18import dataknobs_xization.authorities as dk_auth 

19import dataknobs_xization.masking_tokenizer as dk_tok 

20from dataknobs_utils import emoji_utils 

21 

22 

23class LexicalExpander: 

24 """A class to expand and/or normalize original lexical input terms, to 

25 keep back-references from generated data to corresponding original input, 

26 and to build consistent tokens for lexical matching. 

27 """ 

28 

29 def __init__( 

30 self, 

31 variations_fn: Callable[[str], Set[str]], 

32 normalize_fn: Callable[[str], str], 

33 split_input_camelcase: bool = True, 

34 detect_emojis: bool = False, 

35 ): 

36 """Initialize with the given functions. 

37 

38 Args: 

39 variations_fn: A function, f(t), to expand a raw input term to 

40 all of its variations (including itself if desired). If None, the 

41 default is to expand each term to itself. 

42 normalize_fn: A function to normalize a raw input term or any 

43 of its variations. If None, then the identity function is used. 

44 split_input_camelcase: True to split input camelcase tokens. 

45 detect_emojis: True to detect emojis. If split_input_camelcase, 

46 then adjacent emojis will also be split; otherwise, adjacent 

47 emojis will appear as a single token. 

48 """ 

49 self.variations_fn = variations_fn if variations_fn else lambda x: {x} 

50 self.normalize_fn = normalize_fn if normalize_fn else lambda x: x 

51 self.split_input_camelcase = split_input_camelcase 

52 self.emoji_data = emoji_utils.load_emoji_data() if detect_emojis else None 

53 self.v2t = defaultdict(set) 

54 

55 def __call__(self, term: Any, normalize: bool = True) -> Set[str]: 

56 """Get all variations of the original term. 

57 

58 Args: 

59 term: The term whose variations to compute. 

60 normalize: True to normalize the resulting variations. 

61 

62 Returns: 

63 All variations. 

64 """ 

65 variations = self.variations_fn(term) 

66 if normalize: 

67 variations = {self.normalize_fn(v) for v in variations} 

68 # Add a mapping from each variation to its original term 

69 if variations is not None and len(variations) > 0: 

70 more_itertools.consume(self.v2t[v].add(term) for v in variations) 

71 return variations 

72 

73 def normalize(self, input_term: str) -> str: 

74 """Normalize the given input term or variation. 

75 

76 Args: 

77 input_term: An input term to normalize. 

78 

79 Returns: 

80 The normalized string of the input_term. 

81 """ 

82 return self.normalize_fn(input_term) 

83 

84 def get_terms(self, variation: str) -> Set[Any]: 

85 """Get the term ids for which the given variation was generated. 

86 

87 Args: 

88 variation: A variation whose reference term(s) to retrieve. 

89 

90 Returns: 

91 The set term ids for the variation or the missing_value. 

92 """ 

93 return self.v2t.get(variation, set()) 

94 

95 def build_first_token( 

96 self, 

97 doctext: Union[dk_doc.Text, str], 

98 ) -> dk_tok.Token: 

99 inputf = dk_tok.TextFeatures( 

100 doctext, split_camelcase=self.split_input_camelcase, emoji_data=self.emoji_data 

101 ) 

102 return inputf.build_first_token(normalize_fn=self.normalize_fn) 

103 

104 

105class TokenMatch: 

106 """Represents a match between tokens and a lexical authority variation. 

107 

108 Matches a sequence of tokens against a lexical authority variation, 

109 tracking whether the match is complete and providing access to 

110 matched text and annotation generation. 

111 """ 

112 

113 def __init__(self, auth: dk_auth.LexicalAuthority, val_idx: int, var: str, token: dk_tok.Token): 

114 self.auth = auth 

115 self.val_idx = val_idx 

116 self.var = var 

117 self.token = token 

118 

119 self.varparts = var.split() 

120 self.matches = True 

121 self.tokens = [] 

122 t = token 

123 for v in self.varparts: 

124 if t is not None and v == t.norm_text: 

125 self.tokens.append(t) 

126 t = t.next_token 

127 else: 

128 self.matches = False 

129 break 

130 

131 def __repr__(self): 

132 ttext = " ".join(t.token_text for t in self.tokens) 

133 return ( 

134 f"Match_{self.tokens[0].token_num}-{self.tokens[-1].token_num}({ttext})[{self.val_idx}]" 

135 ) 

136 

137 @property 

138 def next_token(self): 

139 next_token = None 

140 if self.matches: 

141 next_token = self.tokens[-1].next_token 

142 return next_token 

143 

144 @property 

145 def matched_text(self): 

146 """Get the matched original text.""" 

147 return self.token.input_text[self.tokens[0].start_pos : self.tokens[-1].end_pos] 

148 

149 def build_annotation(self): 

150 return self.auth.build_annotation( 

151 start_pos=self.tokens[0].start_pos, 

152 end_pos=self.tokens[-1].end_pos, 

153 entity_text=self.matched_text, 

154 auth_value_id=self.val_idx, 

155 ) 

156 

157 

158class TokenAligner: 

159 """Aligns tokens with a lexical authority to generate annotations. 

160 

161 Processes a token stream, matching tokens against lexical authority 

162 variations and generating annotations for matches. Handles overlapping 

163 matches and tracks processed tokens. 

164 """ 

165 

166 def __init__(self, first_token: dk_tok.Token, authority: dk_auth.LexicalAuthority): 

167 self.first_token = first_token 

168 self.auth = authority 

169 self.annotations = [] # List[Dict[str, Any]] 

170 self._processed_idx = set() 

171 self._process(self.first_token) 

172 

173 def _process(self, token): 

174 if token is not None: 

175 if token.token_num not in self._processed_idx: 

176 token_matches = self._get_token_matches(token) 

177 for token_match in token_matches: 

178 self.annotations.append(token_match.build_annotation()) 

179 self._process(token_match.next_token) 

180 self._process(token.next_token) 

181 

182 def _get_token_matches(self, token): 

183 token_matches = [] 

184 vs = self.auth.find_variations(token.norm_text, starts_with=True) 

185 if len(vs) > 0: 

186 for val_idx, var in vs.items(): 

187 token_match = TokenMatch(self.auth, val_idx, var, token) 

188 if token_match.matches: 

189 # mark token position(s) as matched 

190 self._processed_idx.update({t.token_num for t in token_match.tokens}) 

191 token_matches.append(token_match) 

192 return token_matches 

193 

194 

195class DataframeAuthority(dk_auth.LexicalAuthority): 

196 """A pandas dataframe-based lexical authority.""" 

197 

198 def __init__( 

199 self, 

200 name: str, 

201 lexical_expander: LexicalExpander, 

202 authdata: dk_auth.AuthorityData, 

203 auth_anns_builder: dk_auth.AuthorityAnnotationsBuilder = None, 

204 field_groups: dk_auth.DerivedFieldGroups = None, 

205 anns_validator: Callable[[dk_auth.Authority, Dict[str, Any]], bool] = None, 

206 parent_auth: dk_auth.Authority = None, 

207 ): 

208 """Initialize with the name, values, and associated ids of the authority; 

209 and with the lexical expander for authoritative values. 

210 

211 Args: 

212 name: The authority name, if different from df.columns[0]. 

213 lexical_expander: The lexical expander for the values. 

214 authdata: The data for this authority. 

215 auth_anns_builder: The authority annotations row builder to use 

216 for building annotation rows. 

217 field_groups: The derived field groups to use. 

218 anns_validator: fn(auth, anns_dict_list) that returns True if 

219 the list of annotation row dicts are valid to be added as 

220 annotations for a single match or "entity". 

221 parent_auth: This authority's parent authority (if any). 

222 """ 

223 super().__init__( 

224 name if name else authdata.df.columns[0], 

225 auth_anns_builder=auth_anns_builder, 

226 authdata=authdata, 

227 field_groups=field_groups, 

228 anns_validator=anns_validator, 

229 parent_auth=parent_auth, 

230 ) 

231 self.lexical_expander = lexical_expander 

232 self._variations = None 

233 self._prev_aligner = None 

234 

235 @property 

236 def prev_aligner(self) -> TokenAligner: 

237 """Get the token aligner created in the latest call to annotate_text.""" 

238 return self._prev_aligner 

239 

240 @property 

241 def variations(self) -> pd.Series: 

242 """Get all lexical variations in a series whose index has associated 

243 value IDs. 

244 

245 Returns: 

246 A pandas series with index-identified variations. 

247 """ 

248 if self._variations is None: 

249 self._variations = ( 

250 self.authdata.df[self.name].apply(self.lexical_expander).explode().dropna() 

251 ) 

252 return self._variations 

253 

254 def get_id_by_variation(self, variation: str) -> Set[str]: 

255 """Get the IDs of the value(s) associated with the given variation. 

256 

257 Args: 

258 variation: Variation text. 

259 

260 Returns: 

261 The possibly empty set of associated value IDS. 

262 """ 

263 ids = set() 

264 for value in self.lexical_expander.get_terms(variation): 

265 ids.update(self.get_value_ids(value)) 

266 return ids 

267 

268 def get_variations(self, value: Any, normalize: bool = True) -> Set[Any]: 

269 """Convenience method to compute variations for the value. 

270 

271 Args: 

272 value: The authority value, or term, whose variations to compute. 

273 normalize: True to normalize the variations. 

274 

275 Returns: 

276 The set of variations for the value. 

277 """ 

278 return self.lexical_expander(value, normalize=normalize) 

279 

280 def has_value(self, value: Any) -> bool: 

281 """Determine whether the given value is in this authority. 

282 

283 Args: 

284 value: A possible authority value. 

285 

286 Returns: 

287 True if the value is a valid entity value. 

288 """ 

289 return np.any(self.authdata.df[self.name] == value) 

290 

291 def get_value_ids(self, value: Any) -> Set[Any]: 

292 """Get all IDs associated with the given value. Note that typically 

293 there is a single ID for any value, but this allows for inherent 

294 ambiguities in the authority. 

295 

296 Args: 

297 value: An authority value. 

298 

299 Returns: 

300 The associated IDs or an empty set if the value is not valid. 

301 """ 

302 return set(self.authdata.lookup_values(value).index.tolist()) 

303 

304 def get_values_by_id(self, value_id: Any) -> Set[Any]: 

305 """Get all values for the associated value ID. Note that typically 

306 there is a single value for an ID, but this allows for inherent 

307 ambiguities in the authority. 

308 

309 Args: 

310 value_id: An authority value ID. 

311 

312 Returns: 

313 The associated values or an empty set if the value ID is not valid. 

314 """ 

315 return set(self.authdata.lookup_values(value_id, is_id=True)[self.name].tolist()) 

316 

317 def find_variations( 

318 self, 

319 variation: str, 

320 starts_with: bool = False, 

321 ends_with: bool = False, 

322 scope: str = "fullmatch", 

323 ) -> pd.Series: 

324 """Find all matches to the given variation. 

325 

326 Note: 

327 Only the first true of starts_with, ends_with, and scope will 

328 be applied. If none of these are true, a full match on the pattern 

329 is performed. 

330 

331 Args: 

332 variation: The text to find; treated as a regular expression 

333 unless either starts_with or ends_with is True. 

334 starts_with: When True, find all terms that start with the 

335 variation text. 

336 ends_with: When True, find all terms that end with the variation 

337 text. 

338 scope: 'fullmatch' (default), 'match', or 'contains' for 

339 strict, less strict, and least strict matching. 

340 

341 Returns: 

342 The matching variations as a pd.Series. 

343 """ 

344 vs = self.variations 

345 if starts_with: 

346 vs = vs[vs.str.startswith(variation)] 

347 elif ends_with: 

348 vs = vs[vs.str.endswith(variation)] 

349 else: 

350 if scope == "fullmatch": 

351 hits = vs.str.fullmatch(variation) 

352 elif scope == "match": 

353 hits = vs.str.match(variation) 

354 else: 

355 hits = vs.str.contains(variation) 

356 vs = vs[hits] 

357 vs = vs.drop_duplicates() 

358 return vs 

359 

360 def get_variations_df( 

361 self, 

362 variations: pd.Series, 

363 variations_colname: str = "variation", 

364 ids_colname: str = None, 

365 lookup_values: bool = False, 

366 ) -> pd.DataFrame: 

367 """Create a DataFrame including associated ids for each variation. 

368 

369 Args: 

370 variations: The variations to include in the dataframe. 

371 variations_colname: The name of the variations column. 

372 ids_colname: The column name for value ids. 

373 lookup_values: When True, include a self.name column 

374 with associated values. 

375 """ 

376 if ids_colname is None: 

377 ids_colname = f"{self.name}_id" 

378 df = pd.DataFrame( 

379 { 

380 variations_colname: variations, 

381 ids_colname: variations.apply(self.get_id_by_variation), 

382 } 

383 ).explode(ids_colname) 

384 if lookup_values: 

385 df[self.name] = df[ids_colname].apply(self.get_values_by_id) 

386 df = df.explode(self.name) 

387 return df 

388 

389 def add_annotations( 

390 self, 

391 doctext: dk_doc.Text, 

392 annotations: dk_anns.Annotations, 

393 ) -> dk_anns.Annotations: 

394 """Method to do the work of finding, validating, and adding annotations. 

395 

396 Args: 

397 doctext: The text to process. 

398 annotations: The annotations object to add annotations to. 

399 

400 Returns: 

401 The given or a new Annotations instance. 

402 """ 

403 first_token = self.lexical_expander.build_first_token( 

404 doctext.text, input_id=doctext.text_id 

405 ) 

406 token_aligner = TokenAligner(first_token, self) 

407 self._prev_aligner = token_aligner 

408 if self.validate_ann_dicts(token_aligner.annotations): 

409 annotations.add_dicts(token_aligner.annotations) 

410 return annotations 

411 

412 

413class CorrelatedAuthorityData(dk_auth.AuthorityData): 

414 """Container for authoritative data containing correlated data for multiple 

415 "sub" authorities. 

416 """ 

417 

418 def __init__(self, df: pd.DataFrame, name: str): 

419 super().__init__(df, name) 

420 self._authority_data = {} 

421 

422 def sub_authority_names(self) -> List[str]: 

423 """Get the "sub" authority names.""" 

424 return None 

425 

426 @abstractmethod 

427 def auth_values_mask(self, name: str, value_id: int) -> pd.Series: 

428 """Identify full-authority data corresponding to this sub-value. 

429 

430 Args: 

431 name: The sub-authority name. 

432 value_id: The sub-authority value_id. 

433 

434 Returns: 

435 A series representing relevant full-authority data. 

436 """ 

437 raise NotImplementedError 

438 

439 @abstractmethod 

440 def auth_records_mask( 

441 self, 

442 record_value_ids: Dict[str, int], 

443 filter_mask: pd.Series = None, 

444 ) -> pd.Series: 

445 """Get a series identifying records in the full authority matching 

446 the given records of the form {<sub-name>: <sub-value-id>}. 

447 

448 Args: 

449 record_value_ids: The dict of field names to value_ids. 

450 filter_mask: A pre-filter limiting records to consider and/or 

451 building records incrementally. 

452 

453 Returns: 

454 A series identifying where all fields exist. 

455 """ 

456 raise NotImplementedError 

457 

458 @abstractmethod 

459 def get_auth_records(self, records_mask: pd.Series) -> pd.DataFrame: 

460 """Get the authority records identified by the mask. 

461 

462 Args: 

463 records_mask: A series identifying records in the full data. 

464 

465 Returns: 

466 The records for which the mask is True. 

467 """ 

468 raise NotImplementedError 

469 

470 @abstractmethod 

471 def combine_masks(self, mask1: pd.Series, mask2: pd.Series) -> pd.Series: 

472 """Combine the masks if possible, returning the valid combination or None. 

473 

474 Args: 

475 mask1: An auth_records_mask consistent with this data. 

476 mask2: Another data auth_records_mask. 

477 

478 Returns: 

479 The combined consistent records_mask or None. 

480 """ 

481 raise NotImplementedError 

482 

483 

484class MultiAuthorityData(CorrelatedAuthorityData): 

485 """Container for authoritative data containing correlated data for multiple 

486 "sub" authorities composed of explicit data for each component. 

487 """ 

488 

489 def __init__(self, df: pd.DataFrame, name: str): 

490 super().__init__(df, name) 

491 self._authority_data = {} 

492 

493 @abstractmethod 

494 def build_authority_data(self, name: str) -> dk_auth.AuthorityData: 

495 """Build an authority for the named sub-authority. 

496 

497 Args: 

498 name: The "sub" authority name. 

499 

500 Returns: 

501 The "sub" authority data. 

502 """ 

503 raise NotImplementedError 

504 

505 @property 

506 def authority_data(self, name: str) -> dk_auth.AuthorityData: 

507 """Retrieve without building the named authority data, or None""" 

508 return self._authority_data.get(name, None) 

509 

510 def get_authority_data(self, name: str) -> dk_auth.AuthorityData: 

511 """Get AuthorityData for the named "sub" authority, building if needed. 

512 

513 Args: 

514 name: The "sub" authority name. 

515 

516 Returns: 

517 The "sub" authority data. 

518 """ 

519 if name not in self._authority_data: 

520 self._authority_data[name] = self.build_authority_data(name) 

521 return self._authority_data[name] 

522 

523 @staticmethod 

524 def get_unique_vals_df(col: pd.Series, name: str) -> pd.DataFrame: 

525 """Get a dataframe with the unique values from the column and the given 

526 column name. 

527 """ 

528 data = np.sort(pd.unique(col.dropna())) 

529 if np.issubdtype(col.dtype, np.integer): 

530 # IDs for an integer column are the integers themselves 

531 col_df = pd.DataFrame({name: data}, index=data) 

532 else: 

533 # IDs for other columns are auto-generated from 0 to n-1 

534 col_df = pd.DataFrame({name: data}) 

535 return col_df 

536 

537 def lookup_subauth_values(self, name: str, value: int, is_id: bool = False) -> pd.DataFrame: 

538 """Lookup "sub" authority data for the named "sub" authority value. 

539 

540 Args: 

541 name: The sub-authority name. 

542 value: The value for the sub-authority to lookup. 

543 is_id: True if value is an ID. 

544 

545 Returns: 

546 The applicable authority dataframe rows. 

547 """ 

548 values_df = None 

549 authdata = self._authority_data.get(name, None) 

550 if authdata is not None: 

551 values_df = authdata.lookup_values(value, is_id=is_id) 

552 return values_df 

553 

554 def lookup_auth_values( 

555 self, 

556 name: str, 

557 value: str, 

558 ) -> pd.DataFrame: 

559 """Lookup original authority data for the named "sub" authority value. 

560 

561 Args: 

562 name: The sub-authority name. 

563 value: The sub-authority value(s) (or dataframe row(s)). 

564 

565 Returns: 

566 The original authority dataframe rows. 

567 """ 

568 return self.df[self.df[name] == value] 

569 

570 def auth_values_mask(self, name: str, value_id: int) -> pd.Series: 

571 """Identify the rows in the full authority corresponding to this sub-value. 

572 

573 Args: 

574 name: The sub-authority name. 

575 value_id: The sub-authority value_id. 

576 

577 Returns: 

578 A boolean series where the field exists. 

579 """ 

580 field_values = self.lookup_subauth_values(name, value_id, is_id=True) 

581 return self.df[name].isin(field_values[name].tolist()) 

582 

583 def auth_records_mask( 

584 self, 

585 record_value_ids: Dict[str, int], 

586 filter_mask: pd.Series = None, 

587 ) -> pd.Series: 

588 """Get a boolean series identifying records in the full authority matching 

589 the given records of the form {<sub-name>: <sub-value-id>}. 

590 

591 Args: 

592 record_value_ids: The dict of field names to value_ids. 

593 filter_mask: A pre-filter limiting records to consider and/or 

594 building records incrementally. 

595 

596 Returns: 

597 A boolean series where all fields exist or None. 

598 """ 

599 has_fields = filter_mask 

600 for name, value_id in record_value_ids.items(): 

601 has_field = self.auth_values_mask(name, value_id) 

602 if has_fields is None: 

603 has_fields = has_field 

604 else: 

605 has_fields &= has_field 

606 return has_fields 

607 

608 def get_auth_records(self, records_mask: pd.Series) -> pd.DataFrame: 

609 """Get the authority records identified by the mask. 

610 

611 Args: 

612 records_mask: A boolean series identifying records in the full df. 

613 

614 Returns: 

615 The records/rows for which the mask is True. 

616 """ 

617 return self.df[records_mask] 

618 

619 def combine_masks(self, mask1: pd.Series, mask2: pd.Series) -> pd.Series: 

620 """Combine the masks if possible, returning the valid combination or None. 

621 

622 Args: 

623 mask1: An auth_records_mask consistent with this data. 

624 mask2: Another data auth_records_mask. 

625 

626 Returns: 

627 The combined consistent records_mask or None. 

628 """ 

629 result = None 

630 if mask1 is not None and mask2 is not None: 

631 result = mask1 & mask2 

632 elif mask1 is not None: 

633 result = mask1 

634 elif mask2 is not None: 

635 result = mask2 

636 return result if np.any(result) else None 

637 

638 

639class SimpleMultiAuthorityData(MultiAuthorityData): 

640 """Data class for pulling a single column from the multi-authority data 

641 as a "sub" authority. 

642 """ 

643 

644 def build_authority_data(self, name: str) -> dk_auth.AuthorityData: 

645 """Build an authority for the named column holding authority data. 

646 

647 Note: 

648 Only unique values are kept and the full dataframe's index 

649 will not be preserved. 

650 

651 Args: 

652 name: The "sub" authority (and column) name. 

653 

654 Returns: 

655 The "sub" authority data. 

656 """ 

657 col = self.df[name] 

658 col_df = self.get_unique_vals_df(col, name) 

659 return dk_auth.AuthorityData(col_df, name) 

660 

661 

662class MultiAuthorityFactory(dk_auth.AuthorityFactory): 

663 """An factory for building a "sub" authority directly or indirectly 

664 from MultiAuthorityData. 

665 """ 

666 

667 def __init__( 

668 self, 

669 auth_name: str, 

670 lexical_expander: LexicalExpander = None, 

671 ): 

672 """Initialize the MultiAuthorityFactory. 

673 

674 Args: 

675 auth_name: The name of the dataframe authority to build. 

676 lexical_expander: The lexical expander to use (default=identity). 

677 """ 

678 self.auth_name = auth_name 

679 self._lexical_expander = lexical_expander 

680 

681 def get_lexical_expander(self, name: str) -> LexicalExpander: 

682 """Get the lexical expander for the named (column) data. 

683 

684 Args: 

685 name: The name of the column to expand. 

686 

687 Returns: 

688 The appropriate lexical_expander. 

689 """ 

690 if self._lexical_expander is None: 

691 self._lexical_expander = LexicalExpander(None, None) 

692 return self._lexical_expander 

693 

694 def build_authority( 

695 self, 

696 name: str, 

697 auth_anns_builder: dk_auth.AuthorityAnnotationsBuilder, 

698 multiauthdata: MultiAuthorityData, 

699 parent_auth: dk_auth.Authority = None, 

700 ) -> DataframeAuthority: 

701 """Build a DataframeAuthority. 

702 

703 Args: 

704 name: The name of the authority to build. 

705 auth_anns_builder: The authority annotations row builder to use 

706 for building annotation rows. 

707 multiauthdata: The multi-authority source data. 

708 parent_auth: The parent authority. 

709 

710 Returns: 

711 The DataframeAuthority instance. 

712 """ 

713 authdata = multiauthdata.get_authority_data(name) 

714 field_groups = None # TODO: get from instance var set on construction? 

715 anns_validator = None # TODO: get from multiauthdata? 

716 return DataframeAuthority( 

717 name, 

718 self.get_lexical_expander(name), 

719 authdata, 

720 field_groups=field_groups, 

721 anns_validator=anns_validator, 

722 parent_auth=parent_auth, 

723 )