Coverage for anonymise/altermethod.py: 14%
190 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
1"""
2crate_anon/anonymise/altermethod.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**The AlterMethod class.**
28"""
30import datetime
31import html
32import logging
33import os
34import traceback
35from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
37from cardinal_pythonlib.datetimefunc import (
38 coerce_to_date,
39 truncate_date_to_first_of_month,
40)
41from cardinal_pythonlib.extract_text import (
42 document_to_text,
43 TextProcessingConfig,
44)
45import regex
47# don't import config: circular dependency would have to be sorted out
48from crate_anon.anonymise.constants import AlterMethodType
50if TYPE_CHECKING:
51 from cardinal_pythonlib.hash import GenericHasher
52 from crate_anon.anonymise.config import Config
53 from crate_anon.anonymise.ddr import DataDictionaryRow
55 # import patient to avoid circular import when generating docs
56 from crate_anon.anonymise import patient
58log = logging.getLogger(__name__)
61# =============================================================================
62# Constants
63# =============================================================================
65HTML_TAG_RE = regex.compile("<[^>]*>")
68# =============================================================================
69# AlterMethod
70# =============================================================================
73class AlterMethod:
74 """
75 Implements a SINGLE transformation of source data on its way to the
76 destination database.
78 Knows how to represent itself as a text element in the relevant column of
79 a data dictionary row, and how to create itself from one of those text
80 elements.
82 A :class:`crate_anon.anonymise.ddr.DataDictionaryRow` may include multiple
83 instances of :class:`crate_anon.anonymise.altermethod.AlterMethod` in a
84 sequence.
85 """
87 def __init__(
88 self,
89 config: "Config",
90 text_value: str = None,
91 scrub: bool = False,
92 truncate_date: bool = False,
93 extract_from_filename: bool = False,
94 extract_from_file_format: bool = False, # new in v0.18.18
95 file_format_str: str = "", # new in v0.18.18
96 extract_from_blob: bool = False,
97 skip_if_text_extract_fails: bool = False,
98 extract_ext_field: str = "",
99 hash_: bool = False,
100 hash_config_section: str = "",
101 # html_escape: bool = False,
102 html_unescape: bool = False,
103 html_untag: bool = False,
104 ) -> None:
105 """
106 Args:
107 config:
108 a :class:`crate_anon.anonymise.config.Config`
109 text_value:
110 string (from the data dictionary) to parse via
111 :func:`set_from_text`; may set many of the other attributes
112 scrub:
113 Boolean; "the source field contains sensitive text; scrub it"
114 truncate_date:
115 Boolean; "the source is a date; truncate it to the first of the
116 month"
117 extract_from_filename:
118 Boolean; "the source is a filename; extract the text from it"
119 extract_from_file_format:
120 Boolean; "the source is a partial filename; combine it with
121 ``file_format_str`` to calculate the full filename, then
122 extract the text from it"
123 file_format_str:
124 format string for use with ``extract_from_file_format``
125 extract_from_blob:
126 Boolean; "the source is binary (the database contains a BLOB);
127 extract text from it". See also ``extract_ext_field``.
128 skip_if_text_extract_fails:
129 Boolean: "if text extraction fails, skip the record entirely"
130 extract_ext_field:
131 For when the database contains a BLOB: this parameter indicates
132 a database column (field) name, in the same row, that contains
133 the file's extension, to help identify the BLOB.
134 hash_:
135 Boolean. If true, transform the source by hashing it.
136 hash_config_section:
137 If ``hash_`` is true, this specifies the config section in
138 which the hash is defined.
139 html_unescape:
140 Boolean: "transform the source by HTML-unescaping it". For
141 example, this would convert ``≤`` to ``<``.
142 html_untag:
143 Boolean: "transform the source by removing HTML tags". For
144 example, this would convert ``hello <b>bold</b> world`` to
145 ``hello bold world``.
146 """
147 self.config = config
148 self.scrub = scrub
149 self.truncate_date = truncate_date
150 self.extract_from_blob = extract_from_blob
151 self.extract_from_filename = extract_from_filename
152 self.extract_from_file_format = extract_from_file_format
153 self.file_format_str = file_format_str
154 self.skip_if_text_extract_fails = skip_if_text_extract_fails
155 self.extract_ext_field = extract_ext_field
156 self.hash = hash_
157 self.hash_config_section = hash_config_section
158 self.hasher = None # type: Optional[GenericHasher]
159 # self.html_escape = html_escape
160 self.html_unescape = html_unescape
161 self.html_untag = html_untag
163 self.extract_text = (
164 extract_from_filename
165 or extract_from_file_format
166 or extract_from_blob
167 )
169 if text_value is not None:
170 self.set_from_text(text_value)
171 if hash_:
172 self.hasher = self.config.get_extra_hasher(
173 self.hash_config_section
174 )
176 self._assert_valid()
178 # -------------------------------------------------------------------------
179 # Text representations
180 # -------------------------------------------------------------------------
182 def set_from_text(self, value: str) -> None:
183 """
184 Take the string from the ``alter_method`` field of the data dictionary,
185 and use it to set a bunch of internal attributes.
187 To get the configuration string back, see :func:`get_text`.
188 """
189 self.scrub = False
190 self.truncate_date = False
191 self.extract_text = False
192 self.extract_from_blob = False
193 self.extract_from_file_format = False
194 self.file_format_str = ""
195 self.extract_from_filename = False
196 self.skip_if_text_extract_fails = False
197 self.extract_ext_field = ""
198 self.hash = False
199 self.hash_config_section = ""
201 def get_second_part(missing_description: str) -> str:
202 if "=" not in value:
203 raise ValueError(f"Bad format for alter method: {value}")
204 secondhalf = value[value.index("=") + 1 :]
205 if not secondhalf:
206 raise ValueError(
207 f"Missing {missing_description} in alter method: {value}"
208 )
209 return secondhalf
211 if value == AlterMethodType.TRUNCATEDATE.value:
212 self.truncate_date = True
213 elif value == AlterMethodType.SCRUBIN.value:
214 self.scrub = True
215 elif value.startswith(AlterMethodType.BINARY_TO_TEXT.value):
216 self.extract_text = True
217 self.extract_from_blob = True
218 self.extract_ext_field = get_second_part(
219 "filename/extension field"
220 )
221 elif value.startswith(AlterMethodType.FILENAME_FORMAT_TO_TEXT.value):
222 self.extract_text = True
223 self.extract_from_file_format = True
224 self.file_format_str = get_second_part("filename format field")
225 elif value == AlterMethodType.FILENAME_TO_TEXT.value:
226 self.extract_text = True
227 self.extract_from_filename = True
228 elif value == AlterMethodType.SKIP_IF_TEXT_EXTRACT_FAILS.value:
229 self.skip_if_text_extract_fails = True
230 elif value.startswith(AlterMethodType.HASH.value):
231 self.hash = True
232 self.hash_config_section = get_second_part("hash config section")
233 self.hasher = self.config.get_extra_hasher(
234 self.hash_config_section
235 )
236 # elif value == ALTERMETHOD.HTML_ESCAPE:
237 # self.html_escape = True
238 elif value == AlterMethodType.HTML_UNESCAPE.value:
239 self.html_unescape = True
240 elif value == AlterMethodType.HTML_UNTAG.value:
241 self.html_untag = True
242 else:
243 raise ValueError(f"Bad alter_method part: {value}")
245 @property
246 def as_text(self) -> str:
247 """
248 Return the ``alter_method`` fragment from the working fields;
249 effectively the reverse of :func:`set_from_text`.
250 """
252 def two_part(altermethod: str, parameter: str) -> str:
253 return altermethod + "=" + parameter
255 if self.truncate_date:
256 return AlterMethodType.TRUNCATEDATE.value
257 if self.scrub:
258 return AlterMethodType.SCRUBIN.value
259 if self.extract_text:
260 if self.extract_from_blob:
261 return two_part(
262 AlterMethodType.BINARY_TO_TEXT.value,
263 self.extract_ext_field,
264 )
265 elif self.extract_from_file_format:
266 return two_part(
267 AlterMethodType.FILENAME_FORMAT_TO_TEXT.value,
268 self.file_format_str,
269 )
270 else: # plain filename
271 return AlterMethodType.FILENAME_TO_TEXT.value
272 if self.skip_if_text_extract_fails:
273 return AlterMethodType.SKIP_IF_TEXT_EXTRACT_FAILS.value
274 if self.hash:
275 return two_part(
276 AlterMethodType.HASH.value, self.hash_config_section
277 )
278 # if self.html_escape:
279 # return ALTERMETHOD.HTML_ESCAPE.value
280 if self.html_unescape:
281 return AlterMethodType.HTML_UNESCAPE.value
282 if self.html_untag:
283 return AlterMethodType.HTML_UNTAG.value
284 return ""
286 # -------------------------------------------------------------------------
287 # Validation
288 # -------------------------------------------------------------------------
290 def _assert_valid(self) -> None:
291 """
292 Raises :exc:`ValueError` if the method is invalid (e.g. representing
293 more than one transformation).
294 """
295 methods_map = {
296 "scrub": self.scrub,
297 "truncate_date": self.truncate_date,
298 "extract_text": self.extract_text,
299 "hash": self.hash,
300 "html_unescape": self.html_unescape,
301 "html_untag": self.html_untag,
302 "skip_if_text_extract_fails": self.skip_if_text_extract_fails,
303 }
304 n_methods = sum(int(v) for v in methods_map.values())
305 if n_methods != 1:
306 raise ValueError(
307 f"AlterMethod: should be exactly one method, but "
308 f"there are {n_methods}: {methods_map}"
309 )
311 # -------------------------------------------------------------------------
312 # Perform the transformation: master method
313 # -------------------------------------------------------------------------
315 def alter(
316 self,
317 value: Any,
318 ddr: "DataDictionaryRow", # corresponding DataDictionaryRow
319 row: List[Any], # all values in row
320 ddrows: List["DataDictionaryRow"], # all of them
321 patient: "patient.Patient" = None,
322 ) -> Tuple[Any, bool]:
323 """
324 Performs the alteration.
326 Args:
327 value:
328 source value of interest
329 ddr:
330 corresponding
331 :class:`crate_anon.anonymise.ddr.DataDictionaryRow`
332 row:
333 all values in the same source row
334 ddrows:
335 all data dictionary rows
336 patient:
337 :class:`crate_anon.anonymise.patient.Patient` object
339 Returns:
340 tuple: ``newvalue, skiprow``
342 If multiple transformations are specified within one
343 :class:`AlterMethod`, only one is performed, and in the following
344 order:
346 #. scrub
347 #. truncate_date
348 #. extract_text
349 #. hash
350 #. html_unescape
351 #. html_untag
352 #. skip_if_text_extract_fails
354 However, multiple alteration methods can be specified for one field.
355 See :func:`crate_anon.anonymise.anonymise.process_table` and
356 :class:`crate_anon.anonymise.ddr.DataDictionaryRow`.
358 """
360 if self.scrub:
361 return self._scrub_func(value, patient), False
363 if self.truncate_date:
364 return self._truncate_date_func(value), False
366 if self.extract_text:
367 value, extracted = self._extract_text_func(value, row, ddrows)
368 if not extracted and ddr.skip_row_if_extract_text_fails:
369 log.debug("Skipping row as text extraction failed")
370 return None, True
371 return value, False
373 if self.hash:
374 assert self.hasher is not None
375 return self.hasher.hash(value), False
377 # if alter_method.html_escape:
378 # return html.escape(value), False
380 if self.html_unescape:
381 return html.unescape(value), False
383 if self.html_untag:
384 return self._html_untag_func(value), False
386 if self.skip_if_text_extract_fails:
387 # Modifies other alter methods; doesn't do anything itself
388 return value, True
390 # -------------------------------------------------------------------------
391 # Transformation internals
392 # -------------------------------------------------------------------------
394 @staticmethod
395 def _scrub_func(value: Any, patient: "patient.Patient") -> Optional[str]:
396 """
397 Takes a source value and scrubs it.
399 **Main point of anonymisation within CRATE.**
401 Args:
402 value: source data
403 patient: :class:`crate_anon.anonymise.patient.Patient` object
405 Returns:
406 scrubbed data
408 """
409 if value is None:
410 return None
411 return patient.scrub(str(value))
413 @staticmethod
414 def _truncate_date_func(value: Any) -> Optional[datetime.date]:
415 """
416 Truncates a date-like object to the first of the month.
417 """
418 try:
419 value = coerce_to_date(value)
420 return truncate_date_to_first_of_month(value)
421 except (ValueError, OverflowError):
422 log.warning(
423 f"Invalid date received to "
424 f"{AlterMethodType.TRUNCATEDATE} method: {value}"
425 )
426 return None
428 @staticmethod
429 def _html_untag_func(text: str) -> str:
430 """
431 Removes HTML tags.
432 """
433 # Lots of ways...
434 # -- xml.etree, for well-formed XML
435 # https://stackoverflow.com/questions/9662346
436 # return ''.join(xml.etree.ElementTree.fromstring(text).itertext())
437 # -- html.parser
438 # https://stackoverflow.com/questions/753052
439 # -- lxml (but needs source build on Windows):
440 # http://www.neuraladvance.com/removing-html-from-python-strings.html
441 # http://lxml.de/
442 # -- regex/re
443 # https://stackoverflow.com/questions/3662142
444 # ... as below.
445 return HTML_TAG_RE.sub("", text)
447 def _extract_text_func(
448 self, value: Any, row: List[Any], ddrows: List["DataDictionaryRow"]
449 ) -> Tuple[Optional[str], bool]:
450 """
451 Take a field's value and return extracted text, for file-related
452 fields, where the DD row indicated that this field contains a filename
453 or a BLOB.
455 Args:
456 value: source field contents
457 row: all values in the same source row
458 ddrows: all data dictionary rows
460 Returns:
461 tuple: ``value, extracted``
463 """
464 use_filename = False
465 filename = None
466 blob = None
468 # Work out either a full filename, or a BLOB.
469 # Set either use_filename + filename + extension, or blob + extension.
470 if self.extract_from_filename:
471 # The database contains a plain and full filename.
472 use_filename = True
473 filename = value
474 _, extension = os.path.splitext(filename)
475 log.info(f"extract_text: disk file, filename={filename!r}")
477 elif self.extract_from_file_format:
478 # The database contains a filename. However, it may not be a full
479 # path. For example, in RiO, we have fields like
480 # dbo.ClientDocument.Path, e.g. '1-1-20121023-1000001-LET.pdf'
481 # dbo.ClientDocument.ClientID, e.g. '1000001-LET.pdf'
482 # and the disk file might be
483 # C:\some_base_directory\1000001\Docs\1-1-20121023-1000001-LET.pdf
484 # We could specify this as a file spec:
485 # "C:\some_base_directory\{ClientID}\{Path}".
486 # In principle, this might need to be field-specific, so it could
487 # go in the data dictionary (rather than as a setting that's
488 # constant across an entire anonymisation run).
489 # Let's introduce ALTERMETHOD.FILENAME_FORMAT_TO_TEXT, in v0.18.18.
490 #
491 # Create a dictionary of column name -> value
492 ffdict = {} # type: Dict[str, Any]
493 for i, ddr in enumerate(ddrows):
494 ffdict[ddr.src_field] = row[i]
495 # Use that dictionary with the format string to make the filename
496 log.debug(
497 f"extract_text: file_format_str={self.file_format_str!r}, "
498 f"ffdict={ffdict!r}"
499 )
500 use_filename = True
501 filename = self.file_format_str.format(**ffdict)
502 _, extension = os.path.splitext(filename)
503 log.info(f"extract_text: disk file, filename={filename!r}")
505 else:
506 # The database contains the BLOB itself. However, we'd also like to
507 # know the file type, here from its extension. We look for another
508 # field that contains the extension, marked as such using
509 # alter_method.extract_ext_field in the data dictionary.
510 blob = value
511 extindex = next(
512 (
513 i
514 for i, ddr in enumerate(ddrows)
515 if ddr.src_field == self.extract_ext_field
516 ),
517 None,
518 )
519 if extindex is None:
520 # Configuration error
521 raise ValueError(
522 f"Bug: missing extension field for "
523 f"alter_method={self.as_text}"
524 )
525 extension = row[extindex]
526 log.info(f"extract_text: database BLOB, extension={extension}")
528 # Is it a permissible file type?
529 if not self.config.extract_text_extension_permissible(extension):
530 log.info(f"Extension {extension!r} not permissible; skipping")
531 return None, False
533 if use_filename:
534 if not filename:
535 log.error("No filename; skipping")
536 return None, False
538 if not os.path.isfile(filename):
539 log.error(f"Filename {filename!r} is not a file; skipping")
540 return None, False
542 # Extract text from the file (given its filename), or from a BLOB.
543 try:
544 textconfig = TextProcessingConfig(
545 plain=self.config.extract_text_plain,
546 width=self.config.extract_text_width,
547 )
548 value = document_to_text(
549 filename=filename,
550 blob=blob,
551 extension=extension,
552 config=textconfig,
553 )
554 except Exception as e:
555 # Runtime error
556 traceback.print_exc() # full details, please
557 log.error(f"Caught exception from document_to_text: {e}")
558 return None, False
559 return value, True