Coverage for anonymise/anonymise.py: 35%
774 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-01-09 10:40 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-01-09 10:40 -0600
1"""
2crate_anon/anonymise/anonymise.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Anonymise one or more SQL-based source databases into a destination database
27using a data dictionary.**
29"""
31# =============================================================================
32# Imports
33# =============================================================================
35import logging
36import random
37import sys
38from datetime import datetime
39from typing import Any, Dict, Iterable, Generator, List, Tuple, Union
41from cardinal_pythonlib.datetimefunc import get_now_utc_pendulum
42from cardinal_pythonlib.sqlalchemy.core_query import count_star, exists_plain
43from cardinal_pythonlib.sqlalchemy.insert_on_duplicate import (
44 insert_with_upsert_if_supported,
45)
46from cardinal_pythonlib.sqlalchemy.schema import (
47 add_index,
48 get_column_names,
49)
50from sortedcontainers import SortedSet
51from sqlalchemy.exc import DatabaseError, IntegrityError
52from sqlalchemy.schema import Column, Index, MetaData, Table
53from sqlalchemy.sql import column, func, or_, select, table, text
54from sqlalchemy.types import Boolean
56from crate_anon.anonymise.config_singleton import config
57from crate_anon.anonymise.constants import (
58 AnonymiseConfigKeys,
59 AnonymiseDatabaseSafeConfigKeys,
60 BIGSEP,
61 DEFAULT_CHUNKSIZE,
62 DEFAULT_REPORT_EVERY,
63 IndexType,
64 TABLE_KWARGS,
65 SEP,
66)
67from crate_anon.anonymise.models import (
68 OptOutMpid,
69 OptOutPid,
70 PatientInfo,
71 TridRecord,
72)
73from crate_anon.anonymise.patient import Patient
74from crate_anon.anonymise.ddr import DataDictionaryRow
75from crate_anon.common.file_io import (
76 gen_integers_from_file,
77 gen_words_from_file,
78)
79from crate_anon.common.parallel import is_my_job_by_hash, is_my_job_by_int
80from crate_anon.common.sql import matches_tabledef
82log = logging.getLogger(__name__)
85# =============================================================================
86# Database queries
87# =============================================================================
90def identical_record_exists_by_hash(
91 dest_table: str, pkfield: str, pkvalue: int, hashvalue: str
92) -> bool:
93 """
94 For a given PK in a given destination table, is there a record with the
95 specified value for its source hash?
97 Args:
98 dest_table: name of the destination table
99 pkfield: name of the primary key (PK) column in the destination table
100 pkvalue: integer value of the PK in the destination table
101 hashvalue: hash of the source
102 """
103 return exists_plain(
104 config.destdb.session,
105 dest_table,
106 column(pkfield) == pkvalue,
107 column(config.source_hash_fieldname) == hashvalue,
108 )
111def identical_record_exists_by_pk(
112 dest_table: str, pkfield: str, pkvalue: int
113) -> bool:
114 """
115 For a given PK in a given destination table, does a record exist?
117 Args:
118 dest_table: name of the destination table
119 pkfield: name of the primary key (PK) column in the destination table
120 pkvalue: integer value of the PK in the destination table
121 """
122 return exists_plain(
123 config.destdb.session, dest_table, column(pkfield) == pkvalue
124 )
127# =============================================================================
128# Database actions
129# =============================================================================
132def wipe_and_recreate_destination_db(
133 incremental: bool = False, full_drop_only: bool = False
134) -> None:
135 """
136 Drop and recreate all destination tables (as specified in the DD) in the
137 destination database.
139 Args:
140 incremental:
141 Don't drop the tables first, just create them if they don't exist.
142 full_drop_only:
143 Drop everything, but don't rebuild. Incompatible with
144 ``incremental``.
145 """
146 assert not (incremental and full_drop_only)
147 engine = config.destdb.engine
149 if full_drop_only:
150 log.info("Dropping tables from destination database")
151 else:
152 log.info(
153 f"Rebuilding destination database (incremental={incremental})"
154 )
156 # Drop (all tables that we know about -- this prevents orphan tables when
157 # we alter a data dictionary).
158 if not incremental:
159 for tablename in config.dd.get_dest_tables_all():
160 sqla_table = config.dd.get_dest_sqla_table(tablename)
161 log.info(f"Dropping table: {tablename}")
162 sqla_table.drop(engine, checkfirst=True)
163 if full_drop_only:
164 return
166 # Create and check (tables that will receive content).
167 for tablename in config.dd.get_dest_tables_included():
168 sqla_table = config.dd.get_dest_sqla_table(tablename)
169 # Create
170 log.info(f"Creating table: {tablename}")
171 log.debug(repr(sqla_table))
172 sqla_table.create(engine, checkfirst=True)
173 # Check
174 resulting_fieldnames = get_column_names(engine, tablename)
175 target_set = set(sqla_table.columns.keys())
176 outcome_set = set(resulting_fieldnames)
177 missing = list(target_set - outcome_set)
178 extra = list(outcome_set - target_set)
179 if missing:
180 raise RuntimeError(
181 f"Missing fields in destination table {tablename}: {missing}"
182 )
183 if extra:
184 log.warning(
185 f"Extra fields in destination table {tablename}: {extra}"
186 )
189def delete_dest_rows_with_no_src_row(
190 srcdbname: str,
191 src_table: str,
192 report_every: int = DEFAULT_REPORT_EVERY,
193 chunksize: int = DEFAULT_CHUNKSIZE,
194) -> None:
195 """
196 For a given source database/table, delete any rows in the corresponding
197 destination table where there is no corresponding source row.
199 - Can't do this in a single SQL command, since the engine can't
200 necessarily see both databases.
201 - Can't do this in a multiprocess way, because we're trying to do a
202 ``DELETE WHERE NOT IN``.
203 - However, we can get stupidly long query lists if we try to ``SELECT`` all
204 the values and use a ``DELETE FROM x WHERE y NOT IN (v1, v2, v3, ...)``
205 query. This crashes the MySQL connection, etc.
206 - Therefore, we need a temporary table in the destination.
208 Args:
209 srcdbname: name (as per the data dictionary) of the source database
210 src_table: name of the source table
211 report_every: report to the Python log every *n* records
212 chunksize: insert records every *n* records
213 """
214 if not config.dd.has_active_destination(srcdbname, src_table):
215 return
216 dest_table_name = config.dd.get_dest_table_for_src_db_table(
217 srcdbname, src_table
218 )
219 start = (
220 f"delete_dest_rows_with_no_src_row: "
221 f"{srcdbname}.{src_table} -> {config.destdb.name}.{dest_table_name}: "
222 )
223 log.info(start + "[WARNING: MAY BE SLOW]")
225 metadata = MetaData() # operate in isolation!
226 destengine = config.destdb.engine
227 destsession = config.destdb.session
228 dest_table = config.dd.get_dest_sqla_table(dest_table_name)
229 pkddr = config.dd.get_pk_ddr(srcdbname, src_table)
231 # If there's no source PK, we just delete everything
232 if not pkddr:
233 log.info("... No source PK; deleting everything")
234 destsession.execute(dest_table.delete())
235 commit_destdb()
236 return
238 if pkddr.addition_only:
239 log.info("... Table marked as addition-only; not deleting anything")
240 return
242 # Drop/create temporary table
243 pkfield = "srcpk"
244 temptable = Table(
245 config.temporary_tablename,
246 metadata,
247 Column(pkfield, pkddr.dest_sqla_coltype, primary_key=True),
248 **TABLE_KWARGS,
249 )
250 # THIS (ABOVE) IS WHAT CONSTRAINS A USER-DEFINED PK TO BE UNIQUE WITHIN ITS
251 # TABLE.
252 log.debug("... dropping temporary table")
253 temptable.drop(destengine, checkfirst=True)
254 log.debug("... making temporary table")
255 temptable.create(destengine, checkfirst=True)
257 # Populate temporary table, +/- PK translation
258 n = count_star(config.sources[srcdbname].session, src_table)
259 log.debug(f"... populating temporary table: {n} records to go")
261 def insert(records_: List[Dict[str, Any]]) -> None:
262 log.debug(start + f"... inserting {len(records_)} records")
263 destsession.execute(temptable.insert(), records_)
265 i = 0
266 records = [] # type: List[Dict[str, Any]]
267 for pk in gen_pks(srcdbname, src_table, pkddr.src_field):
268 i += 1
269 if report_every and i % report_every == 0:
270 log.debug(start + f"... src row# {i} / {n}")
271 if pkddr.primary_pid:
272 pk = config.encrypt_primary_pid(pk)
273 elif pkddr.master_pid:
274 pk = config.encrypt_master_pid(pk)
275 records.append({pkfield: pk})
276 if i % chunksize == 0:
277 insert(records)
278 records = [] # type: List[Dict[str: Any]]
279 if records: # remainder
280 insert(records)
281 commit_destdb()
283 # 4. Index -- no, hang on, it's a primary key already
285 # 5. DELETE FROM desttable
286 # WHERE destpk NOT IN (SELECT srcpk FROM temptable)
287 log.debug("... deleting from destination where appropriate")
288 query = dest_table.delete().where(
289 ~column(pkddr.dest_field).in_(select(temptable.columns[pkfield]))
290 )
291 destsession.execute(query)
292 commit_destdb()
294 # 6. Drop temporary table
295 log.debug("... dropping temporary table")
296 temptable.drop(destengine, checkfirst=True)
298 # 7. Commit
299 commit_destdb()
302def commit_destdb() -> None:
303 """
304 Execute a ``COMMIT`` on the destination database, and reset row counts.
305 """
306 config.commit_dest_db()
309def commit_admindb() -> None:
310 """
311 Execute a ``COMMIT`` on the admin database, which is using ORM sessions.
312 """
313 config.admindb.session.commit()
316# =============================================================================
317# Opt-out
318# =============================================================================
321def opting_out_pid(pid: Union[int, str]) -> bool:
322 """
323 Does this patient wish to opt out?
325 Args:
326 pid: patient identifier (PID)
327 """
328 if pid is None:
329 return False
330 return OptOutPid.opting_out(config.admindb.session, pid)
333def opting_out_mpid(mpid: Union[int, str]) -> bool:
334 """
335 Does this patient wish to opt out?
337 Args:
338 mpid: master patient identifier (MPID)
339 """
340 if mpid is None:
341 return False
342 return OptOutMpid.opting_out(config.admindb.session, mpid)
345def gen_optout_rids() -> Generator[str, None, None]:
346 """
347 Generates RIDs for patients who opt out (which we can use to wipe their
348 information from the destination database).
350 Yields:
351 string: research ID (RID)
352 """
353 session = config.admindb.session
354 result = session.query(PatientInfo.rid).filter(
355 or_(
356 PatientInfo.pid.in_(session.query(OptOutPid.pid)),
357 PatientInfo.mpid.in_(session.query(OptOutMpid.mpid)),
358 )
359 )
360 for row in result:
361 yield row[0]
364# =============================================================================
365# Functions for getting PIDs from restricted set
366# =============================================================================
369def get_valid_pid_subset(given_pids: List[str]) -> List[str]:
370 """
371 Takes a list of PIDs and returns those in the list which are also in the
372 database.
373 """
374 pid_is_integer = config.pidtype_is_integer
375 if pid_is_integer:
376 # Remove non-integer values of pid if pids are supposed to be integer
377 final_given_pids = [] # type: List[str]
378 for pid in given_pids:
379 try:
380 int(pid)
381 final_given_pids.append(pid)
382 except (TypeError, ValueError):
383 print(f"pid '{pid}' should be in integer form. ", end="")
384 print("Excluding value.")
385 else:
386 final_given_pids = given_pids
388 pids = [] # type: List[str]
389 for ddr in config.dd.rows:
390 if not ddr.defines_primary_pids:
391 continue
392 pidcol = column(ddr.src_field)
393 session = config.sources[ddr.src_db].session
394 query = (
395 select(pidcol)
396 .select_from(table(ddr.src_table))
397 .where(pidcol is not None)
398 .distinct()
399 )
400 result = session.execute(query)
401 real_pids = [str(x[0]) for x in result]
402 for pid in real_pids:
403 if pid in final_given_pids:
404 pids.append(pid)
406 return pids
409def get_pid_subset_from_field(
410 field: str, values_to_find: List[Any]
411) -> List[Any]:
412 """
413 Takes a field name and elements from that field (values present in that
414 field) and queries the database to find the PIDs associated with these
415 values.
417 Args:
418 field: field name in the format ``database.table.field``
419 values_to_find: values to look for
421 Returns:
422 list of PIDs
424 For example, suppose you have a source table called
425 ``mydb.mystudyinfo`` like this:
427 =============== ============================
428 pid (INTEGER) include_in_extract (VARCHAR)
429 =============== ============================
430 1 no
431 2 0
432 3 yes
433 4 1
434 5 definitely
435 =============== ============================
437 then a call like
439 .. code-block:: python
441 get_subset_from_field("mydb.mystudyinfo.include_in_extract",
442 ["yes", "1", "definitely"])
444 should return ``[3, 4, 5]``, assuming that ``pid`` has been correctly
445 marked as the PID column in the data dictionary.
447 """
448 pids = [] # type: List[Any]
450 # Get database, table and field from 'field'
451 db_parts = field.split(".")
452 assert (
453 len(db_parts) == 3
454 ), "field parameter must be of the form 'db.table.field'"
455 db, tablename, fieldname = db_parts
457 try:
458 session = config.sources[db].session
459 except (KeyError, AttributeError):
460 print(
461 f"Unable to connect to database {db}. "
462 f"Remember argument to '--restrict' must be of the form "
463 f"'database.table.field', or be 'pid'."
464 )
465 return pids
467 fieldcol = column(fieldname)
468 row = None # for type checker
469 for ddr in config.dd.rows:
470 if ddr.src_db != db or not ddr.defines_primary_pids:
471 continue
472 # Check if the field given is in the table with the pids
473 if fieldname in config.dd.get_fieldnames_for_src_table(
474 db, ddr.src_table
475 ):
476 pidcol = column(ddr.src_field)
477 session = config.sources[ddr.src_db].session
478 # Find pids corresponding to the given values of specified field
479 query = (
480 select(pidcol)
481 .select_from(table(ddr.src_table))
482 .where((fieldcol.in_(values_to_find)) & (pidcol is not None))
483 .distinct()
484 )
485 result = session.execute(query)
486 pids.extend([x[0] for x in result])
487 # As there is only one relavant database here, we return pids
488 return pids
489 # Mark out row of dd with primary pid for relavant database
490 row = ddr
491 if row is None:
492 # Didn't find one
493 return []
495 # ###### Doesn't work! Trying in plain SQL #########
496 # # Deal with case where the field specified isn't in the table
497 # # with the primary pid
498 # session = config.sources[db].session
499 # pidcol = column(row.src_field)
500 # session = config.sources[ddr.src_db].session
501 # chosen_table = table(tablename)
502 # ddr_table = table(row.src_table)
503 # join_obj = ddr_table.join(chosen_table,
504 # chosen_table.c.fieldcol == ddr_table.c.pidcol)
505 # query = (
506 # select(pidcol).
507 # select_from(join_obj).
508 # where((chosen_table.fieldcol.in_(field_elements)) &
509 # (ddr_table.pidcol is not None)).
510 # distinct()
511 # )
513 # # Deal with case where the field specified isn't in the table
514 # # with the primary pid
515 source_field = row.src_field
516 source_table = row.src_table
517 # Convert list to string in correct form for query
518 txt_elements = ", ".join(values_to_find)
519 txt_elements = "(" + txt_elements + ")"
521 sql = text(
522 f"""
523 SELECT {source_table}.{source_field}
524 FROM {source_table}
525 JOIN {tablename}
526 ON {source_table}.{source_field} = {tablename}.{fieldname}
527 WHERE {tablename}.{fieldname} IN {txt_elements}
528 AND {source_table}.{source_field} IS NOT NULL
529 """
530 )
531 result = session.execute(sql)
533 # todo:: fix this raw SQL to SQLAlchemy; should be possible.
534 # Is the proble the "&"?
536 pids.extend([x[0] for x in result])
537 return pids
540def fieldname_is_pid(field: str) -> bool:
541 """
542 Checks if a field name is the literal ``'pid'`` or, if in the form
543 ``'database.table.field'``, is the name of a primary PID field in the
544 source database. If either of those conditions is met, return ``True``;
545 otherwise, ``False``.
546 """
547 if field == "pid":
548 return True
549 for ddr in config.dd.rows:
550 if ddr.defines_primary_pids:
551 if field == ddr.src_signature:
552 return True
553 return False
556def get_pids_from_file(field: str, filename: str) -> List[str]:
557 """
558 Takes a field name, and a filename of values of that field, and returns
559 a list of PIDs associated with them.
561 Args:
562 field:
563 a fieldname of the format ``database.table.field``, or the literal
564 ``pid``
565 filename:
566 A file containing words that represent values to look for, as
567 follows.
569 - If ``field`` is the string literal ``'pid'``, or is the name of
570 a source database field containing PIDs, then the values in the
571 file should be PIDs. We check that they are valid.
572 - If it's another kind of field, look for values (from the file)
573 in this field, and return the value of the PID column from the
574 same row of the table. (See :func:`get_pid_subset_from_field`.)
576 Returns:
577 list of PIDs
578 """
579 field_is_pid = fieldname_is_pid(field)
580 # pid_is_integer = config.pidtype_is_integer
581 if field_is_pid:
582 # If the chosen field is a PID field, just make sure all PIDs in the
583 # file are valid
584 given_pids = list(gen_words_from_file(filename))
585 pids = get_valid_pid_subset(given_pids)
586 else:
587 field_elements = list(gen_words_from_file(filename))
588 pids = get_pid_subset_from_field(field, field_elements)
590 return pids
593def get_pids_from_list(field: str, values: List[str]) -> List[str]:
594 """
595 Takes a field name and a list of values, and returns a list of PIDs
596 associated with them.
598 Args:
599 field:
600 a fieldname of the format ``database.table.field``, or the literal
601 ``pid``
602 values:
603 Values to look for, as follows.
605 - If ``field`` is the string literal ``'pid'``, or is the name of
606 a source database field containing PIDs, then the values in the
607 should be PIDs. We check that they are valid.
608 - If it's another kind of field, look for the values in this field,
609 and return the value of the PID column from the same row of the
610 table. (See :func:`get_pid_subset_from_field`.)
612 Returns:
613 list of PIDs
615 """
616 field_is_pid = fieldname_is_pid(field)
617 if field_is_pid:
618 pids = get_valid_pid_subset(values)
619 else:
620 pids = get_pid_subset_from_field(field, values)
622 return pids
625def get_pids_from_limits(low: int, high: int) -> List[Any]:
626 """
627 Finds PIDs from the source database that are between ``low`` and ``high``
628 inclusive.
630 - The SQL ``BETWEEN`` operator is inclusive
631 (https://www.w3schools.com/sql/sql_between.asp).
633 Args:
634 low: lower (inclusive) limit
635 high: upper (inclusive) limit
637 Returns:
638 list of PIDs in this range
639 """
640 pids = [] # type: List[Any]
641 for ddr in config.dd.rows:
642 if not ddr.defines_primary_pids:
643 continue
644 pidcol = column(ddr.src_field)
645 session = config.sources[ddr.src_db].session
646 query = (
647 select(pidcol)
648 .select_from(table(ddr.src_table))
649 .where((pidcol.between(low, high)) & (pidcol is not None))
650 .distinct()
651 )
652 result = session.execute(query)
653 pids.extend([x[0] for x in result])
655 return pids
658def get_pids_query_field_limits(field: str, low: int, high: int) -> List[Any]:
659 """
660 Takes a field name and queries the database to find the PIDs associated
661 with records where ``field`` is in the range ``low`` to ``high`` inclusive.
663 Args:
664 field: field name in the format ``database.table.field``
665 low: lower (inclusive) limit
666 high: upper (inclusive) limit
668 Returns:
669 list of PIDs
671 For example, suppose you have a source table called ``mydb.myoptouts`` like
672 this:
674 =============== ==========================
675 pid (INTEGER) opt_out_level (INTEGER)
676 =============== ==========================
677 1 0
678 2 1
679 3 2
680 4 3
681 5 4
682 =============== ==========================
684 then a call like
686 .. code-block:: python
688 get_subset_from_field("mydb.myoptouts.opt_out_level", 2, 3)
690 should return ``[3, 4]``, assuming that ``pid`` has been correctly marked
691 as the PID column in the data dictionary.
692 """
693 pids = [] # type: List[Any]
694 # Get database, table and field from 'field'
695 db_parts = field.split(".")
696 assert (
697 len(db_parts) == 3
698 ), "field parameter must be of the form 'db.table.field'"
699 db, tablename, fieldname = db_parts
701 try:
702 session = config.sources[db].session
703 except (KeyError, AttributeError):
704 print(
705 f"Unable to connect to database {db}. "
706 f"Remember argument to '--restrict' must be of the form "
707 f"'database.table.field', or be 'pid'."
708 )
709 return pids
711 fieldcol = column(fieldname)
712 row = None
713 for ddr in config.dd.rows:
714 if ddr.src_db != db or not ddr.defines_primary_pids:
715 continue
716 # Check if the field given is in the table with the pids
717 if fieldname in config.dd.get_fieldnames_for_src_table(
718 ddr.src_db, ddr.src_table
719 ):
720 pidcol = column(ddr.src_field)
721 session = config.sources[ddr.src_db].session
722 # Find pids corresponding to the given values of specified field
723 query = (
724 select(pidcol)
725 .select_from(table(ddr.src_table))
726 .where((fieldcol.between(low, high)) & (pidcol is not None))
727 .distinct()
728 )
729 result = session.execute(query)
730 pids.extend([x[0] for x in result])
731 # As there is only one relavant database here, we return pids
732 return pids
733 # Mark out row of dd with primary pid for relavant database
734 row = ddr
735 if row is None:
736 # Didn't find one
737 return []
739 # Deal with case where the field specified isn't in the table
740 # with the primary pid
741 source_field = row.src_field
742 source_table = row.src_table
743 sql = text(
744 f"""
745 SELECT {source_table}.{source_field}
746 FROM {source_table}
747 JOIN {tablename}
748 ON {source_table}.{source_field} = {tablename}.{fieldname}"
749 WHERE ({tablename}.{fieldname} BETWEEN {low} AND {high})
750 AND {source_table}.{source_field} IS NOT NULL"
751 """
752 )
753 result = session.execute(sql)
755 # todo:: fix raw SQL as above
757 pids.extend([x[0] for x in result])
759 return pids
762def get_pids_from_field_limits(field: str, low: int, high: int) -> List[Any]:
763 """
764 Takes a field name and a lower/upper limit, and returns a list of
765 associated PIDs.
767 Args:
768 field:
769 a fieldname of the format ``database.table.field``, or the literal
770 ``pid``
771 low:
772 lower (inclusive) limit
773 high:
774 upper (inclusive) limit
776 The range is used as follows.
778 - If ``field`` is the string literal ``'pid'``, or is the name of
779 a source database field containing PIDs, then fetch PIDs in the specified
780 range and check that they are valid.
781 - If it's another kind of field, look for rows where this field is in the
782 specified range, and return the value of the PID column from the same row
783 of the table. (See :func:`get_pids_query_field_limits`.)
785 Returns:
786 list of PIDs
788 """
789 field_is_pid = fieldname_is_pid(field)
790 if field_is_pid:
791 pids = get_pids_from_limits(low, high)
792 else:
793 pids = get_pids_query_field_limits(field, low, high)
795 return pids
798# =============================================================================
799# Generators. Anything reading the main database should use a generator, so the
800# script can scale to databases of arbitrary size.
801# =============================================================================
804def gen_patient_ids(
805 tasknum: int = 0,
806 ntasks: int = 1,
807 specified_pids: List[Union[int, str]] = None,
808) -> Generator[Union[int, str], None, None]:
809 """
810 Generate patient IDs.
812 Args:
813 tasknum: task number of this process (for dividing up work)
814 ntasks: total number of processes (for dividing up work)
815 specified_pids: optional list of PIDs to restrict ourselves to
817 Yields:
818 integer or string patient IDs (PIDs)
820 - Assigns work to threads/processes, via the simple expedient of processing
821 only those patient ID numbers where ``patientnum % ntasks == tasknum``
822 (for integers), or an equivalent method for string PIDs.
823 """
825 assert ntasks >= 1
826 assert 0 <= tasknum < ntasks
828 pid_is_integer = config.pidtype_is_integer
829 distribute_by_hash = ntasks > 1 and not pid_is_integer
831 # If we're going to define based on >1 table, we need to keep track of
832 # what we've processed. However, if we only have one table, we don't.
833 # We can't use the mapping table easily (*), because it leads to thread/
834 # process locking for database access. So we use a set.
835 # (*) if not patient_id_exists_in_mapping_db(admindb, patient_id): ...
837 # Debug option?
838 if config.debug_pid_list:
839 log.warning("USING MANUALLY SPECIFIED PATIENT ID LIST")
840 for pid in config.debug_pid_list:
841 if pid_is_integer:
842 pid = int(pid)
843 if is_my_job_by_int(pid, tasknum=tasknum, ntasks=ntasks):
844 yield pid
845 else:
846 pid = str(pid)
847 if is_my_job_by_hash(pid, tasknum=tasknum, ntasks=ntasks):
848 yield pid
849 return
851 # Subset specified?
852 if specified_pids is not None:
853 for i, pid in enumerate(specified_pids):
854 if i % ntasks == tasknum:
855 yield pid
856 return
858 # Otherwise do it properly:
859 keeping_track = config.dd.n_definers > 1
860 processed_ids = set() # used only if keeping_track is True
861 # ... POTENTIAL FOR MEMORY PROBLEM WITH V. BIG DB
862 # ... if we ever get near that limit (for a huge number of *patients*,
863 # which is much less likely than a huge number of other records), we'd
864 # need to generate the IDs and stash them in a temporary table, then
865 # work through that. However, a few million patients should be fine
866 # for a Python set on realistic computers.
867 n_found = 0
868 debuglimit = config.debug_max_n_patients
869 for ddr in config.dd.rows:
870 if not ddr.defines_primary_pids:
871 continue
872 log.debug(
873 f"Looking for patient IDs in " f"{ddr.src_table}.{ddr.src_field}"
874 )
875 session = config.sources[ddr.src_db].session
876 pidcol = column(ddr.src_field)
877 query = (
878 select(pidcol)
879 .select_from(table(ddr.src_table))
880 .where(pidcol is not None)
881 .distinct()
882 # .order_by(pidcol) # no need to order by
883 )
884 if ntasks > 1 and pid_is_integer:
885 # With integers, we can take our slice of the workload through a
886 # restricted query.
887 query = query.where(pidcol % ntasks == tasknum)
888 result = session.execute(query)
889 for row in result:
890 # Extract patient ID
891 pid = row[0]
893 # Duff?
894 if pid is None:
895 log.warning("Patient ID is NULL")
896 continue
898 # Ensure type is correct -- even if we are querying from an integer
899 # field and then behaving as if it is a string subsequently.
900 # Note that e.g. SELECT '123' = 123 gives 1 (true), i.e. strings
901 # can be compared to integers.
902 if pid_is_integer:
903 pid = int(pid)
904 else:
905 pid = str(pid)
906 # Operating on non-integer PIDs and not our job?
907 if distribute_by_hash and not is_my_job_by_hash(
908 pid, tasknum=tasknum, ntasks=ntasks
909 ):
910 continue
912 # Duplicate?
913 if keeping_track:
914 # Consider, for non-integer PIDs, storing the hash64 instead
915 # of the raw value.
916 if pid in processed_ids:
917 # we've done this one already; skip it this time
918 continue
919 processed_ids.add(pid)
921 # Valid one
922 log.debug(f"Found patient id: {pid}")
923 n_found += 1
924 yield pid
926 # Too many?
927 if 0 < debuglimit <= n_found:
928 log.warning(
929 f"Not fetching more than {debuglimit} "
930 f"patients (in total for this process) due to "
931 f"{AnonymiseConfigKeys.DEBUG_MAX_N_PATIENTS} limit"
932 )
933 result.close()
934 # http://docs.sqlalchemy.org/en/latest/core/connections.html
935 return
938def estimate_count_patients() -> int:
939 """
940 Estimate the number of patients in the source database.
942 We can't easily and quickly get the total number of patients, because they
943 may be defined in multiple tables across multiple databases. We shouldn't
944 fetch them all into Python in case there are billions, and it's a waste of
945 effort to stash them in a temporary table and count unique rows, because
946 this is all only for a progress indicator. So we approximate.
947 """
948 count = 0
949 for ddr in config.dd.rows:
950 if not ddr.defines_primary_pids:
951 continue
952 session = config.sources[ddr.src_db].session
953 tablename = ddr.src_table
954 count += count_star(session, tablename)
955 return count
958def gen_rows(
959 dbname: str,
960 sourcetable: str,
961 sourcefields: Iterable[str],
962 pid: Union[int, str] = None,
963 intpkname: str = None,
964 tasknum: int = 0,
965 ntasks: int = 1,
966 debuglimit: int = 0,
967) -> Generator[List[Any], None, None]:
968 """
969 Generates rows from a source table:
970 - ... each row being a list of values
971 - ... each value corresponding to a field in sourcefields.
972 - ... optionally restricted to a single patient
974 If the table has a PK and we're operating in a multitasking situation,
975 generate just the rows for this task (thread/process).
977 Args:
978 dbname: name (as per the data dictionary) of the source database
979 sourcetable: name of the source table
980 sourcefields: names of fields in the source table
981 pid: patient ID (PID)
982 intpkname: name of the integer PK column in the source table, if one
983 exists
984 tasknum: task number of this process (for dividing up work)
985 ntasks: total number of processes (for dividing up work)
986 debuglimit: if specified, the maximum number of rows to process
988 Yields:
989 lists, each representing one row and containing values for each of the
990 ``sourcefields``
991 """
992 t = config.sources[dbname].metadata.tables[sourcetable]
993 q = select(*[column(c) for c in sourcefields]).select_from(t)
994 # not ordered
996 # Restrict to one patient?
997 if pid is not None:
998 pidcol_name = config.dd.get_pid_name(dbname, sourcetable)
999 q = q.where(column(pidcol_name) == pid)
1000 else:
1001 # For non-patient tables: divide up rows across tasks?
1002 if intpkname is not None and ntasks > 1:
1003 q = q.where(column(intpkname) % ntasks == tasknum)
1004 # This does not require a user-defined PK to be unique. But other
1005 # constraints do: see delete_dest_rows_with_no_src_row().
1007 db_table_tuple = (dbname, sourcetable)
1008 result = config.sources[dbname].session.execute(q)
1009 for row in result:
1010 if 0 < debuglimit <= config.rows_inserted_per_table[db_table_tuple]:
1011 if not config.warned_re_limits[db_table_tuple]:
1012 log.warning(
1013 f"Table {dbname}.{sourcetable}: not fetching more than "
1014 f"{debuglimit} rows (in total for this process) due to "
1015 f"{AnonymiseDatabaseSafeConfigKeys.DEBUG_ROW_LIMIT}"
1016 )
1017 config.warned_re_limits[db_table_tuple] = True
1018 result.close()
1019 # http://docs.sqlalchemy.org/en/latest/core/connections.html
1020 return
1021 config.notify_src_bytes_read(sys.getsizeof(row)) # ... approximate!
1022 yield list(row)
1023 # yield dict(zip(row.keys(), row))
1024 # see also https://stackoverflow.com/questions/19406859
1025 config.rows_inserted_per_table[db_table_tuple] += 1
1028def count_rows(
1029 dbname: str, sourcetable: str, pid: Union[int, str] = None
1030) -> int:
1031 """
1032 Count the number of rows in a table for a given PID.
1034 Args:
1035 dbname: name (as per the data dictionary) of the source database
1036 sourcetable: name of the source table
1037 pid: patient ID (PID)
1039 Returns:
1040 the number of records
1042 """
1043 # Count function to match gen_rows()
1044 session = config.sources[dbname].session
1045 query = select(func.count()).select_from(table(sourcetable))
1046 if pid is not None:
1047 pidcol_name = config.dd.get_pid_name(dbname, sourcetable)
1048 if not pidcol_name:
1049 raise ValueError(
1050 "No row in the data dictionary provides primary PID "
1051 f"information for db:{dbname}, table: {sourcetable}"
1052 )
1053 query = query.where(column(pidcol_name) == pid)
1054 return session.execute(query).scalar()
1057def gen_index_row_sets_by_table(
1058 tasknum: int = 0, ntasks: int = 1
1059) -> Generator[Tuple[str, List[DataDictionaryRow]], None, None]:
1060 """
1061 Generate ``table, list_of_dd_rows_for_indexed_fields`` tuples for all
1062 tables requiring indexing.
1064 Args:
1065 tasknum: task number of this process (for dividing up work)
1066 ntasks: total number of processes (for dividing up work)
1068 Yields:
1069 tuple: ``table, list_of_dd_rows_for_indexed_fields`` for each table
1070 as above
1072 """
1073 indexrows = [
1074 ddr
1075 for ddr in config.dd.rows
1076 if ddr.index != IndexType.NONE and not ddr.omit
1077 ]
1078 tables = SortedSet([r.dest_table for r in indexrows])
1079 # must sort for parallel processing consistency: set() order varies
1080 for i, t in enumerate(tables):
1081 if i % ntasks != tasknum:
1082 continue
1083 tablerows = [r for r in indexrows if r.dest_table == t]
1084 yield t, tablerows
1087def gen_nonpatient_tables_without_int_pk(
1088 tasknum: int = 0, ntasks: int = 1
1089) -> Generator[Tuple[str, str], None, None]:
1090 """
1091 Generate ``(source db name, source table)`` tuples for all tables that
1093 (a) don't contain patient information and
1094 (b) don't have an integer PK.
1096 Args:
1097 tasknum: task number of this process (for dividing up work)
1098 ntasks: total number of processes (for dividing up work)
1100 Yields:
1101 tuple: ``source_db_name, source_table`` for each table as above
1103 """
1104 db_table_pairs = config.dd.get_src_dbs_tables_with_no_pt_info_no_pk()
1105 # ... returns a SortedSet, so safe to divide parallel processing like this:
1106 for i, pair in enumerate(db_table_pairs):
1107 if i % ntasks != tasknum:
1108 continue
1109 yield pair # will be a (dbname, table) tuple
1112def gen_nonpatient_tables_with_int_pk() -> (
1113 Generator[Tuple[str, str, str], None, None]
1114):
1115 """
1116 Generate ``source_db_name, source_table, pk_name`` tuples for all tables
1117 that
1119 (a) don't contain patient information and
1120 (b) do have an integer PK.
1121 """
1122 db_table_pairs = config.dd.get_src_dbs_tables_with_no_pt_info_int_pk()
1123 for pair in db_table_pairs:
1124 db = pair[0]
1125 tablename = pair[1]
1126 pkname = config.dd.get_int_pk_name(db, tablename)
1127 yield db, tablename, pkname
1130def gen_pks(
1131 srcdbname: str, tablename: str, pkname: str
1132) -> Generator[int, None, None]:
1133 """
1134 Generate PK values from a table.
1136 Args:
1137 srcdbname: name (as per the data dictionary) of the database
1138 tablename: name of the table
1139 pkname: name of the PK column
1141 Yields:
1142 int: each primary key
1143 """
1144 db = config.sources[srcdbname]
1145 t = db.metadata.tables[tablename]
1146 q = select(column(pkname)).select_from(t)
1147 result = db.session.execute(q)
1148 for row in result:
1149 yield row[0]
1152# =============================================================================
1153# Core functions
1154# =============================================================================
1155# - For multithreaded use, the patients are divvied up across the threads.
1156# - KEY THREADING RULE: ALL THREADS MUST HAVE FULLY INDEPENDENT DATABASE
1157# CONNECTIONS.
1160def process_table(
1161 sourcedbname: str,
1162 sourcetable: str,
1163 patient: Patient = None,
1164 incremental: bool = False,
1165 intpkname: str = None,
1166 tasknum: int = 0,
1167 ntasks: int = 1,
1168 free_text_limit: int = None,
1169 exclude_scrubbed_fields: bool = False,
1170) -> None:
1171 """
1172 Process a table. This can either be a patient table (in which case the
1173 patient's scrubber is applied and only rows for that patient are processed)
1174 or not (in which case the table is just copied).
1176 Args:
1177 sourcedbname:
1178 name (as per the data dictionary) of the source database
1179 sourcetable:
1180 name of the source table
1181 patient:
1182 :class:`crate_anon.anonymise.patient.Patient` object, or ``None``
1183 for non-patient tables
1184 incremental:
1185 perform an incremental update, rather than a full run?
1186 intpkname:
1187 name of the integer PK column in the source table
1188 tasknum:
1189 task number of this process (for dividing up work)
1190 ntasks:
1191 total number of processes (for dividing up work)
1192 free_text_limit:
1193 If specified, any text field longer than this will be excluded
1194 exclude_scrubbed_fields:
1195 Exclude all text fields which are being scrubbed.
1196 """
1197 start = f"process_table: {sourcedbname}.{sourcetable}:"
1198 pid = None if patient is None else patient.pid
1199 log.debug(f"{start} pid={pid}, incremental={incremental}")
1201 # Limit the data quantity for debugging?
1202 srccfg = config.sources[sourcedbname].srccfg
1203 if matches_tabledef(sourcetable, srccfg.debug_limited_tables):
1204 debuglimit = srccfg.debug_row_limit
1205 # log.debug(f"Limiting table {sourcetable} to {debuglimit} rows "
1206 # f"(per process)")
1207 else:
1208 debuglimit = 0
1210 ddrows = config.dd.get_rows_for_src_table(sourcedbname, sourcetable)
1211 if all(ddr.omit for ddr in ddrows):
1212 log.debug("... ... all columns omitted.")
1213 return
1214 addhash = any(ddr.add_src_hash for ddr in ddrows)
1215 addtrid = any(ddr.primary_pid and not ddr.omit for ddr in ddrows)
1216 constant = any(ddr.constant for ddr in ddrows)
1217 # If addhash or constant is true AND we are not omitting all rows, then
1218 # the non-omitted rows will include the source PK (by the data dictionary's
1219 # validation process).
1220 ddrows = [
1221 ddr
1222 for ddr in ddrows
1223 if (
1224 (not ddr.omit) # used for data
1225 or (addhash and ddr.scrub_src) # used for hash
1226 or ddr.inclusion_values # used for filter
1227 or ddr.exclusion_values # used for filter
1228 )
1229 ]
1230 # Exclude all text fields over a chosen length
1231 if free_text_limit is not None:
1232 ddrows = [
1233 ddr
1234 for ddr in ddrows
1235 if (
1236 ddr.src_textlength is None
1237 or ddr.src_textlength <= free_text_limit
1238 )
1239 ]
1240 # Exclude all scrubbed fields if requested
1241 if exclude_scrubbed_fields:
1242 ddrows = [
1243 ddr
1244 for ddr in ddrows
1245 if (not ddr.src_is_textual) or (not ddr.being_scrubbed)
1246 ]
1247 if not ddrows:
1248 # No columns to process at all.
1249 return
1250 dest_table = ddrows[0].dest_table
1251 sourcefields = [] # type: List[str]
1252 pkfield_index = None
1253 src_pk_name = None
1254 dest_pk_name = None
1255 pid_is_hashed_in_dest = False
1256 for i, ddr in enumerate(ddrows):
1257 # log.debug(f"DD row: {str(ddr)}")
1258 if ddr.pk:
1259 pkfield_index = i
1260 src_pk_name = ddr.src_field
1261 dest_pk_name = ddr.dest_field
1262 if ddr.add_src_hash and ddr.primary_pid:
1263 pid_is_hashed_in_dest = True
1264 sourcefields.append(ddr.src_field)
1265 srchash = None
1266 timefield = config.timefield
1267 add_mrid_wherever_rid_added = config.add_mrid_wherever_rid_added
1268 mrid_fieldname = config.master_research_id_fieldname
1269 sqla_table = config.dd.get_dest_sqla_table(dest_table)
1270 session = config.destdb.session
1272 # Count what we'll do, so we can give a better indication of progress
1273 count = count_rows(sourcedbname, sourcetable, pid)
1274 n = 0
1275 recnum = tasknum or 0
1277 # Process the rows
1279 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1280 # Generate data
1281 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1282 for row in gen_rows(
1283 sourcedbname,
1284 sourcetable,
1285 sourcefields,
1286 pid,
1287 debuglimit=debuglimit,
1288 intpkname=intpkname,
1289 tasknum=tasknum,
1290 ntasks=ntasks,
1291 ):
1292 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1293 # Reporting
1294 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1295 n += 1
1296 if n % config.report_every_n_rows == 0:
1297 log.info(
1298 f"{start} processing record {recnum + 1}/{count}"
1299 f"{' for this patient' if pid is not None else ''} "
1300 f"({config.overall_progress()})"
1301 )
1302 recnum += ntasks or 1
1303 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1304 # Change detection: source hash and constant rows
1305 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1306 pkvalue = None
1307 if pkfield_index is not None:
1308 pkvalue = row[pkfield_index]
1309 if pid_is_hashed_in_dest:
1310 pkvalue = config.encrypt_primary_pid(pkvalue)
1312 if addhash:
1313 srchash = config.hash_object(row)
1314 if incremental and identical_record_exists_by_hash(
1315 dest_table, dest_pk_name, pkvalue, srchash
1316 ):
1317 log.debug(
1318 f"... ... skipping unchanged record (identical by hash): "
1319 f"{sourcedbname}.{sourcetable}.{src_pk_name} = "
1320 f"(destination) {dest_table}.{dest_pk_name} = "
1321 f"{pkvalue}"
1322 )
1323 continue
1324 if constant:
1325 if incremental and identical_record_exists_by_pk(
1326 dest_table, dest_pk_name, pkvalue
1327 ):
1328 log.debug(
1329 f"... ... skipping unchanged record (identical by PK and "
1330 f"marked as constant): "
1331 f"{sourcedbname}.{sourcetable}.{src_pk_name} = "
1332 f"(destination) {dest_table}.{dest_pk_name} = "
1333 f"{pkvalue}"
1334 )
1335 continue
1336 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1337 # Iterate through values, altering them if necessary
1338 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1339 destvalues = {} # type: Dict[str, Any]
1340 skip_row = False
1341 for i, ddr in enumerate(ddrows):
1342 value = row[i]
1343 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1344 # Skip row?
1345 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1346 if ddr.skip_row_by_value(value):
1347 log.debug(
1348 "... ... skipping row based on inclusion/exclusion values"
1349 )
1350 skip_row = True
1351 break # skip row
1352 # NOTE: would be most efficient if ddrows were ordered with
1353 # inclusion/exclusion fields first. (Not yet done automatically.)
1355 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1356 # Skip column?
1357 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1358 if ddr.omit:
1359 continue # skip column
1361 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1362 # Value alteration: "special" methods (PID, MPID) or other methods
1363 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1364 if ddr.primary_pid:
1365 assert str(value) == str(patient.pid), (
1366 # We compare using str() because we may have an integer and
1367 # a string version. These will hash to the same value (see
1368 # scrub_tests.py).
1369 f"PID mismatch from {ddr.src_signature}: "
1370 f"str(value) = {str(value)!r} but "
1371 f"str(patient.pid) = {str(patient.pid)!r}"
1372 )
1373 value = patient.rid
1374 elif ddr.master_pid:
1375 value = config.encrypt_master_pid(value)
1376 elif ddr.third_party_pid:
1377 # Third-party PID; we encrypt with the same hasher as for other
1378 # PIDs, so that de-identified records remain linkable.
1379 value = config.encrypt_primary_pid(value)
1380 else:
1381 # Value alteration: other methods
1382 for alter_method in ddr.alter_methods:
1383 value, skip_row = alter_method.alter(
1384 value=value,
1385 ddr=ddr,
1386 row=row,
1387 ddrows=ddrows,
1388 patient=patient,
1389 )
1390 if skip_row:
1391 break # from alter method loop
1393 if skip_row:
1394 break # from data dictionary row (field) loop
1396 destvalues[ddr.dest_field] = value
1398 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1399 # Special timestamp field
1400 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1401 if timefield:
1402 destvalues[timefield] = datetime.utcnow()
1404 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1405 # Skip the row?
1406 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1407 if skip_row or not destvalues:
1408 continue # next row
1410 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1411 # Add extra columns?
1412 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1413 if addhash:
1414 destvalues[config.source_hash_fieldname] = srchash
1415 if addtrid:
1416 destvalues[config.trid_fieldname] = patient.trid
1417 if add_mrid_wherever_rid_added:
1418 destvalues[mrid_fieldname] = patient.mrid
1420 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1421 # Insert values into database
1422 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1423 q = insert_with_upsert_if_supported(
1424 table=sqla_table, values=destvalues, session=session
1425 )
1426 try:
1427 session.execute(q)
1428 except IntegrityError:
1429 log.warning(
1430 "Skipping record due to IntegrityError. Non-unique primary "
1431 f"key? {sourcedbname}.{sourcetable}.{src_pk_name} = "
1432 f"(destination) {dest_table}.{dest_pk_name} = "
1433 f"{pkvalue}"
1434 )
1436 # Trigger an early commit?
1437 config.notify_dest_db_transaction(
1438 n_rows=1, n_bytes=sys.getsizeof(destvalues)
1439 ) # ... approximate!
1440 # ... quicker than e.g. len(repr(...)), as judged by a timeit() call.
1442 log.debug(f"{start} finished: pid={pid}")
1443 commit_destdb()
1446def create_indexes(tasknum: int = 0, ntasks: int = 1) -> None:
1447 """
1448 Create indexes for the destination tables.
1450 Args:
1451 tasknum: task number of this process (for dividing up work)
1452 ntasks: total number of processes (for dividing up work)
1453 """
1454 log.info(SEP + "Create indexes")
1455 engine = config.get_destdb_engine_outside_transaction()
1456 mssql = engine.dialect.name == "mssql"
1457 mssql_fulltext_columns_by_table = [] # type: List[List[Column]]
1458 for tablename, tablerows in gen_index_row_sets_by_table(
1459 tasknum=tasknum, ntasks=ntasks
1460 ):
1461 sqla_table = config.dd.get_dest_sqla_table(tablename)
1462 mssql_fulltext_columns = [] # type: List[Column]
1463 for tr in tablerows:
1464 sqla_column = sqla_table.columns[tr.dest_field]
1465 fulltext = tr.index == IndexType.FULLTEXT
1466 if fulltext and mssql:
1467 # Special processing: we can only create one full-text index
1468 # per table under SQL Server, but it can cover multiple
1469 # columns; see below
1470 mssql_fulltext_columns.append(sqla_column)
1471 else:
1472 add_index(
1473 engine=engine,
1474 sqla_column=sqla_column,
1475 unique=(tr.index == IndexType.UNIQUE),
1476 fulltext=fulltext,
1477 length=tr.indexlen,
1478 )
1479 # Extra indexes for TRID, MRID?
1480 if tr.primary_pid:
1481 add_index(
1482 engine,
1483 sqla_table.columns[config.trid_fieldname],
1484 unique=(tr.index == IndexType.UNIQUE),
1485 )
1486 if config.add_mrid_wherever_rid_added:
1487 add_index(
1488 engine,
1489 sqla_table.columns[
1490 config.master_research_id_fieldname
1491 ],
1492 unique=False, # see docs
1493 )
1494 if mssql_fulltext_columns:
1495 mssql_fulltext_columns_by_table.append(mssql_fulltext_columns)
1496 # Special processing for SQL Server FULLTEXT indexes, if any:
1497 for multiple_sqla_columns in mssql_fulltext_columns_by_table:
1498 add_index(
1499 engine=engine,
1500 multiple_sqla_columns=multiple_sqla_columns,
1501 fulltext=True,
1502 )
1505def patient_processing_fn(
1506 tasknum: int = 0,
1507 ntasks: int = 1,
1508 incremental: bool = False,
1509 specified_pids: List[int] = None,
1510 free_text_limit: int = None,
1511 exclude_scrubbed_fields: bool = False,
1512) -> None:
1513 """
1514 Main function to anonymise patient data.
1516 - Iterate through patient IDs;
1517 - build the scrubber for each patient;
1518 - process source data for that patient, scrubbing it;
1519 - insert the patient into the mapping table in the admin database.
1521 Args:
1522 tasknum: task number of this process (for dividing up work)
1523 ntasks: total number of processes (for dividing up work)
1524 incremental: perform an incremental update, rather than a full run?
1525 specified_pids: if specified, restrict to specific PIDs
1526 free_text_limit: as per :func:`process_table`
1527 exclude_scrubbed_fields: as per :func:`process_table`
1528 """
1529 n_patients = estimate_count_patients() // ntasks
1530 i = 0
1531 for pid in gen_patient_ids(tasknum, ntasks, specified_pids=specified_pids):
1532 # gen_patient_ids() assigns the work to the appropriate thread/process
1533 # Check for an abort signal once per patient processed
1534 i += 1
1535 log.info(
1536 f"Processing patient ID: {pid} (incremental={incremental}; "
1537 f"patient {i}/~{n_patients} for this process; "
1538 f"{config.overall_progress()})"
1539 )
1541 # Opt out based on PID?
1542 if opting_out_pid(pid):
1543 log.info("... opt out based on PID")
1544 continue
1545 # MPID information won't be present until we scan all the fields (which
1546 # we do as we build the scrubber).
1548 # Gather scrubbing information for a patient. (Will save.)
1549 try:
1550 adminsession = config.admindb.session
1551 # In case anything else is in the transaction pending commit
1552 adminsession.commit()
1554 patient = Patient(pid)
1555 except DatabaseError:
1556 log.warning(
1557 f"Skipping patient with PID={pid} because the record could "
1558 "not be saved to the secret_map table"
1559 )
1560 adminsession.rollback()
1561 continue
1563 if patient.mandatory_scrubbers_unfulfilled:
1564 log.warning(
1565 f"Skipping patient with PID={pid} as the following scrub_src "
1566 f"fields are required and had no data: "
1567 f"{patient.mandatory_scrubbers_unfulfilled}"
1568 )
1569 continue
1571 # Opt out based on MPID?
1572 if opting_out_mpid(patient.mpid):
1573 log.info("... opt out based on MPID")
1574 continue
1576 patient_unchanged = patient.is_unchanged()
1577 if incremental:
1578 if patient_unchanged:
1579 log.debug("Scrubber unchanged; may save some time")
1580 else:
1581 log.debug("Scrubber new or changed; reprocessing in full")
1583 # For each source database/table...
1584 for d in config.dd.get_source_databases():
1585 log.debug(f"Patient {pid}, processing database: {d}")
1586 for t in config.dd.get_patient_src_tables_with_active_dest(d):
1587 log.debug(f"Patient {pid}, processing table {d}.{t}")
1588 try:
1589 process_table(
1590 d,
1591 t,
1592 patient=patient,
1593 incremental=(incremental and patient_unchanged),
1594 free_text_limit=free_text_limit,
1595 exclude_scrubbed_fields=exclude_scrubbed_fields,
1596 )
1597 except Exception:
1598 log.critical(
1599 "Error whilst processing - "
1600 f"db: {d} table: {t}, patient id: {pid}"
1601 )
1602 raise
1604 commit_destdb()
1607def wipe_destination_data_for_opt_out_patients(
1608 report_every: int = 1000, chunksize: int = 10000
1609) -> None:
1610 """
1611 Delete any data from patients that have opted out (after their data was
1612 processed on a previous occasion).
1614 (Slightly complicated by the fact that the destination database can't
1615 necessarily 'see' the mapping database, so we need to cache the RID keys in
1616 the destination database temporarily.)
1618 Args:
1619 report_every: report logging information every *n* records
1620 chunksize: insert records every *n* records
1621 """
1622 start = "wipe_opt_out_patients"
1623 log.info(start)
1625 adminsession = config.admindb.session
1626 metadata = MetaData() # operate in isolation!
1627 destengine = config.destdb.engine
1628 destsession = config.destdb.session
1629 ridfield = config.research_id_fieldname
1631 # Drop/create temporary table
1632 pkfield = "rid"
1633 temptable = Table(
1634 config.temporary_tablename,
1635 metadata,
1636 Column(pkfield, config.sqltype_encrypted_pid, primary_key=True),
1637 **TABLE_KWARGS,
1638 )
1639 log.debug(start + ": 1. dropping temporary table")
1640 temptable.drop(destengine, checkfirst=True) # use engine, not session
1641 log.debug(start + ": 2. making temporary table")
1642 temptable.create(destengine, checkfirst=True) # use engine, not session
1644 log.debug(start + ": 3. populating temporary table with RIDs")
1646 def insert(records_: List[Dict[str, Any]]) -> None:
1647 # records_: a list of dictionaries
1648 # http://docs.sqlalchemy.org/en/latest/core/tutorial.html
1649 log.debug(start + f"... inserting {len(records_)} records")
1650 destsession.execute(temptable.insert(), records_)
1652 i = 0
1653 records = [] # type: List[Dict[str: Any]]
1654 for rid in gen_optout_rids():
1655 i += 1
1656 if report_every and i % report_every == 0:
1657 log.debug(start + f"... src row# {i}")
1658 records.append({pkfield: rid}) # a row is a dict of values
1659 if i % chunksize == 0:
1660 insert(records)
1661 records = [] # type: List[Dict[str: Any]]
1662 if records: # remainder
1663 insert(records)
1664 commit_destdb()
1666 log.debug(start + ": 4. creating index on temporary table")
1667 index = Index("_temptable_idx", temptable.columns[pkfield])
1668 index.create(destengine) # use engine, not session
1670 # 5. For each patient destination table,
1671 # DELETE FROM desttable WHERE rid IN (SELECT rid FROM temptable)
1672 log.debug(start + ": 5. deleting from destination table by opt-out RID")
1673 for dest_table_name in config.dd.get_dest_tables_with_patient_info():
1674 log.debug(start + f": ... {dest_table_name}")
1675 dest_table = config.dd.get_dest_sqla_table(dest_table_name)
1676 query = dest_table.delete().where(
1677 column(ridfield).in_(select(temptable.columns[pkfield]))
1678 )
1679 destsession.execute(query)
1680 commit_destdb()
1682 log.debug(start + ": 6. dropping temporary table")
1683 temptable.drop(destengine, checkfirst=True) # use engine, not session
1684 commit_destdb()
1686 log.debug(start + ": 7. deleting opt-out patients from mapping table")
1687 adminsession.query(PatientInfo).filter(
1688 or_(
1689 PatientInfo.pid.in_(adminsession.query(OptOutPid.pid)),
1690 PatientInfo.mpid.in_(adminsession.query(OptOutMpid.mpid)),
1691 )
1692 ).delete(synchronize_session=False)
1693 commit_admindb()
1696def drop_remake(
1697 incremental: bool = False,
1698 skipdelete: bool = False,
1699 full_drop_only: bool = False,
1700) -> None:
1701 """
1702 Drop and rebuild (a) mapping table, (b) destination tables.
1704 Args:
1705 incremental:
1706 Doesn't drop tables; just deletes destination information where
1707 source information no longer exists.
1708 skipdelete:
1709 For incremental updates, skip deletion of rows present in the
1710 destination but not the source
1711 full_drop_only:
1712 Performs a full drop (even opt-out tables) and does nothing else.
1713 Incompatible with ``incremental``.
1714 """
1715 assert not (full_drop_only and incremental)
1716 log.info(SEP + "Creating database structure +/- deleting dead data")
1717 engine = config.admindb.engine
1719 # -------------------------------------------------------------------------
1720 # Mapping tables
1721 # -------------------------------------------------------------------------
1723 all_admin_tables = (OptOutMpid, OptOutPid, PatientInfo, TridRecord)
1724 all_admin_except_opt_out = (PatientInfo, TridRecord)
1726 # Drop
1727 if full_drop_only:
1728 log.info("Dropping all admin tables")
1729 to_drop = all_admin_tables
1730 elif not incremental:
1731 log.info("Dropping admin tables except opt-out")
1732 to_drop = all_admin_except_opt_out
1733 else:
1734 # Incremental mode
1735 to_drop = ()
1736 for drop_tableclass in to_drop:
1737 # noinspection PyUnresolvedReferences
1738 drop_tableclass.__table__.drop(engine, checkfirst=True)
1740 # Create
1741 if full_drop_only:
1742 to_create = ()
1743 else:
1744 log.info("Creating admin tables")
1745 to_create = all_admin_tables
1746 for create_tableclass in to_create:
1747 # noinspection PyUnresolvedReferences
1748 create_tableclass.__table__.create(engine, checkfirst=True)
1750 # -------------------------------------------------------------------------
1751 # Destination tables
1752 # -------------------------------------------------------------------------
1754 wipe_and_recreate_destination_db(
1755 incremental=incremental, full_drop_only=full_drop_only
1756 )
1757 if full_drop_only or skipdelete or not incremental:
1758 return
1759 for d in config.dd.get_source_databases():
1760 for t in config.dd.get_src_tables(d):
1761 delete_dest_rows_with_no_src_row(
1762 d,
1763 t,
1764 report_every=config.report_every_n_rows,
1765 chunksize=config.chunksize,
1766 )
1769def gen_opt_out_pids_from_file(
1770 mpid: bool = False,
1771) -> Generator[Union[int, str], None, None]:
1772 """
1773 Generate opt-out PIDs (or MPIDs) from a file.
1775 Args:
1776 mpid:
1777 generate MPIDs, not PIDs (and therefore use
1778 ``config.optout_mpid_filenames``, not
1779 ``config.optout_pid_filenames``, as the set of filenames to read)
1781 Yields:
1782 each PID (or MPID), which will be either ``str`` or ``int`` depending
1783 on the value of ``config.mpidtype_is_integer`` or
1784 ``config.pidtype_is_integer``.
1785 """
1786 if mpid:
1787 txt = "MPID"
1788 filenames = config.optout_mpid_filenames
1789 as_int = config.mpidtype_is_integer
1790 else:
1791 txt = "PID"
1792 filenames = config.optout_pid_filenames
1793 as_int = config.pidtype_is_integer
1794 if not filenames:
1795 log.info(f"... no opt-out {txt} disk files in use")
1796 else:
1797 for filename in filenames:
1798 log.info(f"... {txt} file: {filename}")
1799 if as_int:
1800 for pid in gen_integers_from_file(filename):
1801 yield pid
1802 else:
1803 for pid in gen_words_from_file(filename):
1804 yield pid
1807def remove_invalid_bools_from_optout_values(
1808 optout_colname: str, values: List[Any]
1809) -> List[Any]:
1810 """
1811 Called when the column that defines opt-outs is of boolean type. Removes
1812 any values from ``values`` that is not a valid boolean value (or
1813 None/NULL), announcing it, and return the values that pass the test.
1814 """
1815 returned_values = [] # type: List[Any]
1817 for value in values:
1818 if value not in [None, True, False]:
1819 coltype = type(value).__name__
1820 log.info(
1821 f"... ignoring non-boolean value ({value}), "
1822 f"type '{coltype}' for boolean column '{optout_colname}'"
1823 )
1824 continue
1826 returned_values.append(value)
1828 return returned_values
1831def gen_opt_out_pids_from_database(
1832 mpid: bool = False,
1833) -> Generator[Any, None, None]:
1834 """
1835 Generate opt-out PIDs (or MPIDs) from a database.
1837 Args:
1838 mpid: generate MPIDs, not PIDs
1840 Yields:
1841 each PID (or MPID)
1843 """
1845 txt = "MPID" if mpid else "PID"
1846 found_one = False
1847 defining_fields = config.dd.get_optout_defining_fields()
1848 for t in defining_fields:
1849 src_db, src_table, optout_colname, pid_colname, mpid_colname = t
1850 id_colname = mpid_colname if mpid else pid_colname
1851 if not id_colname:
1852 continue
1853 found_one = True
1854 db_holder = config.sources[src_db]
1855 session = db_holder.session
1856 log.info(
1857 f"... {src_db}.{src_table}.{optout_colname} ({txt}={id_colname})"
1858 )
1859 sqla_table = db_holder.metadata.tables[src_table]
1860 optout_defining_col = sqla_table.columns[optout_colname]
1862 idcol = sqla_table.columns[id_colname]
1864 optout_col_values = config.optout_col_values
1866 # SQL Alchemy will raise a TypeError if optout_col_values includes a
1867 # string value such as "1" and the opt-out field is boolean. The
1868 # rationale behind this is explained at:
1869 # https://docs.sqlalchemy.org/en/14/changelog/migration_12.html#boolean-datatype-now-enforces-strict-true-false-none-values # noqa: E501
1870 # As optout_col_values could potentially include values for both
1871 # boolean and string opt-out fields, we just filter out invalid values
1872 # for boolean opt-out fields.
1873 if isinstance(optout_defining_col.type, Boolean):
1874 optout_col_values = remove_invalid_bools_from_optout_values(
1875 optout_colname, optout_col_values
1876 )
1878 query = select(idcol).select_from(sqla_table).distinct()
1880 if optout_col_values:
1881 # Note that if optout_col_values does not contain valid values,
1882 # this function plays it safe -- ALL PIDs from this table are
1883 # returned, i.e. everyone is opted out. (This is unlikely to happen
1884 # for Boolean columns, because validate_optouts() will have
1885 # pre-validated optout_col_values, but it is legitimate for other
1886 # types.)
1887 query = query.where(optout_defining_col.in_(optout_col_values))
1889 # no need for an order_by clause
1890 result = session.execute(query)
1891 for row in result:
1892 pid = row[0]
1893 yield pid
1894 if not found_one:
1895 log.info(f"... no opt-out-defining {txt} fields in data dictionary")
1898def setup_opt_out(incremental: bool = False) -> None:
1899 """
1900 - Hunts far and wide through its sources for PID/MPID values of patients
1901 who wish to opt out.
1902 - Adds them to the admin tables for
1903 :class:`crate_anon.anonymise.models.OptOutPid` and
1904 :class:`crate_anon.anonymise.models.OptOutMpid`.
1906 Args:
1907 incremental:
1908 after adding opt-out patients, delete any data for them found
1909 in the destination database. (Unnecessary for "full" rather than
1910 "incremental" runs, since "full" runs delete all the destination
1911 tables and start again.)
1913 """
1914 log.info(SEP + "Managing opt-outs")
1915 adminsession = config.admindb.session
1917 log.info("Hunting for opt-out patients from disk file...")
1918 for pid in gen_opt_out_pids_from_file():
1919 # noinspection PyTypeChecker
1920 OptOutPid.add(adminsession, pid)
1921 for mpid in gen_opt_out_pids_from_file(mpid=True):
1922 # noinspection PyTypeChecker
1923 OptOutMpid.add(adminsession, mpid)
1925 log.info("Hunting for opt-out patients from database...")
1926 for pid in gen_opt_out_pids_from_database():
1927 OptOutPid.add(adminsession, pid)
1928 for mpid in gen_opt_out_pids_from_database(mpid=True):
1929 OptOutMpid.add(adminsession, mpid)
1931 adminsession.commit()
1933 if incremental:
1934 wipe_destination_data_for_opt_out_patients()
1937def process_nonpatient_tables(
1938 tasknum: int = 0,
1939 ntasks: int = 1,
1940 incremental: bool = False,
1941 free_text_limit: int = None,
1942 exclude_scrubbed_fields: bool = False,
1943) -> None:
1944 """
1945 Copies all non-patient tables.
1947 - If they have an integer PK, the work may be parallelized.
1948 - If not, whole tables are assigned to different processes in parallel
1949 mode.
1951 Args:
1952 tasknum:
1953 task number of this process (for dividing up work)
1954 ntasks:
1955 total number of processes (for dividing up work)
1956 incremental:
1957 perform an incremental update, rather than a full run?
1958 free_text_limit:
1959 as per :func:`process_table`
1960 exclude_scrubbed_fields:
1961 as per :func:`process_table`
1963 """
1964 log.info(SEP + "Non-patient tables: (a) with integer PK")
1965 for d, t, pkname in gen_nonpatient_tables_with_int_pk():
1966 log.info(
1967 f"Processing non-patient table {d}.{t} (PK: {pkname}) "
1968 f"({config.overall_progress()})..."
1969 )
1970 try:
1971 # noinspection PyTypeChecker
1972 process_table(
1973 d,
1974 t,
1975 patient=None,
1976 incremental=incremental,
1977 intpkname=pkname,
1978 tasknum=tasknum,
1979 ntasks=ntasks,
1980 free_text_limit=free_text_limit,
1981 exclude_scrubbed_fields=exclude_scrubbed_fields,
1982 )
1983 except Exception:
1984 log.critical(f"Error whilst processing - db: {d} table: {t}")
1985 raise
1986 commit_destdb()
1987 log.info(SEP + "Non-patient tables: (b) without integer PK")
1988 for d, t in gen_nonpatient_tables_without_int_pk(
1989 tasknum=tasknum, ntasks=ntasks
1990 ):
1991 log.info(
1992 f"Processing non-patient table {d}.{t} "
1993 f"({config.overall_progress()})..."
1994 )
1995 # Force this into single-task mode, i.e. we have already parallelized
1996 # by assigning different tables to different processes; don't split
1997 # the work within a single table.
1998 try:
1999 # noinspection PyTypeChecker
2000 process_table(
2001 d,
2002 t,
2003 patient=None,
2004 incremental=incremental,
2005 intpkname=None,
2006 tasknum=0,
2007 ntasks=1,
2008 free_text_limit=free_text_limit,
2009 exclude_scrubbed_fields=exclude_scrubbed_fields,
2010 )
2011 except Exception:
2012 log.critical(f"Error whilst processing - db: {d} table: {t}")
2013 raise
2014 commit_destdb()
2017def process_patient_tables(
2018 tasknum: int = 0,
2019 ntasks: int = 1,
2020 incremental: bool = False,
2021 specified_pids: List[int] = None,
2022 free_text_limit: int = None,
2023 exclude_scrubbed_fields: bool = False,
2024) -> None:
2025 """
2026 Process all patient tables, optionally in a parallel-processing fashion.
2028 All the work is done via :func:`patient_processing_fn`.
2030 Args:
2031 tasknum:
2032 task number of this process (for dividing up work)
2033 ntasks:
2034 total number of processes (for dividing up work)
2035 incremental:
2036 perform an incremental update, rather than a full run?
2037 specified_pids:
2038 if specified, restrict to specific PIDs
2039 free_text_limit:
2040 as per :func:`process_table`
2041 exclude_scrubbed_fields:
2042 as per :func:`process_table`
2044 """
2045 # We'll use multiple destination tables, so commit right at the end.
2046 log.info(SEP + "Patient tables")
2047 if ntasks == 1:
2048 log.info("Single-threaded, single-process mode")
2049 else:
2050 log.info(
2051 f"PROCESS {tasknum} (numbered from zero) OF {ntasks} PROCESSES"
2052 )
2053 patient_processing_fn(
2054 tasknum=tasknum,
2055 ntasks=ntasks,
2056 incremental=incremental,
2057 specified_pids=specified_pids,
2058 free_text_limit=free_text_limit,
2059 exclude_scrubbed_fields=exclude_scrubbed_fields,
2060 )
2062 if ntasks > 1:
2063 log.info(f"Process {tasknum}: FINISHED ANONYMISATION")
2064 else:
2065 log.info("FINISHED ANONYMISATION")
2067 # Commit (should be redundant)
2068 commit_destdb()
2071def validate_optouts():
2072 """
2073 Check that our opt-out definitions are valid, or raise ValueError.
2074 """
2075 defining_fields = config.dd.get_optout_defining_fields()
2076 for t in defining_fields:
2077 src_db, src_table, optout_colname, pid_colname, mpid_colname = t
2078 db_holder = config.sources[src_db]
2079 sqla_table = db_holder.metadata.tables[src_table]
2080 optout_defining_col = sqla_table.columns[optout_colname]
2082 optout_col_values = config.optout_col_values
2084 if isinstance(optout_defining_col.type, Boolean):
2085 optout_col_values = remove_invalid_bools_from_optout_values(
2086 optout_colname, optout_col_values
2087 )
2088 if not optout_col_values:
2089 raise ValueError(
2090 f"No valid opt-out values for column '{optout_colname}'"
2091 )
2094# =============================================================================
2095# Main
2096# =============================================================================
2099def anonymise(
2100 incremental: bool = False,
2101 skipdelete: bool = False,
2102 dropremake: bool = False,
2103 full_drop_only: bool = False,
2104 optout: bool = False,
2105 patienttables: bool = False,
2106 nonpatienttables: bool = False,
2107 index: bool = False,
2108 restrict: str = "",
2109 restrict_file: str = "",
2110 restrict_limits: Tuple[Any, Any] = None,
2111 restrict_list: List[Any] = None,
2112 free_text_limit: int = None,
2113 exclude_scrubbed_fields: bool = False,
2114 nprocesses: int = 1,
2115 process: int = 0,
2116 skip_dd_check: bool = False,
2117 seed: str = "",
2118 chunksize: int = DEFAULT_CHUNKSIZE,
2119 reportevery: int = DEFAULT_REPORT_EVERY,
2120 echo: bool = False,
2121 debugscrubbers: bool = False,
2122 savescrubbers: bool = False,
2123) -> None:
2124 """
2125 Main entry point for anonymisation.
2127 Args:
2128 incremental:
2129 If true: incremental run, rather than full.
2130 skipdelete:
2131 (For "incremental".) Skip deletion of rows present in the
2132 destination but not the source.
2134 dropremake:
2135 If true: drop/remake destination tables.
2136 full_drop_only:
2137 If true: drop destination tables (even opt-out ones) and do nothing
2138 else.
2139 optout:
2140 If true: update opt-out list.
2141 patienttables:
2142 If true: process patient tables only (rather than all tables).
2143 nonpatienttables:
2144 If true: process non-patient tables only (rather than all tables).
2145 index:
2146 If true: create indexes only.
2148 restrict:
2149 Restrict to certain patients? Specify a field name, or ``pid``
2150 to restrict by patient IDs.
2151 restrict_file:
2152 (For "restrict".) Filename for permitted values.
2153 restrict_limits:
2154 (For "restrict".) Tuple of lower and upper limits to
2155 apply to the field.
2156 restrict_list:
2157 (For "restrict".) List of permitted values.
2158 free_text_limit:
2159 Filter out all free text over the specified length.
2160 exclude_scrubbed_fields:
2161 Exclude all text fields which are being scrubbed.
2163 nprocesses:
2164 Number of processing being run (of which this is one), for work
2165 allocation.
2166 process:
2167 Number of this process (from 0 to nprocesses - 1), for work
2168 allocation.
2169 skip_dd_check:
2170 If true: skip data dictionary validity check. (Useful in
2171 multiprocessing contexts when another process has already done
2172 this.)
2173 seed:
2174 Seed for random number generator (for TRID generation).
2175 Blank for the default of system time.
2176 chunksize:
2177 Number of records copied in a chunk when copying PKs from one
2178 database to another.
2180 reportevery:
2181 Report insert progress every n rows in verbose mode.
2182 echo:
2183 Echo SQL?
2184 debugscrubbers:
2185 Report sensitive scrubbing information, for debugging
2186 savescrubbers:
2187 Saves sensitive scrubbing information in admin database, for
2188 debugging
2190 """
2191 # Validate args
2192 if nprocesses < 1:
2193 raise ValueError("--nprocesses must be >=1")
2194 if process < 0 or process >= nprocesses:
2195 raise ValueError(
2196 "--process argument must be from 0 to (nprocesses - 1) inclusive"
2197 )
2198 if nprocesses > 1 and dropremake:
2199 raise ValueError("Can't use nprocesses > 1 with --dropremake")
2201 everything = not any(
2202 [dropremake, optout, nonpatienttables, patienttables, index]
2203 )
2205 # Load/validate config
2206 config.report_every_n_rows = reportevery
2207 config.chunksize = chunksize
2208 config.debug_scrubbers = debugscrubbers
2209 config.save_scrubbers = savescrubbers
2210 config.set_echo(echo)
2211 config.load_dd(check_against_source_db=not skip_dd_check)
2212 # The config must be valid:
2213 config.check_valid()
2215 if optout or everything:
2216 validate_optouts()
2218 # -------------------------------------------------------------------------
2219 # Setup
2220 # -------------------------------------------------------------------------
2222 pids = None
2223 if restrict:
2224 if restrict_file:
2225 pids = get_pids_from_file(restrict, restrict_file)
2226 elif restrict_limits:
2227 pids = get_pids_from_field_limits(
2228 restrict, restrict_limits[0], restrict_limits[1]
2229 )
2230 elif restrict_list:
2231 pids = get_pids_from_list(restrict, restrict_list)
2232 else:
2233 raise ValueError(
2234 "'--restrict' option requires one of "
2235 "'--file', '--limits' or '--list'"
2236 )
2237 if not pids:
2238 log.warning("No valid patient ids found for the conditions given")
2240 # random number seed
2241 random.seed(seed)
2243 # -------------------------------------------------------------------------
2244 # The main process
2245 # -------------------------------------------------------------------------
2247 log.info(BIGSEP + "Starting")
2248 start = get_now_utc_pendulum()
2250 # 1. Drop/remake tables. Single-tasking only.
2251 if full_drop_only:
2252 drop_remake(full_drop_only=True)
2253 return
2254 if dropremake or everything:
2255 drop_remake(incremental=incremental, skipdelete=skipdelete)
2257 # 2. Deal with opt-outs
2258 if optout or everything:
2259 setup_opt_out(incremental=incremental)
2261 # 3. Tables with patient info.
2262 # Process PER PATIENT, across all tables, because we have to synthesize
2263 # information to scrub across the entirety of that patient's record.
2264 if patienttables or everything:
2265 process_patient_tables(
2266 tasknum=process,
2267 ntasks=nprocesses,
2268 incremental=incremental,
2269 specified_pids=pids,
2270 free_text_limit=free_text_limit,
2271 exclude_scrubbed_fields=exclude_scrubbed_fields,
2272 )
2274 # 4. Tables without any patient ID (e.g. lookup tables). Process PER TABLE.
2275 if nonpatienttables or everything:
2276 process_nonpatient_tables(
2277 tasknum=process,
2278 ntasks=nprocesses,
2279 incremental=incremental,
2280 free_text_limit=free_text_limit,
2281 exclude_scrubbed_fields=exclude_scrubbed_fields,
2282 )
2284 # 5. Indexes. ALWAYS FASTEST TO DO THIS LAST. Process PER TABLE.
2285 if index or everything:
2286 create_indexes(tasknum=process, ntasks=nprocesses)
2288 log.info(BIGSEP + "Finished")
2289 end = get_now_utc_pendulum()
2290 time_taken = end - start
2291 log.info(f"Time taken: {time_taken.total_seconds()} seconds")
2292 # config.dd.debug_cache_hits()