Coverage for anonymise/altermethod.py: 14%

190 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-02-05 06:46 -0600

1""" 

2crate_anon/anonymise/altermethod.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**The AlterMethod class.** 

27 

28""" 

29 

30import datetime 

31import html 

32import logging 

33import os 

34import traceback 

35from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING 

36 

37from cardinal_pythonlib.datetimefunc import ( 

38 coerce_to_date, 

39 truncate_date_to_first_of_month, 

40) 

41from cardinal_pythonlib.extract_text import ( 

42 document_to_text, 

43 TextProcessingConfig, 

44) 

45import regex 

46 

47# don't import config: circular dependency would have to be sorted out 

48from crate_anon.anonymise.constants import AlterMethodType 

49 

50if TYPE_CHECKING: 

51 from cardinal_pythonlib.hash import GenericHasher 

52 from crate_anon.anonymise.config import Config 

53 from crate_anon.anonymise.ddr import DataDictionaryRow 

54 

55 # import patient to avoid circular import when generating docs 

56 from crate_anon.anonymise import patient 

57 

58log = logging.getLogger(__name__) 

59 

60 

61# ============================================================================= 

62# Constants 

63# ============================================================================= 

64 

65HTML_TAG_RE = regex.compile("<[^>]*>") 

66 

67 

68# ============================================================================= 

69# AlterMethod 

70# ============================================================================= 

71 

72 

73class AlterMethod: 

74 """ 

75 Implements a SINGLE transformation of source data on its way to the 

76 destination database. 

77 

78 Knows how to represent itself as a text element in the relevant column of 

79 a data dictionary row, and how to create itself from one of those text 

80 elements. 

81 

82 A :class:`crate_anon.anonymise.ddr.DataDictionaryRow` may include multiple 

83 instances of :class:`crate_anon.anonymise.altermethod.AlterMethod` in a 

84 sequence. 

85 """ 

86 

87 def __init__( 

88 self, 

89 config: "Config", 

90 text_value: str = None, 

91 scrub: bool = False, 

92 truncate_date: bool = False, 

93 extract_from_filename: bool = False, 

94 extract_from_file_format: bool = False, # new in v0.18.18 

95 file_format_str: str = "", # new in v0.18.18 

96 extract_from_blob: bool = False, 

97 skip_if_text_extract_fails: bool = False, 

98 extract_ext_field: str = "", 

99 hash_: bool = False, 

100 hash_config_section: str = "", 

101 # html_escape: bool = False, 

102 html_unescape: bool = False, 

103 html_untag: bool = False, 

104 ) -> None: 

105 """ 

106 Args: 

107 config: 

108 a :class:`crate_anon.anonymise.config.Config` 

109 text_value: 

110 string (from the data dictionary) to parse via 

111 :func:`set_from_text`; may set many of the other attributes 

112 scrub: 

113 Boolean; "the source field contains sensitive text; scrub it" 

114 truncate_date: 

115 Boolean; "the source is a date; truncate it to the first of the 

116 month" 

117 extract_from_filename: 

118 Boolean; "the source is a filename; extract the text from it" 

119 extract_from_file_format: 

120 Boolean; "the source is a partial filename; combine it with 

121 ``file_format_str`` to calculate the full filename, then 

122 extract the text from it" 

123 file_format_str: 

124 format string for use with ``extract_from_file_format`` 

125 extract_from_blob: 

126 Boolean; "the source is binary (the database contains a BLOB); 

127 extract text from it". See also ``extract_ext_field``. 

128 skip_if_text_extract_fails: 

129 Boolean: "if text extraction fails, skip the record entirely" 

130 extract_ext_field: 

131 For when the database contains a BLOB: this parameter indicates 

132 a database column (field) name, in the same row, that contains 

133 the file's extension, to help identify the BLOB. 

134 hash_: 

135 Boolean. If true, transform the source by hashing it. 

136 hash_config_section: 

137 If ``hash_`` is true, this specifies the config section in 

138 which the hash is defined. 

139 html_unescape: 

140 Boolean: "transform the source by HTML-unescaping it". For 

141 example, this would convert ``&le;`` to ``<``. 

142 html_untag: 

143 Boolean: "transform the source by removing HTML tags". For 

144 example, this would convert ``hello <b>bold</b> world`` to 

145 ``hello bold world``. 

146 """ 

147 self.config = config 

148 self.scrub = scrub 

149 self.truncate_date = truncate_date 

150 self.extract_from_blob = extract_from_blob 

151 self.extract_from_filename = extract_from_filename 

152 self.extract_from_file_format = extract_from_file_format 

153 self.file_format_str = file_format_str 

154 self.skip_if_text_extract_fails = skip_if_text_extract_fails 

155 self.extract_ext_field = extract_ext_field 

156 self.hash = hash_ 

157 self.hash_config_section = hash_config_section 

158 self.hasher = None # type: Optional[GenericHasher] 

159 # self.html_escape = html_escape 

160 self.html_unescape = html_unescape 

161 self.html_untag = html_untag 

162 

163 self.extract_text = ( 

164 extract_from_filename 

165 or extract_from_file_format 

166 or extract_from_blob 

167 ) 

168 

169 if text_value is not None: 

170 self.set_from_text(text_value) 

171 if hash_: 

172 self.hasher = self.config.get_extra_hasher( 

173 self.hash_config_section 

174 ) 

175 

176 self._assert_valid() 

177 

178 # ------------------------------------------------------------------------- 

179 # Text representations 

180 # ------------------------------------------------------------------------- 

181 

182 def set_from_text(self, value: str) -> None: 

183 """ 

184 Take the string from the ``alter_method`` field of the data dictionary, 

185 and use it to set a bunch of internal attributes. 

186 

187 To get the configuration string back, see :func:`get_text`. 

188 """ 

189 self.scrub = False 

190 self.truncate_date = False 

191 self.extract_text = False 

192 self.extract_from_blob = False 

193 self.extract_from_file_format = False 

194 self.file_format_str = "" 

195 self.extract_from_filename = False 

196 self.skip_if_text_extract_fails = False 

197 self.extract_ext_field = "" 

198 self.hash = False 

199 self.hash_config_section = "" 

200 

201 def get_second_part(missing_description: str) -> str: 

202 if "=" not in value: 

203 raise ValueError(f"Bad format for alter method: {value}") 

204 secondhalf = value[value.index("=") + 1 :] 

205 if not secondhalf: 

206 raise ValueError( 

207 f"Missing {missing_description} in alter method: {value}" 

208 ) 

209 return secondhalf 

210 

211 if value == AlterMethodType.TRUNCATEDATE.value: 

212 self.truncate_date = True 

213 elif value == AlterMethodType.SCRUBIN.value: 

214 self.scrub = True 

215 elif value.startswith(AlterMethodType.BINARY_TO_TEXT.value): 

216 self.extract_text = True 

217 self.extract_from_blob = True 

218 self.extract_ext_field = get_second_part( 

219 "filename/extension field" 

220 ) 

221 elif value.startswith(AlterMethodType.FILENAME_FORMAT_TO_TEXT.value): 

222 self.extract_text = True 

223 self.extract_from_file_format = True 

224 self.file_format_str = get_second_part("filename format field") 

225 elif value == AlterMethodType.FILENAME_TO_TEXT.value: 

226 self.extract_text = True 

227 self.extract_from_filename = True 

228 elif value == AlterMethodType.SKIP_IF_TEXT_EXTRACT_FAILS.value: 

229 self.skip_if_text_extract_fails = True 

230 elif value.startswith(AlterMethodType.HASH.value): 

231 self.hash = True 

232 self.hash_config_section = get_second_part("hash config section") 

233 self.hasher = self.config.get_extra_hasher( 

234 self.hash_config_section 

235 ) 

236 # elif value == ALTERMETHOD.HTML_ESCAPE: 

237 # self.html_escape = True 

238 elif value == AlterMethodType.HTML_UNESCAPE.value: 

239 self.html_unescape = True 

240 elif value == AlterMethodType.HTML_UNTAG.value: 

241 self.html_untag = True 

242 else: 

243 raise ValueError(f"Bad alter_method part: {value}") 

244 

245 @property 

246 def as_text(self) -> str: 

247 """ 

248 Return the ``alter_method`` fragment from the working fields; 

249 effectively the reverse of :func:`set_from_text`. 

250 """ 

251 

252 def two_part(altermethod: str, parameter: str) -> str: 

253 return altermethod + "=" + parameter 

254 

255 if self.truncate_date: 

256 return AlterMethodType.TRUNCATEDATE.value 

257 if self.scrub: 

258 return AlterMethodType.SCRUBIN.value 

259 if self.extract_text: 

260 if self.extract_from_blob: 

261 return two_part( 

262 AlterMethodType.BINARY_TO_TEXT.value, 

263 self.extract_ext_field, 

264 ) 

265 elif self.extract_from_file_format: 

266 return two_part( 

267 AlterMethodType.FILENAME_FORMAT_TO_TEXT.value, 

268 self.file_format_str, 

269 ) 

270 else: # plain filename 

271 return AlterMethodType.FILENAME_TO_TEXT.value 

272 if self.skip_if_text_extract_fails: 

273 return AlterMethodType.SKIP_IF_TEXT_EXTRACT_FAILS.value 

274 if self.hash: 

275 return two_part( 

276 AlterMethodType.HASH.value, self.hash_config_section 

277 ) 

278 # if self.html_escape: 

279 # return ALTERMETHOD.HTML_ESCAPE.value 

280 if self.html_unescape: 

281 return AlterMethodType.HTML_UNESCAPE.value 

282 if self.html_untag: 

283 return AlterMethodType.HTML_UNTAG.value 

284 return "" 

285 

286 # ------------------------------------------------------------------------- 

287 # Validation 

288 # ------------------------------------------------------------------------- 

289 

290 def _assert_valid(self) -> None: 

291 """ 

292 Raises :exc:`ValueError` if the method is invalid (e.g. representing 

293 more than one transformation). 

294 """ 

295 methods_map = { 

296 "scrub": self.scrub, 

297 "truncate_date": self.truncate_date, 

298 "extract_text": self.extract_text, 

299 "hash": self.hash, 

300 "html_unescape": self.html_unescape, 

301 "html_untag": self.html_untag, 

302 "skip_if_text_extract_fails": self.skip_if_text_extract_fails, 

303 } 

304 n_methods = sum(int(v) for v in methods_map.values()) 

305 if n_methods != 1: 

306 raise ValueError( 

307 f"AlterMethod: should be exactly one method, but " 

308 f"there are {n_methods}: {methods_map}" 

309 ) 

310 

311 # ------------------------------------------------------------------------- 

312 # Perform the transformation: master method 

313 # ------------------------------------------------------------------------- 

314 

315 def alter( 

316 self, 

317 value: Any, 

318 ddr: "DataDictionaryRow", # corresponding DataDictionaryRow 

319 row: List[Any], # all values in row 

320 ddrows: List["DataDictionaryRow"], # all of them 

321 patient: "patient.Patient" = None, 

322 ) -> Tuple[Any, bool]: 

323 """ 

324 Performs the alteration. 

325 

326 Args: 

327 value: 

328 source value of interest 

329 ddr: 

330 corresponding 

331 :class:`crate_anon.anonymise.ddr.DataDictionaryRow` 

332 row: 

333 all values in the same source row 

334 ddrows: 

335 all data dictionary rows 

336 patient: 

337 :class:`crate_anon.anonymise.patient.Patient` object 

338 

339 Returns: 

340 tuple: ``newvalue, skiprow`` 

341 

342 If multiple transformations are specified within one 

343 :class:`AlterMethod`, only one is performed, and in the following 

344 order: 

345 

346 #. scrub 

347 #. truncate_date 

348 #. extract_text 

349 #. hash 

350 #. html_unescape 

351 #. html_untag 

352 #. skip_if_text_extract_fails 

353 

354 However, multiple alteration methods can be specified for one field. 

355 See :func:`crate_anon.anonymise.anonymise.process_table` and 

356 :class:`crate_anon.anonymise.ddr.DataDictionaryRow`. 

357 

358 """ 

359 

360 if self.scrub: 

361 return self._scrub_func(value, patient), False 

362 

363 if self.truncate_date: 

364 return self._truncate_date_func(value), False 

365 

366 if self.extract_text: 

367 value, extracted = self._extract_text_func(value, row, ddrows) 

368 if not extracted and ddr.skip_row_if_extract_text_fails: 

369 log.debug("Skipping row as text extraction failed") 

370 return None, True 

371 return value, False 

372 

373 if self.hash: 

374 assert self.hasher is not None 

375 return self.hasher.hash(value), False 

376 

377 # if alter_method.html_escape: 

378 # return html.escape(value), False 

379 

380 if self.html_unescape: 

381 return html.unescape(value), False 

382 

383 if self.html_untag: 

384 return self._html_untag_func(value), False 

385 

386 if self.skip_if_text_extract_fails: 

387 # Modifies other alter methods; doesn't do anything itself 

388 return value, True 

389 

390 # ------------------------------------------------------------------------- 

391 # Transformation internals 

392 # ------------------------------------------------------------------------- 

393 

394 @staticmethod 

395 def _scrub_func(value: Any, patient: "patient.Patient") -> Optional[str]: 

396 """ 

397 Takes a source value and scrubs it. 

398 

399 **Main point of anonymisation within CRATE.** 

400 

401 Args: 

402 value: source data 

403 patient: :class:`crate_anon.anonymise.patient.Patient` object 

404 

405 Returns: 

406 scrubbed data 

407 

408 """ 

409 if value is None: 

410 return None 

411 return patient.scrub(str(value)) 

412 

413 @staticmethod 

414 def _truncate_date_func(value: Any) -> Optional[datetime.date]: 

415 """ 

416 Truncates a date-like object to the first of the month. 

417 """ 

418 try: 

419 value = coerce_to_date(value) 

420 return truncate_date_to_first_of_month(value) 

421 except (ValueError, OverflowError): 

422 log.warning( 

423 f"Invalid date received to " 

424 f"{AlterMethodType.TRUNCATEDATE} method: {value}" 

425 ) 

426 return None 

427 

428 @staticmethod 

429 def _html_untag_func(text: str) -> str: 

430 """ 

431 Removes HTML tags. 

432 """ 

433 # Lots of ways... 

434 # -- xml.etree, for well-formed XML 

435 # https://stackoverflow.com/questions/9662346 

436 # return ''.join(xml.etree.ElementTree.fromstring(text).itertext()) 

437 # -- html.parser 

438 # https://stackoverflow.com/questions/753052 

439 # -- lxml (but needs source build on Windows): 

440 # http://www.neuraladvance.com/removing-html-from-python-strings.html 

441 # http://lxml.de/ 

442 # -- regex/re 

443 # https://stackoverflow.com/questions/3662142 

444 # ... as below. 

445 return HTML_TAG_RE.sub("", text) 

446 

447 def _extract_text_func( 

448 self, value: Any, row: List[Any], ddrows: List["DataDictionaryRow"] 

449 ) -> Tuple[Optional[str], bool]: 

450 """ 

451 Take a field's value and return extracted text, for file-related 

452 fields, where the DD row indicated that this field contains a filename 

453 or a BLOB. 

454 

455 Args: 

456 value: source field contents 

457 row: all values in the same source row 

458 ddrows: all data dictionary rows 

459 

460 Returns: 

461 tuple: ``value, extracted`` 

462 

463 """ 

464 use_filename = False 

465 filename = None 

466 blob = None 

467 

468 # Work out either a full filename, or a BLOB. 

469 # Set either use_filename + filename + extension, or blob + extension. 

470 if self.extract_from_filename: 

471 # The database contains a plain and full filename. 

472 use_filename = True 

473 filename = value 

474 _, extension = os.path.splitext(filename) 

475 log.info(f"extract_text: disk file, filename={filename!r}") 

476 

477 elif self.extract_from_file_format: 

478 # The database contains a filename. However, it may not be a full 

479 # path. For example, in RiO, we have fields like 

480 # dbo.ClientDocument.Path, e.g. '1-1-20121023-1000001-LET.pdf' 

481 # dbo.ClientDocument.ClientID, e.g. '1000001-LET.pdf' 

482 # and the disk file might be 

483 # C:\some_base_directory\1000001\Docs\1-1-20121023-1000001-LET.pdf 

484 # We could specify this as a file spec: 

485 # "C:\some_base_directory\{ClientID}\{Path}". 

486 # In principle, this might need to be field-specific, so it could 

487 # go in the data dictionary (rather than as a setting that's 

488 # constant across an entire anonymisation run). 

489 # Let's introduce ALTERMETHOD.FILENAME_FORMAT_TO_TEXT, in v0.18.18. 

490 # 

491 # Create a dictionary of column name -> value 

492 ffdict = {} # type: Dict[str, Any] 

493 for i, ddr in enumerate(ddrows): 

494 ffdict[ddr.src_field] = row[i] 

495 # Use that dictionary with the format string to make the filename 

496 log.debug( 

497 f"extract_text: file_format_str={self.file_format_str!r}, " 

498 f"ffdict={ffdict!r}" 

499 ) 

500 use_filename = True 

501 filename = self.file_format_str.format(**ffdict) 

502 _, extension = os.path.splitext(filename) 

503 log.info(f"extract_text: disk file, filename={filename!r}") 

504 

505 else: 

506 # The database contains the BLOB itself. However, we'd also like to 

507 # know the file type, here from its extension. We look for another 

508 # field that contains the extension, marked as such using 

509 # alter_method.extract_ext_field in the data dictionary. 

510 blob = value 

511 extindex = next( 

512 ( 

513 i 

514 for i, ddr in enumerate(ddrows) 

515 if ddr.src_field == self.extract_ext_field 

516 ), 

517 None, 

518 ) 

519 if extindex is None: 

520 # Configuration error 

521 raise ValueError( 

522 f"Bug: missing extension field for " 

523 f"alter_method={self.as_text}" 

524 ) 

525 extension = row[extindex] 

526 log.info(f"extract_text: database BLOB, extension={extension}") 

527 

528 # Is it a permissible file type? 

529 if not self.config.extract_text_extension_permissible(extension): 

530 log.info(f"Extension {extension!r} not permissible; skipping") 

531 return None, False 

532 

533 if use_filename: 

534 if not filename: 

535 log.error("No filename; skipping") 

536 return None, False 

537 

538 if not os.path.isfile(filename): 

539 log.error(f"Filename {filename!r} is not a file; skipping") 

540 return None, False 

541 

542 # Extract text from the file (given its filename), or from a BLOB. 

543 try: 

544 textconfig = TextProcessingConfig( 

545 plain=self.config.extract_text_plain, 

546 width=self.config.extract_text_width, 

547 ) 

548 value = document_to_text( 

549 filename=filename, 

550 blob=blob, 

551 extension=extension, 

552 config=textconfig, 

553 ) 

554 except Exception as e: 

555 # Runtime error 

556 traceback.print_exc() # full details, please 

557 log.error(f"Caught exception from document_to_text: {e}") 

558 return None, False 

559 return value, True