Coverage for anonymise/dd.py: 27%
552 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-05 06:46 -0600
1"""
2crate_anon/anonymise/dd.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Data dictionary classes for CRATE anonymiser.**
28The data dictionary is a TSV file, for ease of editing by multiple authors,
29rather than a database table.
31"""
33# =============================================================================
34# Imports
35# =============================================================================
37from collections import Counter, OrderedDict
38from dataclasses import dataclass
39from functools import lru_cache
40from itertools import zip_longest
41import logging
42import operator
43from typing import (
44 AbstractSet,
45 Any,
46 Callable,
47 Dict,
48 List,
49 Optional,
50 Set,
51 Tuple,
52 TYPE_CHECKING,
53 Union,
54)
56from cardinal_pythonlib.sql.validation import is_sqltype_integer
57from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
58from cardinal_pythonlib.sqlalchemy.schema import (
59 is_sqlatype_integer,
60 is_sqlatype_string,
61 is_sqlatype_text_over_one_char,
62)
63from sortedcontainers import SortedSet
64from sqlalchemy import Column, Table, DateTime
65from sqlalchemy.engine.interfaces import Dialect
66from sqlalchemy.sql.sqltypes import String, TypeEngine
68# don't import config: circular dependency would have to be sorted out
69from crate_anon.anonymise.constants import (
70 AlterMethodType,
71 AnonymiseColumnComments,
72 AnonymiseConfigKeys,
73 Decision,
74 IndexType,
75 ScrubMethod,
76 ScrubSrc,
77 SrcFlag,
78 TABLE_KWARGS,
79 TridType,
80)
81from crate_anon.anonymise.ddr import DataDictionaryRow
82from crate_anon.anonymise.scrub import PersonalizedScrubber
83from crate_anon.common.spreadsheet import (
84 gen_rows_from_spreadsheet,
85 SINGLE_SPREADSHEET_TYPE,
86 write_spreadsheet,
87)
88from crate_anon.common.sql import matches_fielddef, ReflectedColumnInfo
90if TYPE_CHECKING:
91 from crate_anon.anonymise.config import Config
93log = logging.getLogger(__name__)
96# =============================================================================
97# Constants
98# =============================================================================
100STRING_LENGTH_FOR_BIGINT = len(str(-(2**63)))
101# = -2^63: https://dev.mysql.com/doc/refman/8.0/en/integer-types.html
104# =============================================================================
105# Helper classes
106# =============================================================================
109@dataclass
110class ScrubSourceFieldInfo:
111 is_mpid: bool
112 is_patient: bool
113 recurse: bool
114 required_scrubber: bool
115 scrub_method: ScrubMethod
116 signature: str
117 value_fieldname: str
120@dataclass
121class DDTableSummary:
122 # Which table?
123 src_db: str
124 src_table: str
126 # Source information
127 src_has_pk: bool
128 src_pk_fieldname: str
129 src_constant: bool
130 src_addition_only: bool
131 src_defines_pid: bool
132 src_has_pid: bool
133 src_has_mpid: bool
134 src_has_opt_out: bool
135 src_has_patient_scrub_info: bool
136 src_has_third_party_scrub_info: bool
137 src_has_required_scrub_info: bool
138 src_has_table_comment: bool
140 # Destination information
141 dest_table: str
142 dest_has_rows: bool
143 dest_add_src_hash: bool
144 dest_being_scrubbed: bool
147# =============================================================================
148# Helper functions
149# =============================================================================
152def ensure_no_source_type_mismatch(
153 ddr: DataDictionaryRow,
154 config_sqlatype: Union[TypeEngine, String],
155 primary_pid: bool = True,
156) -> None:
157 """
158 Ensure that the source column type of a data dictionary row is compatible
159 with what's expected from the config. We check this only for specific type
160 of column (PID, MPID), because we need to know their data types concretely
161 for the secret mapping table. The question is not whether the types are the
162 same, but whether the value will fit into the config-determined type (for
163 example, it's OK to convert an integer to a long-enough string but
164 necessarily not the other way round).
166 Args:
167 ddr:
168 Data dictionary row.
169 config_sqlatype:
170 SQLAlchemy column type that would be expected based on the current
171 config.
172 primary_pid:
173 Is this the main PID field? If false, it's the MPID.
174 """
175 if primary_pid:
176 human_type = "primary PID"
177 configparam = AnonymiseConfigKeys.SQLATYPE_PID
178 else:
179 human_type = "master PID"
180 configparam = AnonymiseConfigKeys.SQLATYPE_MPID
181 rowtype = ddr.src_sqla_coltype
182 suffix = ""
183 error = True # we may downgrade to a warning
184 destination_is_integer = is_sqlatype_integer(config_sqlatype)
185 assert destination_is_integer or is_sqlatype_string(config_sqlatype), (
186 f"Bug: config parameter {configparam!r} has given a type of "
187 f"{config_sqlatype}, which appears neither integer nor string."
188 )
189 if is_sqlatype_integer(rowtype):
190 # ---------------------------------------------------------------------
191 # Integer source
192 # ---------------------------------------------------------------------
193 if destination_is_integer:
194 # Good enough. The only integer type we use for storing a PID/MPID
195 # in the secret mapping table is BigInteger, so any integer type
196 # should fit.
197 return
198 else:
199 # Storing an integer in a string. This may be OK, if the string is
200 # long enough. We could do detailed checks here based on the type
201 # of integer, but we'll be simple.
202 if STRING_LENGTH_FOR_BIGINT <= config_sqlatype.length:
203 # It'll fit!
204 return
205 else:
206 suffix = (
207 f"Using a bigger string field in the config (minimum "
208 f"length {STRING_LENGTH_FOR_BIGINT}) would fix this."
209 )
210 elif is_sqlatype_string(rowtype):
211 # ---------------------------------------------------------------------
212 # String source
213 # ---------------------------------------------------------------------
214 if destination_is_integer:
215 error = False
216 suffix = (
217 "Warning only: this is acceptable if, but only if, the source "
218 "string fields contain only integers."
219 )
220 else:
221 # Storing a string in a string. Fine if the destination is big
222 # enough.
223 # noinspection PyUnresolvedReferences
224 if rowtype.length <= config_sqlatype.length:
225 return
226 else:
227 suffix = (
228 f"Using a bigger string field in the config (minimum "
229 f"length {rowtype.length}) would fix this."
230 )
231 else:
232 # e.g. something silly like a DATETIME source
233 pass
234 # Generic error or warning:
235 msg = (
236 f"Source column {ddr.src_signature} is marked as a {human_type} field "
237 f"but its type is {rowtype}, while the config thinks it should be "
238 f"{config_sqlatype} (determined by the {configparam!r} parameter). "
239 f"{suffix}"
240 )
241 if error:
242 raise ValueError(msg)
243 else:
244 log.warning(msg)
247# =============================================================================
248# DataDictionary
249# =============================================================================
252class DataDictionary:
253 """
254 Class representing an entire data dictionary.
255 """
257 def __init__(self, config: "Config") -> None:
258 """
259 Set defaults.
261 Args:
262 config: :class:`crate_anon.anonymise.config.Config`
263 """
264 self.config = config
265 self.rows = [] # type: List[DataDictionaryRow]
266 # noinspection PyArgumentList
267 self.cached_srcdb_table_pairs = SortedSet()
269 # -------------------------------------------------------------------------
270 # Information
271 # -------------------------------------------------------------------------
273 @property
274 def n_rows(self) -> int:
275 """
276 Number of rows.
277 """
278 return len(self.rows)
280 @property
281 def dest_dialect(self) -> Dialect:
282 """
283 Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for
284 the destination database.
285 """
286 return self.config.dest_dialect
288 @property
289 def dest_dialect_name(self) -> str:
290 """
291 Returns the SQLAlchemy dialect name for the destination database.
292 """
293 return self.config.dest_dialect_name
295 # -------------------------------------------------------------------------
296 # Loading
297 # -------------------------------------------------------------------------
299 def read_from_file(
300 self,
301 filename: str,
302 check_valid: bool = True,
303 override_dialect: Dialect = None,
304 ) -> None:
305 """
306 Read DD from file.
308 Args:
309 filename:
310 Filename to read.
311 check_valid:
312 Run a validity check after setting each row from its values?
313 override_dialect:
314 SQLAlchemy SQL dialect to enforce (e.g. for interpreting
315 textual column types in the source database). By default, the
316 source database's own dialect is used.
317 """
318 log.debug(f"Loading data dictionary: {filename}")
319 row_gen = gen_rows_from_spreadsheet(filename)
320 self._read_from_rows(
321 row_gen, check_valid=check_valid, override_dialect=override_dialect
322 )
324 def _read_from_rows(
325 self,
326 rows: SINGLE_SPREADSHEET_TYPE,
327 check_valid: bool = True,
328 override_dialect: Dialect = None,
329 ) -> None:
330 """
331 Internal function to read from a set of rows, whatever the underlying
332 format.
334 Args:
335 rows:
336 Iterable of rows (one per data dictionary row), each row being
337 a list of values.
338 check_valid:
339 Run a validity check after setting the values?
340 override_dialect:
341 SQLAlchemy SQL dialect to enforce (e.g. for interpreting
342 textual column types in the source database). By default, the
343 source database's own dialect is used.
344 """
345 # Clear existing data
346 self.rows = [] # type: List[DataDictionaryRow]
348 # Headers
349 # noinspection PyTypeChecker
350 headers = next(rows)
351 if not all(x in headers for x in DataDictionaryRow.ROWNAMES):
352 actual = "\n".join(
353 f"{i}. {h}" for i, h in enumerate(headers, start=1)
354 )
355 desired = "\n".join(
356 f"{i}. {h}"
357 for i, h in enumerate(DataDictionaryRow.ROWNAMES, start=1)
358 )
359 raise ValueError(
360 f"Bad data dictionary file. Data dictionaries must be in "
361 f"tabular format and contain the following headings:\n\n"
362 f"{desired}\n\n"
363 f"but yours are:\n\n"
364 f"{actual}"
365 )
366 log.debug("Data dictionary has correct header. Loading content...")
368 # Data
369 for values in rows:
370 if len(values) < len(headers):
371 valuedict = dict(zip_longest(headers, values, fillvalue=""))
372 else:
373 valuedict = dict(zip(headers, values))
374 ddr = DataDictionaryRow(self.config)
375 try:
376 ddr.set_from_dict(valuedict, override_dialect=override_dialect)
377 if check_valid:
378 ddr.check_valid()
379 except ValueError:
380 log.critical(f"Offending input: {valuedict}")
381 raise
382 self.rows.append(ddr)
383 log.debug("... content loaded.")
385 # Clear caches
386 self.clear_caches()
388 @classmethod
389 def create_from_file(
390 cls,
391 filename: str,
392 config: "Config",
393 check_valid: bool = True,
394 override_dialect: Dialect = None,
395 ) -> "DataDictionary":
396 """
397 Creates a new data dictionary by reading a file.
398 """
399 dd = DataDictionary(config)
400 dd.read_from_file(
401 filename,
402 check_valid=check_valid,
403 override_dialect=override_dialect,
404 )
405 return dd
407 def draft_from_source_databases(self, report_every: int = 100) -> None:
408 """
409 Create a draft DD from a source database.
411 Will skip any rows it knows about already (thus allowing the generation
412 of incremental changes).
414 Args:
415 report_every: report to the Python log every *n* columns
416 """
417 log.info("Reading information for draft data dictionary")
419 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
420 # Scan databases
421 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
422 existing_signatures = set(ddr.src_signature for ddr in self.rows)
423 for pretty_dbname, db in self.config.sources.items():
424 log.info(f"... database nice name = {pretty_dbname}")
425 cfg = db.srccfg
426 meta = db.metadata
427 i = 0
428 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
429 # Scan each table
430 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
431 for t in meta.sorted_tables:
432 tablename = t.name
433 log.info(f"... ... table: {tablename}")
435 # Skip table?
436 if cfg.is_table_denied(tablename):
437 log.debug(f"Skipping denied table: {tablename}")
438 continue
439 all_col_names = [c.name for c in t.columns]
440 if cfg.does_table_fail_minimum_fields(all_col_names):
441 log.debug(
442 f"Skipping table {t} because it fails "
443 f"minimum field requirements"
444 )
445 continue
447 # Does the database know the PK for this table?
448 table_has_explicit_pk = any(c.primary_key for c in t.columns)
449 table_has_candidate_pk = any(
450 matches_fielddef(tablename, c.name, cfg.ddgen_pk_fields)
451 for c in t.columns
452 )
454 # Is there a table comment?
455 if t.comment:
456 comment_ddr = DataDictionaryRow(self.config)
457 comment_ddr.set_as_table_comment(
458 src_db=pretty_dbname,
459 src_table=tablename,
460 comment=t.comment,
461 )
462 self.rows.append(comment_ddr)
464 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
465 # Scan each column
466 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
467 for c in t.columns:
468 i += 1
469 if report_every and i % report_every == 0:
470 log.debug(f"... reading source field number {i}")
471 log.debug(repr(c))
472 r = ReflectedColumnInfo(c)
474 # Skip column?
475 if cfg.is_field_denied(r.columnname):
476 log.debug(
477 f"Skipping denied column: {r.tablename_columname}"
478 )
479 continue
481 comment = r.comment
482 if cfg.ddgen_append_source_info_to_comment:
483 comment += r.get_column_source_description()
484 comment = comment.strip()
486 # Create row
487 ddr = DataDictionaryRow(self.config)
488 ddr.set_from_src_db_info(
489 src_db=pretty_dbname,
490 src_table=tablename,
491 src_field=r.columnname,
492 src_datatype_sqltext=r.datatype_sqltext,
493 src_sqla_coltype=r.sqla_coltype,
494 dbconf=cfg,
495 comment=comment,
496 nullable=c.nullable,
497 primary_key=c.primary_key,
498 table_has_explicit_pk=table_has_explicit_pk,
499 table_has_candidate_pk=table_has_candidate_pk,
500 )
502 # ---------------------------------------------------------
503 # If we have this one already, skip ASAP
504 # This is how incremental data dictionaries get generated.
505 # ---------------------------------------------------------
506 sig = ddr.src_signature
507 if sig in existing_signatures:
508 log.debug(
509 f"Skipping duplicated column: "
510 f"{r.tablename_columname}"
511 )
512 continue
513 existing_signatures.add(sig)
515 self.rows.append(ddr)
517 log.info("... done")
518 self.clear_caches()
519 self.sort()
521 def tidy_draft(self) -> None:
522 """
523 Corrects a draft data dictionary for overall logical consistency.
525 The checks are:
527 - Don't scrub in non-patient tables.
528 - SQL Server only supports one FULLTEXT index per table, and only if
529 the table has a non-null column with a unique index.
531 Test code for full-text index creation:
533 .. code-block:: sql
535 -- ----------------------------------------------------------------
536 -- SQL Server: basic use
537 -- ----------------------------------------------------------------
539 USE mydb;
540 CREATE FULLTEXT CATALOG default_fulltext_catalog AS DEFAULT;
541 CREATE TABLE junk (intthing INT PRIMARY KEY, textthing VARCHAR(MAX));
542 -- now find the name of the PK index (! -- by hand or see cardinal_pythonlib)
543 CREATE FULLTEXT INDEX ON junk (textthing) KEY INDEX <pk_index_name>;
545 -- ----------------------------------------------------------------
546 -- SQL Server: it means it about the "NOT NULL" aspects, and a
547 -- unique index is not enough
548 -- ----------------------------------------------------------------
550 USE mydb;
551 DROP TABLE IF EXISTS rubbish;
552 CREATE TABLE rubbish (a INT NOT NULL, b VARCHAR(MAX));
553 CREATE UNIQUE INDEX rubbish_a ON rubbish (a);
554 CREATE FULLTEXT INDEX ON rubbish (b) KEY INDEX rubbish_a;
556 -- .. that works, but if you remove the "NOT NULL" from the table
557 -- definition, it fails with:
558 --
559 -- 'rubbish_a' is not a valid index to enforce a full-text search
560 -- key. A full-text search key must be a unique, non-nullable,
561 -- single-column index which is not offline, is not defined on a
562 -- non-deterministic or imprecise nonpersisted computed column,
563 -- does not have a filter, and has maximum size of 900 bytes.
564 -- Choose another index for the full-text key.
566 -- ----------------------------------------------------------------
567 -- MySQL: two FULLTEXT indexes on one table
568 -- ----------------------------------------------------------------
570 USE mydb;
571 CREATE TABLE junk (intthing INT PRIMARY KEY, text1 LONGTEXT, text2 LONGTEXT);
572 ALTER TABLE junk ADD FULLTEXT INDEX ftidx1 (text1);
573 ALTER TABLE junk ADD FULLTEXT INDEX ftidx2 (text2); -- OK
574 """ # noqa: E501
575 log.info("Tidying/correcting draft data dictionary")
577 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
578 log.info("... Ensuring we don't scrub in non-patient tables")
579 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
580 for d, t in self.get_src_db_tablepairs_w_no_pt_info():
581 for ddr in self.get_rows_for_src_table(d, t):
582 if ddr.being_scrubbed:
583 log.warning(
584 f"Removing {AlterMethodType.SCRUBIN.value} from "
585 f"{DataDictionaryRow.ALTER_METHOD} setting of "
586 f"destination {ddr.dest_signature}, since that is not "
587 f"a patient table"
588 )
589 ddr.remove_scrub_from_alter_methods()
591 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
592 log.info("... Make full-text indexes follow dialect rules")
593 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
594 # https://docs.microsoft.com/en-us/sql/t-sql/statements/create-fulltext-index-transact-sql?view=sql-server-ver15 # noqa: E501
595 if self.dest_dialect_name == SqlaDialectName.SQLSERVER:
596 # -----------------------------------------------------------------
597 # Checks for Microsoft SQL Server
598 # -----------------------------------------------------------------
599 for d, t in self.get_src_db_tablepairs():
600 rows = self.get_rows_for_src_table(d, t)
602 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
603 # SQL Server: every table with a FULLTEXT index must have a
604 # column that is non-nullable with a unique index.
605 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
606 table_ok_for_fulltext = False
607 for ddr in rows:
608 if (
609 ddr.include
610 and ddr.not_null
611 and ddr.index == IndexType.UNIQUE
612 ):
613 table_ok_for_fulltext = True
614 if not table_ok_for_fulltext:
615 for ddr in rows:
616 if ddr.include and ddr.index == IndexType.FULLTEXT:
617 log.warning(
618 f"To create a FULLTEXT index, SQL Server "
619 f"requires the table to have a non-nullable "
620 f"column with a unique index. Can't find one "
621 f"for destination table {ddr.dest_table!r}. "
622 f"Removing index from column "
623 f"{ddr.dest_field!r}."
624 )
625 ddr.index = IndexType.NONE
627 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
628 # SQL server: only one FULLTEXT index per table. (Although in
629 # principle you can have one FULLTEXT index that covers
630 # multiple columns; we don't support that.)
631 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
632 n_fulltext = 0
633 for ddr in rows:
634 if ddr.include and ddr.index == IndexType.FULLTEXT:
635 if n_fulltext >= 1:
636 log.warning(
637 f"SQL Server permits only one FULLTEXT index "
638 f"per table (and CRATE does not support "
639 f"multi-column full-text indexes). Since "
640 f"there is already one, removing the "
641 f"full-text index from "
642 f"{ddr.dest_table}.{ddr.dest_field}."
643 )
644 ddr.index = IndexType.NONE
645 else:
646 n_fulltext += 1
648 # MySQL: fine to have multiple FULLTEXT indexes in one table.
649 # See text code above.
651 log.info("... done")
652 self.clear_caches()
654 def make_dest_datatypes_explicit(self) -> None:
655 """
656 By default, when autocreating a data dictionary, the ``dest_datatype``
657 field is not populated explicit, just implicitly. This option makes
658 them explicit by instantiating those values. Primarily for debugging.
659 """
660 for ddr in self.rows:
661 ddr.make_dest_datatype_explicit()
663 # -------------------------------------------------------------------------
664 # Sorting
665 # -------------------------------------------------------------------------
667 def sort(self) -> None:
668 """
669 Sorts the data dictionary.
671 (Table comments, having no source field, will be first among rows for
672 their tables.)
673 """
674 log.info("Sorting data dictionary")
675 self.rows = sorted(
676 self.rows,
677 key=operator.attrgetter(
678 "src_db_lowercase",
679 "src_table_lowercase",
680 "src_field_lowercase",
681 ),
682 )
683 log.info("... done")
685 # -------------------------------------------------------------------------
686 # Validation
687 # -------------------------------------------------------------------------
689 def check_against_source_db(self) -> None:
690 """
691 Check DD validity against the source database(s).
693 Also caches SQLAlchemy source column types.
694 """
695 for d in self.get_source_databases():
696 db = self.config.sources[d]
698 for t in self.get_src_tables(d):
699 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
700 # Ensure each source table maps to only one destination table
701 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
702 dt = self.get_dest_tables_for_src_db_table(d, t)
703 if len(dt) > 1:
704 raise ValueError(
705 f"Source table {d}.{t} maps to >1 destination "
706 f"table: {', '.join(dt)}"
707 )
709 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
710 # Ensure source table is in database
711 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
712 if t not in db.table_names:
713 log.debug(
714 f"Source database {d!r} has tables: {db.table_names}"
715 )
716 raise ValueError(
717 f"Table {t!r} missing from source database {d!r}"
718 )
720 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
721 # Row checks: preamble
722 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
723 rows = self.get_rows_for_src_table(d, t)
724 # We may need to cross-reference rows, so all rows need to know
725 # their type.
726 for r in rows:
727 if r.src_field not in db.metadata.tables[t].columns:
728 raise ValueError(
729 f"Column {r.src_field!r} missing from table {t!r} "
730 f"in source database {d!r}"
731 )
732 sqla_coltype = (
733 db.metadata.tables[t].columns[r.src_field].type
734 )
735 r.set_src_sqla_coltype(sqla_coltype) # CACHES TYPE HERE
737 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
738 # If PID field is required, is it present?
739 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
740 needs_pidfield = any(r.being_scrubbed for r in rows)
741 # Before 2021-12-07, we used to check r.master_pid, too.
742 # However, if nothing is being scrubbed, then the lack of a
743 # link via primary PID is a researcher inconvenience, not an
744 # de-identification risk.
745 if needs_pidfield and not self.get_pid_name(d, t):
746 raise ValueError(
747 f"Source table {d}.{t} has a "
748 f"{AlterMethodType.SCRUBIN.value!r} "
749 f"field but no primary patient ID field"
750 )
752 pk_colname = None
753 for r in rows:
754 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
755 # Data types for special rows
756 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
757 if r.primary_pid:
758 ensure_no_source_type_mismatch(
759 r, self.config.pidtype, primary_pid=True
760 )
761 if r.master_pid:
762 ensure_no_source_type_mismatch(
763 r, self.config.mpidtype, primary_pid=False
764 )
766 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
767 # Too many PKs?
768 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
769 if r.pk:
770 if pk_colname:
771 raise ValueError(
772 f"Table {d}.{t} has >1 source PK set "
773 f"(previously {pk_colname!r}, "
774 f"now {r.src_field!r})."
775 )
776 pk_colname = r.src_field
778 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
779 # Duff alter method?
780 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
781 for am in r.alter_methods:
782 if am.extract_from_blob:
783 extrow = next(
784 (
785 r2
786 for r2 in rows
787 if r2.src_field == am.extract_ext_field
788 ),
789 None,
790 )
791 if extrow is None:
792 raise ValueError(
793 f"alter_method = {r.alter_method}, "
794 f"but field {am.extract_ext_field} "
795 f"not found in the same table"
796 )
797 if not is_sqlatype_text_over_one_char(
798 extrow.src_sqla_coltype
799 ):
800 raise ValueError(
801 f"alter_method = {r.alter_method}, but "
802 f"field {am.extract_ext_field}, which "
803 f"should contain an extension or "
804 f"filename, is not text of >1 character"
805 )
807 def check_valid(
808 self,
809 prohibited_fieldnames: List[str] = None,
810 check_against_source_db: bool = True,
811 ) -> None:
812 """
813 Check DD validity, internally ± against the source database(s).
815 Args:
816 prohibited_fieldnames:
817 list of prohibited destination fieldnames
818 check_against_source_db:
819 check validity against the source database(s)?
821 Raises:
822 :exc:`ValueError` if the DD is invalid
823 """
824 if prohibited_fieldnames is None:
825 prohibited_fieldnames = [] # type: List[str]
826 log.info("Checking data dictionary...")
827 if not self.rows:
828 raise ValueError("Empty data dictionary")
829 if not self.get_dest_tables_included():
830 raise ValueError(
831 "Empty data dictionary after removing " "redundant tables"
832 )
834 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
835 # Check (or re-check) individual rows
836 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
837 log.debug("Checking DD: individual row validity...")
838 for r in self.rows:
839 r.check_valid()
841 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
842 # Check collective consistency
843 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
845 log.debug("Checking DD: prohibited flags...")
846 for d in self.get_source_databases():
847 for t in self.get_src_tables(d):
848 # This will have excluded all tables where all rows are
849 # omitted. So now we have only active tables, for which we
850 # cannot combine certain flags.
851 # (We used to prohibit these combinations at all times, in the
852 # DataDictionaryRow class, but it's inconvenient to have to
853 # alter these flags if you want to omit the whole table.)
854 for r in self.get_rows_for_src_table(d, t):
855 if r.add_src_hash and r.omit:
856 raise ValueError(
857 f"Do not set {Decision.OMIT.value} on "
858 f"{DataDictionaryRow.SRC_FLAGS}="
859 f"{SrcFlag.ADD_SRC_HASH} fields -- "
860 f"currently set for {r.src_signature}"
861 )
862 if r.constant and r.omit:
863 raise ValueError(
864 f"Do not set {Decision.OMIT.value} on "
865 f"{DataDictionaryRow.SRC_FLAGS}="
866 f"{SrcFlag.CONSTANT} fields -- "
867 f"currently set for {r.src_signature}"
868 )
870 log.debug("Checking DD: table consistency...")
871 for d, t in self.get_scrub_from_db_table_pairs():
872 pid_field = self.get_pid_name(d, t)
873 if not pid_field:
874 raise ValueError(
875 f"Scrub-source table {d}.{t} must have a patient ID field "
876 f"(one with flag {SrcFlag.PRIMARY_PID})"
877 )
879 log.debug("Checking DD: prohibited fieldnames...")
880 if prohibited_fieldnames:
881 for r in self.rows:
882 r.check_prohibited_fieldnames(prohibited_fieldnames)
884 log.debug("Checking DD: opt-out fields...")
885 for t in self.get_optout_defining_fields():
886 (src_db, src_table, optout_colname, pid_colname, mpid_colname) = t
887 if not pid_colname and not mpid_colname:
888 raise ValueError(
889 f"Field {src_db}.{src_table}.{optout_colname} has "
890 f"{DataDictionaryRow.SRC_FLAGS}={SrcFlag.OPT_OUT} set, "
891 f"but that table does not have a primary patient ID field "
892 f"or a master patient ID field"
893 )
895 log.debug("Checking DD: destination tables...")
896 for t in self.get_dest_tables_included():
897 sdt = self.get_src_dbs_tables_for_dest_table(t)
898 if len(sdt) > 1:
899 raise ValueError(
900 "Destination table {t} is mapped to by multiple source "
901 "databases: {s}".format(
902 t=t,
903 s=", ".join(["{}.{}".format(s[0], s[1]) for s in sdt]),
904 )
905 )
907 log.debug("Checking DD: duplicate source rows?")
908 src_sigs = [r.src_signature for r in self.rows]
909 src_duplicates = [
910 item for item, count in Counter(src_sigs).items() if count > 1
911 ]
912 if src_duplicates:
913 raise ValueError(f"Duplicate source rows: {src_duplicates}")
915 log.debug("Checking DD: duplicate destination rows?")
916 dst_sigs = [r.dest_signature for r in self.rows if not r.omit]
917 dst_duplicates = [
918 item for item, count in Counter(dst_sigs).items() if count > 1
919 ]
920 if dst_duplicates:
921 raise ValueError(f"Duplicate destination rows: {dst_duplicates}")
923 if check_against_source_db:
924 log.debug("Checking DD against source database tables...")
925 self.check_against_source_db()
927 log.debug("Checking DD: global patient-defining fields...")
928 n_definers = self.n_definers
929 if n_definers == 0:
930 if self.config.allow_no_patient_info:
931 log.warning(
932 "NO PATIENT-DEFINING FIELD! DATABASE(S) WILL "
933 "BE COPIED, NOT ANONYMISED."
934 )
935 else:
936 raise ValueError(
937 f"No patient-defining field! (And "
938 f"{AnonymiseConfigKeys.ALLOW_NO_PATIENT_INFO} not set.)"
939 )
940 elif n_definers > 1:
941 log.warning(
942 f"Unusual: >1 field with "
943 f"{DataDictionaryRow.SRC_FLAGS}="
944 f"{SrcFlag.DEFINES_PRIMARY_PIDS} set."
945 )
947 log.debug("Checking DD: table comments")
948 table_comments_seen = set() # type: Set[str]
949 for ddr in self.rows:
950 if not ddr.is_table_comment:
951 continue
952 if ddr.dest_table in table_comments_seen:
953 raise ValueError(
954 f"More than one table comment for destination table "
955 f"{ddr.dest_table}"
956 )
957 table_comments_seen.add(ddr.dest_table)
959 log.debug("... DD checked.")
961 # -------------------------------------------------------------------------
962 # Saving
963 # -------------------------------------------------------------------------
965 def write(self, filename: str, filetype: str = None) -> None:
966 """
967 Writes the dictionary, either specifying the filetype or autodetecting
968 it from the specified filename.
970 Args:
971 filename:
972 Name of file to write, or "-" for stdout (in which case the
973 filetype is forced to TSV).
974 filetype:
975 File type as one of ``.ods``, ``.tsv``, or ``.xlsx``;
976 alternatively, use ``None`` to autodetect from the filename.
977 """
978 log.info("Saving data dictionary...")
979 data = self._as_dict()
980 write_spreadsheet(filename, data, filetype=filetype)
982 def _as_dict(self) -> Dict[str, Any]:
983 """
984 Returns an ordered dictionary representation used for writing
985 spreadsheets.
986 """
987 sheetname = "data_dictionary"
988 rows = [DataDictionaryRow.header_row()] + [
989 ddr.as_row() for ddr in self.rows
990 ]
991 data = OrderedDict()
992 data[sheetname] = rows
993 return data
995 # -------------------------------------------------------------------------
996 # Global DD queries
997 # -------------------------------------------------------------------------
999 @property
1000 def n_definers(self) -> int:
1001 """
1002 The number of patient-defining columns.
1003 """
1004 return sum([1 if x.defines_primary_pids else 0 for x in self.rows])
1006 @lru_cache(maxsize=None)
1007 def get_source_databases(self) -> AbstractSet[str]:
1008 """
1009 Return a SortedSet of source database names.
1010 """
1011 return SortedSet([ddr.src_db for ddr in self.rows if ddr.required])
1013 @lru_cache(maxsize=None)
1014 def get_scrub_from_db_table_pairs(self) -> AbstractSet[Tuple[str, str]]:
1015 """
1016 Return a SortedSet of ``source_database_name, source_table`` tuples
1017 where those fields contain ``scrub_src`` (scrub-from) information.
1018 """
1019 return SortedSet(
1020 [(ddr.src_db, ddr.src_table) for ddr in self.rows if ddr.scrub_src]
1021 )
1022 # even if omit flag set
1024 @lru_cache(maxsize=None)
1025 def get_src_db_tablepairs(self) -> AbstractSet[Tuple[str, str]]:
1026 """
1027 Return a SortedSet of all ``source_database_name, source_table``
1028 tuples.
1029 """
1030 return SortedSet([(ddr.src_db, ddr.src_table) for ddr in self.rows])
1032 @lru_cache(maxsize=None)
1033 def get_src_db_tablepairs_w_pt_info(self) -> AbstractSet[Tuple[str, str]]:
1034 """
1035 Return a SortedSet of ``source_database_name, source_table`` tuples
1036 for tables that contain patient information.
1037 """
1038 return SortedSet(
1039 [
1040 (ddr.src_db, ddr.src_table)
1041 for ddr in self.rows
1042 if ddr.contains_patient_info
1043 ]
1044 )
1046 def get_src_db_tablepairs_w_no_pt_info(
1047 self,
1048 ) -> AbstractSet[Tuple[str, str]]:
1049 """
1050 Return a SortedSet of ``source_database_name, source_table`` tuples
1051 for tables that contain no patient information.
1052 """
1053 return (
1054 self.get_src_db_tablepairs()
1055 - self.get_src_db_tablepairs_w_pt_info()
1056 )
1058 def get_tables_w_no_pt_info(self) -> AbstractSet[str]:
1059 """
1060 Return a SortedSet of ``source_table`` names for tables that contain no
1061 patient information.
1062 """
1063 tables_with_pt_info = SortedSet(
1064 [ddr.src_table for ddr in self.rows if ddr.contains_patient_info]
1065 )
1066 all_tables = SortedSet([ddr.src_table for ddr in self.rows])
1067 return all_tables - tables_with_pt_info
1069 def get_tables_w_scrub_src(self) -> AbstractSet[str]:
1070 """
1071 Return a SortedSet of ``source_table`` names for tables that contain
1072 ``scrub_src`` information, i.e. that contribute to anonymisation.
1073 """
1074 return SortedSet(
1075 [ddr.src_table for ddr in self.rows if ddr.contains_scrub_src]
1076 )
1078 @lru_cache(maxsize=None)
1079 def get_src_db_tablepairs_w_int_pk(self) -> AbstractSet[Tuple[str, str]]:
1080 """
1081 Return a SortedSet of ``source_database_name, source_table`` tuples
1082 for tables that have an integer PK.
1083 """
1084 return SortedSet(
1085 [
1086 (ddr.src_db, ddr.src_table)
1087 for ddr in self.rows
1088 if self.get_int_pk_ddr(ddr.src_db, ddr.src_table) is not None
1089 ]
1090 )
1092 @lru_cache(maxsize=None)
1093 def get_src_dbs_tables_with_no_pt_info_no_pk(
1094 self,
1095 ) -> AbstractSet[Tuple[str, str]]:
1096 """
1097 Return a SortedSet of ``source_database_name, source_table`` tuples
1098 where the table has no patient information and no integer PK.
1099 """
1100 return (
1101 self.get_src_db_tablepairs()
1102 - self.get_src_db_tablepairs_w_pt_info()
1103 - self.get_src_db_tablepairs_w_int_pk()
1104 )
1106 @lru_cache(maxsize=None)
1107 def get_src_dbs_tables_with_no_pt_info_int_pk(
1108 self,
1109 ) -> AbstractSet[Tuple[str, str]]:
1110 """
1111 Return a SortedSet of ``source_database_name, source_table`` tuples
1112 where the table has no patient information and has an integer PK.
1113 """
1114 return (
1115 self.get_src_db_tablepairs()
1116 - self.get_src_db_tablepairs_w_pt_info()
1117 ) & self.get_src_db_tablepairs_w_int_pk() # & is intersection
1119 @lru_cache(maxsize=None)
1120 def get_dest_tables_all(self) -> AbstractSet[str]:
1121 """
1122 Return a SortedSet of all destination table names (including tables
1123 that will receive no contents).
1124 """
1125 return SortedSet([ddr.dest_table for ddr in self.rows])
1127 @lru_cache(maxsize=None)
1128 def get_dest_tables_included(self) -> AbstractSet[str]:
1129 """
1130 Return a SortedSet of all destination table names (tables with at least
1131 some columns that are included).
1132 """
1133 return SortedSet([ddr.dest_table for ddr in self.rows if not ddr.omit])
1135 @lru_cache(maxsize=None)
1136 def get_dest_tables_with_patient_info(self) -> AbstractSet[str]:
1137 """
1138 Return a SortedSet of destination table names that have patient
1139 information.
1140 """
1141 return SortedSet(
1142 [
1143 ddr.dest_table
1144 for ddr in self.rows
1145 if ddr.contains_patient_info and not ddr.omit
1146 ]
1147 )
1149 @lru_cache(maxsize=None)
1150 def get_optout_defining_fields(
1151 self,
1152 ) -> AbstractSet[Tuple[str, str, str, str, str]]:
1153 """
1154 Return a SortedSet of ``src_db, src_table, src_field, pidfield,
1155 mpidfield`` tuples for rows that define opt-out information.
1156 """
1157 return SortedSet(
1158 [
1159 (
1160 ddr.src_db,
1161 ddr.src_table,
1162 ddr.src_field,
1163 self.get_pid_name(ddr.src_db, ddr.src_table),
1164 self.get_mpid_name(ddr.src_db, ddr.src_table),
1165 )
1166 for ddr in self.rows
1167 if ddr.opt_out_info
1168 ]
1169 )
1171 @lru_cache(maxsize=None)
1172 def get_mandatory_scrubber_sigs(self) -> AbstractSet[str]:
1173 """
1174 Return a set of field signatures (strings of the format
1175 ``db.table.column``) for all rows representing "required scrubber"
1176 fields -- that is, rows that must have at least one non-NULL value for
1177 each patient, or the patient won't get processed.
1178 """
1179 return set(
1180 [ddr.src_signature for ddr in self.rows if ddr.required_scrubber]
1181 )
1183 def get_summary_info_for_table(
1184 self, src_db: str, src_table: str
1185 ) -> DDTableSummary:
1186 """
1187 Returns summary information for a specific table.
1188 """
1189 rows = self.get_rows_for_src_table(
1190 src_db, src_table, skip_table_comments=False
1191 )
1193 # Source
1194 src_has_pk = False
1195 src_pk_fieldname = None # type: Optional[str]
1196 src_constant = False
1197 src_addition_only = False
1198 src_defines_pid = False
1199 src_has_pid = False
1200 src_has_mpid = False
1201 src_has_opt_out = False
1202 src_has_patient_scrub_info = False
1203 src_has_third_party_scrub_info = False
1204 src_has_required_scrub_info = False
1205 src_has_table_comment = False
1206 # Destination
1207 dest_table = None # type: Optional[str]
1208 dest_has_rows = False
1209 dest_add_src_hash = False
1210 dest_being_scrubbed = False
1212 for ddr in rows:
1213 # Source
1214 src_has_pk = src_has_pk or ddr.pk
1215 if ddr.pk:
1216 src_pk_fieldname = ddr.src_field
1217 src_constant = src_constant or ddr.constant
1218 src_addition_only = src_addition_only or ddr.addition_only
1219 src_defines_pid = src_defines_pid or ddr.defines_primary_pids
1220 src_has_pid = src_has_pid or ddr.primary_pid
1221 src_has_mpid = src_has_mpid or ddr.master_pid
1222 src_has_opt_out = src_has_opt_out or ddr.opt_out_info
1223 src_has_patient_scrub_info = (
1224 src_has_patient_scrub_info
1225 or ddr.contains_patient_scrub_src_info
1226 )
1227 src_has_third_party_scrub_info = (
1228 src_has_third_party_scrub_info or ddr.contains_third_party_info
1229 )
1230 src_has_required_scrub_info = (
1231 src_has_required_scrub_info or ddr.required_scrubber
1232 )
1233 src_has_table_comment = (
1234 src_has_table_comment or ddr.is_table_comment
1235 )
1236 # Destination
1237 dest_table = dest_table or ddr.dest_table
1238 dest_has_rows = dest_has_rows or not ddr.omit
1239 dest_add_src_hash = dest_add_src_hash or ddr.add_src_hash
1240 dest_being_scrubbed = dest_being_scrubbed or ddr.being_scrubbed
1242 return DDTableSummary(
1243 # Which table?
1244 src_db=src_db,
1245 src_table=src_table,
1246 # Source info
1247 src_has_pk=src_has_pk,
1248 src_pk_fieldname=src_pk_fieldname,
1249 src_constant=src_constant,
1250 src_addition_only=src_addition_only,
1251 src_defines_pid=src_defines_pid,
1252 src_has_pid=src_has_pid,
1253 src_has_mpid=src_has_mpid,
1254 src_has_opt_out=src_has_opt_out,
1255 src_has_patient_scrub_info=src_has_patient_scrub_info,
1256 src_has_third_party_scrub_info=src_has_third_party_scrub_info,
1257 src_has_required_scrub_info=src_has_required_scrub_info,
1258 src_has_table_comment=src_has_table_comment,
1259 # Destination info
1260 dest_table=dest_table,
1261 dest_has_rows=dest_has_rows,
1262 dest_add_src_hash=dest_add_src_hash,
1263 dest_being_scrubbed=dest_being_scrubbed,
1264 )
1266 def get_summary_info_all_tables(self) -> List[DDTableSummary]:
1267 """
1268 Returns summary information by table.
1269 """
1270 infolist = [] # type: List[DDTableSummary]
1271 for src_db, src_table in self.get_src_db_tablepairs():
1272 infolist.append(self.get_summary_info_for_table(src_db, src_table))
1273 return infolist
1275 # -------------------------------------------------------------------------
1276 # Queries by source DB
1277 # -------------------------------------------------------------------------
1279 @lru_cache(maxsize=None)
1280 def get_src_tables(self, src_db: str) -> AbstractSet[str]:
1281 """
1282 For a given source database name, return a SortedSet of all source
1283 tables that are required (that is, ones being copied and ones providing
1284 vital patient information).
1285 """
1286 return SortedSet(
1287 [
1288 ddr.src_table
1289 for ddr in self.rows
1290 if ddr.src_db == src_db and ddr.required
1291 ]
1292 )
1294 @lru_cache(maxsize=None)
1295 def get_src_tables_with_active_dest(self, src_db: str) -> AbstractSet[str]:
1296 """
1297 For a given source database name, return a SortedSet of its source
1298 tables that have an active destination.
1299 """
1300 return SortedSet(
1301 [
1302 ddr.src_table
1303 for ddr in self.rows
1304 if ddr.src_db == src_db and not ddr.omit
1305 ]
1306 )
1308 @lru_cache(maxsize=None)
1309 def get_src_tables_with_patient_info(
1310 self, src_db: str
1311 ) -> AbstractSet[str]:
1312 """
1313 For a given source database name, return a SortedSet of source tables
1314 that have patient information.
1315 """
1316 return SortedSet(
1317 [
1318 ddr.src_table
1319 for ddr in self.rows
1320 if ddr.src_db == src_db and ddr.contains_patient_info
1321 ]
1322 )
1324 @lru_cache(maxsize=None)
1325 def get_patient_src_tables_with_active_dest(
1326 self, src_db: str
1327 ) -> AbstractSet[str]:
1328 """
1329 For a given source database name, return a SortedSet of source tables
1330 that contain patient information and have an active destination table.
1331 """
1332 return self.get_src_tables_with_active_dest(
1333 src_db
1334 ) & self.get_src_tables_with_patient_info(src_db)
1336 # -------------------------------------------------------------------------
1337 # Queries by source DB/table
1338 # -------------------------------------------------------------------------
1340 @lru_cache(maxsize=None)
1341 def get_dest_tables_for_src_db_table(
1342 self, src_db: str, src_table: str
1343 ) -> AbstractSet[str]:
1344 """
1345 For a given source database/table, return a SortedSet of destination
1346 tables.
1347 """
1348 return SortedSet(
1349 [
1350 ddr.dest_table
1351 for ddr in self.rows
1352 if (
1353 ddr.src_db == src_db
1354 and ddr.src_table == src_table
1355 and not ddr.omit
1356 )
1357 ]
1358 )
1360 @lru_cache(maxsize=None)
1361 def get_dest_table_for_src_db_table(
1362 self, src_db: str, src_table: str
1363 ) -> str:
1364 """
1365 For a given source database/table, return the single or the first
1366 destination table.
1367 """
1368 return list(self.get_dest_tables_for_src_db_table(src_db, src_table))[
1369 0
1370 ]
1372 @lru_cache(maxsize=None)
1373 def get_rows_for_src_table(
1374 self, src_db: str, src_table: str, skip_table_comments: bool = True
1375 ) -> AbstractSet[DataDictionaryRow]:
1376 """
1377 For a given source database name/table, return a SortedSet of DD rows.
1378 """
1379 return SortedSet(
1380 [
1381 ddr
1382 for ddr in self.rows
1383 if ddr.src_db == src_db
1384 and ddr.src_table == src_table
1385 and (not skip_table_comments or not ddr.is_table_comment)
1386 ]
1387 )
1389 @lru_cache(maxsize=None)
1390 def get_fieldnames_for_src_table(
1391 self, src_db: str, src_table: str
1392 ) -> AbstractSet[DataDictionaryRow]:
1393 """
1394 For a given source database name/table, return a SortedSet of source
1395 fields.
1396 """
1397 return SortedSet(
1398 [
1399 ddr.src_field
1400 for ddr in self.rows
1401 if ddr.src_db == src_db
1402 and ddr.src_table == src_table
1403 and ddr.src_field
1404 ]
1405 )
1407 @lru_cache(maxsize=None)
1408 def get_scrub_from_rows(
1409 self, src_db: str, src_table: str
1410 ) -> AbstractSet[DataDictionaryRow]:
1411 """
1412 Return a SortedSet of DD rows for all fields containing ``scrub_src``
1413 (scrub-from) information.
1414 """
1415 return SortedSet(
1416 [
1417 ddr
1418 for ddr in self.rows
1419 if (
1420 ddr.scrub_src
1421 and ddr.src_db == src_db
1422 and ddr.src_table == src_table
1423 )
1424 ]
1425 )
1426 # even if omit flag set
1428 def get_scrub_from_rows_as_fieldinfo(
1429 self, src_db: str, src_table: str, depth: int, max_depth: int
1430 ) -> List[ScrubSourceFieldInfo]:
1431 """
1432 Using :meth:`get_scrub_from_rows`, as a list of
1433 :class:`ScrubSourceFieldInfo` objects, which is more convenient for
1434 scrubbing.
1436 Args:
1437 src_db:
1438 Source database name.
1439 src_table:
1440 Source table name.
1441 depth:
1442 Current recursion depth for looking up third-party information.
1443 max_depth:
1444 Maximum permitted recursion depth for looking up third-party
1445 information.
1446 """
1447 ddrows = self.get_scrub_from_rows(src_db, src_table)
1448 infolist = [] # type: List[ScrubSourceFieldInfo]
1449 for ddr in ddrows:
1450 info = ScrubSourceFieldInfo(
1451 is_mpid=(
1452 depth == 0
1453 and ddr.master_pid
1454 # The check for "depth == 0" means that third-party
1455 # information is never marked as patient-related.
1456 ),
1457 is_patient=(depth == 0 and ddr.scrub_src is ScrubSrc.PATIENT),
1458 recurse=(
1459 depth < max_depth
1460 and ddr.scrub_src is ScrubSrc.THIRDPARTY_XREF_PID
1461 ),
1462 required_scrubber=ddr.required_scrubber,
1463 scrub_method=PersonalizedScrubber.get_scrub_method(
1464 ddr.src_datatype, ddr.scrub_method
1465 ),
1466 signature=ddr.src_signature,
1467 value_fieldname=ddr.src_field,
1468 )
1469 infolist.append(info)
1470 return infolist
1472 @lru_cache(maxsize=None)
1473 def get_pk_ddr(
1474 self, src_db: str, src_table: str
1475 ) -> Optional[DataDictionaryRow]:
1476 """
1477 For a given source database name and table, return the DD row for the
1478 PK for that table, whether integer or not.
1480 Will return ``None`` if no such data dictionary row exists.
1481 """
1482 for ddr in self.rows:
1483 if ddr.src_db == src_db and ddr.src_table == src_table and ddr.pk:
1484 return ddr
1485 return None
1487 @lru_cache(maxsize=None)
1488 def get_int_pk_ddr(
1489 self, src_db: str, src_table: str
1490 ) -> Optional[DataDictionaryRow]:
1491 """
1492 For a given source database name and table, return the DD row for the
1493 integer PK for that table.
1495 Will return ``None`` if no such data dictionary row exists.
1496 """
1497 for ddr in self.rows:
1498 if (
1499 ddr.src_db == src_db
1500 and ddr.src_table == src_table
1501 and ddr.pk
1502 and is_sqltype_integer(ddr.src_datatype)
1503 ):
1504 return ddr
1505 return None
1507 @lru_cache(maxsize=None)
1508 def get_int_pk_name(self, src_db: str, src_table: str) -> Optional[str]:
1509 """
1510 For a given source database name and table, return the field name of
1511 the integer PK for that table (or ``None`` if there isn't one).
1512 """
1513 ddr = self.get_int_pk_ddr(src_db, src_table)
1514 if ddr is None:
1515 return None
1516 return ddr.src_field
1518 @lru_cache(maxsize=None)
1519 def has_active_destination(self, src_db: str, src_table: str) -> bool:
1520 """
1521 For a given source database name and table, does it have an active
1522 destination?
1523 """
1524 for ddr in self.rows:
1525 if (
1526 ddr.src_db == src_db
1527 and ddr.src_table == src_table
1528 and not ddr.omit
1529 ):
1530 return True
1531 return False
1533 @lru_cache(maxsize=None)
1534 def get_pid_name(self, src_db: str, src_table: str) -> Optional[str]:
1535 """
1536 For a given source database name and table: return the field name of
1537 the field providing primary PID information (or ``None`` if there isn't
1538 one).
1539 """
1540 for ddr in self.rows:
1541 if (
1542 ddr.src_db == src_db
1543 and ddr.src_table == src_table
1544 and ddr.primary_pid
1545 ):
1546 return ddr.src_field
1547 return None
1549 @lru_cache(maxsize=None)
1550 def get_mpid_name(self, src_db: str, src_table: str) -> Optional[str]:
1551 """
1552 For a given source database name and table: return the field name of
1553 the field providing master PID (MPID) information (or ``None`` if there
1554 isn't one).
1555 """
1556 for ddr in self.rows:
1557 if (
1558 ddr.src_db == src_db
1559 and ddr.src_table == src_table
1560 and ddr.master_pid
1561 ):
1562 return ddr.src_field
1563 return None
1565 # -------------------------------------------------------------------------
1566 # Queries by destination table
1567 # -------------------------------------------------------------------------
1569 @lru_cache(maxsize=None)
1570 def get_src_dbs_tables_for_dest_table(
1571 self, dest_table: str
1572 ) -> AbstractSet[Tuple[str, str]]:
1573 """
1574 For a given destination table, return a SortedSet of ``dbname, table``
1575 tuples, representing source database(s)/table(s).
1576 """
1577 return SortedSet(
1578 [
1579 (ddr.src_db, ddr.src_table)
1580 for ddr in self.rows
1581 if ddr.dest_table == dest_table
1582 ]
1583 )
1585 @lru_cache(maxsize=None)
1586 def get_rows_for_dest_table(
1587 self, dest_table: str, skip_table_comments: bool = True
1588 ) -> AbstractSet[DataDictionaryRow]:
1589 """
1590 For a given destination table, return a SortedSet of DD rows.
1591 """
1592 return SortedSet(
1593 [
1594 ddr
1595 for ddr in self.rows
1596 if ddr.dest_table == dest_table
1597 and not ddr.omit
1598 and (not skip_table_comments or not ddr.is_table_comment)
1599 ]
1600 )
1602 # -------------------------------------------------------------------------
1603 # SQLAlchemy Table objects
1604 # -------------------------------------------------------------------------
1606 @lru_cache(maxsize=None)
1607 def get_dest_sqla_table(self, tablename: str) -> Table:
1608 """
1609 For a given destination table name, return an
1610 :class:`sqlalchemy.sql.schema.Table` object for the destination table
1611 (which we will create).
1612 """
1613 config = self.config
1614 metadata = config.destdb.metadata
1615 timefield = config.timefield
1616 add_mrid_wherever_rid_added = config.add_mrid_wherever_rid_added
1617 pid_found = False
1618 rows_include_mrid_with_expected_name = False
1619 columns = [] # type: List[Column]
1620 extra_kwargs = dict(extend_existing=True) # type: Dict[str, Any]
1621 for ddr in self.get_rows_for_dest_table(
1622 tablename, skip_table_comments=False
1623 ):
1624 if ddr.is_table_comment and not ddr.omit:
1625 extra_kwargs["comment"] = ddr.comment
1626 continue # don't build a column for this one
1627 columns.append(ddr.dest_sqla_column)
1628 if ddr.add_src_hash:
1629 columns.append(self._get_srchash_sqla_column())
1630 if ddr.primary_pid:
1631 columns.append(self._get_trid_sqla_column())
1632 pid_found = True
1633 if (
1634 ddr.master_pid
1635 and ddr.dest_field == config.master_research_id_fieldname
1636 ):
1637 # This table has an explicit MRID field with the expected name;
1638 # we make a note, because if we're being asked to add MRIDs
1639 # automatically along with RIDs, we need not to do it twice.
1640 rows_include_mrid_with_expected_name = True
1641 if (
1642 pid_found
1643 and add_mrid_wherever_rid_added
1644 and not rows_include_mrid_with_expected_name
1645 ):
1646 columns.append(self._get_mrid_sqla_column())
1647 if timefield:
1648 timecol = Column(
1649 timefield,
1650 DateTime,
1651 comment=AnonymiseColumnComments.TIMEFIELD_COMMENT,
1652 )
1653 columns.append(timecol)
1654 return Table(
1655 tablename, metadata, *columns, **TABLE_KWARGS, **extra_kwargs
1656 )
1658 def _get_srchash_sqla_column(self) -> Column:
1659 """
1660 Returns a :class:`sqlalchemy.sql.schema.Column` object for the
1661 "source hash" column (which is inserted into many destination tables
1662 so they can record the hash of their source, for change detection).
1663 """
1664 return Column(
1665 self.config.source_hash_fieldname,
1666 self.config.sqltype_encrypted_pid,
1667 comment="Hashed amalgamation of all source fields",
1668 )
1670 def _get_trid_sqla_column(self) -> Column:
1671 """
1672 Returns a :class:`sqlalchemy.sql.schema.Column` object for the "TRID"
1673 column. This is inserted into all patient-related destination tables as
1674 a high-speed (integer) but impermanent research ID -- a transient
1675 research ID (TRID).
1676 """
1677 return Column(
1678 self.config.trid_fieldname,
1679 TridType,
1680 nullable=False,
1681 comment="Transient integer research ID (TRID)",
1682 )
1684 def _get_mrid_sqla_column(self) -> Column:
1685 """
1686 Returns a :class:`sqlalchemy.sql.schema.Column` object for the "MRID"
1687 column. This is inserted into all patient-related destination tables
1688 where the flag 'add_mrid_wherever_rid_added' is set.
1689 """
1690 return Column(
1691 self.config.master_research_id_fieldname,
1692 self.config.sqltype_encrypted_pid,
1693 nullable=True,
1694 comment="Master research ID (MRID)",
1695 )
1697 # -------------------------------------------------------------------------
1698 # Clear caches
1699 # -------------------------------------------------------------------------
1701 def cached_funcs(self) -> List[Any]:
1702 """
1703 Returns a list of our methods that are cached. See
1704 :func:`clear_caches`.
1705 """
1706 return [
1707 self.get_source_databases,
1708 self.get_scrub_from_db_table_pairs,
1709 self.get_src_db_tablepairs,
1710 self.get_src_db_tablepairs_w_pt_info,
1711 self.get_src_db_tablepairs_w_int_pk,
1712 self.get_src_dbs_tables_with_no_pt_info_no_pk,
1713 self.get_src_dbs_tables_with_no_pt_info_int_pk,
1714 self.get_dest_tables_all,
1715 self.get_dest_tables_included,
1716 self.get_dest_tables_with_patient_info,
1717 self.get_optout_defining_fields,
1718 self.get_mandatory_scrubber_sigs,
1719 self.get_src_tables,
1720 self.get_src_tables_with_active_dest,
1721 self.get_src_tables_with_patient_info,
1722 self.get_patient_src_tables_with_active_dest,
1723 self.get_dest_tables_for_src_db_table,
1724 self.get_dest_table_for_src_db_table,
1725 self.get_rows_for_src_table,
1726 self.get_fieldnames_for_src_table,
1727 self.get_scrub_from_rows,
1728 self.get_pk_ddr,
1729 self.get_int_pk_ddr,
1730 self.get_int_pk_name,
1731 self.has_active_destination,
1732 self.get_pid_name,
1733 self.get_mpid_name,
1734 self.get_src_dbs_tables_for_dest_table,
1735 self.get_rows_for_dest_table,
1736 self.get_dest_sqla_table,
1737 ]
1739 def clear_caches(self) -> None:
1740 """
1741 Clear all our cached information.
1742 """
1743 for func in self.cached_funcs():
1744 func.cache_clear()
1746 def debug_cache_hits(self) -> None:
1747 """
1748 Report cache hit information for our caches, to the Python log.
1749 """
1750 for func in self.cached_funcs():
1751 log.debug(f"{func.__name__}: {func.cache_info()}")
1753 # -------------------------------------------------------------------------
1754 # Filtering
1755 # -------------------------------------------------------------------------
1757 KEEP_FUNCTION_TYPE = Callable[[DataDictionaryRow], bool]
1758 # ... returns keep (True/False)
1760 def remove_rows_by_filter(self, keep: KEEP_FUNCTION_TYPE) -> None:
1761 """
1762 Removes any rows that do not pass a filter function.
1763 (Retains table comment rows.)
1765 Args:
1766 keep:
1767 Function taking a data dictionary row as an argument, and
1768 returning a boolean of whether to keep the row.
1769 """
1770 self.rows = [row for row in self.rows if keep(row)]
1772 def omit_rows_by_filter(self, keep: KEEP_FUNCTION_TYPE) -> None:
1773 """
1774 Set to "omit" any rows that do not pass a filter function.
1775 Does not alter any rows already set to omit.
1776 (Skips table comment rows.)
1778 Args:
1779 keep:
1780 Function taking a data dictionary row as an argument, and
1781 returning a boolean of whether to keep the row.
1782 """
1783 for row in self.rows:
1784 if row.is_table_comment:
1785 continue
1786 if not row.omit:
1787 row.omit = not keep(row)
1789 MODIFYING_KEEP_FUNCTION_TYPE = Callable[
1790 [DataDictionaryRow], Optional[DataDictionaryRow]
1791 ]
1792 # returns the row (perhaps modified) to keep, or None to reject
1794 def remove_rows_by_modifying_filter(
1795 self, keep_modify: MODIFYING_KEEP_FUNCTION_TYPE
1796 ) -> None:
1797 """
1798 Removes any rows that do not pass a filter function; allows the filter
1799 function to modify rows that are kept.
1800 (Retains table comment rows.)
1802 Args:
1803 keep_modify:
1804 Function taking a data dictionary row as an argument, and
1805 returning either the row (potentially modified) to retain it,
1806 or ``None`` to reject it.
1807 """
1808 new_rows = [] # type: List[DataDictionaryRow]
1809 for row in self.rows:
1810 if row.is_table_comment:
1811 continue
1812 result = keep_modify(row)
1813 if result is not None:
1814 new_rows.append(result)
1815 self.rows = new_rows