Coverage for anonymise/ddr.py: 28%
615 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
1"""
2crate_anon/anonymise/ddr.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Data dictionary rows.**
28"""
30# =============================================================================
31# Imports
32# =============================================================================
34import ast
35import logging
36import re
37from typing import Any, List, Dict, Iterable, Optional, TYPE_CHECKING, Union
39from cardinal_pythonlib.convert import convert_to_int
40from cardinal_pythonlib.lists import count_bool
41from cardinal_pythonlib.sql.validation import (
42 ensure_valid_field_name,
43 ensure_valid_table_name,
44 is_sqltype_valid,
45 SQLTYPE_DATE,
46)
47from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
48from cardinal_pythonlib.sqlalchemy.schema import (
49 convert_sqla_type_for_dialect,
50 does_sqlatype_merit_fulltext_index,
51 does_sqlatype_require_index_len,
52 giant_text_sqltype,
53 get_sqla_coltype_from_dialect_str,
54 is_sqlatype_binary,
55 is_sqlatype_date,
56 is_sqlatype_numeric,
57 is_sqlatype_text_of_length_at_least,
58 is_sqlatype_text_over_one_char,
59)
60from sqlalchemy import Column
61from sqlalchemy.engine.interfaces import Dialect
62from sqlalchemy.sql.sqltypes import TypeEngine
64from crate_anon.anonymise.altermethod import AlterMethod
65from crate_anon.anonymise.constants import (
66 AlterMethodType,
67 AnonymiseConfigKeys,
68 Decision,
69 DEFAULT_INDEX_LEN,
70 IndexType,
71 MYSQL_MAX_IDENTIFIER_LENGTH,
72 ScrubMethod,
73 ScrubSrc,
74 SQLSERVER_MAX_IDENTIFIER_LENGTH,
75 SrcFlag,
76)
77from crate_anon.common.sql import (
78 coltype_length_if_text,
79 is_sql_column_type_textual,
80 matches_fielddef,
81 matches_tabledef,
82)
84if TYPE_CHECKING:
85 from crate_anon.anonymise.config import Config, DatabaseSafeConfig
87log = logging.getLogger(__name__)
90# =============================================================================
91# Constants for reporting
92# =============================================================================
95class DDRLabels:
96 BEING_SCRUBBED = "Scrubbed free text."
97 DEFINES_PRIMARY_PID = (
98 "PRINCIPAL PATIENT-DEFINING COLUMN IN THE WHOLE DATABASE."
99 )
100 MRID = "MRID."
101 NOTHING = "–" # en dash
102 RID = "RID."
103 SOURCE_HASH = "Hash of source row to detect changes."
104 THIRD_PARTY_RID = "Third-party RID (e.g. relative)."
105 TRID = "TRID."
106 UNKNOWN = "?"
109# =============================================================================
110# Helper functions
111# =============================================================================
114def warn_if_identifier_long(
115 table: str, column: str, dest_dialect: Optional[str]
116) -> None:
117 """
118 Warns about identifiers that are too long for specific database engines.
119 """
120 prettyname_dialectname_maxlen = (
121 ("MySQL", SqlaDialectName.MYSQL, MYSQL_MAX_IDENTIFIER_LENGTH),
122 ("SQL Server", SqlaDialectName.MSSQL, SQLSERVER_MAX_IDENTIFIER_LENGTH),
123 )
124 description_value = (
125 ("Table", table),
126 ("Column", column),
127 )
128 for prettyname, dialect_name, maxlen in prettyname_dialectname_maxlen:
129 if dest_dialect is not None and dest_dialect != dialect_name:
130 # We know our destination dialect and it's not the one we're
131 # considering.
132 continue
133 for description, value in description_value:
134 if len(value) > maxlen:
135 log.warning(
136 f"{description} name in {table!r}.{column!r} "
137 f"is too long for {prettyname} "
138 f"({len(value)} characters > {maxlen} maximum)"
139 )
142# =============================================================================
143# DataDictionaryRow
144# =============================================================================
147class DataDictionaryRow:
148 """
149 Class representing a single row of a data dictionary (a DDR).
150 """
152 # For attribute/config references:
153 SRC_DB = "src_db"
154 SRC_TABLE = "src_table"
155 SRC_FIELD = "src_field"
156 SRC_DATAFILE = "src_datatype"
157 SRC_FLAGS = "src_flags"
159 SCRUB_SRC = "scrub_src"
160 SCRUB_METHOD = "scrub_method"
162 DECISION = "decision"
163 INCLUSION_VALUES = "inclusion_values"
164 EXCLUSION_VALUES = "exclusion_values"
165 ALTER_METHOD = "alter_method"
167 DEST_TABLE = "dest_table"
168 DEST_FIELD = "dest_field"
169 DEST_DATATYPE = "dest_datatype"
170 INDEX = "index"
171 INDEXLEN = "indexlen"
172 COMMENT = "comment"
174 ROWNAMES = [
175 SRC_DB,
176 SRC_TABLE,
177 SRC_FIELD,
178 SRC_DATAFILE,
179 SRC_FLAGS,
180 SCRUB_SRC,
181 SCRUB_METHOD,
182 DECISION,
183 INCLUSION_VALUES,
184 EXCLUSION_VALUES,
185 ALTER_METHOD,
186 DEST_TABLE,
187 DEST_FIELD,
188 DEST_DATATYPE,
189 INDEX,
190 INDEXLEN,
191 COMMENT,
192 ]
193 ENUM_ROWNAMES = (INDEX, SCRUB_SRC, SCRUB_METHOD)
195 def __init__(self, config: "Config") -> None:
196 """
197 Set up basic defaults.
199 Args:
200 config: :class:`crate_anon.anonymise.config.Config`
201 """
202 self.config = config
204 # In the order of ROWNAMES:
205 self.src_db = None # type: Optional[str]
206 self.src_table = None # type: Optional[str]
207 self.src_field = None # type: Optional[str]
208 self.src_datatype = None # type: Optional[str] # in SQL string format
209 # src_flags: a property; see below
211 self.scrub_src = None # type: Optional[str]
212 self.scrub_method = None # type: Optional[str]
214 # decision: a property; see below
215 # inclusion_values: a property; see below
216 # exclusion_values: a property; see below
217 # alter_method: a property; see below
219 self.dest_table = None # type: Optional[str]
220 self.dest_field = None # type: Optional[str]
221 self.dest_datatype = None # type: Optional[str]
222 self.index = IndexType.NONE # type: IndexType
223 self.indexlen = None # type: Optional[int]
224 self.comment = ""
226 # For src_flags:
227 self._pk = False
228 self._not_null = False
229 self._add_src_hash = False
230 self._primary_pid = False
231 self._defines_primary_pids = False
232 self._master_pid = False
233 self._constant = False
234 self._addition_only = False
235 self._opt_out_info = False
236 self._required_scrubber = False
238 # Other:
239 self.omit = False # in the DD file, this corresponds to 'decision'
241 self._from_file = False
242 self._src_override_dialect = None # type: Optional[Dialect]
243 self._src_sqla_coltype = None # type: Optional[str]
244 self._inclusion_values = [] # type: List[Any]
245 self._exclusion_values = [] # type: List[Any]
246 self._alter_methods = [] # type: List[AlterMethod]
248 # -------------------------------------------------------------------------
249 # Properties: Relating to whole databases
250 # -------------------------------------------------------------------------
252 @property
253 def src_db_lowercase(self) -> str:
254 """
255 Returns the source database name, in lower case.
256 """
257 return self.src_db.lower()
259 @property
260 def src_dialect(self) -> Dialect:
261 """
262 Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for
263 the source database.
264 """
265 return self._src_override_dialect or self.config.get_src_dialect(
266 self.src_db
267 )
269 @property
270 def dest_dialect(self) -> Dialect:
271 """
272 Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for
273 the destination database.
274 """
275 return self.config.dest_dialect
277 @property
278 def dest_dialect_name(self) -> str:
279 """
280 Returns the SQLAlchemy dialect name for the destination database.
281 """
282 return self.config.dest_dialect_name
284 # -------------------------------------------------------------------------
285 # Properties: Relating to database columns
286 # -------------------------------------------------------------------------
288 @property
289 def src_table_lowercase(self) -> str:
290 """
291 Returns the source table name, in lower case.
292 """
293 return self.src_table.lower()
295 @property
296 def src_field_lowercase(self) -> str:
297 """
298 Returns the source field (column) name, in lower case.
299 """
300 return self.src_field.lower()
302 @property
303 def pk(self) -> bool:
304 """
305 Is the source field (and the destination field, for that matter) a
306 primary key (PK)?
307 """
308 return self._pk
310 @property
311 def not_null(self) -> bool:
312 """
313 Defaults to False. But if the DD row was created by database
314 reflection, and the source field was set NOT NULL, will return True.
315 """
316 return self._not_null
318 @property
319 def src_is_textual(self) -> bool:
320 """
321 Is the source column textual?
322 """
323 return is_sql_column_type_textual(self.src_datatype)
325 @property
326 def src_textlength(self) -> Optional[int]:
327 """
328 If the source column is textual, returns its length (or ``None``) for
329 unlimited. Also returns ``None`` if the source is not textual.
330 """
331 if not self.src_is_textual:
332 return None
333 dialect = self.src_dialect
334 # Get length of field if text field (otherwise this remains 'None')
335 # noinspection PyUnresolvedReferences
336 return coltype_length_if_text(self.src_datatype, dialect.name)
338 # -------------------------------------------------------------------------
339 # Properties: CRATE
340 # -------------------------------------------------------------------------
342 @property
343 def add_src_hash(self) -> bool:
344 """
345 Should we add a column to the destination that contains a hash of the
346 contents of the whole source row (all fields)?
347 """
348 return self._add_src_hash
350 @property
351 def primary_pid(self) -> bool:
352 """
353 Does the source field contain the primary patient ID (PID)?
355 (A typical example of a PID: "hospital number".)
356 """
357 return self._primary_pid
359 @property
360 def defines_primary_pids(self) -> bool:
361 """
362 Is this the field -- usually one in the entire source database -- that
363 *defines* primary PIDs? Usually this is true for the "ID" column of
364 the master patient table.
365 """
366 return self._defines_primary_pids
368 @property
369 def master_pid(self) -> bool:
370 """
371 Does this field contain the master patient ID (MPID)?
373 (A typical example of an MPID: "NHS number".)
374 """
375 return self._master_pid
377 @property
378 def contains_patient_scrub_src_info(self) -> bool:
379 """
380 Does this field contain scrub-source information about the patient?
381 """
382 return self.scrub_src == ScrubSrc.PATIENT
384 @property
385 def contains_third_party_info_directly(self) -> bool:
386 """
387 Does this field contain (identifiable) information about a third party,
388 directly?
389 """
390 return self.scrub_src == ScrubSrc.THIRDPARTY
392 @property
393 def third_party_pid(self) -> bool:
394 """
395 Does this field contain the PID of a different (e.g. related) patient?
396 """
397 return self.scrub_src == ScrubSrc.THIRDPARTY_XREF_PID
399 @property
400 def contains_third_party_info(self) -> bool:
401 """
402 Does this field contain (identifiable) information about a third party,
403 either directly or via a third-party PID?
404 """
405 return self.third_party_pid or self.contains_third_party_info_directly
407 @property
408 def has_special_alter_method(self) -> bool:
409 """
410 Fields for which the alter method is fixed.
411 """
412 return self.primary_pid or self.master_pid or self.third_party_pid
414 @property
415 def constant(self) -> bool:
416 """
417 Is the source field guaranteed not to change (for a given PK)?
418 """
419 return self._constant
421 @property
422 def addition_only(self) -> bool:
423 """
424 May we assume that records can only be added to this table, not
425 deleted?
427 This is a flag that may be applied to a PK row only.
428 """
429 return self._addition_only
431 @property
432 def opt_out_info(self) -> bool:
433 """
434 Does the field contain information about whether the patient wishes
435 to opt out entirely from the anonymised database?
437 (Whether the contents of the field means "opt out" or "don't opt out"
438 depends on ``optout_col_values`` in the
439 :class:`crate_anon.anonymise.config.Config`.)
440 """
441 return self._opt_out_info
443 @property
444 def src_flags(self) -> str:
445 """
446 Returns a string representation of the source flags.
447 """
448 return "".join(
449 str(x)
450 for x in (
451 SrcFlag.PK if self._pk else "",
452 SrcFlag.NOT_NULL if self._not_null else "",
453 SrcFlag.ADD_SRC_HASH if self._add_src_hash else "",
454 SrcFlag.PRIMARY_PID if self._primary_pid else "",
455 (
456 SrcFlag.DEFINES_PRIMARY_PIDS
457 if self._defines_primary_pids
458 else ""
459 ),
460 SrcFlag.MASTER_PID if self._master_pid else "",
461 SrcFlag.CONSTANT if self._constant else "",
462 SrcFlag.ADDITION_ONLY if self._addition_only else "",
463 SrcFlag.OPT_OUT if self._opt_out_info else "",
464 SrcFlag.REQUIRED_SCRUBBER if self._required_scrubber else "",
465 )
466 )
468 @src_flags.setter
469 def src_flags(self, flags: str) -> None:
470 """
471 Takes a string representation of the source flags, and sets our
472 internal flags accordingly.
473 """
474 self._pk = SrcFlag.PK.value in flags
475 self._not_null = SrcFlag.NOT_NULL.value in flags
476 self._add_src_hash = SrcFlag.ADD_SRC_HASH.value in flags
477 self._primary_pid = SrcFlag.PRIMARY_PID.value in flags
478 self._defines_primary_pids = (
479 SrcFlag.DEFINES_PRIMARY_PIDS.value in flags
480 )
481 self._master_pid = SrcFlag.MASTER_PID.value in flags
482 self._constant = SrcFlag.CONSTANT.value in flags
483 self._addition_only = SrcFlag.ADDITION_ONLY.value in flags
484 self._opt_out_info = SrcFlag.OPT_OUT.value in flags
485 self._required_scrubber = SrcFlag.REQUIRED_SCRUBBER.value in flags
487 @property
488 def inclusion_values(self) -> List[Any]:
489 """
490 Returns a list of inclusion values (or an empty string if there are
491 no such values).
493 This slightly curious output format is used to create a TSV row (see
494 :func:`get_tsv`) or to check in a "truthy" way whether we have
495 inclusion values (see
496 :func:`crate_anon.anonymise.anonymise.process_table`).
497 """
498 return self._inclusion_values or "" # for TSV output
500 @inclusion_values.setter
501 def inclusion_values(self, value: str) -> None:
502 """
503 Set the inclusion values.
505 Args:
506 value:
507 either something that is "falsy" (to set the inclusion values
508 to an empty list) or something that evaluates to a Python
509 iterable (e.g. list or tuple) via :func:`ast.literal_eval`.
511 For example: ``[None, 0]``, or ``[True, 1, 'yes', 'true',
512 'Yes', 'True']``.
514 """
515 if value:
516 self._inclusion_values = (
517 ast.literal_eval(value) or []
518 ) # type: List[Any]
519 else:
520 self._inclusion_values = [] # type: List[Any]
522 @property
523 def exclusion_values(self) -> Union[List[Any], str]:
524 """
525 Returns a list of exclusion values (or an empty string if there are
526 no such values).
528 This slightly curious output format is used to create a TSV row (see
529 :func:`get_tsv`) or to check in a "truthy" way whether we have
530 exclusion values (see
531 :func:`crate_anon.anonymise.anonymise.process_table`).
532 """
533 return self._exclusion_values or "" # for TSV output
535 @exclusion_values.setter
536 def exclusion_values(self, value: str) -> None:
537 """
538 Set the exclusion values.
540 Args:
541 value:
542 either something that is "falsy" (to set the inclusion values
543 to an empty list) or something that evaluates to a Python
544 iterable (e.g. list or tuple) via :func:`ast.literal_eval`.
546 For example: ``[None, 0]``, or ``[True, 1, 'yes', 'true',
547 'Yes', 'True']``.
549 """
550 if value:
551 self._exclusion_values = (
552 ast.literal_eval(value) or []
553 ) # type: List[Any]
554 else:
555 self._exclusion_values = [] # type: List[Any]
557 @property
558 def required_scrubber(self) -> bool:
559 """
560 Is this a "required scrubber" field?
562 A "required scrubber" is a field that must provide at least one
563 non-NULL value for each patient, or the patient won't get processed.
564 (For example, you might want to omit a patient if you can't be certain
565 about their surname for anonymisation.)
566 """
567 return self._required_scrubber
569 @property
570 def alter_method(self) -> str:
571 """
572 Return the ``alter_method`` string from the working fields.
573 """
574 return ",".join(filter(None, (x.as_text for x in self._alter_methods)))
575 # This removes any AlterMethod objects that are doing nothing, because
576 # they return blank strings.
578 @alter_method.setter
579 def alter_method(self, value: str) -> None:
580 """
581 Convert the ``alter_method`` string (from the data dictionary) to a
582 bunch of Boolean/simple fields internally.
583 """
584 # Get the list of elements in the user's order.
585 self._alter_methods = [] # type: List[AlterMethod]
586 elements = [x.strip() for x in value.split(",") if x]
587 methods = [] # type: List[AlterMethod]
588 for e in elements:
589 methods.append(AlterMethod(config=self.config, text_value=e))
590 # Now establish order. Text extraction first; everything else in order.
591 text_extraction_indices = [] # type: List[int]
592 for i, am in enumerate(methods):
593 if am.extract_text:
594 text_extraction_indices.append(i)
595 for index in sorted(text_extraction_indices, reverse=True):
596 # Go in reverse order of index.
597 self._alter_methods.append(methods[index])
598 del methods[index]
599 self._alter_methods.extend(methods)
600 # Now, checks:
601 have_text_extraction = False
602 have_truncate_date = False
603 for am in self._alter_methods:
604 if not am.truncate_date and have_truncate_date:
605 raise ValueError(
606 f"Date truncation must stand alone in "
607 f"alter_method: {value}"
608 )
609 if am.extract_text and have_text_extraction:
610 raise ValueError(
611 f"Can only have one text extraction method " f"in {value}"
612 )
613 if am.truncate_date:
614 have_truncate_date = True
615 if am.extract_text:
616 have_text_extraction = True
618 def set_alter_methods_directly(self, methods: List[AlterMethod]) -> None:
619 """
620 For internal use: setting from a list directly.
621 """
622 # Calls the alter_method setter.
623 self.alter_method = ",".join(m.as_text for m in methods)
625 @property
626 def from_file(self) -> bool:
627 """
628 Was this DDR loaded from a file (rather than, say, autogenerated from
629 a database)?
630 """
631 return self._from_file
633 @property
634 def decision(self) -> str:
635 """
636 Should we include the field in the destination?
638 Returns:
639 ``"OMIT"`` or ``"include``.
640 """
641 return Decision.OMIT.value if self.omit else Decision.INCLUDE.value
643 @decision.setter
644 def decision(self, value: Union[str, Decision]) -> None:
645 """
646 Sets the internal ``omit`` flag from the input (usually taken from the
647 data dictionary file).
649 Args:
650 value: ``"OMIT"`` or ``"include``.
651 """
652 try:
653 if isinstance(value, Decision):
654 e = value
655 else:
656 e = Decision.lookup(value)
657 self.omit = e is Decision.OMIT
658 except ValueError:
659 raise ValueError(
660 "decision was {}; must be one of {}".format(
661 value, [Decision.OMIT.value, Decision.INCLUDE.value]
662 )
663 )
665 @property
666 def include(self) -> bool:
667 """
668 Is this row being included (not omitted)?
669 """
670 return not self.omit
672 @property
673 def is_table_comment(self) -> bool:
674 """
675 Is this a table comment, free of column information?
676 """
677 return bool(self.comment and not self.src_field)
679 # -------------------------------------------------------------------------
680 # Comparisons
681 # -------------------------------------------------------------------------
683 def __lt__(self, other: "DataDictionaryRow") -> bool:
684 """
685 Defines an order of DDRs based on their source field's signature.
686 """
687 return self.src_signature < other.src_signature
689 def matches_tabledef(self, tabledef: Union[str, List[str]]) -> bool:
690 """
691 Does our source table match the wildcard-based table definition?
693 Args:
694 tabledef: ``fnmatch``-style pattern (e.g.
695 ``"patient_address_table_*"``), or list of them
696 """
697 return matches_tabledef(self.src_table, tabledef)
699 def matches_fielddef(self, fielddef: Union[str, List[str]]) -> bool:
700 """
701 Does our source table/field match the wildcard-based field definition?
703 Args:
704 fielddef: ``fnmatch``-style pattern (e.g. ``"system_table.*"`` or
705 ``"*.nhs_number"``), or list of them
706 """
707 return matches_fielddef(self.src_table, self.src_field, fielddef)
709 # -------------------------------------------------------------------------
710 # Representations
711 # -------------------------------------------------------------------------
713 def __str__(self) -> str:
714 """
715 Returns a string representation of the DDR.
716 """
717 return ", ".join(
718 [f"{a}: {getattr(self, a)!r}" for a in DataDictionaryRow.ROWNAMES]
719 )
721 @property
722 def src_signature(self) -> str:
723 """
724 Returns a signature based on the source database/table/field, in the
725 format ``db.table.column``.
726 """
727 if self.is_table_comment:
728 return f"{self.src_db}.{self.src_table}"
729 return f"{self.src_db}.{self.src_table}.{self.src_field}"
731 @property
732 def dest_signature(self) -> str:
733 """
734 Returns a signature based on the destination table/field, in the format
735 ``table.column``.
736 """
737 if self.is_table_comment:
738 return f"{self.dest_table}"
739 return f"{self.dest_table}.{self.dest_field}"
741 @property
742 def offender_description(self) -> str:
743 """
744 Get a string used to describe this DDR (in terms of its
745 source/destination fields) if it does something wrong.
746 """
747 offenderdest = "" if not self.omit else f" -> {self.dest_signature}"
748 return f"{self.src_signature}{offenderdest}"
750 @classmethod
751 def header_row(cls) -> List[str]:
752 """
753 Returns a header row (a list of headings) for use in spreadsheet
754 formats.
755 """
756 return list(cls.ROWNAMES)
758 def as_row(self) -> List[Any]:
759 """
760 Returns a data row (a list of values whose order matches
761 :meth:`header_row`) for use in spreadsheet formats.
762 """
763 row = [] # type: List[Any]
764 for k in self.ROWNAMES:
765 v = getattr(self, k)
766 if v is None:
767 # some spreadsheet handlers (e.g. pyexcel_ods) choke on None
768 v = ""
769 elif k in self.ENUM_ROWNAMES:
770 # convert enum to str
771 v = str(v)
772 row.append(v)
773 return row
775 def report_dest_annotation(self) -> str:
776 """
777 Returns information useful for a researcher looking at the destination
778 database, in simple string form.
780 - Therefore: does not include fields like "constant",
781 "addition_only", which are primarily for database managers; we're
782 trying to keep this terse.
783 - Relates to DESTINATION fields, e.g. a source PID becomes a
784 destination RID.
785 """
786 items = [] # type: List[str]
787 if self.primary_pid:
788 items.append(DDRLabels.RID)
789 if self.defines_primary_pids:
790 items.append(DDRLabels.DEFINES_PRIMARY_PID)
791 if self.master_pid:
792 items.append(DDRLabels.MRID)
793 if self.third_party_pid:
794 items.append(DDRLabels.THIRD_PARTY_RID)
796 if self.being_scrubbed:
797 items.append(DDRLabels.BEING_SCRUBBED)
799 if not items:
800 return DDRLabels.NOTHING
801 return " ".join(items)
803 # -------------------------------------------------------------------------
804 # Setting
805 # -------------------------------------------------------------------------
807 def set_from_dict(
808 self, valuedict: Dict[str, Any], override_dialect: Dialect = None
809 ) -> None:
810 """
811 Set internal fields from a dict of elements representing a row from the
812 TSV data dictionary file.
814 Also sets the "loaded from file" indicator, since that is the context
815 in which we use this function.
817 Args:
818 valuedict:
819 Dictionary mapping row headings (or attribute names) to values.
820 override_dialect:
821 SQLAlchemy SQL dialect to enforce (e.g. for interpreting
822 textual column types in the source database). By default, the
823 source database's own dialect is used.
824 """
825 self.src_db = valuedict["src_db"]
826 self.src_table = valuedict["src_table"]
827 self.src_field = valuedict["src_field"]
828 self.src_datatype = valuedict["src_datatype"].upper()
829 self._src_override_dialect = override_dialect
830 # noinspection PyAttributeOutsideInit
831 self.src_flags = valuedict["src_flags"] # a property
832 self.scrub_src = ScrubSrc.lookup(
833 valuedict["scrub_src"], allow_none=True
834 )
835 self.scrub_method = ScrubMethod.lookup(
836 valuedict["scrub_method"], allow_none=True
837 )
838 # noinspection PyAttributeOutsideInit
839 self.decision = valuedict["decision"] # a property; sets self.omit
840 # noinspection PyAttributeOutsideInit
841 self.inclusion_values = valuedict["inclusion_values"] # a property
842 # noinspection PyAttributeOutsideInit
843 self.exclusion_values = valuedict["exclusion_values"] # a property
844 # noinspection PyAttributeOutsideInit
845 self.alter_method = valuedict["alter_method"] # a property
846 self.dest_table = valuedict["dest_table"]
847 self.dest_field = valuedict["dest_field"]
848 self.dest_datatype = valuedict["dest_datatype"].upper()
849 self.index = IndexType.lookup(valuedict["index"], allow_none=True)
850 self.indexlen = convert_to_int(valuedict["indexlen"])
851 self.comment = valuedict["comment"]
852 self._from_file = True
854 # -------------------------------------------------------------------------
855 # Anonymisation decisions
856 # -------------------------------------------------------------------------
858 @property
859 def being_scrubbed(self) -> bool:
860 """
861 Is the field being scrubbed as it passes from source to destination?
862 (Only true if the field is being included, not omitted.)
863 """
864 return not self.omit and any(am.scrub for am in self._alter_methods)
866 @property
867 def contains_patient_info(self) -> bool:
868 """
869 Does the field contain patient information? That means any of:
871 - primary PID
872 - MPID
873 - scrub-source (sensitive) information
874 """
875 return self.primary_pid or self.master_pid or bool(self.scrub_src)
877 @property
878 def contains_scrub_src(self) -> bool:
879 """
880 Does the field contain scrub-source information (sensitive information
881 used for de-identification)?
882 """
883 return bool(self.scrub_src)
885 @property
886 def contains_vital_patient_info(self) -> bool:
887 """
888 Does the field contain vital patient information? That means:
890 - scrub-source (sensitive) information
891 """
892 return self.contains_scrub_src
894 @property
895 def required(self) -> bool:
896 """
897 Is the field required? That means any of:
899 - chosen by the user to be translated into the destination
900 - contains vital patient information (scrub-source information)
901 """
902 # return not self.omit or self.contains_patient_info
903 return not self.omit or self.contains_vital_patient_info
905 def skip_row_by_value(self, value: Any) -> bool:
906 """
907 Should we skip this row, because the value is one of the row's
908 exclusion values, or the row has inclusion values and the value isn't
909 one of them?
911 Args:
912 value: value to test
913 """
914 if self._inclusion_values and value not in self._inclusion_values:
915 # log.debug("skipping row based on inclusion_values")
916 return True
917 if value in self._exclusion_values:
918 # log.debug("skipping row based on exclusion_values")
919 return True
920 return False
922 @property
923 def alter_methods(self) -> List[AlterMethod]:
924 """
925 Return all alteration methods to be applied.
927 Returns:
928 list of :class:`crate_anon.anonymise.altermethod.AlterMethod`
929 objects
931 """
932 return self._alter_methods
934 @property
935 def skip_row_if_extract_text_fails(self) -> bool:
936 """
937 Should we skip the row if processing the row involves extracting text
938 and that process fails?
939 """
940 return any(x.skip_if_text_extract_fails for x in self._alter_methods)
942 @property
943 def extracting_text_altermethods(self) -> List[AlterMethod]:
944 """
945 Return all alteration methods that involve text extraction.
947 Returns:
948 list of :class:`crate_anon.anonymise.altermethod.AlterMethod`
949 objects
950 """
951 return [am for am in self._alter_methods if am.extract_text]
953 def remove_scrub_from_alter_methods(self) -> None:
954 """
955 Prevent this row from being scrubbed, by removing any "scrub" method
956 from among its alteration methods.
957 """
958 # log.debug(
959 # f"remove_scrub_from_alter_methods "
960 # f"[used for non-patient tables]: {self.src_signature}")
961 for sm in self._alter_methods:
962 sm.scrub = False
964 # -------------------------------------------------------------------------
965 # Other decisions
966 # -------------------------------------------------------------------------
968 @property
969 def using_fulltext_index(self) -> bool:
970 """
971 Should the destination field have a full-text index?
972 """
973 return self.index == IndexType.FULLTEXT
975 # -------------------------------------------------------------------------
976 # SQLAlchemy types
977 # -------------------------------------------------------------------------
979 @property
980 def src_sqla_coltype(self) -> TypeEngine:
981 """
982 Returns the SQLAlchemy column type of the source column.
983 """
984 return self._src_sqla_coltype or get_sqla_coltype_from_dialect_str(
985 self.src_datatype, self.config.get_src_dialect(self.src_db)
986 )
988 def set_src_sqla_coltype(self, sqla_coltype: TypeEngine) -> None:
989 """
990 Sets the SQLAlchemy column type of the source column.
991 """
992 self._src_sqla_coltype = sqla_coltype
994 @property
995 def dest_should_be_encrypted_pid_type(self) -> bool:
996 """
997 Should the destination column (if included) be of the encrypted
998 PID/MPID type?
999 """
1000 return self.primary_pid or self.third_party_pid or self.master_pid
1002 @property
1003 def dest_sqla_coltype(self) -> TypeEngine:
1004 """
1005 Returns the SQLAlchemy column type of the destination column.
1007 Note that this doesn't include nullable status. An SQLAlchemy column
1008 looks like Column(String(50), nullable=False) -- the type that we're
1009 fetching here is, for example, the String(50) part. For the full
1010 column, see ``dest_sqla_column`` below.
1011 """
1012 assert not self.is_table_comment
1013 if self.dest_datatype:
1014 # User (or our autogeneration process) wants to override
1015 # the type.
1016 return get_sqla_coltype_from_dialect_str(
1017 self.dest_datatype, self.dest_dialect
1018 )
1019 else:
1020 # Destination data type is not explicitly specified.
1021 # Is it a special type of field?
1022 if self.dest_should_be_encrypted_pid_type:
1023 return self.config.sqltype_encrypted_pid
1024 else:
1025 # Otherwise: return the SQLAlchemy column type class determined
1026 # from the source database by reflection. Will be autoconverted
1027 # to the destination dialect, with some exceptions, addressed
1028 # as below:
1029 return convert_sqla_type_for_dialect(
1030 coltype=self.src_sqla_coltype,
1031 dialect=self.dest_dialect,
1032 expand_for_scrubbing=self.being_scrubbed,
1033 )
1035 @property
1036 def dest_sqla_column(self) -> Column:
1037 """
1038 Returns an SQLAlchemy :class:`sqlalchemy.sql.schema.Column` for the
1039 destination column.
1040 """
1041 assert not self.is_table_comment
1042 name = self.dest_field
1043 coltype = self.dest_sqla_coltype
1044 comment = self.comment or ""
1045 kwargs = {
1046 "doc": comment, # Python side
1047 "comment": comment, # SQL side; supported from SQLAlchemy 1.2:
1048 # https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Column.params.comment # noqa: E501
1049 }
1050 if self.pk:
1051 kwargs["primary_key"] = True
1052 kwargs["autoincrement"] = False
1053 if self.not_null or self.primary_pid:
1054 kwargs["nullable"] = False
1055 return Column(name, coltype, **kwargs)
1057 def make_dest_datatype_explicit(self) -> None:
1058 """
1059 By default, when autocreating a data dictionary, the ``dest_datatype``
1060 field is not populated explicit, just implicitly. This option makes
1061 them explicit by instantiating those values. Primarily for debugging.
1062 """
1063 if self.is_table_comment:
1064 return
1065 if not self.dest_datatype:
1066 self.dest_datatype = str(self.dest_sqla_coltype)
1068 # -------------------------------------------------------------------------
1069 # Validation
1070 # -------------------------------------------------------------------------
1072 def check_valid(self) -> None:
1073 """
1074 Check internal validity and complain if invalid, showing the source
1075 of the problem.
1077 Raises:
1078 :exc:`AssertionError`, :exc:`ValueError`
1079 """
1080 try:
1081 self._check_valid()
1082 except (AssertionError, ValueError):
1083 log.exception(
1084 f"Offending DD row [{self.offender_description}]: "
1085 f"{str(self)}"
1086 )
1087 raise
1089 def check_prohibited_fieldnames(
1090 self, prohibited_fieldnames: Iterable[str]
1091 ) -> None:
1092 """
1093 Check that the destination field isn't a prohibited one.
1095 Args:
1096 prohibited_fieldnames: list of prohibited fieldnames
1098 Raises:
1099 :exc:`ValueError` if there's a problem.
1101 """
1102 if self.dest_field in prohibited_fieldnames:
1103 log.exception(
1104 f"Offending DD row [{self.offender_description}]: "
1105 f"{str(self)}"
1106 )
1107 raise ValueError("Prohibited dest_field name")
1109 def _check_valid(self) -> None:
1110 """
1111 Check internal validity and complain if invalid.
1113 Raises:
1114 :exc:`AssertionError`, :exc:`ValueError`
1115 """
1116 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1117 # Check source database/table is OK
1118 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1119 assert self.src_db, "Need src_db"
1120 if self.src_db not in self.config.source_db_names:
1121 raise ValueError(
1122 "Data dictionary row references non-existent source "
1123 "database"
1124 )
1125 srccfg = self.config.sources[self.src_db].srccfg
1127 assert self.src_table, "Need src_table"
1129 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1130 # Check destination table is OK
1131 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1132 if not self.omit:
1133 assert self.dest_table, "Need dest_table"
1134 ensure_valid_table_name(self.dest_table)
1136 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1137 # Check for conflicting or missing flags
1138 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1139 if self.defines_primary_pids and not self.primary_pid:
1140 raise ValueError(
1141 f"All fields with "
1142 f"{self.SRC_FLAGS}={SrcFlag.DEFINES_PRIMARY_PIDS} "
1143 f"set must have {self.SRC_FLAGS}={SrcFlag.PRIMARY_PID} set"
1144 )
1146 if self.opt_out_info and not self.config.optout_col_values:
1147 raise ValueError(
1148 f"Fields with {self.SRC_FLAGS}={SrcFlag.OPT_OUT} "
1149 f"exist, but config's {AnonymiseConfigKeys.OPTOUT_COL_VALUES} "
1150 f"setting is empty"
1151 )
1153 if (
1154 count_bool(
1155 [
1156 self.primary_pid,
1157 self.master_pid,
1158 self.third_party_pid,
1159 bool(self.alter_method),
1160 ]
1161 )
1162 > 1
1163 ):
1164 raise ValueError(
1165 f"Field can be any ONE of: "
1166 f"{self.SRC_FLAGS}={SrcFlag.PRIMARY_PID}, "
1167 f"{self.SRC_FLAGS}={SrcFlag.MASTER_PID}, "
1168 f"{self.SCRUB_SRC}={ScrubSrc.THIRDPARTY_XREF_PID}, or "
1169 f"{self.ALTER_METHOD} "
1170 f"(because those flags all imply a certain "
1171 f"{self.ALTER_METHOD})"
1172 )
1174 if self.required_scrubber and not self.scrub_src:
1175 raise ValueError(
1176 f"If you specify "
1177 f"{self.SRC_FLAGS}={SrcFlag.REQUIRED_SCRUBBER}, "
1178 f"you must specify {self.SCRUB_SRC}"
1179 )
1181 if self.add_src_hash:
1182 if not self.pk:
1183 raise ValueError(
1184 f"{self.SRC_FLAGS}={SrcFlag.ADD_SRC_HASH} "
1185 f"can only be set on "
1186 f"{self.SRC_FLAGS}={SrcFlag.PK} fields"
1187 )
1188 # Don't exclude the self.omit and SrcFlag.ADD_SRC_HASH here. That
1189 # is done as a table-level check in dd.py instead, in
1190 # DataDictionary.check_valid(), because if the whole table is being
1191 # omitted, this is OK.
1192 if self.index != IndexType.UNIQUE:
1193 raise ValueError(
1194 f"{self.SRC_FLAGS}={SrcFlag.ADD_SRC_HASH} fields require "
1195 f"{self.INDEX}=={IndexType.UNIQUE}"
1196 )
1197 if self.constant:
1198 raise ValueError(
1199 f"cannot mix {SrcFlag.ADD_SRC_HASH} flag with "
1200 f"{SrcFlag.CONSTANT} flag"
1201 )
1203 if self.constant:
1204 if not self.pk:
1205 raise ValueError(
1206 f"{self.SRC_FLAGS}={SrcFlag.CONSTANT} can only be set on "
1207 f"{self.SRC_FLAGS}={SrcFlag.PK} fields"
1208 )
1209 if self.index != IndexType.UNIQUE:
1210 raise ValueError(
1211 f"{self.SRC_FLAGS}={SrcFlag.CONSTANT} fields require "
1212 f"{self.INDEX}=={IndexType.UNIQUE}"
1213 )
1215 if self.addition_only:
1216 if not self.pk:
1217 raise ValueError(
1218 f"{self.SRC_FLAGS}={SrcFlag.ADDITION_ONLY} "
1219 f"can only be set on "
1220 f"{self.SRC_FLAGS}={SrcFlag.PK} fields"
1221 )
1223 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1224 # No more checks required if field will be omitted.
1225 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1226 if self.omit:
1227 return
1229 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1230 # Table comment? If so, and it's OK, then we'll end here.
1231 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1232 if not self.src_field:
1233 assert self.comment, (
1234 "src_field can only be missing for table comments, "
1235 "but there is no comment"
1236 )
1237 assert (
1238 not self.src_datatype and not self.dest_datatype
1239 ), "Table comments cannot have datatypes"
1240 assert not self.src_flags, "Table comments cannot have any flags"
1241 assert not self.scrub_src, "Table comments cannot be scrub sources"
1242 assert not self.scrub_method, "Table comments cannot be scrubbed"
1243 assert (
1244 not self.inclusion_values and not self.exclusion_values
1245 ), "Table comments cannot have inclusion/exclusion values"
1246 assert not self.dest_field, "Table comments cannot have dest_field"
1247 assert (
1248 self.index == IndexType.NONE
1249 ), "Table comments cannot be indexed"
1250 return
1252 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1253 # Check other source field information
1254 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1255 assert self.src_datatype, "Need src_datatype"
1257 # REMOVED 2016-06-04; fails with complex SQL Server types, which can
1258 # look like 'NVARCHAR(10) COLLATE "Latin1_General_CI_AS"'.
1259 #
1260 # if not is_sqltype_valid(self.src_datatype):
1261 # raise ValueError(
1262 # "Field has invalid source data type: {}".format(
1263 # self.src_datatype))
1265 src_sqla_coltype = self.src_sqla_coltype
1267 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1268 # Check destination table/field/datatype
1269 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1270 # Table
1271 assert self.dest_field, "Need dest_field"
1272 ensure_valid_table_name(self.dest_table)
1273 if self.dest_table == self.config.temporary_tablename:
1274 raise ValueError(
1275 f"Destination tables can't be named "
1276 f"{self.config.temporary_tablename}, as that's the name set "
1277 f"in the config's {AnonymiseConfigKeys.TEMPORARY_TABLENAME} "
1278 f"variable"
1279 )
1280 # Field
1281 ensure_valid_field_name(self.dest_field)
1282 if self.dest_field == self.config.source_hash_fieldname:
1283 raise ValueError(
1284 f"Destination fields can't be named "
1285 f"{self.config.source_hash_fieldname}, as that's the name set "
1286 f"in the config's {AnonymiseConfigKeys.SOURCE_HASH_FIELDNAME} "
1287 f"variable"
1288 )
1289 elif self.dest_field == self.config.trid_fieldname:
1290 raise ValueError(
1291 f"Destination fields can't be named "
1292 f"{self.config.trid_fieldname}, as that's the name set "
1293 f"in the config's {AnonymiseConfigKeys.TRID_FIELDNAME} "
1294 f"variable"
1295 )
1296 # Ensure the destination table/column names are OK for the dialect.
1297 warn_if_identifier_long(
1298 self.dest_table, self.dest_field, self.dest_dialect_name
1299 )
1300 # Datatype
1301 if self.dest_datatype and not is_sqltype_valid(self.dest_datatype):
1302 raise ValueError(
1303 f"Field has invalid {self.DEST_DATATYPE}: "
1304 f"{self.dest_datatype}"
1305 )
1306 dest_sqla_coltype = self.dest_sqla_coltype
1308 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1309 # Check destination flags/special fields
1310 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1311 # PID/RID
1312 if self.matches_fielddef(srccfg.ddgen_per_table_pid_field):
1313 if not self.primary_pid:
1314 raise ValueError(
1315 f"All fields with {self.SRC_FIELD}={self.src_field!r} "
1316 f"used in output should have "
1317 f"{self.SRC_FLAGS}={SrcFlag.PRIMARY_PID} set"
1318 )
1319 if self.dest_field != self.config.research_id_fieldname:
1320 raise ValueError(
1321 f"Primary PID field should have {self.DEST_FIELD} = "
1322 f"{self.config.research_id_fieldname}"
1323 )
1324 # MPID/MRID
1325 if (
1326 self.matches_fielddef(srccfg.ddgen_master_pid_fieldname)
1327 and not self.master_pid
1328 ):
1329 raise ValueError(
1330 f"All fields with {self.SRC_FIELD} = "
1331 f"{srccfg.ddgen_master_pid_fieldname} used in output should "
1332 f"have {self.SRC_FLAGS}={SrcFlag.MASTER_PID} set"
1333 )
1334 # Anything that is hashed (but not self._add_src_hash -- added
1335 # separately):
1336 if (
1337 self.dest_should_be_encrypted_pid_type
1338 and self.dest_datatype
1339 and self.dest_datatype != self.config.sqltype_encrypted_pid_as_sql
1340 ):
1341 raise ValueError(
1342 f"All {self.SRC_FLAGS}={SrcFlag.PRIMARY_PID}/"
1343 f"{self.SRC_FLAGS}={SrcFlag.MASTER_PID}/"
1344 f"{self.SRC_FLAGS}={SrcFlag.ADD_SRC_HASH} "
1345 f"fields used in output must have {self.DEST_DATATYPE} = "
1346 f"{self.config.sqltype_encrypted_pid_as_sql} "
1347 f"(determined by the config parameter "
1348 f"{AnonymiseConfigKeys.HASH_METHOD!r})"
1349 )
1351 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1352 # Check alteration methods
1353 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1354 if self.has_special_alter_method and self._alter_methods:
1355 raise ValueError(
1356 f"Don't specify {self.ALTER_METHOD} "
1357 f"for PID/MPID/third-party PID fields; "
1358 f"these are handled specially"
1359 )
1360 for am in self._alter_methods:
1361 if am.truncate_date:
1362 if not (
1363 is_sqlatype_date(src_sqla_coltype)
1364 or is_sqlatype_text_over_one_char(src_sqla_coltype)
1365 ):
1366 raise ValueError(
1367 f"Can't set {AlterMethodType.TRUNCATEDATE.value} "
1368 f"for non-date/non-text field"
1369 )
1370 if am.extract_from_filename:
1371 if not is_sqlatype_text_over_one_char(src_sqla_coltype):
1372 raise ValueError(
1373 f"For {self.ALTER_METHOD} = "
1374 f"{AlterMethodType.FILENAME_TO_TEXT}, "
1375 f"source field must contain a filename and therefore "
1376 f"must be text type of >1 character"
1377 )
1378 if am.extract_from_blob:
1379 if not is_sqlatype_binary(src_sqla_coltype):
1380 raise ValueError(
1381 f"For {self.ALTER_METHOD} = "
1382 f"{AlterMethodType.BINARY_TO_TEXT}, "
1383 f"source field must be of binary type"
1384 )
1386 # This error/warning too hard to be sure of with SQL Server odd
1387 # string types:
1388 # if [RENAMED: self._scrub] and not self._extract_text:
1389 # if not is_sqltype_text_over_one_char(self.src_datatype):
1390 # raise ValueError("Can't scrub in non-text field or "
1391 # "single-character text field")
1393 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1394 # Check indexing
1395 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1396 if (
1397 self.index in (IndexType.NORMAL, IndexType.UNIQUE)
1398 and self.indexlen is None
1399 and does_sqlatype_require_index_len(dest_sqla_coltype)
1400 ):
1401 raise ValueError(
1402 f"Must specify {self.INDEXLEN} "
1403 f"to index a TEXT or BLOB field"
1404 )
1406 # -------------------------------------------------------------------------
1407 # Other stuff requiring config or database info
1408 # -------------------------------------------------------------------------
1410 def set_from_src_db_info(
1411 self,
1412 src_db: str,
1413 src_table: str,
1414 src_field: str,
1415 src_datatype_sqltext: str,
1416 src_sqla_coltype: TypeEngine,
1417 dbconf: "DatabaseSafeConfig",
1418 comment: str = None,
1419 nullable: bool = True,
1420 primary_key: bool = False,
1421 table_has_explicit_pk: bool = False,
1422 table_has_candidate_pk: bool = False,
1423 ) -> None:
1424 """
1425 Set up this DDR from a field in the source database, using options set
1426 in the config file. Used to draft a data dictionary. This is the
1427 first-draft classification of a given column, which the administrator
1428 should review and may then wish to edit.
1430 Args:
1431 src_db:
1432 Source database name.
1433 src_table:
1434 Source table name.
1435 src_field:
1436 Source field (column) name.
1437 src_datatype_sqltext:
1438 Source string SQL type, e.g. ``"VARCHAR(100)"``.
1439 src_sqla_coltype:
1440 Source SQLAlchemy column type, e.g. ``Integer()``.
1441 dbconf:
1442 A :class:`crate_anon.anonymise.config.DatabaseSafeConfig`.
1443 comment:
1444 Textual comment.
1445 nullable:
1446 Whether the source is can be NULL (True) or is NOT NULL
1447 (False).
1448 primary_key:
1449 Whether the source is marked as a primary key.
1450 table_has_explicit_pk:
1451 Whether the source database knows of a formal PK field for this
1452 table, whether or not this is it.
1453 table_has_candidate_pk:
1454 Whether the source table contains a field that matches
1455 CRATE's name detection criteria, whether or not this is it.
1456 """
1457 self.src_db = src_db
1458 self.src_table = src_table
1459 self.src_field = src_field
1460 self.src_datatype = src_datatype_sqltext
1461 self._src_sqla_coltype = src_sqla_coltype
1462 self._not_null = not nullable
1463 self._pk = False
1464 self._add_src_hash = False
1465 self._primary_pid = False
1466 self._defines_primary_pids = False
1467 self._master_pid = False
1468 self._constant = False
1469 self._addition_only = False
1470 self.comment = comment
1471 self._from_file = False
1473 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1474 # ddgen: Is the field special, such as a PK?
1475 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1476 resembles_pk = self.matches_fielddef(dbconf.ddgen_pk_fields)
1477 if table_has_explicit_pk:
1478 if dbconf.ddgen_prefer_original_pk:
1479 # Use the database's PK.
1480 treat_as_pk = primary_key
1481 else:
1482 # The table has an explicit PK. However, we prefer to use our
1483 # name-based rules, if something matches that.
1484 if table_has_candidate_pk:
1485 # There's a name-based match in the table. Use it.
1486 treat_as_pk = resembles_pk
1487 else:
1488 # Nothing matches. So it's the original PK or nothing.
1489 treat_as_pk = primary_key
1490 else:
1491 # The table doesn't have an explicit PK, so we just follow our
1492 # name-based rules.
1493 treat_as_pk = resembles_pk
1494 if treat_as_pk:
1495 # Table primary key (e.g. arbitrary integer).
1496 self._pk = True
1497 self._constant = (
1498 dbconf.ddgen_constant_content
1499 or self.matches_tabledef(dbconf.ddgen_constant_content_tables)
1500 ) and not self.matches_tabledef(
1501 dbconf.ddgen_nonconstant_content_tables
1502 )
1503 self._add_src_hash = not self._constant
1504 self._addition_only = (
1505 dbconf.ddgen_addition_only
1506 or self.matches_tabledef(dbconf.ddgen_addition_only_tables)
1507 ) and not self.matches_tabledef(
1508 dbconf.ddgen_deletion_possible_tables
1509 )
1511 if self.matches_fielddef(dbconf.ddgen_per_table_pid_field):
1512 # PID, e.g. local hospital number.
1513 self._primary_pid = True
1514 if self.matches_tabledef(dbconf.ddgen_table_defines_pids):
1515 self._defines_primary_pids = True
1517 if self.matches_fielddef(dbconf.ddgen_master_pid_fieldname):
1518 # MPID, e.g. NHS number.
1519 self._master_pid = True
1521 if self.matches_fielddef(dbconf.ddgen_pid_defining_fieldnames):
1522 # The PID in the "chief" patient table.
1523 self._defines_primary_pids = True
1525 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1526 # ddgen: Does it indicate the patient wishes to opt out entirely?
1527 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1528 if self.matches_fielddef(dbconf.ddgen_patient_opt_out_fields):
1529 self._opt_out_info = True
1531 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1532 # ddgen: Does the field contain sensitive data?
1533 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1534 if (
1535 self.master_pid
1536 or self.defines_primary_pids
1537 or (
1538 self.primary_pid
1539 and dbconf.ddgen_add_per_table_pids_to_scrubber
1540 )
1541 or self.matches_fielddef(dbconf.ddgen_scrubsrc_patient_fields)
1542 ):
1543 self.scrub_src = ScrubSrc.PATIENT
1545 elif self.matches_fielddef(dbconf.ddgen_scrubsrc_thirdparty_fields):
1546 self.scrub_src = ScrubSrc.THIRDPARTY
1548 elif self.matches_fielddef(
1549 dbconf.ddgen_scrubsrc_thirdparty_xref_pid_fields
1550 ):
1551 self.scrub_src = ScrubSrc.THIRDPARTY_XREF_PID
1553 else:
1554 self.scrub_src = None # type: Optional[str]
1556 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1557 # ddgen: Is it a mandatory scrubbing field?
1558 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1559 if self.matches_fielddef(dbconf.ddgen_required_scrubsrc_fields):
1560 self._required_scrubber = True
1562 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1563 # ddgen: What kind of sensitive data? Date, text, number, code?
1564 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1565 if not self.scrub_src:
1566 self.scrub_method = ""
1568 elif (
1569 self.scrub_src is ScrubSrc.THIRDPARTY_XREF_PID
1570 or is_sqlatype_numeric(src_sqla_coltype)
1571 or self.matches_fielddef(dbconf.ddgen_per_table_pid_field)
1572 or self.matches_fielddef(dbconf.ddgen_master_pid_fieldname)
1573 or self.matches_fielddef(dbconf.ddgen_scrubmethod_number_fields)
1574 ):
1575 self.scrub_method = ScrubMethod.NUMERIC
1577 elif is_sqlatype_date(src_sqla_coltype) or self.matches_fielddef(
1578 dbconf.ddgen_scrubmethod_date_fields
1579 ):
1580 self.scrub_method = ScrubMethod.DATE
1582 elif self.matches_fielddef(dbconf.ddgen_scrubmethod_code_fields):
1583 self.scrub_method = ScrubMethod.CODE
1585 elif self.matches_fielddef(dbconf.ddgen_scrubmethod_phrase_fields):
1586 self.scrub_method = ScrubMethod.PHRASE
1588 else:
1589 self.scrub_method = ScrubMethod.WORDS
1591 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1592 # ddgen: Do we want to change the destination fieldname?
1593 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1594 if self.primary_pid:
1595 self.dest_field = self.config.research_id_fieldname
1596 elif self.master_pid:
1597 self.dest_field = self.config.master_research_id_fieldname
1598 else:
1599 self.dest_field = src_field
1600 if dbconf.ddgen_force_lower_case:
1601 self.dest_field = self.dest_field.lower()
1602 if dbconf.ddgen_convert_odd_chars_to_underscore:
1603 self.dest_field = str(self.dest_field) # if this fails,
1604 # there's a Unicode problem
1605 self.dest_field = self.replace_odd_chars(self.dest_field)
1606 # ... this will choke on a Unicode string
1608 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1609 # ddgen: Do we want to change the destination field SQL type?
1610 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1611 if self.dest_should_be_encrypted_pid_type:
1612 self.dest_datatype = self.config.sqltype_encrypted_pid_as_sql
1613 else:
1614 self.dest_datatype = ""
1615 # ... and see also potential changes made below
1617 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1618 # ddgen: How should we manipulate the destination?
1619 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1620 extracting_text = False
1622 if self.matches_fielddef(dbconf.ddgen_truncate_date_fields):
1623 # Date truncation
1624 self._alter_methods.append(
1625 AlterMethod(config=self.config, truncate_date=True)
1626 )
1627 # If we're truncating a date, we should also encourage a DATE
1628 # destination (as opposed to e.g. a DATETIME).
1629 self.dest_datatype = SQLTYPE_DATE
1631 elif self.matches_fielddef(dbconf.ddgen_filename_to_text_fields):
1632 # Read filename from database, read file, convert to text
1633 self._alter_methods.append(
1634 AlterMethod(config=self.config, extract_from_filename=True)
1635 )
1636 self.dest_datatype = giant_text_sqltype(self.dest_dialect)
1637 extracting_text = True
1639 elif self.matches_fielddef(dbconf.bin2text_dict.keys()):
1640 # Read binary data from database, convert to text
1641 for binfielddef, extfield in dbconf.bin2text_dict.items():
1642 if self.matches_fielddef(binfielddef):
1643 self._alter_methods.append(
1644 AlterMethod(
1645 config=self.config,
1646 extract_from_blob=True,
1647 extract_ext_field=extfield,
1648 )
1649 )
1650 self.dest_datatype = giant_text_sqltype(self.dest_dialect)
1651 extracting_text = True
1653 elif (
1654 not self.primary_pid
1655 and not self.master_pid
1656 and not self.matches_fielddef(
1657 dbconf.ddgen_safe_fields_exempt_from_scrubbing
1658 )
1659 and dbconf.ddgen_min_length_for_scrubbing >= 1
1660 and is_sqlatype_text_of_length_at_least(
1661 src_sqla_coltype, dbconf.ddgen_min_length_for_scrubbing
1662 )
1663 ):
1664 # Text field meeting the criteria to scrub
1665 self._alter_methods.append(
1666 AlterMethod(config=self.config, scrub=True)
1667 )
1669 if extracting_text:
1670 # Scrub all extract-text fields, unless asked not to
1671 if not self.matches_fielddef(
1672 dbconf.ddgen_safe_fields_exempt_from_scrubbing
1673 ):
1674 self._alter_methods.append(
1675 AlterMethod(config=self.config, scrub=True)
1676 )
1677 # Set skip_if_text_extract_fails flag?
1678 if self.matches_fielddef(
1679 dbconf.ddgen_skip_row_if_extract_text_fails_fields
1680 ):
1681 self._alter_methods.append(
1682 AlterMethod(
1683 config=self.config, skip_if_text_extract_fails=True
1684 )
1685 )
1687 for fieldspec, cfg_section in dbconf.ddgen_extra_hash_fields.items():
1688 # Hash something using an "extra" hasher.
1689 if self.matches_fielddef(fieldspec):
1690 self._alter_methods.append(
1691 AlterMethod(
1692 config=self.config,
1693 hash_=True,
1694 hash_config_section=cfg_section,
1695 )
1696 )
1698 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1699 # ddgen: Manipulate the destination table name?
1700 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1701 # https://stackoverflow.com/questions/10017147
1702 self.dest_table = src_table
1704 if dbconf.ddgen_force_lower_case:
1705 self.dest_table = self.dest_table.lower()
1707 if dbconf.ddgen_convert_odd_chars_to_underscore:
1708 self.dest_table = str(self.dest_table)
1709 # ... if this fails, there's a Unicode problem
1710 self.dest_table = self.replace_odd_chars(self.dest_table)
1712 for suffix in dbconf.ddgen_rename_tables_remove_suffixes:
1713 if self.dest_table.endswith(suffix):
1714 self.dest_table = self.dest_table[: -len(suffix)] # remove it
1715 break # only remove one suffix!
1717 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1718 # ddgen: Should we index the destination?
1719 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1720 dest_sqla_type = self.dest_sqla_coltype
1722 if self._pk:
1723 self.index = IndexType.UNIQUE
1725 elif (
1726 self.primary_pid
1727 or self.master_pid
1728 or self.defines_primary_pids
1729 or self.dest_field == self.config.research_id_fieldname
1730 ):
1731 self.index = IndexType.NORMAL
1733 elif (
1734 dbconf.ddgen_allow_fulltext_indexing
1735 and does_sqlatype_merit_fulltext_index(
1736 src_sqla_coltype,
1737 min_length=dbconf.ddgen_freetext_index_min_length,
1738 )
1739 ):
1740 self.index = IndexType.FULLTEXT
1742 elif self.matches_fielddef(dbconf.ddgen_index_fields):
1743 self.index = IndexType.NORMAL
1745 else:
1746 self.index = IndexType.NONE
1748 self.indexlen = (
1749 DEFAULT_INDEX_LEN
1750 if (
1751 self.index != IndexType.NONE
1752 and self.index != IndexType.FULLTEXT
1753 and does_sqlatype_require_index_len(dest_sqla_type)
1754 )
1755 else None
1756 )
1758 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1759 # ddgen: Should we omit it (at least until a human has checked the DD)?
1760 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1762 # In descending order of priority:
1763 if self.matches_fielddef(dbconf.ddgen_omit_fields): # explicit
1764 # Explicit omission trumps everything else
1765 # (There are rare occasions with "additional" databases where we
1766 # may want to omit a PK/PID/MPID field.)
1767 self.omit = True
1769 elif self._pk or self.primary_pid or self.master_pid:
1770 # We always want PKs, and the translated PID/MPID (RID+TRID or
1771 # MRID respectively).
1772 self.omit = False
1774 elif bool(self.scrub_src):
1775 # Scrub-source fields are generally sensitive and therefore worthy
1776 # of omission, EXCEPT that if a date is marked for truncation, the
1777 # user probably wants it (truncated) to come through!
1778 if any(am.truncate_date for am in self._alter_methods):
1779 self.omit = False
1780 else:
1781 self.omit = True
1783 elif self.matches_fielddef(dbconf.ddgen_include_fields): # explicit
1784 # Explicit inclusion next.
1785 self.omit = False
1787 else:
1788 self.omit = dbconf.ddgen_omit_by_default
1790 @staticmethod
1791 def replace_odd_chars(text: str) -> str:
1792 """
1793 Sanitise a table or field name to only contain printable ASCII
1794 characters plus ()/|
1796 SQLServer and MySQL allow pretty much anything in a table or field name
1797 but these could cause problems elsewhere.
1798 """
1800 # Replace Unicode with underscore
1801 text = re.sub(r"[^\x21-\x7F]+", "_", text)
1803 # Replace invalid ASCII
1804 return text.translate({ord(c): "_" for c in "()/|"})
1806 def set_as_table_comment(
1807 self, src_db: str, src_table: str, comment: str
1808 ) -> None:
1809 """
1810 Set up this DDR as a special table-comment row. (Used in data
1811 dictionary drafting.)
1813 Args:
1814 src_db:
1815 Source database name.
1816 src_table:
1817 Source table name.
1818 comment:
1819 Textual comment.
1820 """
1821 self.src_db = src_db
1822 self.src_table = src_table
1823 self.src_field = ""
1824 self.dest_table = src_table
1825 self.comment = comment
1826 self.omit = False
1827 self._from_file = False