Coverage for src / dataknobs_xization / masking_tokenizer.py: 34%
293 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:46 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:46 -0700
1"""Character-level text feature extraction and tokenization.
3Provides abstract classes for extracting character-level features from text,
4building DataFrames with character features for masking and tokenization.
5"""
7from abc import ABC, abstractmethod
8from collections.abc import Callable
9from typing import Any, List, Tuple, Union
11import numpy as np
12import pandas as pd
14import dataknobs_structures.document as dk_doc
15from dataknobs_utils import emoji_utils
18class CharacterFeatures(ABC):
19 """Class representing features of text as a dataframe with each character
20 as a row and columns representing character features.
21 """
23 def __init__(self, doctext: Union[dk_doc.Text, str], roll_padding: int = 0):
24 """Initialize with the text to tokenize.
26 Args:
27 doctext: The text to tokenize (or dk_doc.Text with its metadata).
28 roll_padding: The number of pad characters added to each end of
29 the text.
30 """
31 self._doctext = doctext
32 self._roll_padding = roll_padding
33 self._padded_text = None
35 @property
36 def cdf(self) -> pd.DataFrame:
37 """The character dataframe with each padded text character as a row."""
38 raise NotImplementedError
40 @property
41 def doctext(self) -> dk_doc.Text:
42 if isinstance(self._doctext, str):
43 self._doctext = dk_doc.Text(self._doctext, None)
44 return self._doctext
46 @property
47 def text_col(self) -> str:
48 """The name of the cdf column holding the text characters."""
49 return self.doctext.text_label
51 @property
52 def text(self) -> str:
53 """The text string."""
54 return self.doctext.text
56 @property
57 def text_id(self) -> Any:
58 """The ID of the text."""
59 return self.doctext.text_id
61 @abstractmethod
62 def build_first_token(
63 self,
64 normalize_fn: Callable[[str], str],
65 ) -> "Token":
66 """Build the first token as the start of tokenization.
68 Args:
69 normalize_fn: A function to normalize a raw text term or any
70 of its variations. If None, then the identity function is used.
72 Returns:
73 The first text token.
74 """
75 raise NotImplementedError
77 @property
78 def roll_padding(self) -> int:
79 """The number of pad characters added to each end of the text."""
80 return self._roll_padding
82 @property
83 def padded_text(self) -> str:
84 """The text with padding included."""
85 if self._padded_text is None:
86 padding = " " * self.roll_padding
87 self._padded_text = padding + self.text + padding
88 return self._padded_text
90 def get_tokens(
91 self,
92 normalize_fn: Callable[[str], str] = lambda x: x,
93 ) -> List["Token"]:
94 """Get all token instances using the given normalize function.
96 Args:
97 normalize_fn: The normalization function (default=identity fn).
99 Returns:
100 A list of token instances.
101 """
102 token = self.build_first_token(normalize_fn)
103 tokens = []
104 while token is not None:
105 tokens.append(token)
106 token = token.next_token
107 return tokens
110class TextFeatures(CharacterFeatures):
111 """Extracts text-specific character features for tokenization.
113 Extends CharacterFeatures to provide text tokenization with support for
114 camelCase splitting, character type features (alpha, digit, upper, lower),
115 and emoji handling. Builds a character DataFrame with features for
116 token boundary detection.
117 """
119 def __init__(
120 self,
121 doctext: Union[dk_doc.Text, str],
122 split_camelcase: bool = True,
123 mark_alpha: bool = False,
124 mark_digit: bool = False,
125 mark_upper: bool = False,
126 mark_lower: bool = False,
127 emoji_data: emoji_utils.EmojiData = None,
128 ):
129 """Initialize with text tokenization parameters.
131 Note:
132 If emoji_data is non-null:
133 * Then emojis will be treated as text (instead of as non-text)
134 * If split_camelcase is True,
135 * then each emoji will be in its own token
136 * otherwise, each sequence of (adjacent) emojis will be treated
137 as a single token.
139 Args:
140 doctext: The text to tokenize with its metadata.
141 split_camelcase: True to mark camel-case features.
142 mark_alpha: True to mark alpha features (separate from alnum).
143 mark_digit: True to mark digit features (separate from alnum).
144 mark_upper: True to mark upper features (auto-included for
145 camel-case).
146 mark_lower: True to mark lower features (auto-included for
147 camel-case).
148 emoji_data: An EmojiData instance to mark emoji BIO features.
149 """
150 # NOTE: roll_padding is determined by "roll" feature needs. Currently 1.
151 super().__init__(doctext, roll_padding=1)
152 self.split_camelcase = split_camelcase
153 self._cdf = self._build_character_dataframe(
154 split_camelcase,
155 mark_alpha,
156 mark_digit,
157 mark_upper,
158 mark_lower,
159 emoji_data,
160 )
162 @property
163 def cdf(self) -> pd.DataFrame:
164 """The character dataframe with each padded text character as a row."""
165 return self._cdf
167 def build_first_token(
168 self,
169 normalize_fn: Callable[[str], str],
170 ) -> "Token":
171 """Build the first token as the start of tokenization.
173 Args:
174 normalize_fn: A function to normalize a raw text term or any
175 of its variations. If None, then the identity function is used.
177 Returns:
178 The first text token.
179 """
180 token_mask = (
181 DualTokenMask(
182 self,
183 self.cdf["tok_start"],
184 self.cdf["tok_end"],
185 )
186 if self.split_camelcase
187 else SimpleTokenMask(self, self.cdf["alnum"])
188 )
189 token = Token(token_mask, normalize_fn=normalize_fn)
190 return token
192 def _build_character_dataframe(
193 self,
194 split_camelcase,
195 mark_alpha,
196 mark_digit,
197 mark_upper,
198 mark_lower,
199 emoji_data,
200 ):
201 if split_camelcase:
202 mark_upper = True
203 mark_lower = True
204 cdf = pd.DataFrame({self.text_col: list(self.padded_text)})
205 if mark_alpha:
206 cdf["alpha"] = cdf[self.text_col].str.isalpha()
207 if mark_digit:
208 cdf["digit"] = cdf[self.text_col].str.isdigit()
209 cdf["alnum"] = cdf[self.text_col].str.isalnum()
210 cdf["space"] = cdf[self.text_col].str.isspace()
211 if mark_upper:
212 cdf["upper"] = cdf[self.text_col].str.isupper()
213 if mark_lower:
214 cdf["lower"] = cdf[self.text_col].str.islower()
215 cdf["sym"] = ~(cdf["alnum"] | cdf["space"])
216 if split_camelcase:
217 cdf["cc1"] = np.roll(cdf["lower"], 1) & cdf["upper"]
218 cdf["cc2"] = ( # Mark 2nd U of UUl
219 np.roll(cdf["upper"], 1) & cdf["upper"] & np.roll(cdf["lower"], -1)
220 )
221 # NOTE: tok_start and tok_end are both INCLUSIVE
222 cdf["tok_start"] = ( # mark a char following a non-char
223 cdf["alnum"] & ~np.roll(cdf["alnum"], 1)
224 )
225 cdf["tok_end"] = ( # mark a char followed by a non-char
226 cdf["alnum"] & ~np.roll(cdf["alnum"], -1)
227 )
228 if split_camelcase:
229 cdf["tok_start"] = cdf["tok_start"] | cdf["cc1"] | cdf["cc2"]
230 cdf["tok_end"] = cdf["tok_end"] | np.roll(cdf["cc1"] | cdf["cc2"], -1)
231 if emoji_data is not None:
232 cdf["emoji"] = pd.Series(list(emoji_data.emoji_bio(self.padded_text)))
233 if split_camelcase:
234 # Splitting camelcase includes splitting distinct emojis
235 cdf["tok_start"] |= cdf["emoji"] == "B"
236 cdf["tok_end"] |= ( # mark an 'I' followed by not 'I'
237 (cdf["emoji"] == "I") & np.roll(cdf["emoji"] != "I", -1)
238 )
239 cdf["tok_end"] |= ( # mark an 'B' followed by not 'I'
240 (cdf["emoji"] == "B") & np.roll(cdf["emoji"] != "I", -1)
241 )
242 else:
243 # Not splitting camelcase keeps consecutive emojis together
244 cdf["alnum"] |= cdf["emoji"] != "O"
245 return cdf
248class CharacterInputFeatures(CharacterFeatures):
249 """A wrapper that starts with a pre-built character features dataframe."""
251 def __init__(
252 self,
253 cdf: pd.DataFrame,
254 token_mask: "TokenMask",
255 doctext: Union[dk_doc.Text, str],
256 roll_padding: int = 0,
257 ):
258 super().__init__(doctext, roll_padding=roll_padding)
259 self._cdf = cdf
260 self._token_mask = token_mask
262 @property
263 def cdf(self) -> pd.DataFrame:
264 """The character dataframe with each padded text character as a row."""
265 return self._cdf
267 def build_first_token(
268 self,
269 normalize_fn: Callable[[str], str] = None,
270 ) -> "Token":
271 """Build the first token as the start of tokenization.
273 Args:
274 normalize_fn: A function to normalize a raw text term or any
275 of its variations. If None, then the identity function is used.
277 Returns:
278 The first text token.
279 """
280 token = Token(self._token_mask, normalize_fn=normalize_fn)
281 return token
284class TokenLoc:
285 """Simple structure holding information about a token's location."""
287 def __init__(
288 self,
289 start_loc: int,
290 end_loc: int,
291 token_num: int = None,
292 start_incl: bool = True,
293 end_incl: bool = False,
294 ):
295 """Initialize with the available information.
297 Args:
298 start_loc: The starting location of the token.
299 end_loc: The ending location of the token.
300 token_num: The position of the token within its text string.
301 start_incl: True if start_loc is part of the token; otherwise
302 start_loc+1 is part of the token.
303 end_incl: True if end_loc is part of the token; otherwise
304 end_loc-1 is part of the token.
305 """
306 self._start_loc = start_loc
307 self._end_loc = end_loc
308 self._token_num = token_num
309 self._start_incl = int(start_incl)
310 self._end_incl = int(end_incl)
312 def __repr__(self) -> str:
313 token_num = f"#{self._token_num}" if self._token_num >= 0 else ""
315 def inclc(incl, left):
316 if incl:
317 return "[" if left else "]"
318 else:
319 return "(" if left else ")"
321 return f"{token_num}{inclc(self._start_incl, True)}{self._start_loc}:{self._end_loc}{inclc(self._end_incl, False)}"
323 def _incl_offset(self, wanted_incl: bool, current_incl: int) -> int:
324 """Get the inclusivity offset based on what is wanted versus what is."""
325 return int(wanted_incl) - current_incl
327 @property
328 def len(self) -> int:
329 """Get the length of the token at this location."""
330 return self.end_loc_excl - self.start_loc_incl
332 @property
333 def start_loc_incl(self) -> int:
334 """Get the inclusive start location."""
335 return self._start_loc + self._incl_offset(True, self._start_incl)
337 @property
338 def start_loc_excl(self) -> int:
339 """Get the exclusive start location."""
340 return self._start_loc + self._incl_offset(False, self._start_incl)
342 @property
343 def end_loc_incl(self) -> int:
344 """Get the inclusive end location."""
345 return self._end_loc - self._incl_offset(True, self._end_incl)
347 @property
348 def end_loc_excl(self) -> int:
349 """Get the exclusive end location."""
350 return self._end_loc - self._incl_offset(False, self._end_incl)
352 @property
353 def token_num(self) -> int:
354 """Get the token's position within its text string, or -1 if unknown."""
355 return self._token_num if self._token_num is not None else -1
358class TokenMask(ABC):
359 """A class for accessing text characters through feature masks."""
361 def __init__(self, text_features: CharacterFeatures):
362 self.text_features = text_features
363 self.pad = self.text_features.roll_padding
364 self.max_ploc = max(self.text_features.cdf.index)
366 def _get_next_start(self, ref_ploc: int, token_mask: pd.Series) -> int:
367 """Given the end of a prior token or possible start of the next, get
368 the "next" start token's starting ploc. If there is no subsequent
369 token, then return None.
371 Args:
372 ref_ploc: The end ploc of the prior token or start of string.
373 token_mask: The token mask to use.
375 Returns:
376 The ploc of the start of the next token or None.
377 """
378 # if not at end of string or already at the start of a token, increment
379 if ref_ploc > self.max_ploc:
380 ref_ploc = None # At end of string
381 elif not token_mask.loc[ref_ploc]:
382 next_ploc = increment(ref_ploc, token_mask)
383 ref_ploc = next_ploc if next_ploc > ref_ploc else None
384 return ref_ploc
386 def get_padded_text(self, start_loc_incl: int, end_loc_excl: int) -> str:
387 return self.text_features.padded_text[start_loc_incl:end_loc_excl]
389 def get_text(self, token_loc: TokenLoc) -> str:
390 """Get the text at the (padded) token location.
392 Args:
393 token_loc: The token location.
395 Returns:
396 The token text.
397 """
398 return self.get_padded_text(token_loc.start_loc_incl, token_loc.end_loc_excl)
400 @abstractmethod
401 def get_next_token_loc(self, ref_ploc: int, token_num: int = -1) -> TokenLoc:
402 """Given the end of a prior token or possible start of the next, get
403 the "next" token's location.
404 If there is no subsequent token, then return None.
406 Args:
407 ref_ploc: The end ploc of the prior token or start of string.
408 token_num: The token position within its text string.
410 Returns:
411 The TokenLoc of the next token or None.
412 """
413 raise NotImplementedError
415 @abstractmethod
416 def get_prev_token_loc(self, from_token_loc: TokenLoc) -> TokenLoc:
417 """Get the previous token bounds before the given token start ploc.
418 If there is no prior token, then return None.
420 Args:
421 from_token_loc: The token location after the result.
423 Returns:
424 The TokenLoc of the prior token or None.
425 """
426 raise NotImplementedError
429def increment(start_loc: int, mask: pd.Series) -> Tuple[int, bool]:
430 """Increment to the opposite True or False index location in the given mask
431 from the given start index location.
433 If the mask value at index (loc) start_idx is False, then find the
434 index (loc) value where the mask is True. Then the mask values from
435 start_idx (inclusive) to end_idx (exclusive) are all False.
436 And vice-versa for if the mask value at start_idx is True.
438 Args:
439 start_loc: The start index location.
440 mask: The boolean feature mask.
442 Returns:
443 end_loc Where the mask value is opposite that at start_loc.
444 If unable to increment (e.g., at the end of the mask or no flips),
445 then end_idx will equal start_idx.
446 """
447 end_loc = start_loc
448 if start_loc in mask.index:
449 m = mask.loc[start_loc:]
450 end_iloc = m.argmin() if m.iloc[0] else m.argmax()
451 if end_iloc > 0:
452 end_loc = m.index[end_iloc]
453 return end_loc
456class SimpleTokenMask(TokenMask):
457 """A mask where "in" tokens are ones and "out" are zeros."""
459 def __init__(self, text_features: CharacterFeatures, token_mask: pd.Series):
460 """Initialize with the text_features and token mask.
462 Args:
463 text_features: The text features to tokenize.
464 token_mask: The token mask identifying token characters as True
465 and characters between tokens as False.
466 """
467 super().__init__(text_features)
468 self.token_mask = token_mask
469 self.revmask = token_mask[::-1]
471 def get_next_token_loc(self, ref_ploc: int, token_num: int = -1) -> TokenLoc:
472 """Given the end of a prior token or possible start of the next, get
473 the "next" token's location.
474 If there is no subsequent token, then return None.
476 Args:
477 ref_ploc: The end ploc of the prior token or start of string.
478 token_num: The token position within its text string.
480 Returns:
481 The TokenLoc of the next token or None.
482 """
483 result = None
484 start_ploc = self._get_next_start(ref_ploc, self.token_mask)
485 if start_ploc is not None:
486 end_ploc = increment(start_ploc, self.token_mask)
487 result = TokenLoc(start_ploc, end_ploc, token_num=token_num)
488 return result
490 def get_prev_token_loc(self, from_token_loc: TokenLoc) -> TokenLoc:
491 """Get the previous token bounds before the given token start ploc.
492 If there is no prior token, then return None.
494 Args:
495 from_token_loc: The token location after the result.
497 Returns:
498 The TokenLoc of the prior token or None.
499 """
500 result = None
502 from_loc = from_token_loc.start_loc_excl
503 start_loc = increment(increment(from_loc, self.revmask), self.revmask)
504 if start_loc != from_loc:
505 start_loc += 1
506 end_loc = increment(start_loc, self.token_mask)
507 result = TokenLoc(start_loc, end_loc, token_num=from_token_loc.token_num - 1)
508 return result
511class DualTokenMask(TokenMask):
512 """A mask comprised of a mask for token starts and a mask for token ends."""
514 def __init__(
515 self,
516 text_features: CharacterFeatures,
517 start_mask: pd.Series,
518 end_mask: pd.Series,
519 ):
520 super().__init__(text_features)
521 self.start_mask = start_mask
522 self.end_mask = end_mask
523 # self.tok_starts = start_mask.index[start_mask]
524 # self.tok_ends = end_mask.index[end_mask]
525 self.tok_starts = start_mask
526 self.tok_ends = end_mask
527 self.rev_starts = self.tok_starts[::-1]
528 self.rev_ends = self.tok_starts[::-1]
530 def _get_token_end(self, start_ploc: int) -> int:
531 return self._get_next_start(start_ploc, self.tok_ends) + 1
533 def get_next_token_loc(self, ref_ploc: int, token_num: int = -1) -> TokenLoc:
534 """Given the end of a prior token or possible start of the next, get
535 the "next" token's location.
536 If there is no subsequent token, then return None.
538 Args:
539 ref_ploc: The end ploc of the prior token or start of string.
540 token_num: The token position within its text string.
542 Returns:
543 The TokenLoc of the next token or None.
544 """
545 result = None
546 start_ploc = self._get_next_start(ref_ploc, self.tok_starts)
547 if start_ploc is not None:
548 end_ploc = self._get_token_end(start_ploc)
549 result = TokenLoc(start_ploc, end_ploc, token_num=token_num)
550 return result
552 def get_prev_token_loc(self, from_token_loc: TokenLoc) -> TokenLoc:
553 """Get the previous token bounds before the given token start ploc.
554 If there is no prior token, then return None.
556 Args:
557 from_token_loc: The token location after the result.
559 Returns:
560 The TokenLoc of the prior token or None.
561 """
562 result = None
563 from_loc = from_token_loc.start_loc_excl
564 if from_loc > self.pad:
565 start_loc = increment(from_loc, self.rev_starts)
566 result = TokenLoc(
567 start_loc, self._get_token_end(start_loc), token_num=from_token_loc.token_num + 1
568 )
569 return result
572class Token:
573 """A structure identifying the token start (inclusive) and end (exclusive)
574 index positions according to text features mask.
576 NOTE: The masks in CharacterFeatures include padding, which displaces indices
577 relative to positions in the original text. In this class, padded indices
578 are referred to with a "p".
579 """
581 def __init__(
582 self,
583 token_mask: TokenMask,
584 token_loc: TokenLoc = None,
585 start_ploc: int = 0,
586 prev_token: "Token" = None,
587 next_token: "Token" = None,
588 normalize_fn: Callable[[str], str] = None,
589 ):
590 """Initialize the token pointer with text features and the token_mask.
592 Args:
593 token_mask: The token mask to use.
594 token_loc: The (padded) token location, if known or None.
595 If token_loc is None and start_ploc is 0, then this will be the
596 first token of the text.
597 start_ploc: The padded character index for the start of this
598 token as an alternate to specifying token_loc. If start_ploc is not
599 at a token character according to the token mask, then it will be
600 auto-incremented to the next token.
601 prev_token: The token prior to this token.
602 next_token: The token following this token.
603 normalize_fn: A function to normalize token text.
604 """
605 self.token_mask = token_mask
606 self._next = next_token
607 self._prev = prev_token
608 self.normalize_fn = normalize_fn
609 self._text = None
610 self._norm_text = None
611 self._pre_delims = None
612 self._post_delims = None
613 if token_loc is not None:
614 self.token_loc = token_loc
615 else:
616 self.token_loc = self.token_mask.get_next_token_loc(
617 max(start_ploc, token_mask.pad),
618 token_num=0,
619 )
620 # If token_loc is None, the text is empty
621 if self.token_loc is None:
622 self._text = ""
623 self.token_loc = TokenLoc(
624 self.token_mask.max_ploc + 1,
625 self.token_mask.max_ploc + 1,
626 token_num=0,
627 )
628 self._pre_delims = ""
629 self._post_delims = ""
631 def __repr__(self) -> str:
632 return f"Token({self.token_text}){self.token_loc}"
634 @property
635 def doctext(self) -> dk_doc.Text:
636 """Get the text object with metadata."""
637 return self.token_mask.text_features.doctext
639 @property
640 def full_text(self) -> str:
641 """Get the full original text of which this token is a part."""
642 return self.token_mask.text_features.text
644 @property
645 def text_id(self) -> Any:
646 """Get the full text ID."""
647 return self.token_mask.text_features.text_id
649 @property
650 def token_num(self) -> int:
651 """Get the position of this token within its text string."""
652 return self.token_loc.token_num
654 @property
655 def len(self) -> int:
656 """Get the length of this token."""
657 return self.token_loc.len
659 @property
660 def token_text(self) -> str:
661 """Get this token's original text."""
662 if self._text is None:
663 self._text = self.token_mask.get_text(self.token_loc)
664 return self._text
666 @property
667 def norm_text(self) -> str:
668 """Get this token's normalized text."""
669 if self._norm_text is None:
670 self._norm_text = (
671 self.normalize_fn(self.token_text)
672 if self.normalize_fn is not None
673 else self.token_text
674 )
675 return self._norm_text
677 @property
678 def start_pos(self) -> int:
679 """Get this token's start (incl) position in the original text."""
680 return self.token_loc.start_loc_incl - self.token_mask.pad
682 @property
683 def end_pos(self) -> int:
684 """Get this token's end (excl) position in the original text."""
685 return self.token_loc.end_loc_excl - self.token_mask.pad
687 @property
688 def token_pos(self) -> Tuple[int, int]:
689 """Get the token start (incl) and end (excl) indexes in the original text."""
690 return (self.start_pos, self.end_pos)
692 @property
693 def pre_delims(self) -> str:
694 if self._pre_delims is None:
695 delims = ""
696 prev_loc = self.token_mask.get_prev_token_loc(self.token_loc)
697 if prev_loc is not None:
698 delims = self.token_mask.get_padded_text(
699 prev_loc.end_loc_excl, self.token_loc.start_loc_incl
700 )
701 self._pre_delims = delims
702 return self._pre_delims
704 @property
705 def post_delims(self) -> str:
706 if self._post_delims is None:
707 delims = ""
708 next_loc = self.token_mask.get_next_token_loc(
709 self.token_loc.end_loc_excl,
710 )
711 if next_loc is not None:
712 delims = self.token_mask.get_padded_text(
713 self.token_loc.end_loc_excl, next_loc.start_loc_incl
714 )
715 else:
716 # There isn't a next token. Get remainder of text after tok.
717 delims = self.token_mask.get_padded_text(
718 self.token_loc.end_loc_excl,
719 self.token_mask.max_ploc,
720 )
722 self._post_delims = delims
723 return self._post_delims
725 @property
726 def next_token(self) -> "Token":
727 if self._next is None:
728 next_token_loc = self.token_mask.get_next_token_loc(
729 self.token_loc.end_loc_excl,
730 token_num=self.token_loc.token_num + 1,
731 )
732 if next_token_loc is not None:
733 self._next = Token(
734 self.token_mask,
735 token_loc=next_token_loc,
736 prev_token=self,
737 normalize_fn=self.normalize_fn,
738 )
739 return self._next
741 @property
742 def prev_token(self) -> "Token":
743 if self._prev is None:
744 prev_token_loc = self.token_mask.get_prev_token_loc(self.token_loc)
745 if prev_token_loc is not None:
746 self._prev = Token(
747 self.token_mask,
748 token_loc=prev_token_loc,
749 next_token=self,
750 normalize_fn=self.normalize_fn,
751 )
752 return self._prev
754 @property
755 def first_token(self) -> "Token":
756 """Get the first token for this token's input."""
757 first = self
758 while first.prev_token is not None:
759 first = first.prev_token
760 return first
762 @property
763 def last_token(self) -> "Token":
764 """Get the last token for this token's input."""
765 last = self
766 while last.next_token is not None:
767 last = last.next_token
768 return last