Coverage for anonymise/scrub.py: 21%

351 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-02-05 06:46 -0600

1""" 

2crate_anon/anonymise/scrub.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Scrubber classes for CRATE anonymiser.** 

27 

28""" 

29 

30from abc import ABC, abstractmethod 

31from collections import OrderedDict 

32import datetime 

33import logging 

34import re 

35import string 

36from typing import ( 

37 Any, 

38 Dict, 

39 Iterable, 

40 Generator, 

41 List, 

42 Optional, 

43 Pattern, 

44 Set, 

45 Tuple, 

46 TYPE_CHECKING, 

47 Union, 

48) 

49 

50if TYPE_CHECKING: 

51 from re import Match 

52 

53from cardinal_pythonlib.datetimefunc import coerce_to_datetime 

54from cardinal_pythonlib.file_io import gen_lines_without_comments 

55from cardinal_pythonlib.hash import GenericHasher 

56from cardinal_pythonlib.sql.validation import ( 

57 is_sqltype_date, 

58 is_sqltype_text_over_one_char, 

59) 

60from cardinal_pythonlib.text import get_unicode_characters 

61 

62# from flashtext import KeywordProcessor 

63from crate_anon.common.bugfix_flashtext import KeywordProcessorFixed 

64 

65# ... temp bugfix 

66 

67# noinspection PyPep8Naming 

68from crate_anon.anonymise.constants import ( 

69 AnonymiseConfigDefaults as DA, 

70 DATE_BLURRING_DIRECTIVES, 

71 DATE_BLURRING_DIRECTIVES_CSV, 

72 MONTH_3_LETTER_INDEX, 

73 ScrubMethod, 

74) 

75from crate_anon.anonymise.anonregex import ( 

76 EMAIL_REGEX_STR, 

77 DateRegexNames, 

78 get_anon_fragments_from_string, 

79 get_code_regex_elements, 

80 get_date_regex_elements, 

81 get_generic_date_regex_elements, 

82 get_number_of_length_n_regex_elements, 

83 get_phrase_regex_elements, 

84 get_regex_from_elements, 

85 get_regex_string_from_elements, 

86 get_string_regex_elements, 

87 get_uk_postcode_regex_elements, 

88) 

89from crate_anon.common.stringfunc import ( 

90 get_digit_string_from_vaguely_numeric_string, 

91 reduce_to_alphanumeric, 

92) 

93 

94log = logging.getLogger(__name__) 

95 

96 

97# ============================================================================= 

98# Generic scrubber base class 

99# ============================================================================= 

100 

101 

102class ScrubberBase(ABC): 

103 """ 

104 Scrubber base class. 

105 """ 

106 

107 def __init__(self, hasher: GenericHasher) -> None: 

108 """ 

109 Args: 

110 hasher: 

111 :class:`GenericHasher` to use to hash this scrubber (for 

112 change-detection purposes); should be a secure hasher 

113 """ 

114 self.hasher = hasher 

115 

116 @abstractmethod 

117 def scrub(self, text: str) -> str: 

118 """ 

119 Returns a scrubbed version of the text. 

120 

121 Args: 

122 text: the raw text, potentially containing sensitive information 

123 

124 Returns: 

125 the de-identified text 

126 """ 

127 raise NotImplementedError("Implement in derived class") 

128 

129 @abstractmethod 

130 def get_hash(self) -> str: 

131 """ 

132 Returns a hash of our scrubber -- so we can store it, and later see if 

133 it's changed. In an incremental update, if the scrubber has changed, we 

134 should re-anonymise all data for this patient. 

135 """ 

136 raise NotImplementedError("Implement in derived class") 

137 

138 

139# ============================================================================= 

140# WordList 

141# ============================================================================= 

142 

143 

144def lower_case_words_from_file(filename: str) -> Generator[str, None, None]: 

145 """ 

146 Generates lower-case words from a file. 

147 """ 

148 for line in gen_lines_without_comments( 

149 filename, comment_at_start_only=True 

150 ): 

151 for word in line.split(): 

152 if word: 

153 yield word.lower() 

154 

155 

156def lower_case_phrase_lines_from_file( 

157 filename: str, 

158) -> Generator[str, None, None]: 

159 """ 

160 Generates lower-case phrases from a file, one per line. 

161 """ 

162 for line in gen_lines_without_comments( 

163 filename, comment_at_start_only=True 

164 ): 

165 # line is pre-stripped (left/right) and not empty 

166 yield line.lower() 

167 

168 

169FLASHTEXT_WORD_CHARACTERS = set( 

170 string.digits 

171 + string.ascii_letters # part of flashtext default 

172 + "_" # part of flashtext default 

173 + get_unicode_characters("Latin_Alphabetic") # part of flashtext default 

174) 

175# Why do we do this? So e.g. "naïve" isn't truncated to "naï[~~~]". 

176# Check: FLASHTEXT_WORDCHAR_STR = "".join(sorted(FLASHTEXT_WORD_CHARACTERS)) 

177 

178 

179class WordList(ScrubberBase): 

180 """ 

181 A scrubber that removes all words in a wordlist, in case-insensitive 

182 fashion. 

183 

184 This serves a dual function as an allowlist (is a word in the list?) and a 

185 denylist (scrub text using the wordlist). 

186 """ 

187 

188 def __init__( 

189 self, 

190 filenames: Iterable[str] = None, 

191 words: Iterable[str] = None, 

192 as_phrases: bool = False, 

193 replacement_text: str = "[---]", 

194 hasher: GenericHasher = None, 

195 suffixes: List[str] = None, 

196 at_word_boundaries_only: bool = True, 

197 max_errors: int = 0, 

198 regex_method: bool = False, 

199 ) -> None: 

200 """ 

201 Args: 

202 filenames: 

203 Filenames to read words from. 

204 words: 

205 Additional words to add. 

206 as_phrases: 

207 Keep lines in the source file intact (as phrases), rather than 

208 splitting them into individual words, and (if ``regex_method`` 

209 is True) scrub as phrases. 

210 replacement_text: 

211 Replace sensitive content with this string. 

212 hasher: 

213 :class:`GenericHasher` to use to hash this scrubber (for 

214 change-detection purposes); should be a secure hasher. 

215 suffixes: 

216 Append each of these suffixes to each word. 

217 at_word_boundaries_only: 

218 Boolean. If set, ensure that the regex begins and ends with a 

219 word boundary requirement. (If false: will scrub ``ANN`` from 

220 ``bANNed``, for example.) 

221 max_errors: 

222 The maximum number of typographical insertion / deletion / 

223 substitution errors to permit. Applicable only if 

224 ``regex_method`` is True. 

225 regex_method: 

226 Use regular expressions? If True: slower, but phrase scrubbing 

227 deals with variable whitespace. If False: much faster (uses 

228 FlashText), but whitespace is inflexible. 

229 """ 

230 if not regex_method and at_word_boundaries_only is False: 

231 raise ValueError( 

232 "FlashText (chosen by regex_method=False) will only work at " 

233 "word boundaries, but at_word_boundaries_only is False" 

234 ) 

235 filenames = filenames or [] 

236 words = words or [] 

237 

238 super().__init__(hasher) 

239 self.replacement_text = replacement_text 

240 self.as_phrases = as_phrases 

241 self.suffixes = suffixes or [] # type: List[str] 

242 self.at_word_boundaries_only = at_word_boundaries_only 

243 self.max_errors = max_errors 

244 self.regex_method = regex_method 

245 self._regex = None # type: Optional[Pattern[str]] 

246 self._processor = None # type: Optional[KeywordProcessorFixed] 

247 self._cached_hash = None # type: Optional[str] 

248 self._built = False 

249 

250 self.words = set() # type: Set[str] 

251 # Sets are faster than lists for "is x in s" operations: 

252 # https://stackoverflow.com/questions/2831212/python-sets-vs-lists 

253 # noinspection PyTypeChecker 

254 for f in filenames: 

255 self.add_file(f, clear_cache=False) 

256 # noinspection PyTypeChecker 

257 for w in words: 

258 self.add_word(w, clear_cache=False) 

259 # log.debug(f"Created wordlist with {len(self.words)} words") 

260 

261 def clear_cache(self) -> None: 

262 """ 

263 Clear cached information (e.g. the compiled regex, the cached hash of 

264 this scrubber). 

265 """ 

266 self._built = False 

267 self._regex = None # type: Optional[Pattern[str]] 

268 self._processor = None # type: Optional[KeywordProcessorFixed] 

269 self._cached_hash = None # type: Optional[str] 

270 

271 def add_word(self, word: str, clear_cache: bool = True) -> None: 

272 """ 

273 Add a word to our wordlist. 

274 

275 Args: 

276 word: word to add 

277 clear_cache: also clear our cache? 

278 """ 

279 if not word: 

280 return 

281 self.words.add(word.lower()) 

282 if clear_cache: 

283 self.clear_cache() 

284 

285 def add_file(self, filename: str, clear_cache: bool = True) -> None: 

286 """ 

287 Add all words from a file. 

288 

289 Args: 

290 filename: 

291 File to read. 

292 clear_cache: 

293 Also clear our cache? 

294 """ 

295 if self.as_phrases: 

296 wordgen = lower_case_phrase_lines_from_file(filename) 

297 else: 

298 wordgen = lower_case_words_from_file(filename) 

299 for w in wordgen: 

300 self.words.add(w) 

301 if clear_cache: 

302 self.clear_cache() 

303 

304 def contains(self, word: str) -> bool: 

305 """ 

306 Does our wordlist contain this word? 

307 """ 

308 return word.lower() in self.words 

309 

310 def get_hash(self) -> str: 

311 # docstring in parent class 

312 

313 # A set is unordered. 

314 # We want the hash to be the same if we have the same words, even if 

315 # they were entered in a different order, so we need to sort: 

316 if not self._cached_hash: 

317 self._cached_hash = self.hasher.hash(sorted(self.words)) 

318 return self._cached_hash 

319 

320 def scrub(self, text: str) -> str: 

321 # docstring in parent class 

322 if not self._built: 

323 self.build() 

324 if self.regex_method: 

325 if not self._regex: 

326 return text 

327 return self._regex.sub(self.replacement_text, text) 

328 else: 

329 if not self._processor: 

330 return text 

331 return self._processor.replace_keywords(text) 

332 

333 def _gen_word_and_suffixed(self, w: str) -> Iterable[str]: 

334 """ 

335 Yields the word supplied plus suffixed versions. 

336 """ 

337 yield w 

338 for s in self.suffixes: 

339 yield w + s 

340 

341 def build(self) -> None: 

342 """ 

343 Compiles a high-speed scrubbing device, be it a regex or a FlashText 

344 processor. Called only when we have collected all our words. 

345 """ 

346 if self.regex_method: 

347 elements = [] # type: List[str] 

348 for w in self.words: 

349 if self.as_phrases: 

350 elements.extend( 

351 get_phrase_regex_elements( 

352 w, 

353 suffixes=self.suffixes, 

354 at_word_boundaries_only=self.at_word_boundaries_only, # noqa: E501 

355 max_errors=self.max_errors, 

356 ) 

357 ) 

358 else: 

359 elements.extend( 

360 get_string_regex_elements( 

361 w, 

362 suffixes=self.suffixes, 

363 at_word_boundaries_only=self.at_word_boundaries_only, # noqa: E501 

364 max_errors=self.max_errors, 

365 ) 

366 ) 

367 log.debug(f"Building regex with {len(elements)} elements") 

368 self._regex = get_regex_from_elements(elements) 

369 else: 

370 if self.words: 

371 self._processor = KeywordProcessorFixed(case_sensitive=False) 

372 self._processor.set_non_word_boundaries( 

373 FLASHTEXT_WORD_CHARACTERS 

374 ) 

375 replacement = self.replacement_text 

376 log.debug( 

377 f"Building FlashText processor with " 

378 f"{len(self.words)} keywords" 

379 ) 

380 for w in self.words: 

381 for sw in self._gen_word_and_suffixed(w): 

382 self._processor.add_keyword(sw, replacement) 

383 else: 

384 self._processor = None # type: Optional[KeywordProcessorFixed] 

385 self._built = True 

386 

387 

388# ============================================================================= 

389# NonspecificScrubber 

390# ============================================================================= 

391 

392 

393class Replacer: 

394 """ 

395 Custom regex replacement called from regex.sub(). 

396 This base class doesn't do much and is the equivalent of just passing the 

397 replacement text to regex.sub(). 

398 """ 

399 

400 def __init__(self, replacement_text: str) -> None: 

401 self.replacement_text = replacement_text 

402 

403 def replace(self, match: "Match") -> str: 

404 """ 

405 When re.sub() or regex.sub() is called, the "repl" argument can be 

406 a function. If so, it's a function that takes a :class:`re.Match` 

407 argument and returns the replacement text. 

408 """ 

409 return self.replacement_text 

410 

411 

412class NonspecificReplacer(Replacer): 

413 """ 

414 Custom regex replacement for the Nonspecific scrubber. Currently this 

415 will "blur" dates if replacement_text_all_dates contains any formatting 

416 directives. 

417 """ 

418 

419 def __init__(self, replacement_text: str, replacement_text_all_dates: str): 

420 """ 

421 Args: 

422 replacement_text: 

423 Generic text to use. 

424 replacement_text_all_dates: 

425 Replacement text to use if the matched text is a date. Can 

426 include format specifiers to blur the date rather than 

427 scrubbing it out entirely. 

428 """ 

429 super().__init__(replacement_text) 

430 

431 self.replacement_text_all_dates = replacement_text_all_dates 

432 self.slow_date_replacement = "%" in replacement_text_all_dates 

433 

434 def replace(self, match: "Match") -> str: 

435 groupdict = match.groupdict() 

436 if not self.is_a_date(groupdict): 

437 return super().replace(match) 

438 

439 if self.slow_date_replacement: 

440 date = self.parse_date(match, groupdict) 

441 return date.strftime(self.replacement_text_all_dates) 

442 

443 return self.replacement_text_all_dates 

444 

445 @staticmethod 

446 def is_a_date(groupdict: Dict[str, Any]) -> bool: 

447 """ 

448 Is the match result a date? We detect this via our named regex groups. 

449 """ 

450 return any( 

451 groupdict.get(groupname) is not None 

452 for groupname in ( 

453 DateRegexNames.DAY_MONTH_YEAR, 

454 DateRegexNames.MONTH_DAY_YEAR, 

455 DateRegexNames.YEAR_MONTH_DAY, 

456 DateRegexNames.ISODATE_NO_SEP, 

457 ) 

458 ) 

459 

460 @staticmethod 

461 def parse_date( 

462 match: "Match", groupdict: Dict[str, Any] 

463 ) -> datetime.datetime: 

464 """ 

465 Retrieve a valid date from the Match object for blurring. 

466 

467 Valid regex group name combinations, where D == DateRegexNames: 

468 

469 D.ISODATE_NO_SEP: D.FOUR_DIGIT_YEAR, 

470 

471 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.TWO_DIGIT_YEAR, 

472 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.FOUR_DIGIT_YEAR, 

473 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.TWO_DIGIT_YEAR, 

474 D.DAY_MONTH_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.FOUR_DIGIT_YEAR, 

475 

476 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.TWO_DIGIT_YEAR, 

477 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.FOUR_DIGIT_YEAR, 

478 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.TWO_DIGIT_YEAR, 

479 D.MONTH_DAY_YEAR: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.FOUR_DIGIT_YEAR, 

480 

481 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.TWO_DIGIT_YEAR, 

482 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.NUMERIC_MONTH, D.FOUR_DIGIT_YEAR, 

483 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.TWO_DIGIT_YEAR, 

484 D.YEAR_MONTH_DAY: D.NUMERIC_DAY, D.ALPHABETICAL_MONTH, D.FOUR_DIGIT_YEAR, 

485 """ # noqa: E501 

486 

487 # Simple special handling for ISO date format without separators. 

488 isodate_no_sep = groupdict.get(DateRegexNames.ISODATE_NO_SEP) 

489 if isodate_no_sep is not None: 

490 return datetime.datetime.strptime(isodate_no_sep, "%Y%m%d") 

491 

492 # For all others, extract D/M/Y information. 

493 

494 year = groupdict.get(DateRegexNames.FOUR_DIGIT_YEAR) 

495 if year is None: 

496 two_digit_year = match.group(DateRegexNames.TWO_DIGIT_YEAR) 

497 

498 # Will convert: 

499 # 00-68 -> 2000-2068 

500 # 69-99 -> 1969-1999 

501 year = datetime.datetime.strptime(two_digit_year, "%y").year 

502 

503 numeric_day = match.group(DateRegexNames.NUMERIC_DAY) 

504 

505 numeric_month = groupdict.get(DateRegexNames.NUMERIC_MONTH) 

506 if numeric_month is None: 

507 three_letter_month = match.group( 

508 DateRegexNames.ALPHABETICAL_MONTH 

509 )[:3] 

510 numeric_month = MONTH_3_LETTER_INDEX.get(three_letter_month) 

511 

512 return datetime.datetime( 

513 int(year), int(numeric_month), int(numeric_day) 

514 ) 

515 

516 

517class NonspecificScrubber(ScrubberBase): 

518 """ 

519 Scrubs a bunch of things that are independent of any patient-specific data, 

520 such as removing all UK postcodes, or numbers of a certain length. 

521 """ 

522 

523 def __init__( 

524 self, 

525 hasher: GenericHasher, 

526 replacement_text: str = DA.REPLACE_NONSPECIFIC_INFO_WITH, 

527 anonymise_codes_at_word_boundaries_only: bool = DA.ANONYMISE_CODES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

528 anonymise_dates_at_word_boundaries_only: bool = DA.ANONYMISE_DATES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

529 anonymise_numbers_at_word_boundaries_only: bool = DA.ANONYMISE_NUMBERS_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

530 denylist: WordList = None, 

531 scrub_all_numbers_of_n_digits: List[int] = None, 

532 scrub_all_uk_postcodes: bool = DA.SCRUB_ALL_UK_POSTCODES, 

533 scrub_all_dates: bool = DA.SCRUB_ALL_DATES, 

534 replacement_text_all_dates: str = DA.REPLACE_ALL_DATES_WITH, 

535 scrub_all_email_addresses: bool = DA.SCRUB_ALL_EMAIL_ADDRESSES, 

536 extra_regexes: Optional[List[str]] = None, 

537 ) -> None: 

538 """ 

539 Args: 

540 replacement_text: 

541 Replace sensitive content with this string. 

542 hasher: 

543 :class:`GenericHasher` to use to hash this scrubber (for 

544 change-detection purposes); should be a secure hasher 

545 anonymise_codes_at_word_boundaries_only: 

546 For codes: Boolean. Ensure that the regex begins and ends with 

547 a word boundary requirement. 

548 anonymise_dates_at_word_boundaries_only: 

549 Scrub dates only if they occur at word boundaries. (Even if you 

550 say no, there are *some* restrictions or very odd things would 

551 happen; see 

552 :func:`crate_anon.anonymise.anonregex.get_generic_date_regex_elements`.) 

553 anonymise_numbers_at_word_boundaries_only: 

554 For numbers: Boolean. If set, ensure that the regex begins and 

555 ends with a word boundary requirement. If not set, the regex 

556 must be surrounded by non-digits. (If it were surrounded by 

557 more digits, it wouldn't be an n-digit number!) 

558 denylist: 

559 Words to scrub. 

560 scrub_all_numbers_of_n_digits: 

561 List of values of n; number lengths to scrub. 

562 scrub_all_uk_postcodes: 

563 Scrub all UK postcodes? 

564 scrub_all_dates: 

565 Scrub all dates? (Currently assumes the default locale for 

566 month names and ordinal suffixes.) 

567 replacement_text_all_dates: 

568 When scrub_all_dates is True, replace with this text. 

569 Supports limited datetime.strftime directives for "blurring" of 

570 dates. Example: "%b %Y" for abbreviated month and year. 

571 scrub_all_email_addresses: 

572 Scrub all e-mail addresses? 

573 extra_regexes: 

574 List of user-defined extra regexes to scrub. 

575 """ 

576 scrub_all_numbers_of_n_digits = scrub_all_numbers_of_n_digits or [] 

577 

578 super().__init__(hasher) 

579 self.replacement_text = replacement_text 

580 self.anonymise_codes_at_word_boundaries_only = ( 

581 anonymise_codes_at_word_boundaries_only 

582 ) 

583 self.anonymise_dates_at_word_boundaries_only = ( 

584 anonymise_dates_at_word_boundaries_only 

585 ) 

586 self.anonymise_numbers_at_word_boundaries_only = ( 

587 anonymise_numbers_at_word_boundaries_only 

588 ) 

589 self.denylist = denylist 

590 self.scrub_all_numbers_of_n_digits = scrub_all_numbers_of_n_digits 

591 self.scrub_all_uk_postcodes = scrub_all_uk_postcodes 

592 self.scrub_all_dates = scrub_all_dates 

593 

594 self.replacement_text_all_dates = replacement_text_all_dates 

595 self.check_replacement_text_all_dates() 

596 self.replacer = self.get_replacer() 

597 

598 self.scrub_all_email_addresses = scrub_all_email_addresses 

599 self.extra_regexes = extra_regexes 

600 

601 self._cached_hash = None # type: Optional[str] 

602 self._regex = None # type: Optional[Pattern[str]] 

603 self._regex_built = False 

604 self.build_regex() 

605 

606 def get_replacer(self) -> Replacer: 

607 """ 

608 Return a function that can be used as the "repl" (replacer) argument 

609 to a re.sub() or regex.sub() call. 

610 """ 

611 if ( 

612 self.replacement_text == self.replacement_text_all_dates 

613 and "%" not in self.replacement_text_all_dates 

614 ): 

615 # Fast, simple 

616 return Replacer(self.replacement_text) 

617 

618 # Handle dates in a more complex way, e.g. blurring them: 

619 return NonspecificReplacer( 

620 self.replacement_text, self.replacement_text_all_dates 

621 ) 

622 

623 def check_replacement_text_all_dates(self) -> None: 

624 """ 

625 Ensure our date-replacement text is legitimate in terms of e.g. 

626 "%Y"-style directives. 

627 """ 

628 bad = False 

629 possible_percent_chars = "".join(DATE_BLURRING_DIRECTIVES) 

630 if re.search( 

631 rf"%[^{possible_percent_chars}]", self.replacement_text_all_dates 

632 ): 

633 bad = True 

634 else: 

635 # Double-check: 

636 test_date = datetime.date(2000, 12, 31) 

637 try: 

638 test_date.strftime(self.replacement_text_all_dates) 

639 except ValueError: 

640 bad = True 

641 if bad: 

642 raise ValueError( 

643 f"Bad format {self.replacement_text_all_dates!r} for date " 

644 "scrubbing. Allowed directives are: " 

645 f"{DATE_BLURRING_DIRECTIVES_CSV}" 

646 ) 

647 

648 def get_hash(self) -> str: 

649 # docstring in parent class 

650 if not self._cached_hash: 

651 self._cached_hash = self.hasher.hash( 

652 [ 

653 # signature, used for hashing: 

654 self.anonymise_codes_at_word_boundaries_only, 

655 self.anonymise_numbers_at_word_boundaries_only, 

656 self.denylist.get_hash() if self.denylist else None, 

657 self.scrub_all_numbers_of_n_digits, 

658 self.scrub_all_uk_postcodes, 

659 ] 

660 ) 

661 return self._cached_hash 

662 

663 def scrub(self, text: str) -> str: 

664 # docstring in parent class 

665 if not self._regex_built: 

666 self.build_regex() 

667 if self.denylist: 

668 text = self.denylist.scrub(text) 

669 if not self._regex: # possible; may be blank 

670 return text 

671 return self._regex.sub(self.replacer.replace, text) 

672 

673 def build_regex(self) -> None: 

674 """ 

675 Compile our high-speed regex. 

676 """ 

677 elements = [] # type: List[str] 

678 if self.scrub_all_uk_postcodes: 

679 elements.extend( 

680 get_uk_postcode_regex_elements( 

681 at_word_boundaries_only=( 

682 self.anonymise_codes_at_word_boundaries_only 

683 ) 

684 ) 

685 ) 

686 # noinspection PyTypeChecker 

687 for n in self.scrub_all_numbers_of_n_digits: 

688 elements.extend( 

689 get_number_of_length_n_regex_elements( 

690 n, 

691 at_word_boundaries_only=( 

692 self.anonymise_numbers_at_word_boundaries_only 

693 ), 

694 ) 

695 ) 

696 if self.scrub_all_dates: 

697 elements.extend( 

698 get_generic_date_regex_elements( 

699 at_word_boundaries_only=self.anonymise_dates_at_word_boundaries_only # noqa: E501 

700 ) 

701 ) 

702 if self.scrub_all_email_addresses: 

703 elements.append(EMAIL_REGEX_STR) 

704 if self.extra_regexes: 

705 elements.extend(self.extra_regexes) 

706 self._regex = get_regex_from_elements(elements) 

707 self._regex_built = True 

708 

709 

710# ============================================================================= 

711# PersonalizedScrubber 

712# ============================================================================= 

713 

714 

715class PersonalizedScrubber(ScrubberBase): 

716 """ 

717 Accepts patient-specific (patient and third-party) information, and uses 

718 that to scrub text. 

719 """ 

720 

721 def __init__( 

722 self, 

723 hasher: GenericHasher, 

724 replacement_text_patient: str = DA.REPLACE_PATIENT_INFO_WITH, 

725 replacement_text_third_party: str = DA.REPLACE_THIRD_PARTY_INFO_WITH, # noqa: E501 

726 anonymise_codes_at_word_boundaries_only: bool = DA.ANONYMISE_CODES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

727 anonymise_codes_at_numeric_boundaries_only: bool = DA.ANONYMISE_CODES_AT_NUMERIC_BOUNDARIES_ONLY, # noqa: E501 

728 anonymise_dates_at_word_boundaries_only: bool = DA.ANONYMISE_DATES_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

729 anonymise_numbers_at_word_boundaries_only: bool = DA.ANONYMISE_NUMBERS_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

730 anonymise_numbers_at_numeric_boundaries_only: bool = DA.ANONYMISE_NUMBERS_AT_NUMERIC_BOUNDARIES_ONLY, # noqa: E501 

731 anonymise_strings_at_word_boundaries_only: bool = DA.ANONYMISE_STRINGS_AT_WORD_BOUNDARIES_ONLY, # noqa: E501 

732 min_string_length_for_errors: int = DA.MIN_STRING_LENGTH_FOR_ERRORS, 

733 min_string_length_to_scrub_with: int = DA.MIN_STRING_LENGTH_TO_SCRUB_WITH, # noqa: E501 

734 scrub_string_suffixes: List[str] = None, 

735 string_max_regex_errors: int = DA.STRING_MAX_REGEX_ERRORS, 

736 allowlist: WordList = None, 

737 alternatives: List[List[str]] = None, 

738 nonspecific_scrubber: NonspecificScrubber = None, 

739 nonspecific_scrubber_first: bool = DA.NONSPECIFIC_SCRUBBER_FIRST, 

740 debug: bool = False, 

741 ) -> None: 

742 """ 

743 Args: 

744 hasher: 

745 :class:`GenericHasher` to use to hash this scrubber (for 

746 change-detection purposes); should be a secure hasher. 

747 replacement_text_patient: 

748 Replace sensitive "patient" content with this string. 

749 replacement_text_third_party: 

750 Replace sensitive "third party" content with this string. 

751 anonymise_codes_at_word_boundaries_only: 

752 For codes: Boolean. Ensure that the regex begins and ends with 

753 a word boundary requirement. 

754 anonymise_codes_at_numeric_boundaries_only: 

755 For codes: Boolean. Only applicable if 

756 anonymise_codes_at_word_boundaries_only is False. Ensure that 

757 the code is only recognized when surrounded by non-numbers; 

758 that is, only at the boundaries of numbers (at numeric 

759 boundaries). See 

760 :func:`crate_anon.anonymise.anonregex.get_code_regex_elements`. 

761 anonymise_dates_at_word_boundaries_only: 

762 For dates: Boolean. Ensure that the regex begins and ends with 

763 a word boundary requirement. 

764 anonymise_numbers_at_word_boundaries_only: 

765 For numbers: Boolean. Ensure that the regex begins and ends 

766 with a word boundary requirement. See 

767 :func:`crate_anon.anonymise.anonregex.get_code_regex_elements`. 

768 anonymise_numbers_at_numeric_boundaries_only: 

769 For numbers: Boolean. Only applicable if 

770 anonymise_numbers_at_word_boundaries_only is False. Ensure that 

771 the number is only recognized when surrounded by 

772 non-numbers; that is, only at the boundaries of numbers (at 

773 numeric boundaries). See 

774 :func:`crate_anon.anonymise.anonregex.get_code_regex_elements`. 

775 anonymise_strings_at_word_boundaries_only: 

776 For strings: Boolean. Ensure that the regex begins and ends 

777 with a word boundary requirement. 

778 min_string_length_for_errors: 

779 For strings: minimum string length at which typographical 

780 errors will be permitted. 

781 min_string_length_to_scrub_with: 

782 For strings: minimum string length at which the string will be 

783 permitted to be scrubbed with. 

784 scrub_string_suffixes: 

785 A list of suffixes to permit on strings. 

786 string_max_regex_errors: 

787 The maximum number of typographical insertion / deletion / 

788 substitution errors to permit. 

789 allowlist: 

790 :class:`WordList` of words to allow (not to scrub). 

791 alternatives: 

792 This allows words to be substituted by equivalents; such as 

793 ``St`` for ``Street`` or ``Rd`` for ``Road``. The parameter is 

794 a list of lists of equivalents; see 

795 :func:`crate_anon.anonymise.config.get_word_alternatives`. 

796 nonspecific_scrubber: 

797 :class:`NonspecificScrubber` to apply to remove information 

798 that is generic. 

799 nonspecific_scrubber_first: 

800 If one is provided, run the nonspecific scrubber first (rather 

801 than last)? 

802 debug: 

803 Show the final scrubber regex text as we compile our regexes. 

804 """ 

805 scrub_string_suffixes = scrub_string_suffixes or [] 

806 

807 super().__init__(hasher) 

808 self.replacement_text_patient = replacement_text_patient 

809 self.replacement_text_third_party = replacement_text_third_party 

810 self.anonymise_codes_at_word_boundaries_only = ( 

811 anonymise_codes_at_word_boundaries_only 

812 ) 

813 self.anonymise_codes_at_numeric_boundaries_only = ( 

814 anonymise_codes_at_numeric_boundaries_only 

815 ) 

816 self.anonymise_dates_at_word_boundaries_only = ( 

817 anonymise_dates_at_word_boundaries_only 

818 ) 

819 self.anonymise_numbers_at_word_boundaries_only = ( 

820 anonymise_numbers_at_word_boundaries_only 

821 ) 

822 self.anonymise_numbers_at_numeric_boundaries_only = ( 

823 anonymise_numbers_at_numeric_boundaries_only 

824 ) 

825 self.anonymise_strings_at_word_boundaries_only = ( 

826 anonymise_strings_at_word_boundaries_only 

827 ) 

828 self.min_string_length_for_errors = min_string_length_for_errors 

829 self.min_string_length_to_scrub_with = min_string_length_to_scrub_with 

830 self.scrub_string_suffixes = scrub_string_suffixes 

831 self.string_max_regex_errors = string_max_regex_errors 

832 self.allowlist = allowlist 

833 self.alternatives = alternatives 

834 self.nonspecific_scrubber = nonspecific_scrubber 

835 self.nonspecific_scrubber_first = nonspecific_scrubber_first 

836 self.debug = debug 

837 

838 # Regex information 

839 self.re_patient = None # type: Optional[Pattern[str]] 

840 self.re_tp = None # type: Optional[Pattern[str]] 

841 self.regexes_built = False 

842 self.re_patient_elements = [] # type: List[str] 

843 self.re_tp_elements = [] # type: List[str] 

844 # ... both changed from set to list to reflect referee's point re 

845 # potential importance of scrubber order 

846 self.elements_tuplelist = ( 

847 [] 

848 ) # type: List[Tuple[bool, ScrubMethod, str]] 

849 # ... list of tuples: (patient?, type, value) 

850 # ... used for get_raw_info(); since we've made the order important, 

851 # we should detect changes in order here as well 

852 self.clear_cache() 

853 

854 def clear_cache(self) -> None: 

855 """ 

856 Clear the internal cache (the compiled regex). 

857 """ 

858 self.regexes_built = False 

859 

860 @staticmethod 

861 def get_scrub_method( 

862 datatype_long: str, scrub_method: Optional[ScrubMethod] 

863 ) -> ScrubMethod: 

864 """ 

865 Return the default scrub method for a given SQL datatype, unless 

866 overridden. For example, dates are scrubbed via a date method; numbers 

867 by a numeric method. 

868 

869 Args: 

870 datatype_long: SQL datatype as a string 

871 scrub_method: optional method to enforce 

872 

873 Returns: 

874 :class:`crate_anon.anonymise.constants.SCRUBMETHOD` value 

875 """ 

876 if scrub_method is not None: 

877 return scrub_method 

878 elif is_sqltype_date(datatype_long): 

879 return ScrubMethod.DATE 

880 elif is_sqltype_text_over_one_char(datatype_long): 

881 return ScrubMethod.WORDS 

882 else: 

883 return ScrubMethod.NUMERIC 

884 

885 def add_value( 

886 self, 

887 value: Any, 

888 scrub_method: ScrubMethod, 

889 patient: bool = True, 

890 clear_cache: bool = True, 

891 ) -> None: 

892 """ 

893 Add a specific value via a specific scrub_method. 

894 

895 Args: 

896 value: 

897 value to add to the scrubber 

898 scrub_method: 

899 :class:`crate_anon.anonymise.constants.SCRUBMETHOD` value 

900 patient: 

901 Boolean; controls whether it's treated as a patient value or a 

902 third-party value. 

903 clear_cache: 

904 also clear our cache? 

905 """ 

906 if value is None: 

907 return 

908 new_tuple = (patient, scrub_method, repr(value)) 

909 if new_tuple not in self.elements_tuplelist: 

910 self.elements_tuplelist.append(new_tuple) 

911 # Note: object reference 

912 r = self.re_patient_elements if patient else self.re_tp_elements 

913 

914 if scrub_method is ScrubMethod.DATE: 

915 elements = self.get_elements_date(value) 

916 elif scrub_method is ScrubMethod.WORDS: 

917 elements = self.get_elements_words(value) 

918 elif scrub_method is ScrubMethod.PHRASE: 

919 elements = self.get_elements_phrase(value) 

920 elif scrub_method is ScrubMethod.PHRASE_UNLESS_NUMERIC: 

921 elements = self.get_elements_phrase_unless_numeric(value) 

922 elif scrub_method is ScrubMethod.NUMERIC: 

923 elements = self.get_elements_numeric(value) 

924 elif scrub_method is ScrubMethod.CODE: 

925 elements = self.get_elements_code(value) 

926 else: 

927 raise ValueError( 

928 f"Bug: unknown scrub_method to add_value: " f"{scrub_method}" 

929 ) 

930 r.extend(elements) 

931 if clear_cache: 

932 self.clear_cache() 

933 

934 def get_elements_date( 

935 self, value: Union[datetime.datetime, datetime.date] 

936 ) -> Optional[List[str]]: 

937 """ 

938 Returns a list of regex elements for a given date value. 

939 """ 

940 try: 

941 value = coerce_to_datetime(value) 

942 except Exception as e: 

943 log.warning( 

944 f"Invalid date received to PersonalizedScrubber. " 

945 f"get_elements_date(): value={value}, exception={e}" 

946 ) 

947 return 

948 return get_date_regex_elements( 

949 value, 

950 at_word_boundaries_only=( 

951 self.anonymise_dates_at_word_boundaries_only 

952 ), 

953 ) 

954 

955 def get_elements_words(self, value: str) -> List[str]: 

956 """ 

957 Returns a list of regex elements for a given string that contains 

958 textual words. 

959 """ 

960 elements = [] # type: List[str] 

961 for s in get_anon_fragments_from_string(str(value)): 

962 length = len(s) 

963 if length < self.min_string_length_to_scrub_with: 

964 # With numbers: if you use the length limit, you may see 

965 # numeric parts of addresses, e.g. 4 Drury Lane as 

966 # 4 [___] [___]. However, if you exempt numbers then you 

967 # mess up a whole bunch of quantitative information, such 

968 # as "the last 4-5 years" getting wiped to "the last 

969 # [___]-5 years". So let's apply the length limit 

970 # consistently. 

971 continue 

972 if self.allowlist and self.allowlist.contains(s): 

973 continue 

974 if length >= self.min_string_length_for_errors: 

975 max_errors = self.string_max_regex_errors 

976 else: 

977 max_errors = 0 

978 elements.extend( 

979 get_string_regex_elements( 

980 s, 

981 self.scrub_string_suffixes, 

982 max_errors=max_errors, 

983 at_word_boundaries_only=( 

984 self.anonymise_strings_at_word_boundaries_only 

985 ), 

986 ) 

987 ) 

988 return elements 

989 

990 def get_elements_phrase(self, value: Any) -> List[str]: 

991 """ 

992 Returns a list of regex elements for a given phrase. 

993 """ 

994 value = str(value).strip() 

995 if not value: 

996 return [] 

997 length = len(value) 

998 if length < self.min_string_length_to_scrub_with: 

999 return [] 

1000 if self.allowlist and self.allowlist.contains(value): 

1001 return [] 

1002 if length >= self.min_string_length_for_errors: 

1003 max_errors = self.string_max_regex_errors 

1004 else: 

1005 max_errors = 0 

1006 return get_phrase_regex_elements( 

1007 value, 

1008 max_errors=max_errors, 

1009 at_word_boundaries_only=( 

1010 self.anonymise_strings_at_word_boundaries_only 

1011 ), 

1012 alternatives=self.alternatives, 

1013 ) 

1014 

1015 def get_elements_phrase_unless_numeric(self, value: Any) -> List[str]: 

1016 """ 

1017 If the value is numeric, return an empty list. Otherwise, returns a 

1018 list of regex elements for the given phrase. 

1019 """ 

1020 try: 

1021 _ = float(value) 

1022 return [] 

1023 except (TypeError, ValueError): 

1024 return self.get_elements_phrase(value) 

1025 

1026 def get_elements_numeric(self, value: Any) -> List[str]: 

1027 """ 

1028 Start with a number. Remove everything but the digits. Build a regex 

1029 that scrubs the number. 

1030 

1031 Particular examples: phone numbers, e.g. ``"(01223) 123456"``. 

1032 

1033 Args: 

1034 value: a string containing a number, or an actual number. 

1035 

1036 Returns: 

1037 a list of regex elements 

1038 """ 

1039 return get_code_regex_elements( 

1040 get_digit_string_from_vaguely_numeric_string(str(value)), 

1041 at_word_boundaries_only=( 

1042 self.anonymise_numbers_at_word_boundaries_only 

1043 ), 

1044 at_numeric_boundaries_only=( 

1045 self.anonymise_numbers_at_numeric_boundaries_only 

1046 ), 

1047 ) 

1048 

1049 def get_elements_code(self, value: Any) -> List[str]: 

1050 """ 

1051 Start with an alphanumeric code. Remove whitespace. Build a regex that 

1052 scrubs the code. 

1053 

1054 Particular examples: postcodes, e.g. ``"PE12 3AB"``. 

1055 

1056 Args: 

1057 value: a string containing containing an alphanumeric code 

1058 

1059 Returns: 

1060 a list of regex elements 

1061 """ 

1062 return get_code_regex_elements( 

1063 reduce_to_alphanumeric(str(value)), 

1064 at_word_boundaries_only=( 

1065 self.anonymise_codes_at_word_boundaries_only 

1066 ), 

1067 at_numeric_boundaries_only=( 

1068 self.anonymise_codes_at_numeric_boundaries_only 

1069 ), 

1070 ) 

1071 

1072 def get_patient_regex_string(self) -> str: 

1073 """ 

1074 Return the string version of the patient regex, sorted. 

1075 """ 

1076 return get_regex_string_from_elements(self.re_patient_elements) 

1077 

1078 def get_tp_regex_string(self) -> str: 

1079 """ 

1080 Return the string version of the third-party regex, sorted. 

1081 """ 

1082 return get_regex_string_from_elements(self.re_tp_elements) 

1083 

1084 def build_regexes(self) -> None: 

1085 """ 

1086 Compile our regexes. 

1087 """ 

1088 self.re_patient = get_regex_from_elements(self.re_patient_elements) 

1089 self.re_tp = get_regex_from_elements(self.re_tp_elements) 

1090 self.regexes_built = True 

1091 # Note that the regexes themselves may be None even if they have 

1092 # been built. 

1093 if self.debug: 

1094 log.debug(f"Patient scrubber: {self.get_patient_regex_string()}") 

1095 log.debug(f"Third party scrubber: {self.get_tp_regex_string()}") 

1096 

1097 def scrub(self, text: str) -> Optional[str]: 

1098 # docstring in parent class 

1099 if text is None: 

1100 return None 

1101 if not self.regexes_built: 

1102 self.build_regexes() 

1103 

1104 # If nonspecific_scrubber_first: 

1105 # (1) nonspecific, (2) patient, (3) third party. 

1106 # Otherwise: 

1107 # (1) patient, (2) third party, (3) nonspecific. 

1108 if self.nonspecific_scrubber and self.nonspecific_scrubber_first: 

1109 text = self.nonspecific_scrubber.scrub(text) 

1110 if self.re_patient: 

1111 text = self.re_patient.sub(self.replacement_text_patient, text) 

1112 if self.re_tp: 

1113 text = self.re_tp.sub(self.replacement_text_third_party, text) 

1114 if self.nonspecific_scrubber and not self.nonspecific_scrubber_first: 

1115 text = self.nonspecific_scrubber.scrub(text) 

1116 return text 

1117 

1118 def get_hash(self) -> str: 

1119 # docstring in parent class 

1120 return self.hasher.hash(self.get_raw_info()) 

1121 

1122 def get_raw_info(self) -> Dict[str, Any]: 

1123 """ 

1124 Summarizes settings and (sensitive) data for this scrubber. 

1125 

1126 This is both a summary for debugging and the basis for our 

1127 change-detection hash (and for the latter reason we need order etc. to 

1128 be consistent). For any information we put in here, changes will cause 

1129 data to be re-scrubbed. 

1130 

1131 Note that the hasher should be a secure one, because this is sensitive 

1132 information. 

1133 """ 

1134 # We use a list of tuples to make an OrderedDict. 

1135 d = ( 

1136 ( 

1137 "anonymise_codes_at_word_boundaries_only", 

1138 self.anonymise_codes_at_word_boundaries_only, 

1139 ), 

1140 ( 

1141 "anonymise_codes_at_numeric_boundaries_only", 

1142 self.anonymise_codes_at_numeric_boundaries_only, 

1143 ), 

1144 ( 

1145 "anonymise_dates_at_word_boundaries_only", 

1146 self.anonymise_dates_at_word_boundaries_only, 

1147 ), 

1148 ( 

1149 "anonymise_numbers_at_word_boundaries_only", 

1150 self.anonymise_numbers_at_word_boundaries_only, 

1151 ), 

1152 ( 

1153 "anonymise_numbers_at_numeric_boundaries_only", 

1154 self.anonymise_numbers_at_numeric_boundaries_only, 

1155 ), 

1156 ( 

1157 "anonymise_strings_at_word_boundaries_only", 

1158 self.anonymise_strings_at_word_boundaries_only, 

1159 ), 

1160 ( 

1161 "min_string_length_for_errors", 

1162 self.min_string_length_for_errors, 

1163 ), 

1164 ( 

1165 "min_string_length_to_scrub_with", 

1166 self.min_string_length_to_scrub_with, 

1167 ), 

1168 ("scrub_string_suffixes", sorted(self.scrub_string_suffixes)), 

1169 ("string_max_regex_errors", self.string_max_regex_errors), 

1170 ( 

1171 "allowlist_hash", 

1172 self.allowlist.get_hash() if self.allowlist else None, 

1173 ), 

1174 ( 

1175 "nonspecific_scrubber_hash", 

1176 ( 

1177 self.nonspecific_scrubber.get_hash() 

1178 if self.nonspecific_scrubber 

1179 else None 

1180 ), 

1181 ), 

1182 ("elements", self.elements_tuplelist), 

1183 ) 

1184 return OrderedDict(d)