Coverage for anonymise/scrub.py: 21%
351 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
1"""
2crate_anon/anonymise/scrub.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Scrubber classes for CRATE anonymiser.**
28"""
30from abc import ABC, abstractmethod
31from collections import OrderedDict
32import datetime
33import logging
34import re
35import string
36from typing import (
37 Any,
38 Dict,
39 Iterable,
40 Generator,
41 List,
42 Optional,
43 Pattern,
44 Set,
45 Tuple,
46 TYPE_CHECKING,
47 Union,
48)
50if TYPE_CHECKING:
51 from re import Match
53from cardinal_pythonlib.datetimefunc import coerce_to_datetime
54from cardinal_pythonlib.file_io import gen_lines_without_comments
55from cardinal_pythonlib.hash import GenericHasher
56from cardinal_pythonlib.sql.validation import (
57 is_sqltype_date,
58 is_sqltype_text_over_one_char,
59)
60from cardinal_pythonlib.text import get_unicode_characters
62# from flashtext import KeywordProcessor
63from crate_anon.common.bugfix_flashtext import KeywordProcessorFixed
65# ... temp bugfix
67# noinspection PyPep8Naming
68from crate_anon.anonymise.constants import (
69 AnonymiseConfigDefaults as DA,
70 DATE_BLURRING_DIRECTIVES,
71 DATE_BLURRING_DIRECTIVES_CSV,
72 MONTH_3_LETTER_INDEX,
73 ScrubMethod,
74)
75from crate_anon.anonymise.anonregex import (
76 EMAIL_REGEX_STR,
77 DateRegexNames,
78 get_anon_fragments_from_string,
79 get_code_regex_elements,
80 get_date_regex_elements,
81 get_generic_date_regex_elements,
82 get_number_of_length_n_regex_elements,
83 get_phrase_regex_elements,
84 get_regex_from_elements,
85 get_regex_string_from_elements,
86 get_string_regex_elements,
87 get_uk_postcode_regex_elements,
88)
89from crate_anon.common.stringfunc import (
90 get_digit_string_from_vaguely_numeric_string,
91 reduce_to_alphanumeric,
92)
94log = logging.getLogger(__name__)
97# =============================================================================
98# Generic scrubber base class
99# =============================================================================
102class ScrubberBase(ABC):
103 """
104 Scrubber base class.
105 """
107 def __init__(self, hasher: GenericHasher) -> None:
108 """
109 Args:
110 hasher:
111 :class:`GenericHasher` to use to hash this scrubber (for
112 change-detection purposes); should be a secure hasher
113 """
114 self.hasher = hasher
116 @abstractmethod
117 def scrub(self, text: str) -> str:
118 """
119 Returns a scrubbed version of the text.
121 Args:
122 text: the raw text, potentially containing sensitive information
124 Returns:
125 the de-identified text
126 """
127 raise NotImplementedError("Implement in derived class")
129 @abstractmethod
130 def get_hash(self) -> str:
131 """
132 Returns a hash of our scrubber -- so we can store it, and later see if
133 it's changed. In an incremental update, if the scrubber has changed, we
134 should re-anonymise all data for this patient.
135 """
136 raise NotImplementedError("Implement in derived class")
139# =============================================================================
140# WordList
141# =============================================================================
144def lower_case_words_from_file(filename: str) -> Generator[str, None, None]:
145 """
146 Generates lower-case words from a file.
147 """
148 for line in gen_lines_without_comments(
149 filename, comment_at_start_only=True
150 ):
151 for word in line.split():
152 if word:
153 yield word.lower()
156def lower_case_phrase_lines_from_file(
157 filename: str,
158) -> Generator[str, None, None]:
159 """
160 Generates lower-case phrases from a file, one per line.
161 """
162 for line in gen_lines_without_comments(
163 filename, comment_at_start_only=True
164 ):
165 # line is pre-stripped (left/right) and not empty
166 yield line.lower()
169FLASHTEXT_WORD_CHARACTERS = set(
170 string.digits
171 + string.ascii_letters # part of flashtext default
172 + "_" # part of flashtext default
173 + get_unicode_characters("Latin_Alphabetic") # part of flashtext default
174)
175# Why do we do this? So e.g. "naïve" isn't truncated to "naï[~~~]".
176# Check: FLASHTEXT_WORDCHAR_STR = "".join(sorted(FLASHTEXT_WORD_CHARACTERS))
179class WordList(ScrubberBase):
180 """
181 A scrubber that removes all words in a wordlist, in case-insensitive
182 fashion.
184 This serves a dual function as an allowlist (is a word in the list?) and a
185 denylist (scrub text using the wordlist).
186 """
188 def __init__(
189 self,
190 filenames: Iterable[str] = None,
191 words: Iterable[str] = None,
192 as_phrases: bool = False,
193 replacement_text: str = "[---]",
194 hasher: GenericHasher = None,
195 suffixes: List[str] = None,
196 at_word_boundaries_only: bool = True,
197 max_errors: int = 0,
198 regex_method: bool = False,
199 ) -> None:
200 """
201 Args:
202 filenames:
203 Filenames to read words from.
204 words:
205 Additional words to add.
206 as_phrases:
207 Keep lines in the source file intact (as phrases), rather than
208 splitting them into individual words, and (if ``regex_method``
209 is True) scrub as phrases.
210 replacement_text:
211 Replace sensitive content with this string.
212 hasher:
213 :class:`GenericHasher` to use to hash this scrubber (for
214 change-detection purposes); should be a secure hasher.
215 suffixes:
216 Append each of these suffixes to each word.
217 at_word_boundaries_only:
218 Boolean. If set, ensure that the regex begins and ends with a
219 word boundary requirement. (If false: will scrub ``ANN`` from
220 ``bANNed``, for example.)
221 max_errors:
222 The maximum number of typographical insertion / deletion /
223 substitution errors to permit. Applicable only if
224 ``regex_method`` is True.
225 regex_method:
226 Use regular expressions? If True: slower, but phrase scrubbing
227 deals with variable whitespace. If False: much faster (uses
228 FlashText), but whitespace is inflexible.
229 """
230 if not regex_method and at_word_boundaries_only is False:
231 raise ValueError(
232 "FlashText (chosen by regex_method=False) will only work at "
233 "word boundaries, but at_word_boundaries_only is False"
234 )
235 filenames = filenames or []
236 words = words or []
238 super().__init__(hasher)
239 self.replacement_text = replacement_text
240 self.as_phrases = as_phrases
241 self.suffixes = suffixes or [] # type: List[str]
242 self.at_word_boundaries_only = at_word_boundaries_only
243 self.max_errors = max_errors
244 self.regex_method = regex_method
245 self._regex = None # type: Optional[Pattern[str]]
246 self._processor = None # type: Optional[KeywordProcessorFixed]
247 self._cached_hash = None # type: Optional[str]
248 self._built = False
250 self.words = set() # type: Set[str]
251 # Sets are faster than lists for "is x in s" operations:
252 # https://stackoverflow.com/questions/2831212/python-sets-vs-lists
253 # noinspection PyTypeChecker
254 for f in filenames:
255 self.add_file(f, clear_cache=False)
256 # noinspection PyTypeChecker
257 for w in words:
258 self.add_word(w, clear_cache=False)
259 # log.debug(f"Created wordlist with {len(self.words)} words")
261 def clear_cache(self) -> None:
262 """
263 Clear cached information (e.g. the compiled regex, the cached hash of
264 this scrubber).
265 """
266 self._built = False
267 self._regex = None # type: Optional[Pattern[str]]
268 self._processor = None # type: Optional[KeywordProcessorFixed]
269 self._cached_hash = None # type: Optional[str]
271 def add_word(self, word: str, clear_cache: bool = True) -> None:
272 """
273 Add a word to our wordlist.
275 Args:
276 word: word to add
277 clear_cache: also clear our cache?
278 """
279 if not word:
280 return
281 self.words.add(word.lower())
282 if clear_cache:
283 self.clear_cache()
285 def add_file(self, filename: str, clear_cache: bool = True) -> None:
286 """
287 Add all words from a file.
289 Args:
290 filename:
291 File to read.
292 clear_cache:
293 Also clear our cache?
294 """
295 if self.as_phrases:
296 wordgen = lower_case_phrase_lines_from_file(filename)
297 else:
298 wordgen = lower_case_words_from_file(filename)
299 for w in wordgen:
300 self.words.add(w)
301 if clear_cache:
302 self.clear_cache()
304 def contains(self, word: str) -> bool:
305 """
306 Does our wordlist contain this word?
307 """
308 return word.lower() in self.words
310 def get_hash(self) -> str:
311 # docstring in parent class
313 # A set is unordered.
314 # We want the hash to be the same if we have the same words, even if
315 # they were entered in a different order, so we need to sort:
316 if not self._cached_hash:
317 self._cached_hash = self.hasher.hash(sorted(self.words))
318 return self._cached_hash
320 def scrub(self, text: str) -> str:
321 # docstring in parent class
322 if not self._built:
323 self.build()
324 if self.regex_method:
325 if not self._regex:
326 return text
327 return self._regex.sub(self.replacement_text, text)
328 else:
329 if not self._processor:
330 return text
331 return self._processor.replace_keywords(text)
333 def _gen_word_and_suffixed(self, w: str) -> Iterable[str]:
334 """
335 Yields the word supplied plus suffixed versions.
336 """
337 yield w
338 for s in self.suffixes:
339 yield w + s
341 def build(self) -> None:
342 """
343 Compiles a high-speed scrubbing device, be it a regex or a FlashText
344 processor. Called only when we have collected all our words.
345 """
346 if self.regex_method:
347 elements = [] # type: List[str]
348 for w in self.words:
349 if self.as_phrases:
350 elements.extend(
351 get_phrase_regex_elements(
352 w,
353 suffixes=self.suffixes,
354 at_word_boundaries_only=self.at_word_boundaries_only, # noqa: E501
355 max_errors=self.max_errors,
356 )
357 )
358 else:
359 elements.extend(
360 get_string_regex_elements(
361 w,
362 suffixes=self.suffixes,
363 at_word_boundaries_only=self.at_word_boundaries_only, # noqa: E501
364 max_errors=self.max_errors,
365 )
366 )
367 log.debug(f"Building regex with {len(elements)} elements")
368 self._regex = get_regex_from_elements(elements)
369 else:
370 if self.words:
371 self._processor = KeywordProcessorFixed(case_sensitive=False)
372 self._processor.set_non_word_boundaries(
373 FLASHTEXT_WORD_CHARACTERS
374 )
375 replacement = self.replacement_text
376 log.debug(
377 f"Building FlashText processor with "
378 f"{len(self.words)} keywords"
379 )
380 for w in self.words:
381 for sw in self._gen_word_and_suffixed(w):
382 self._processor.add_keyword(sw, replacement)
383 else:
384 self._processor = None # type: Optional[KeywordProcessorFixed]
385 self._built = True
388# =============================================================================
389# NonspecificScrubber
390# =============================================================================
393class Replacer:
394 """
395 Custom regex replacement called from regex.sub().
396 This base class doesn't do much and is the equivalent of just passing the
397 replacement text to regex.sub().
398 """
400 def __init__(self, replacement_text: str) -> None:
401 self.replacement_text = replacement_text
403 def replace(self, match: "Match") -> str:
404 """
405 When re.sub() or regex.sub() is called, the "repl" argument can be
406 a function. If so, it's a function that takes a :class:`re.Match`
407 argument and returns the replacement text.
408 """
409 return self.replacement_text
412class NonspecificReplacer(Replacer):
413 """
414 Custom regex replacement for the Nonspecific scrubber. Currently this
415 will "blur" dates if replacement_text_all_dates contains any formatting
416 directives.
417 """
419 def __init__(self, replacement_text: str, replacement_text_all_dates: str):
420 """
421 Args:
422 replacement_text:
423 Generic text to use.
424 replacement_text_all_dates:
425 Replacement text to use if the matched text is a date. Can
426 include format specifiers to blur the date rather than
427 scrubbing it out entirely.
428 """
429 super().__init__(replacement_text)
431 self.replacement_text_all_dates = replacement_text_all_dates
432 self.slow_date_replacement = "%" in replacement_text_all_dates
434 def replace(self, match: "Match") -> str:
435 groupdict = match.groupdict()
436 if not self.is_a_date(groupdict):
437 return super().replace(match)
439 if self.slow_date_replacement:
440 date = self.parse_date(match, groupdict)
441 return date.strftime(self.replacement_text_all_dates)
443 return self.replacement_text_all_dates
445 @staticmethod
446 def is_a_date(groupdict: Dict[str, Any]) -> bool:
447 """
448 Is the match result a date? We detect this via our named regex groups.
449 """
450 return any(
451 groupdict.get(groupname) is not None
452 for groupname in (
453 DateRegexNames.DAY_MONTH_YEAR,
454 DateRegexNames.MONTH_DAY_YEAR,
455 DateRegexNames.YEAR_MONTH_DAY,
456 DateRegexNames.ISODATE_NO_SEP,
457 )
458 )
460 @staticmethod
461 def parse_date(
462 match: "Match", groupdict: Dict[str, Any]
463 ) -> datetime.datetime:
464 """
465 Retrieve a valid date from the Match object for blurring.
467 Valid regex group name combinations, where D == DateRegexNames:
469 D.ISODATE_NO_SEP: D.FOUR_DIGIT_YEAR,
471 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.TWO_DIGIT_YEAR,
472 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.FOUR_DIGIT_YEAR,
473 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.TWO_DIGIT_YEAR,
474 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.FOUR_DIGIT_YEAR,
476 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.TWO_DIGIT_YEAR,
477 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.FOUR_DIGIT_YEAR,
478 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.TWO_DIGIT_YEAR,
479 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.FOUR_DIGIT_YEAR,
481 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.TWO_DIGIT_YEAR,
482 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.FOUR_DIGIT_YEAR,
483 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.TWO_DIGIT_YEAR,
484 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.FOUR_DIGIT_YEAR,
485 """ # noqa: E501
487 # Simple special handling for ISO date format without separators.
488 isodate_no_sep = groupdict.get(DateRegexNames.ISODATE_NO_SEP)
489 if isodate_no_sep is not None:
490 return datetime.datetime.strptime(isodate_no_sep, "%Y%m%d")
492 # For all others, extract D/M/Y information.
494 year = groupdict.get(DateRegexNames.FOUR_DIGIT_YEAR)
495 if year is None:
496 two_digit_year = match.group(DateRegexNames.TWO_DIGIT_YEAR)
498 # Will convert:
499 # 00-68 -> 2000-2068
500 # 69-99 -> 1969-1999
501 year = datetime.datetime.strptime(two_digit_year, "%y").year
503 numeric_day = match.group(DateRegexNames.NUMERIC_DAY)
505 numeric_month = groupdict.get(DateRegexNames.NUMERIC_MONTH)
506 if numeric_month is None:
507 three_letter_month = match.group(
508 DateRegexNames.ALPHABETICAL_MONTH
509 )[:3]
510 numeric_month = MONTH_3_LETTER_INDEX.get(three_letter_month)
512 return datetime.datetime(
513 int(year), int(numeric_month), int(numeric_day)
514 )
517class NonspecificScrubber(ScrubberBase):
518 """
519 Scrubs a bunch of things that are independent of any patient-specific data,
520 such as removing all UK postcodes, or numbers of a certain length.
521 """
523 def __init__(
524 self,
525 hasher: GenericHasher,
526 replacement_text: str = DA.REPLACE_NONSPECIFIC_INFO_WITH,
527 anonymise_codes_at_word_boundaries_only: bool = DA.ANONYMISE_CODES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
528 anonymise_dates_at_word_boundaries_only: bool = DA.ANONYMISE_DATES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
529 anonymise_numbers_at_word_boundaries_only: bool = DA.ANONYMISE_NUMBERS_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
530 denylist: WordList = None,
531 scrub_all_numbers_of_n_digits: List[int] = None,
532 scrub_all_uk_postcodes: bool = DA.SCRUB_ALL_UK_POSTCODES,
533 scrub_all_dates: bool = DA.SCRUB_ALL_DATES,
534 replacement_text_all_dates: str = DA.REPLACE_ALL_DATES_WITH,
535 scrub_all_email_addresses: bool = DA.SCRUB_ALL_EMAIL_ADDRESSES,
536 extra_regexes: Optional[List[str]] = None,
537 ) -> None:
538 """
539 Args:
540 replacement_text:
541 Replace sensitive content with this string.
542 hasher:
543 :class:`GenericHasher` to use to hash this scrubber (for
544 change-detection purposes); should be a secure hasher
545 anonymise_codes_at_word_boundaries_only:
546 For codes: Boolean. Ensure that the regex begins and ends with
547 a word boundary requirement.
548 anonymise_dates_at_word_boundaries_only:
549 Scrub dates only if they occur at word boundaries. (Even if you
550 say no, there are *some* restrictions or very odd things would
551 happen; see
552 :func:`crate_anon.anonymise.anonregex.get_generic_date_regex_elements`.)
553 anonymise_numbers_at_word_boundaries_only:
554 For numbers: Boolean. If set, ensure that the regex begins and
555 ends with a word boundary requirement. If not set, the regex
556 must be surrounded by non-digits. (If it were surrounded by
557 more digits, it wouldn't be an n-digit number!)
558 denylist:
559 Words to scrub.
560 scrub_all_numbers_of_n_digits:
561 List of values of n; number lengths to scrub.
562 scrub_all_uk_postcodes:
563 Scrub all UK postcodes?
564 scrub_all_dates:
565 Scrub all dates? (Currently assumes the default locale for
566 month names and ordinal suffixes.)
567 replacement_text_all_dates:
568 When scrub_all_dates is True, replace with this text.
569 Supports limited datetime.strftime directives for "blurring" of
570 dates. Example: "%b %Y" for abbreviated month and year.
571 scrub_all_email_addresses:
572 Scrub all e-mail addresses?
573 extra_regexes:
574 List of user-defined extra regexes to scrub.
575 """
576 scrub_all_numbers_of_n_digits = scrub_all_numbers_of_n_digits or []
578 super().__init__(hasher)
579 self.replacement_text = replacement_text
580 self.anonymise_codes_at_word_boundaries_only = (
581 anonymise_codes_at_word_boundaries_only
582 )
583 self.anonymise_dates_at_word_boundaries_only = (
584 anonymise_dates_at_word_boundaries_only
585 )
586 self.anonymise_numbers_at_word_boundaries_only = (
587 anonymise_numbers_at_word_boundaries_only
588 )
589 self.denylist = denylist
590 self.scrub_all_numbers_of_n_digits = scrub_all_numbers_of_n_digits
591 self.scrub_all_uk_postcodes = scrub_all_uk_postcodes
592 self.scrub_all_dates = scrub_all_dates
594 self.replacement_text_all_dates = replacement_text_all_dates
595 self.check_replacement_text_all_dates()
596 self.replacer = self.get_replacer()
598 self.scrub_all_email_addresses = scrub_all_email_addresses
599 self.extra_regexes = extra_regexes
601 self._cached_hash = None # type: Optional[str]
602 self._regex = None # type: Optional[Pattern[str]]
603 self._regex_built = False
604 self.build_regex()
606 def get_replacer(self) -> Replacer:
607 """
608 Return a function that can be used as the "repl" (replacer) argument
609 to a re.sub() or regex.sub() call.
610 """
611 if (
612 self.replacement_text == self.replacement_text_all_dates
613 and "%" not in self.replacement_text_all_dates
614 ):
615 # Fast, simple
616 return Replacer(self.replacement_text)
618 # Handle dates in a more complex way, e.g. blurring them:
619 return NonspecificReplacer(
620 self.replacement_text, self.replacement_text_all_dates
621 )
623 def check_replacement_text_all_dates(self) -> None:
624 """
625 Ensure our date-replacement text is legitimate in terms of e.g.
626 "%Y"-style directives.
627 """
628 bad = False
629 possible_percent_chars = "".join(DATE_BLURRING_DIRECTIVES)
630 if re.search(
631 rf"%[^{possible_percent_chars}]", self.replacement_text_all_dates
632 ):
633 bad = True
634 else:
635 # Double-check:
636 test_date = datetime.date(2000, 12, 31)
637 try:
638 test_date.strftime(self.replacement_text_all_dates)
639 except ValueError:
640 bad = True
641 if bad:
642 raise ValueError(
643 f"Bad format {self.replacement_text_all_dates!r} for date "
644 "scrubbing. Allowed directives are: "
645 f"{DATE_BLURRING_DIRECTIVES_CSV}"
646 )
648 def get_hash(self) -> str:
649 # docstring in parent class
650 if not self._cached_hash:
651 self._cached_hash = self.hasher.hash(
652 [
653 # signature, used for hashing:
654 self.anonymise_codes_at_word_boundaries_only,
655 self.anonymise_numbers_at_word_boundaries_only,
656 self.denylist.get_hash() if self.denylist else None,
657 self.scrub_all_numbers_of_n_digits,
658 self.scrub_all_uk_postcodes,
659 ]
660 )
661 return self._cached_hash
663 def scrub(self, text: str) -> str:
664 # docstring in parent class
665 if not self._regex_built:
666 self.build_regex()
667 if self.denylist:
668 text = self.denylist.scrub(text)
669 if not self._regex: # possible; may be blank
670 return text
671 return self._regex.sub(self.replacer.replace, text)
673 def build_regex(self) -> None:
674 """
675 Compile our high-speed regex.
676 """
677 elements = [] # type: List[str]
678 if self.scrub_all_uk_postcodes:
679 elements.extend(
680 get_uk_postcode_regex_elements(
681 at_word_boundaries_only=(
682 self.anonymise_codes_at_word_boundaries_only
683 )
684 )
685 )
686 # noinspection PyTypeChecker
687 for n in self.scrub_all_numbers_of_n_digits:
688 elements.extend(
689 get_number_of_length_n_regex_elements(
690 n,
691 at_word_boundaries_only=(
692 self.anonymise_numbers_at_word_boundaries_only
693 ),
694 )
695 )
696 if self.scrub_all_dates:
697 elements.extend(
698 get_generic_date_regex_elements(
699 at_word_boundaries_only=self.anonymise_dates_at_word_boundaries_only # noqa: E501
700 )
701 )
702 if self.scrub_all_email_addresses:
703 elements.append(EMAIL_REGEX_STR)
704 if self.extra_regexes:
705 elements.extend(self.extra_regexes)
706 self._regex = get_regex_from_elements(elements)
707 self._regex_built = True
710# =============================================================================
711# PersonalizedScrubber
712# =============================================================================
715class PersonalizedScrubber(ScrubberBase):
716 """
717 Accepts patient-specific (patient and third-party) information, and uses
718 that to scrub text.
719 """
721 def __init__(
722 self,
723 hasher: GenericHasher,
724 replacement_text_patient: str = DA.REPLACE_PATIENT_INFO_WITH,
725 replacement_text_third_party: str = DA.REPLACE_THIRD_PARTY_INFO_WITH, # noqa: E501
726 anonymise_codes_at_word_boundaries_only: bool = DA.ANONYMISE_CODES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
727 anonymise_codes_at_numeric_boundaries_only: bool = DA.ANONYMISE_CODES_AT_NUMERIC_BOUNDARIES_ONLY, # noqa: E501
728 anonymise_dates_at_word_boundaries_only: bool = DA.ANONYMISE_DATES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
729 anonymise_numbers_at_word_boundaries_only: bool = DA.ANONYMISE_NUMBERS_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
730 anonymise_numbers_at_numeric_boundaries_only: bool = DA.ANONYMISE_NUMBERS_AT_NUMERIC_BOUNDARIES_ONLY, # noqa: E501
731 anonymise_strings_at_word_boundaries_only: bool = DA.ANONYMISE_STRINGS_AT_WORD_BOUNDARIES_ONLY, # noqa: E501
732 min_string_length_for_errors: int = DA.MIN_STRING_LENGTH_FOR_ERRORS,
733 min_string_length_to_scrub_with: int = DA.MIN_STRING_LENGTH_TO_SCRUB_WITH, # noqa: E501
734 scrub_string_suffixes: List[str] = None,
735 string_max_regex_errors: int = DA.STRING_MAX_REGEX_ERRORS,
736 allowlist: WordList = None,
737 alternatives: List[List[str]] = None,
738 nonspecific_scrubber: NonspecificScrubber = None,
739 nonspecific_scrubber_first: bool = DA.NONSPECIFIC_SCRUBBER_FIRST,
740 debug: bool = False,
741 ) -> None:
742 """
743 Args:
744 hasher:
745 :class:`GenericHasher` to use to hash this scrubber (for
746 change-detection purposes); should be a secure hasher.
747 replacement_text_patient:
748 Replace sensitive "patient" content with this string.
749 replacement_text_third_party:
750 Replace sensitive "third party" content with this string.
751 anonymise_codes_at_word_boundaries_only:
752 For codes: Boolean. Ensure that the regex begins and ends with
753 a word boundary requirement.
754 anonymise_codes_at_numeric_boundaries_only:
755 For codes: Boolean. Only applicable if
756 anonymise_codes_at_word_boundaries_only is False. Ensure that
757 the code is only recognized when surrounded by non-numbers;
758 that is, only at the boundaries of numbers (at numeric
759 boundaries). See
760 :func:`crate_anon.anonymise.anonregex.get_code_regex_elements`.
761 anonymise_dates_at_word_boundaries_only:
762 For dates: Boolean. Ensure that the regex begins and ends with
763 a word boundary requirement.
764 anonymise_numbers_at_word_boundaries_only:
765 For numbers: Boolean. Ensure that the regex begins and ends
766 with a word boundary requirement. See
767 :func:`crate_anon.anonymise.anonregex.get_code_regex_elements`.
768 anonymise_numbers_at_numeric_boundaries_only:
769 For numbers: Boolean. Only applicable if
770 anonymise_numbers_at_word_boundaries_only is False. Ensure that
771 the number is only recognized when surrounded by
772 non-numbers; that is, only at the boundaries of numbers (at
773 numeric boundaries). See
774 :func:`crate_anon.anonymise.anonregex.get_code_regex_elements`.
775 anonymise_strings_at_word_boundaries_only:
776 For strings: Boolean. Ensure that the regex begins and ends
777 with a word boundary requirement.
778 min_string_length_for_errors:
779 For strings: minimum string length at which typographical
780 errors will be permitted.
781 min_string_length_to_scrub_with:
782 For strings: minimum string length at which the string will be
783 permitted to be scrubbed with.
784 scrub_string_suffixes:
785 A list of suffixes to permit on strings.
786 string_max_regex_errors:
787 The maximum number of typographical insertion / deletion /
788 substitution errors to permit.
789 allowlist:
790 :class:`WordList` of words to allow (not to scrub).
791 alternatives:
792 This allows words to be substituted by equivalents; such as
793 ``St`` for ``Street`` or ``Rd`` for ``Road``. The parameter is
794 a list of lists of equivalents; see
795 :func:`crate_anon.anonymise.config.get_word_alternatives`.
796 nonspecific_scrubber:
797 :class:`NonspecificScrubber` to apply to remove information
798 that is generic.
799 nonspecific_scrubber_first:
800 If one is provided, run the nonspecific scrubber first (rather
801 than last)?
802 debug:
803 Show the final scrubber regex text as we compile our regexes.
804 """
805 scrub_string_suffixes = scrub_string_suffixes or []
807 super().__init__(hasher)
808 self.replacement_text_patient = replacement_text_patient
809 self.replacement_text_third_party = replacement_text_third_party
810 self.anonymise_codes_at_word_boundaries_only = (
811 anonymise_codes_at_word_boundaries_only
812 )
813 self.anonymise_codes_at_numeric_boundaries_only = (
814 anonymise_codes_at_numeric_boundaries_only
815 )
816 self.anonymise_dates_at_word_boundaries_only = (
817 anonymise_dates_at_word_boundaries_only
818 )
819 self.anonymise_numbers_at_word_boundaries_only = (
820 anonymise_numbers_at_word_boundaries_only
821 )
822 self.anonymise_numbers_at_numeric_boundaries_only = (
823 anonymise_numbers_at_numeric_boundaries_only
824 )
825 self.anonymise_strings_at_word_boundaries_only = (
826 anonymise_strings_at_word_boundaries_only
827 )
828 self.min_string_length_for_errors = min_string_length_for_errors
829 self.min_string_length_to_scrub_with = min_string_length_to_scrub_with
830 self.scrub_string_suffixes = scrub_string_suffixes
831 self.string_max_regex_errors = string_max_regex_errors
832 self.allowlist = allowlist
833 self.alternatives = alternatives
834 self.nonspecific_scrubber = nonspecific_scrubber
835 self.nonspecific_scrubber_first = nonspecific_scrubber_first
836 self.debug = debug
838 # Regex information
839 self.re_patient = None # type: Optional[Pattern[str]]
840 self.re_tp = None # type: Optional[Pattern[str]]
841 self.regexes_built = False
842 self.re_patient_elements = [] # type: List[str]
843 self.re_tp_elements = [] # type: List[str]
844 # ... both changed from set to list to reflect referee's point re
845 # potential importance of scrubber order
846 self.elements_tuplelist = (
847 []
848 ) # type: List[Tuple[bool, ScrubMethod, str]]
849 # ... list of tuples: (patient?, type, value)
850 # ... used for get_raw_info(); since we've made the order important,
851 # we should detect changes in order here as well
852 self.clear_cache()
854 def clear_cache(self) -> None:
855 """
856 Clear the internal cache (the compiled regex).
857 """
858 self.regexes_built = False
860 @staticmethod
861 def get_scrub_method(
862 datatype_long: str, scrub_method: Optional[ScrubMethod]
863 ) -> ScrubMethod:
864 """
865 Return the default scrub method for a given SQL datatype, unless
866 overridden. For example, dates are scrubbed via a date method; numbers
867 by a numeric method.
869 Args:
870 datatype_long: SQL datatype as a string
871 scrub_method: optional method to enforce
873 Returns:
874 :class:`crate_anon.anonymise.constants.SCRUBMETHOD` value
875 """
876 if scrub_method is not None:
877 return scrub_method
878 elif is_sqltype_date(datatype_long):
879 return ScrubMethod.DATE
880 elif is_sqltype_text_over_one_char(datatype_long):
881 return ScrubMethod.WORDS
882 else:
883 return ScrubMethod.NUMERIC
885 def add_value(
886 self,
887 value: Any,
888 scrub_method: ScrubMethod,
889 patient: bool = True,
890 clear_cache: bool = True,
891 ) -> None:
892 """
893 Add a specific value via a specific scrub_method.
895 Args:
896 value:
897 value to add to the scrubber
898 scrub_method:
899 :class:`crate_anon.anonymise.constants.SCRUBMETHOD` value
900 patient:
901 Boolean; controls whether it's treated as a patient value or a
902 third-party value.
903 clear_cache:
904 also clear our cache?
905 """
906 if value is None:
907 return
908 new_tuple = (patient, scrub_method, repr(value))
909 if new_tuple not in self.elements_tuplelist:
910 self.elements_tuplelist.append(new_tuple)
911 # Note: object reference
912 r = self.re_patient_elements if patient else self.re_tp_elements
914 if scrub_method is ScrubMethod.DATE:
915 elements = self.get_elements_date(value)
916 elif scrub_method is ScrubMethod.WORDS:
917 elements = self.get_elements_words(value)
918 elif scrub_method is ScrubMethod.PHRASE:
919 elements = self.get_elements_phrase(value)
920 elif scrub_method is ScrubMethod.PHRASE_UNLESS_NUMERIC:
921 elements = self.get_elements_phrase_unless_numeric(value)
922 elif scrub_method is ScrubMethod.NUMERIC:
923 elements = self.get_elements_numeric(value)
924 elif scrub_method is ScrubMethod.CODE:
925 elements = self.get_elements_code(value)
926 else:
927 raise ValueError(
928 f"Bug: unknown scrub_method to add_value: " f"{scrub_method}"
929 )
930 r.extend(elements)
931 if clear_cache:
932 self.clear_cache()
934 def get_elements_date(
935 self, value: Union[datetime.datetime, datetime.date]
936 ) -> Optional[List[str]]:
937 """
938 Returns a list of regex elements for a given date value.
939 """
940 try:
941 value = coerce_to_datetime(value)
942 except Exception as e:
943 log.warning(
944 f"Invalid date received to PersonalizedScrubber. "
945 f"get_elements_date(): value={value}, exception={e}"
946 )
947 return
948 return get_date_regex_elements(
949 value,
950 at_word_boundaries_only=(
951 self.anonymise_dates_at_word_boundaries_only
952 ),
953 )
955 def get_elements_words(self, value: str) -> List[str]:
956 """
957 Returns a list of regex elements for a given string that contains
958 textual words.
959 """
960 elements = [] # type: List[str]
961 for s in get_anon_fragments_from_string(str(value)):
962 length = len(s)
963 if length < self.min_string_length_to_scrub_with:
964 # With numbers: if you use the length limit, you may see
965 # numeric parts of addresses, e.g. 4 Drury Lane as
966 # 4 [___] [___]. However, if you exempt numbers then you
967 # mess up a whole bunch of quantitative information, such
968 # as "the last 4-5 years" getting wiped to "the last
969 # [___]-5 years". So let's apply the length limit
970 # consistently.
971 continue
972 if self.allowlist and self.allowlist.contains(s):
973 continue
974 if length >= self.min_string_length_for_errors:
975 max_errors = self.string_max_regex_errors
976 else:
977 max_errors = 0
978 elements.extend(
979 get_string_regex_elements(
980 s,
981 self.scrub_string_suffixes,
982 max_errors=max_errors,
983 at_word_boundaries_only=(
984 self.anonymise_strings_at_word_boundaries_only
985 ),
986 )
987 )
988 return elements
990 def get_elements_phrase(self, value: Any) -> List[str]:
991 """
992 Returns a list of regex elements for a given phrase.
993 """
994 value = str(value).strip()
995 if not value:
996 return []
997 length = len(value)
998 if length < self.min_string_length_to_scrub_with:
999 return []
1000 if self.allowlist and self.allowlist.contains(value):
1001 return []
1002 if length >= self.min_string_length_for_errors:
1003 max_errors = self.string_max_regex_errors
1004 else:
1005 max_errors = 0
1006 return get_phrase_regex_elements(
1007 value,
1008 max_errors=max_errors,
1009 at_word_boundaries_only=(
1010 self.anonymise_strings_at_word_boundaries_only
1011 ),
1012 alternatives=self.alternatives,
1013 )
1015 def get_elements_phrase_unless_numeric(self, value: Any) -> List[str]:
1016 """
1017 If the value is numeric, return an empty list. Otherwise, returns a
1018 list of regex elements for the given phrase.
1019 """
1020 try:
1021 _ = float(value)
1022 return []
1023 except (TypeError, ValueError):
1024 return self.get_elements_phrase(value)
1026 def get_elements_numeric(self, value: Any) -> List[str]:
1027 """
1028 Start with a number. Remove everything but the digits. Build a regex
1029 that scrubs the number.
1031 Particular examples: phone numbers, e.g. ``"(01223) 123456"``.
1033 Args:
1034 value: a string containing a number, or an actual number.
1036 Returns:
1037 a list of regex elements
1038 """
1039 return get_code_regex_elements(
1040 get_digit_string_from_vaguely_numeric_string(str(value)),
1041 at_word_boundaries_only=(
1042 self.anonymise_numbers_at_word_boundaries_only
1043 ),
1044 at_numeric_boundaries_only=(
1045 self.anonymise_numbers_at_numeric_boundaries_only
1046 ),
1047 )
1049 def get_elements_code(self, value: Any) -> List[str]:
1050 """
1051 Start with an alphanumeric code. Remove whitespace. Build a regex that
1052 scrubs the code.
1054 Particular examples: postcodes, e.g. ``"PE12 3AB"``.
1056 Args:
1057 value: a string containing containing an alphanumeric code
1059 Returns:
1060 a list of regex elements
1061 """
1062 return get_code_regex_elements(
1063 reduce_to_alphanumeric(str(value)),
1064 at_word_boundaries_only=(
1065 self.anonymise_codes_at_word_boundaries_only
1066 ),
1067 at_numeric_boundaries_only=(
1068 self.anonymise_codes_at_numeric_boundaries_only
1069 ),
1070 )
1072 def get_patient_regex_string(self) -> str:
1073 """
1074 Return the string version of the patient regex, sorted.
1075 """
1076 return get_regex_string_from_elements(self.re_patient_elements)
1078 def get_tp_regex_string(self) -> str:
1079 """
1080 Return the string version of the third-party regex, sorted.
1081 """
1082 return get_regex_string_from_elements(self.re_tp_elements)
1084 def build_regexes(self) -> None:
1085 """
1086 Compile our regexes.
1087 """
1088 self.re_patient = get_regex_from_elements(self.re_patient_elements)
1089 self.re_tp = get_regex_from_elements(self.re_tp_elements)
1090 self.regexes_built = True
1091 # Note that the regexes themselves may be None even if they have
1092 # been built.
1093 if self.debug:
1094 log.debug(f"Patient scrubber: {self.get_patient_regex_string()}")
1095 log.debug(f"Third party scrubber: {self.get_tp_regex_string()}")
1097 def scrub(self, text: str) -> Optional[str]:
1098 # docstring in parent class
1099 if text is None:
1100 return None
1101 if not self.regexes_built:
1102 self.build_regexes()
1104 # If nonspecific_scrubber_first:
1105 # (1) nonspecific, (2) patient, (3) third party.
1106 # Otherwise:
1107 # (1) patient, (2) third party, (3) nonspecific.
1108 if self.nonspecific_scrubber and self.nonspecific_scrubber_first:
1109 text = self.nonspecific_scrubber.scrub(text)
1110 if self.re_patient:
1111 text = self.re_patient.sub(self.replacement_text_patient, text)
1112 if self.re_tp:
1113 text = self.re_tp.sub(self.replacement_text_third_party, text)
1114 if self.nonspecific_scrubber and not self.nonspecific_scrubber_first:
1115 text = self.nonspecific_scrubber.scrub(text)
1116 return text
1118 def get_hash(self) -> str:
1119 # docstring in parent class
1120 return self.hasher.hash(self.get_raw_info())
1122 def get_raw_info(self) -> Dict[str, Any]:
1123 """
1124 Summarizes settings and (sensitive) data for this scrubber.
1126 This is both a summary for debugging and the basis for our
1127 change-detection hash (and for the latter reason we need order etc. to
1128 be consistent). For any information we put in here, changes will cause
1129 data to be re-scrubbed.
1131 Note that the hasher should be a secure one, because this is sensitive
1132 information.
1133 """
1134 # We use a list of tuples to make an OrderedDict.
1135 d = (
1136 (
1137 "anonymise_codes_at_word_boundaries_only",
1138 self.anonymise_codes_at_word_boundaries_only,
1139 ),
1140 (
1141 "anonymise_codes_at_numeric_boundaries_only",
1142 self.anonymise_codes_at_numeric_boundaries_only,
1143 ),
1144 (
1145 "anonymise_dates_at_word_boundaries_only",
1146 self.anonymise_dates_at_word_boundaries_only,
1147 ),
1148 (
1149 "anonymise_numbers_at_word_boundaries_only",
1150 self.anonymise_numbers_at_word_boundaries_only,
1151 ),
1152 (
1153 "anonymise_numbers_at_numeric_boundaries_only",
1154 self.anonymise_numbers_at_numeric_boundaries_only,
1155 ),
1156 (
1157 "anonymise_strings_at_word_boundaries_only",
1158 self.anonymise_strings_at_word_boundaries_only,
1159 ),
1160 (
1161 "min_string_length_for_errors",
1162 self.min_string_length_for_errors,
1163 ),
1164 (
1165 "min_string_length_to_scrub_with",
1166 self.min_string_length_to_scrub_with,
1167 ),
1168 ("scrub_string_suffixes", sorted(self.scrub_string_suffixes)),
1169 ("string_max_regex_errors", self.string_max_regex_errors),
1170 (
1171 "allowlist_hash",
1172 self.allowlist.get_hash() if self.allowlist else None,
1173 ),
1174 (
1175 "nonspecific_scrubber_hash",
1176 (
1177 self.nonspecific_scrubber.get_hash()
1178 if self.nonspecific_scrubber
1179 else None
1180 ),
1181 ),
1182 ("elements", self.elements_tuplelist),
1183 )
1184 return OrderedDict(d)