Coverage for anonymise/config.py: 62%
415 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-01-07 15:56 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-01-07 15:56 -0600
1"""
2crate_anon/anonymise/config.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Config class for CRATE anonymiser.**
28Thoughts on configuration method
30- First version used a ``Config()`` class, which initializes with blank
31 values. The ``anonymise_cli.py`` file creates a config singleton and passes
32 it around. Then when its ``set()`` method is called, it reads a config file
33 and instantiates its settings. An option exists to print a draft config
34 without ever reading one from disk.
36 Advantage: easy to start the program without a valid config file (e.g. to
37 print one).
39 Disadvantage: modules can't be sure that a config is properly instantiated
40 before they are loaded, so you can't easily define a class according to
41 config settings (you'd have to have a class factory, which gets ugly).
43- The Django method is to have a configuration file (e.g. ``settings.py``,
44 which can import from other things) that is read by Django and then becomes
45 importable by anything at startup as ``django.conf.settings``. (I've added
46 local settings via an environment variable.) The way Django knows where to
47 look is via this in ``manage.py``:
49 .. code-block:: python
51 os.environ.setdefault("DJANGO_SETTINGS_MODULE",
52 "crate_anon.crateweb.config.settings")
54 Advantage: setting the config file via an environment variable (read when
55 the config file loads) allows guaranteed config existence as other modules
56 start.
58 Further advantage: config filenames not on command line, therefore not
59 visible to ``ps``.
61 Disadvantage: how do you override with a command-line (argparse) setting?
62 .. though: who cares?
64 To print a config using that file: raise an exception on nonexistent
65 config, and catch it with a special entry point script.
67- See also
68 https://stackoverflow.com/questions/7443366/argument-passing-strategy-environment-variables-vs-command-line
70"""
72# =============================================================================
73# Imports
74# =============================================================================
76import fnmatch
77from io import StringIO
78import logging
79import os
80import sys
81import traceback
82from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, Union, Set
84from cardinal_pythonlib.hash import GenericHasher, make_hasher
85from cardinal_pythonlib.logs import remove_all_logger_handlers
86from cardinal_pythonlib.sql.validation import (
87 ensure_valid_field_name,
88 ensure_valid_table_name,
89)
90from cardinal_pythonlib.sqlalchemy.schema import is_sqlatype_integer
91from cardinal_pythonlib.sizeformatter import sizeof_fmt
92import regex
93from sqlalchemy import BigInteger, create_engine, String
94from sqlalchemy.dialects.mssql.base import dialect as ms_sql_server_dialect
95from sqlalchemy.dialects.mysql.base import dialect as mysql_dialect
96from sqlalchemy.engine.base import Engine
97from sqlalchemy.engine.interfaces import Dialect
98from sqlalchemy.sql.sqltypes import TypeEngine
100# noinspection PyPep8Naming
101from crate_anon.anonymise.constants import (
102 ANON_CONFIG_ENV_VAR,
103 AnonymiseConfigDefaults as DA,
104 AnonymiseConfigKeys as AK,
105 AnonymiseDatabaseSafeConfigDefaults as DS,
106 AnonymiseDatabaseSafeConfigKeys as SK,
107 DEFAULT_CHUNKSIZE,
108 DEFAULT_REPORT_EVERY,
109 HashConfigKeys as HK,
110 SEP,
111)
112from crate_anon.anonymise.demo_config import get_demo_config
113from crate_anon.anonymise.dd import DataDictionary
114from crate_anon.anonymise.scrub import (
115 NonspecificScrubber,
116 WordList,
117)
118from crate_anon.common.constants import RUNNING_WITHOUT_CONFIG
119from crate_anon.common.extendedconfigparser import (
120 ConfigSection,
121 ExtendedConfigParser,
122)
123from crate_anon.common.sql import TransactionSizeLimiter
124from crate_anon.nlp_manager.constants import DatabaseConfigKeys
126if TYPE_CHECKING:
127 from crate_anon.anonymise.dbholder import DatabaseHolder
129log = logging.getLogger(__name__)
132# =============================================================================
133# Config/databases
134# =============================================================================
137class DatabaseSafeConfig:
138 """
139 Class representing non-sensitive configuration information about a
140 source database.
141 """
143 def __init__(self, parser: ExtendedConfigParser, section: str) -> None:
144 """
145 Read from a configparser section.
147 Args:
148 parser: configparser object
149 section: section name
150 """
151 cfg = ConfigSection(section=section, parser=parser)
153 cfg.require_absent(
154 SK.DEPRECATED_DDGEN_TABLE_BLACKLIST,
155 f"Replace {SK.DEPRECATED_DDGEN_TABLE_BLACKLIST!r} with "
156 f"{SK.DDGEN_TABLE_DENYLIST!r}",
157 )
158 cfg.require_absent(
159 SK.DEPRECATED_DDGEN_TABLE_WHITELIST,
160 f"Replace {SK.DEPRECATED_DDGEN_TABLE_WHITELIST!r} with "
161 f"{SK.DDGEN_TABLE_ALLOWLIST!r}",
162 )
163 cfg.require_absent(
164 SK.DEPRECATED_DDGEN_FIELD_BLACKLIST,
165 f"Replace {SK.DEPRECATED_DDGEN_FIELD_BLACKLIST!r} with "
166 f"{SK.DDGEN_FIELD_DENYLIST!r}",
167 )
168 cfg.require_absent(
169 SK.DEPRECATED_DDGEN_FIELD_WHITELIST,
170 f"Replace {SK.DEPRECATED_DDGEN_FIELD_WHITELIST!r} with "
171 f"{SK.DDGEN_FIELD_ALLOWLIST!r}",
172 )
174 self.ddgen_append_source_info_to_comment = cfg.opt_bool(
175 SK.DDGEN_APPEND_SOURCE_INFO_TO_COMMENT,
176 DS.DDGEN_APPEND_SOURCE_INFO_TO_COMMENT,
177 )
178 self.ddgen_omit_by_default = cfg.opt_bool(
179 SK.DDGEN_OMIT_BY_DEFAULT, DS.DDGEN_OMIT_BY_DEFAULT
180 )
181 self.ddgen_omit_fields = cfg.opt_multiline(SK.DDGEN_OMIT_FIELDS)
182 self.ddgen_include_fields = cfg.opt_multiline(SK.DDGEN_INCLUDE_FIELDS)
184 self.ddgen_per_table_pid_field = cfg.opt_str(
185 SK.DDGEN_PER_TABLE_PID_FIELD
186 )
187 self.ddgen_table_defines_pids = cfg.opt_str(
188 SK.DDGEN_TABLE_DEFINES_PIDS
189 )
190 self.ddgen_add_per_table_pids_to_scrubber = cfg.opt_bool(
191 SK.DDGEN_ADD_PER_TABLE_PIDS_TO_SCRUBBER,
192 DS.DDGEN_ADD_PER_TABLE_PIDS_TO_SCRUBBER,
193 )
194 self.ddgen_master_pid_fieldname = cfg.opt_str(
195 SK.DDGEN_MASTER_PID_FIELDNAME
196 )
197 self.ddgen_table_denylist = cfg.opt_multiline(SK.DDGEN_TABLE_DENYLIST)
198 self.ddgen_table_allowlist = cfg.opt_multiline(
199 SK.DDGEN_TABLE_ALLOWLIST
200 )
201 self.ddgen_table_require_field_absolute = cfg.opt_multiline(
202 SK.DDGEN_TABLE_REQUIRE_FIELD_ABSOLUTE
203 )
204 self.ddgen_table_require_field_conditional = (
205 cfg.opt_multiline_csv_pairs(
206 SK.DDGEN_TABLE_REQUIRE_FIELD_CONDITIONAL
207 )
208 )
209 self.ddgen_field_denylist = cfg.opt_multiline(SK.DDGEN_FIELD_DENYLIST)
210 self.ddgen_field_allowlist = cfg.opt_multiline(
211 SK.DDGEN_FIELD_ALLOWLIST
212 )
213 self.ddgen_pk_fields = cfg.opt_multiline(SK.DDGEN_PK_FIELDS)
214 self.ddgen_prefer_original_pk = cfg.opt_bool(
215 SK.DDGEN_PREFER_ORIGINAL_PK, DS.DDGEN_PREFER_ORIGINAL_PK
216 )
218 self.ddgen_constant_content = cfg.opt_bool(
219 SK.DDGEN_CONSTANT_CONTENT, DS.DDGEN_CONSTANT_CONTENT
220 )
221 self.ddgen_constant_content_tables = cfg.opt_str(
222 SK.DDGEN_CONSTANT_CONTENT_TABLES
223 )
224 self.ddgen_nonconstant_content_tables = cfg.opt_str(
225 SK.DDGEN_NONCONSTANT_CONTENT_TABLES
226 )
227 self.ddgen_addition_only = cfg.opt_bool(
228 SK.DDGEN_ADDITION_ONLY, DS.DDGEN_ADDITION_ONLY
229 )
230 self.ddgen_addition_only_tables = cfg.opt_str(
231 SK.DDGEN_ADDITION_ONLY_TABLES
232 )
233 self.ddgen_deletion_possible_tables = cfg.opt_str(
234 SK.DDGEN_DELETION_POSSIBLE_TABLES
235 )
237 self.ddgen_pid_defining_fieldnames = cfg.opt_multiline(
238 SK.DDGEN_PID_DEFINING_FIELDNAMES
239 )
240 self.ddgen_scrubsrc_patient_fields = cfg.opt_multiline(
241 SK.DDGEN_SCRUBSRC_PATIENT_FIELDS
242 )
243 self.ddgen_scrubsrc_thirdparty_fields = cfg.opt_multiline(
244 SK.DDGEN_SCRUBSRC_THIRDPARTY_FIELDS
245 )
246 self.ddgen_scrubsrc_thirdparty_xref_pid_fields = cfg.opt_multiline(
247 SK.DDGEN_SCRUBSRC_THIRDPARTY_XREF_PID_FIELDS
248 )
249 self.ddgen_required_scrubsrc_fields = cfg.opt_multiline(
250 SK.DDGEN_REQUIRED_SCRUBSRC_FIELDS
251 )
252 self.ddgen_scrubmethod_code_fields = cfg.opt_multiline(
253 SK.DDGEN_SCRUBMETHOD_CODE_FIELDS
254 )
255 self.ddgen_scrubmethod_date_fields = cfg.opt_multiline(
256 SK.DDGEN_SCRUBMETHOD_DATE_FIELDS
257 )
258 self.ddgen_scrubmethod_number_fields = cfg.opt_multiline(
259 SK.DDGEN_SCRUBMETHOD_NUMBER_FIELDS
260 )
261 self.ddgen_scrubmethod_phrase_fields = cfg.opt_multiline(
262 SK.DDGEN_SCRUBMETHOD_PHRASE_FIELDS
263 )
264 self.ddgen_safe_fields_exempt_from_scrubbing = cfg.opt_multiline(
265 SK.DDGEN_SAFE_FIELDS_EXEMPT_FROM_SCRUBBING
266 )
267 self.ddgen_min_length_for_scrubbing = cfg.opt_int(
268 SK.DDGEN_MIN_LENGTH_FOR_SCRUBBING,
269 DS.DDGEN_MIN_LENGTH_FOR_SCRUBBING,
270 )
272 self.ddgen_truncate_date_fields = cfg.opt_multiline(
273 SK.DDGEN_TRUNCATE_DATE_FIELDS
274 )
275 self.ddgen_filename_to_text_fields = cfg.opt_multiline(
276 SK.DDGEN_FILENAME_TO_TEXT_FIELDS
277 )
279 self.bin2text_dict = cfg.opt_multiline_csv_pairs(
280 SK.DDGEN_BINARY_TO_TEXT_FIELD_PAIRS
281 )
282 self.ddgen_skip_row_if_extract_text_fails_fields = cfg.opt_multiline(
283 SK.DDGEN_SKIP_ROW_IF_EXTRACT_TEXT_FAILS_FIELDS
284 )
285 self.ddgen_rename_tables_remove_suffixes = cfg.opt_multiline(
286 SK.DDGEN_RENAME_TABLES_REMOVE_SUFFIXES, as_words=True
287 )
289 self.ddgen_index_fields = cfg.opt_multiline(SK.DDGEN_INDEX_FIELDS)
290 self.ddgen_allow_fulltext_indexing = cfg.opt_bool(
291 SK.DDGEN_ALLOW_FULLTEXT_INDEXING, DS.DDGEN_ALLOW_FULLTEXT_INDEXING
292 )
293 self.ddgen_freetext_index_min_length = cfg.opt_int(
294 SK.DDGEN_FREETEXT_INDEX_MIN_LENGTH,
295 DS.DDGEN_FREETEXT_INDEX_MIN_LENGTH,
296 )
298 self.ddgen_force_lower_case = cfg.opt_bool(
299 SK.DDGEN_FORCE_LOWER_CASE, DS.DDGEN_FORCE_LOWER_CASE
300 )
301 self.ddgen_convert_odd_chars_to_underscore = cfg.opt_bool(
302 SK.DDGEN_CONVERT_ODD_CHARS_TO_UNDERSCORE,
303 DS.DDGEN_CONVERT_ODD_CHARS_TO_UNDERSCORE,
304 )
306 self.debug_row_limit = cfg.opt_int(
307 SK.DEBUG_ROW_LIMIT, DS.DEBUG_ROW_LIMIT
308 )
309 self.debug_limited_tables = cfg.opt_multiline(SK.DEBUG_LIMITED_TABLES)
311 self.ddgen_patient_opt_out_fields = cfg.opt_multiline(
312 SK.DDGEN_PATIENT_OPT_OUT_FIELDS
313 )
315 self.ddgen_extra_hash_fields = cfg.opt_multiline_csv_pairs(
316 SK.DDGEN_EXTRA_HASH_FIELDS
317 )
318 # ... key: fieldspec
319 # ... value: hash_config_section_name
321 self.pidtype = BigInteger()
322 self.mpidtype = BigInteger()
324 def is_table_denied(self, table: str) -> bool:
325 """
326 Is the table name denylisted (and not also allowlisted)?
327 """
328 for allow in self.ddgen_table_allowlist:
329 r = regex.compile(fnmatch.translate(allow), regex.IGNORECASE)
330 if r.match(table):
331 return False
332 for deny in self.ddgen_table_denylist:
333 r = regex.compile(fnmatch.translate(deny), regex.IGNORECASE)
334 if r.match(table):
335 return True
336 return False
338 def is_field_denied(self, field: str) -> bool:
339 """
340 Is the field name denylisted (and not also allowlisted)?
341 """
342 for allow in self.ddgen_field_allowlist:
343 r = regex.compile(fnmatch.translate(allow), regex.IGNORECASE)
344 if r.match(field):
345 return True
346 for deny in self.ddgen_field_denylist:
347 r = regex.compile(fnmatch.translate(deny), regex.IGNORECASE)
348 if r.match(field):
349 return True
350 return False
352 def does_table_fail_minimum_fields(self, colnames: List[str]) -> bool:
353 """
354 For use when creating a data dictionary automatically:
356 Does a table with the specified column names fail our minimum
357 requirements? These requirements are set by our
358 ``ddgen_table_require_field_absolute`` and
359 ``ddgen_table_require_field_conditional`` configuration parameters.
361 Args:
362 colnames: list of column names for the table
364 Returns:
365 does it fail?
366 """
367 for abs_req in self.ddgen_table_require_field_absolute:
368 if abs_req not in colnames:
369 log.debug(
370 f"Table fails minimum field requirements: no column "
371 f"named {abs_req!r}"
372 )
373 return True
374 for (
375 if_field,
376 then_field,
377 ) in self.ddgen_table_require_field_conditional.items():
378 if if_field in colnames and then_field not in colnames:
379 log.debug(
380 f"Table fails minimum field requirements: "
381 f"field {if_field!r} present but "
382 f"field {then_field!r} absent"
383 )
384 return True
385 return False
388# =============================================================================
389# ExtraHashconfig
390# =============================================================================
393def get_extra_hasher(
394 parser: ExtendedConfigParser, section: str
395) -> GenericHasher:
396 """
397 Read hasher configuration from a configparser section, and return the
398 hasher.
400 Args:
401 parser: configparser object
402 section: section name
404 Returns:
405 the hasher
406 """
407 cfg = ConfigSection(section=section, parser=parser)
408 hash_method = cfg.opt_str(HK.HASH_METHOD, required=True)
409 secret_key = cfg.opt_str(HK.SECRET_KEY, required=True)
410 return make_hasher(hash_method, secret_key)
413# =============================================================================
414# WordAlternatives
415# =============================================================================
418def get_word_alternatives(filenames: List[str]) -> List[List[str]]:
419 """
420 Reads in a list of word alternatives, from one or more
421 comma-separated-value (CSV) text files (also accepting comment lines
422 starting with #, and allowing different numbers of columns on different
423 lines).
425 All entries on one line will be substituted for each other, if alternatives
426 are enabled.
428 Produces a list of equivalent-word lists.
430 Arbitrarily, uses upper case internally. (All CRATE regex replacements are
431 case-insensitive.)
433 An alternatives file might look like this:
435 .. code-block:: none
437 # Street types
438 # https://en.wikipedia.org/wiki/Street_suffix
440 avenue, circus, close, crescent, drive, gardens, grove, hill, lane, mead, mews, place, rise, road, row, square, street, vale, way, wharf
442 Args:
443 filenames: filenames to read from
445 Returns:
446 a list of lists of equivalent words
448 """ # noqa: E501
449 alternatives = [] # type: List[List[str]]
450 all_words_seen = set() # type: Set[str]
451 for filename in filenames:
452 with open(filename, "r") as alt_file:
453 for line in alt_file:
454 line = line.strip()
455 if not line: # blank line
456 continue
457 if line.startswith("#"): # comment line
458 continue
459 equivalent_words = [w.strip().upper() for w in line.split(",")]
460 equivalent_words = [
461 w for w in equivalent_words if w
462 ] # remove empties
463 if len(equivalent_words) < 2:
464 continue
465 for w in equivalent_words:
466 if w in all_words_seen:
467 raise ValueError(
468 f"Word {w!r} appears twice in "
469 f"alternatives list! Invalid"
470 )
471 all_words_seen.add(w)
472 alternatives.append(equivalent_words)
473 return alternatives
476# =============================================================================
477# get_sqlatype
478# =============================================================================
481def get_sqlatype(sqlatype: str) -> TypeEngine:
482 """
483 Converts a string, like "VARCHAR(10)", to an SQLAlchemy type.
485 Since we might have to return String(length=...), we have to return
486 an instance, not a class.
487 """
488 if sqlatype == "BigInteger":
489 return BigInteger()
490 r = regex.compile(r"String\((\d+)\)") # e.g. String(50)
491 try:
492 m = r.match(sqlatype)
493 length = int(m.group(1))
494 return String(length)
495 except (AttributeError, ValueError):
496 raise ValueError(
497 f"Bad SQLAlchemy type specification for "
498 f"PID/MPID columns: {sqlatype!r}"
499 )
502# =============================================================================
503# Config
504# =============================================================================
507class Config:
508 """
509 Class representing the main CRATE anonymiser configuration.
510 """
512 def __init__(
513 self, open_databases: bool = True, mock: bool = False
514 ) -> None:
515 """
516 Read the config from the file specified in the ``CRATE_ANON_CONFIG``
517 environment variable.
519 Args:
520 open_databases: open SQLAlchemy connections to the databases?
521 mock: create mock (dummy) config?
522 """
524 # Get filename
525 try:
526 self.config_filename = os.environ[ANON_CONFIG_ENV_VAR] # may raise
527 assert self.config_filename # may raise
528 filename = self.config_filename
529 fileobj = None
530 except (KeyError, AssertionError):
531 if RUNNING_WITHOUT_CONFIG or mock:
532 # Running in a mock environment; no config required
533 filename = None
534 fileobj = StringIO(get_demo_config())
535 else:
536 print(
537 f"You must set the {ANON_CONFIG_ENV_VAR} environment "
538 f"variable to point to a CRATE anonymisation config file, "
539 f"or specify it on the command line."
540 )
541 traceback.print_exc() # full details, please
542 sys.exit(1)
544 cfg = ConfigSection(
545 section=AK.SECTION_MAIN, filename=filename, fileobj=fileobj
546 )
547 parser = cfg.parser
549 def get_database(
550 section_: str,
551 name: str,
552 srccfg_: DatabaseSafeConfig = None,
553 with_session: bool = False,
554 with_conn: bool = True,
555 reflect: bool = True,
556 ) -> "DatabaseHolder":
557 return parser.get_database(
558 section_,
559 dbname=name,
560 srccfg=srccfg_,
561 with_session=with_session,
562 with_conn=with_conn,
563 reflect=reflect,
564 )
566 # ---------------------------------------------------------------------
567 # Data dictionary
568 # ---------------------------------------------------------------------
570 self.data_dictionary_filename = cfg.opt_str(
571 AK.DATA_DICTIONARY_FILENAME
572 )
574 # ---------------------------------------------------------------------
575 # Critical field types
576 # ---------------------------------------------------------------------
578 self.pidtype = get_sqlatype(
579 cfg.opt_str(AK.SQLATYPE_PID, DA.SQLATYPE_PID)
580 )
581 self.pidtype_is_integer = is_sqlatype_integer(self.pidtype)
582 self.mpidtype = get_sqlatype(
583 cfg.opt_str(AK.SQLATYPE_MPID, DA.SQLATYPE_MPID)
584 )
585 self.mpidtype_is_integer = is_sqlatype_integer(self.mpidtype)
587 # ---------------------------------------------------------------------
588 # Encryption phrases/passwords
589 # ---------------------------------------------------------------------
591 self.hash_method = cfg.opt_str(AK.HASH_METHOD, DA.HASH_METHOD)
592 self.per_table_patient_id_encryption_phrase = cfg.opt_str(
593 AK.PER_TABLE_PATIENT_ID_ENCRYPTION_PHRASE
594 )
595 self.master_patient_id_encryption_phrase = cfg.opt_str(
596 AK.MASTER_PATIENT_ID_ENCRYPTION_PHRASE
597 )
598 self.change_detection_encryption_phrase = cfg.opt_str(
599 AK.CHANGE_DETECTION_ENCRYPTION_PHRASE
600 )
601 _extra_hash_config_section_names = cfg.opt_multiline(
602 AK.EXTRA_HASH_CONFIG_SECTIONS
603 )
605 self.extra_hashers = {} # type: Dict[str, GenericHasher]
606 for hasher_name in _extra_hash_config_section_names:
607 self.extra_hashers[hasher_name] = get_extra_hasher(
608 parser, hasher_name
609 )
610 # Load encryption keys and create hashers
611 dummyhash = make_hasher(self.hash_method, "dummysalt")
612 encrypted_length = dummyhash.output_length()
614 self.sqltype_encrypted_pid = String(encrypted_length)
615 self.sqltype_encrypted_pid_as_sql = str(self.sqltype_encrypted_pid)
616 # ... VARCHAR(32) for MD5; VARCHAR(64) for SHA-256; VARCHAR(128) for
617 # SHA-512.
619 if not self.per_table_patient_id_encryption_phrase:
620 raise ValueError(
621 f"Missing {AK.PER_TABLE_PATIENT_ID_ENCRYPTION_PHRASE}"
622 )
623 self.primary_pid_hasher = make_hasher(
624 self.hash_method, self.per_table_patient_id_encryption_phrase
625 )
627 if not self.master_patient_id_encryption_phrase:
628 raise ValueError(
629 f"Missing {AK.MASTER_PATIENT_ID_ENCRYPTION_PHRASE}"
630 )
631 self.master_pid_hasher = make_hasher(
632 self.hash_method, self.master_patient_id_encryption_phrase
633 )
635 if not self.change_detection_encryption_phrase:
636 raise ValueError(
637 f"Missing {AK.CHANGE_DETECTION_ENCRYPTION_PHRASE}"
638 )
639 self.change_detection_hasher = make_hasher(
640 self.hash_method, self.change_detection_encryption_phrase
641 )
643 # ---------------------------------------------------------------------
644 # Text extraction
645 # ---------------------------------------------------------------------
647 self.extract_text_extensions_case_sensitive = cfg.opt_bool(
648 AK.EXTRACT_TEXT_EXTENSIONS_CASE_SENSITIVE,
649 DA.EXTRACT_TEXT_EXTENSIONS_CASE_SENSITIVE,
650 )
651 self.extract_text_extensions_permitted = cfg.opt_multiline(
652 AK.EXTRACT_TEXT_EXTENSIONS_PERMITTED
653 )
654 self.extract_text_extensions_prohibited = cfg.opt_multiline(
655 AK.EXTRACT_TEXT_EXTENSIONS_PROHIBITED
656 )
657 self.extract_text_plain = cfg.opt_bool(
658 AK.EXTRACT_TEXT_PLAIN, DA.EXTRACT_TEXT_PLAIN
659 )
660 self.extract_text_width = cfg.opt_int(
661 AK.EXTRACT_TEXT_WIDTH, DA.EXTRACT_TEXT_WIDTH
662 )
664 # ---------------------------------------------------------------------
665 # Anonymisation
666 # ---------------------------------------------------------------------
668 cfg.require_absent(
669 AK.DEPRECATED_WHITELIST_FILENAMES,
670 f"Replace {AK.DEPRECATED_WHITELIST_FILENAMES!r} with "
671 f"{AK.ALLOWLIST_FILENAMES!r}",
672 )
673 cfg.require_absent(
674 AK.DEPRECATED_BLACKLIST_FILENAMES,
675 f"Replace {AK.DEPRECATED_BLACKLIST_FILENAMES!r} with "
676 f"{AK.DENYLIST_FILENAMES!r}",
677 )
679 self.allow_no_patient_info = cfg.opt_bool(
680 AK.ALLOW_NO_PATIENT_INFO, DA.ALLOW_NO_PATIENT_INFO
681 )
682 self.allowlist_filenames = cfg.opt_multiline(AK.ALLOWLIST_FILENAMES)
683 self.anonymise_codes_at_word_boundaries_only = cfg.opt_bool(
684 AK.ANONYMISE_CODES_AT_WORD_BOUNDARIES_ONLY,
685 DA.ANONYMISE_CODES_AT_WORD_BOUNDARIES_ONLY,
686 )
687 self.anonymise_codes_at_numeric_boundaries_only = cfg.opt_bool(
688 AK.ANONYMISE_CODES_AT_NUMERIC_BOUNDARIES_ONLY,
689 DA.ANONYMISE_CODES_AT_NUMERIC_BOUNDARIES_ONLY,
690 )
691 self.anonymise_dates_at_word_boundaries_only = cfg.opt_bool(
692 AK.ANONYMISE_DATES_AT_WORD_BOUNDARIES_ONLY,
693 DA.ANONYMISE_DATES_AT_WORD_BOUNDARIES_ONLY,
694 )
695 self.anonymise_numbers_at_word_boundaries_only = cfg.opt_bool(
696 AK.ANONYMISE_NUMBERS_AT_WORD_BOUNDARIES_ONLY,
697 DA.ANONYMISE_NUMBERS_AT_WORD_BOUNDARIES_ONLY,
698 )
699 self.anonymise_numbers_at_numeric_boundaries_only = cfg.opt_bool(
700 AK.ANONYMISE_NUMBERS_AT_NUMERIC_BOUNDARIES_ONLY,
701 DA.ANONYMISE_NUMBERS_AT_NUMERIC_BOUNDARIES_ONLY,
702 )
703 self.anonymise_strings_at_word_boundaries_only = cfg.opt_bool(
704 AK.ANONYMISE_STRINGS_AT_WORD_BOUNDARIES_ONLY,
705 DA.ANONYMISE_STRINGS_AT_WORD_BOUNDARIES_ONLY,
706 )
707 self.denylist_filenames = cfg.opt_multiline(AK.DENYLIST_FILENAMES)
708 self.denylist_files_as_phrases = cfg.opt_bool(
709 AK.DENYLIST_FILES_AS_PHRASES, DA.DENYLIST_FILES_AS_PHRASES
710 )
711 self.denylist_use_regex = cfg.opt_bool(
712 AK.DENYLIST_USE_REGEX, DA.DENYLIST_USE_REGEX
713 )
714 self.min_string_length_for_errors = cfg.opt_int(
715 AK.MIN_STRING_LENGTH_FOR_ERRORS, DA.MIN_STRING_LENGTH_FOR_ERRORS
716 )
717 self.min_string_length_to_scrub_with = cfg.opt_int(
718 AK.MIN_STRING_LENGTH_TO_SCRUB_WITH,
719 DA.MIN_STRING_LENGTH_TO_SCRUB_WITH,
720 )
721 self.nonspecific_scrubber_first = cfg.opt_bool(
722 AK.NONSPECIFIC_SCRUBBER_FIRST, DA.NONSPECIFIC_SCRUBBER_FIRST
723 )
724 self.phrase_alternative_word_filenames = cfg.opt_multiline(
725 AK.PHRASE_ALTERNATIVE_WORD_FILENAMES
726 )
727 self.replace_all_dates_with = cfg.opt_str(
728 AK.REPLACE_ALL_DATES_WITH, DA.REPLACE_ALL_DATES_WITH
729 )
730 self.replace_patient_info_with = cfg.opt_str(
731 AK.REPLACE_PATIENT_INFO_WITH, DA.REPLACE_PATIENT_INFO_WITH
732 )
733 self.replace_third_party_info_with = cfg.opt_str(
734 AK.REPLACE_THIRD_PARTY_INFO_WITH, DA.REPLACE_THIRD_PARTY_INFO_WITH
735 )
736 self.replace_nonspecific_info_with = cfg.opt_str(
737 AK.REPLACE_NONSPECIFIC_INFO_WITH, DA.REPLACE_NONSPECIFIC_INFO_WITH
738 )
739 self.scrub_all_dates = cfg.opt_bool(
740 AK.SCRUB_ALL_DATES, DA.SCRUB_ALL_DATES
741 )
742 self.scrub_all_email_addresses = cfg.opt_bool(
743 AK.SCRUB_ALL_EMAIL_ADDRESSES, DA.SCRUB_ALL_EMAIL_ADDRESSES
744 )
745 self.scrub_all_numbers_of_n_digits = cfg.opt_multiline_int(
746 AK.SCRUB_ALL_NUMBERS_OF_N_DIGITS, minimum=1
747 )
748 self.scrub_all_uk_postcodes = cfg.opt_bool(
749 AK.SCRUB_ALL_UK_POSTCODES, DA.SCRUB_ALL_UK_POSTCODES
750 )
751 self.scrub_string_suffixes = cfg.opt_multiline(
752 AK.SCRUB_STRING_SUFFIXES
753 )
754 self.string_max_regex_errors = cfg.opt_int(
755 AK.STRING_MAX_REGEX_ERRORS, DA.STRING_MAX_REGEX_ERRORS
756 )
757 self.thirdparty_xref_max_depth = cfg.opt_int(
758 AK.THIRDPARTY_XREF_MAX_DEPTH, DA.THIRDPARTY_XREF_MAX_DEPTH
759 )
760 self.timefield = cfg.opt_str(AK.TIMEFIELD_NAME, DA.TIMEFIELD_NAME)
762 # Get all extra regexes
763 if parser.has_section(AK.SECTION_EXTRA_REGEXES):
764 self.extra_regexes = [
765 x[1] for x in parser.items(AK.SECTION_EXTRA_REGEXES)
766 ]
767 else:
768 self.extra_regexes = [] # type: List[str]
770 if not self.extract_text_extensions_case_sensitive:
771 self.extract_text_extensions_permitted = [
772 x.upper() for x in self.extract_text_extensions_permitted
773 ]
774 self.extract_text_extensions_permitted = [
775 x.upper() for x in self.extract_text_extensions_permitted
776 ]
778 # allowlist, denylist, nonspecific scrubber, alternative words
779 self.allowlist = WordList(
780 filenames=self.allowlist_filenames,
781 hasher=self.change_detection_hasher,
782 )
783 self.denylist = WordList(
784 filenames=self.denylist_filenames,
785 as_phrases=self.denylist_files_as_phrases,
786 replacement_text=self.replace_nonspecific_info_with,
787 hasher=self.change_detection_hasher,
788 at_word_boundaries_only=(
789 self.anonymise_strings_at_word_boundaries_only # flexible
790 if self.denylist_use_regex
791 else True # required by Flashtext
792 ),
793 max_errors=0,
794 regex_method=self.denylist_use_regex,
795 )
796 self.nonspecific_scrubber = NonspecificScrubber(
797 replacement_text=self.replace_nonspecific_info_with,
798 hasher=self.change_detection_hasher,
799 anonymise_codes_at_word_boundaries_only=(
800 self.anonymise_codes_at_word_boundaries_only
801 ),
802 anonymise_dates_at_word_boundaries_only=(
803 self.anonymise_dates_at_word_boundaries_only
804 ),
805 anonymise_numbers_at_word_boundaries_only=(
806 self.anonymise_numbers_at_word_boundaries_only
807 ),
808 denylist=self.denylist,
809 scrub_all_numbers_of_n_digits=self.scrub_all_numbers_of_n_digits,
810 scrub_all_uk_postcodes=self.scrub_all_uk_postcodes,
811 scrub_all_dates=self.scrub_all_dates,
812 scrub_all_email_addresses=self.scrub_all_email_addresses,
813 replacement_text_all_dates=self.replace_all_dates_with,
814 extra_regexes=self.extra_regexes,
815 )
816 self.phrase_alternative_words = get_word_alternatives(
817 self.phrase_alternative_word_filenames
818 )
820 # ---------------------------------------------------------------------
821 # Output fields and formatting
822 # ---------------------------------------------------------------------
824 self.research_id_fieldname = cfg.opt_str(
825 AK.RESEARCH_ID_FIELDNAME, DA.RESEARCH_ID_FIELDNAME
826 )
827 self.trid_fieldname = cfg.opt_str(AK.TRID_FIELDNAME, DA.TRID_FIELDNAME)
828 self.master_research_id_fieldname = cfg.opt_str(
829 AK.MASTER_RESEARCH_ID_FIELDNAME, DA.MASTER_RESEARCH_ID_FIELDNAME
830 )
831 self.add_mrid_wherever_rid_added = cfg.opt_bool(
832 AK.ADD_MRID_WHEREVER_RID_ADDED, DA.ADD_MRID_WHEREVER_RID_ADDED
833 )
834 self.source_hash_fieldname = cfg.opt_str(
835 AK.SOURCE_HASH_FIELDNAME, DA.SOURCE_HASH_FIELDNAME
836 )
838 # ---------------------------------------------------------------------
839 # Destination database configuration
840 # ---------------------------------------------------------------------
842 self.max_rows_before_commit = cfg.opt_int(
843 AK.MAX_ROWS_BEFORE_COMMIT, DA.MAX_ROWS_BEFORE_COMMIT
844 )
845 self.max_bytes_before_commit = cfg.opt_int(
846 AK.MAX_BYTES_BEFORE_COMMIT, DA.MAX_BYTES_BEFORE_COMMIT
847 )
848 self.temporary_tablename = cfg.opt_str(
849 AK.TEMPORARY_TABLENAME, DA.TEMPORARY_TABLENAME
850 )
852 # ---------------------------------------------------------------------
853 # Databases
854 # ---------------------------------------------------------------------
856 destination_database_cfg_section = cfg.opt_str(AK.DESTINATION_DATABASE)
857 self._destination_database_url = parser.get_str(
858 destination_database_cfg_section,
859 DatabaseConfigKeys.URL,
860 required=True,
861 )
862 admin_database_cfg_section = cfg.opt_str(AK.ADMIN_DATABASE)
863 if destination_database_cfg_section == admin_database_cfg_section:
864 raise ValueError(
865 "Destination and admin databases mustn't be the same"
866 )
867 source_database_cfg_sections = cfg.opt_multiline(AK.SOURCE_DATABASES)
868 self._source_db_names = source_database_cfg_sections
869 if destination_database_cfg_section in source_database_cfg_sections:
870 raise ValueError(
871 "Destination database mustn't be listed as a "
872 "source database"
873 )
874 if admin_database_cfg_section in source_database_cfg_sections:
875 raise ValueError(
876 "Admin database mustn't be listed as a " "source database"
877 )
879 if RUNNING_WITHOUT_CONFIG:
880 self.destdb = None # type: Optional[DatabaseHolder]
881 self._dest_dialect = mysql_dialect
882 self._destdb_transaction_limiter = None
883 else:
884 self.destdb = get_database(
885 destination_database_cfg_section,
886 name=destination_database_cfg_section,
887 with_session=open_databases,
888 with_conn=False,
889 reflect=False,
890 )
891 if not self.destdb:
892 raise ValueError("Destination database misconfigured")
893 if open_databases:
894 self._dest_dialect = self.destdb.engine.dialect
895 else: # in context of web framework, some sort of default
896 self._dest_dialect = mysql_dialect
897 self._destdb_transaction_limiter = TransactionSizeLimiter(
898 session=self.destdb.session,
899 max_bytes_before_commit=self.max_bytes_before_commit,
900 max_rows_before_commit=self.max_rows_before_commit,
901 )
903 if RUNNING_WITHOUT_CONFIG:
904 self.admindb = None # type: Optional[DatabaseHolder]
905 else:
906 self.admindb = get_database(
907 admin_database_cfg_section,
908 name=admin_database_cfg_section,
909 with_session=open_databases,
910 with_conn=False,
911 reflect=open_databases,
912 )
913 if not self.admindb:
914 raise ValueError("Admin database misconfigured")
916 self.sources = {} # type: Dict[str, DatabaseHolder]
917 self.src_dialects = {} # type: Dict[str, Dialect]
918 for sourcedb_name in source_database_cfg_sections:
919 if RUNNING_WITHOUT_CONFIG:
920 continue
921 log.info(f"Adding source database: {sourcedb_name}")
922 srccfg = DatabaseSafeConfig(parser, sourcedb_name)
923 srcdb = get_database(
924 sourcedb_name,
925 srccfg_=srccfg,
926 name=sourcedb_name,
927 with_session=open_databases,
928 with_conn=False,
929 reflect=open_databases,
930 )
931 if not srcdb:
932 raise ValueError(
933 f"Source database {sourcedb_name} misconfigured"
934 )
935 self.sources[sourcedb_name] = srcdb
936 if open_databases:
937 self.src_dialects[sourcedb_name] = srcdb.engine.dialect
938 else: # in context of web framework
939 self.src_dialects[sourcedb_name] = ms_sql_server_dialect
941 # ---------------------------------------------------------------------
942 # Processing options
943 # ---------------------------------------------------------------------
945 self.debug_max_n_patients = cfg.opt_int(
946 AK.DEBUG_MAX_N_PATIENTS, DA.DEBUG_MAX_N_PATIENTS
947 )
948 self.debug_pid_list = cfg.opt_multiline(AK.DEBUG_PID_LIST)
950 # ---------------------------------------------------------------------
951 # Opting out entirely
952 # ---------------------------------------------------------------------
954 self.optout_pid_filenames = cfg.opt_multiline(AK.OPTOUT_PID_FILENAMES)
955 self.optout_mpid_filenames = cfg.opt_multiline(
956 AK.OPTOUT_MPID_FILENAMES
957 )
958 self.optout_col_values = cfg.opt_pyvalue_list(AK.OPTOUT_COL_VALUES)
960 # ---------------------------------------------------------------------
961 # Rest of initialization
962 # ---------------------------------------------------------------------
964 self.dd = DataDictionary(self)
966 self.rows_in_transaction = 0
967 self.bytes_in_transaction = 0
968 self.rows_inserted_per_table = {} # type: Dict[Tuple[str, str], int]
969 self.warned_re_limits = {} # type: Dict[Tuple[str, str], bool]
971 self.report_every_n_rows = DEFAULT_REPORT_EVERY
972 self.chunksize = DEFAULT_CHUNKSIZE
973 self.debug_scrubbers = False
974 self.save_scrubbers = False
976 self._src_bytes_read = 0
977 self._dest_bytes_written = 0
978 self._echo = False
980 def get_destdb_engine_outside_transaction(self) -> Engine:
981 """
982 Get a standalone SQLAlchemy Engine for the destination database, and
983 configure itself so transactions aren't used (equivalently:
984 ``autocommit`` is True; equivalently, the database commits after every
985 statement).
987 See
988 https://github.com/mkleehammer/pyodbc/wiki/Database-Transaction-Management
990 Returns:
991 the Engine
992 """
993 url = self._destination_database_url
994 return create_engine(
995 url,
996 echo=self._echo,
997 connect_args={"autocommit": True}, # for pyodbc
998 future=True,
999 )
1001 def overall_progress(self) -> str:
1002 """
1003 Returns a formatted description of the number of bytes read from the
1004 source database(s) and written to the destination database.
1006 (The Config is used to keep track of progress, via
1007 :func:`notify_src_bytes_read` and :func:`notify_dest_db_transaction`.)
1008 """
1009 return (
1010 f"{sizeof_fmt(self._src_bytes_read)} read, "
1011 f"{sizeof_fmt(self._dest_bytes_written)} written"
1012 )
1014 def load_dd(self, check_against_source_db: bool = True) -> None:
1015 """
1016 Loads the data dictionary (DD) into the config.
1018 Args:
1019 check_against_source_db:
1020 check DD validity against the source database?
1021 """
1022 log.info(
1023 SEP + f"Loading data dictionary: {self.data_dictionary_filename}"
1024 )
1025 self.dd.read_from_file(self.data_dictionary_filename)
1026 self.dd.check_valid(
1027 prohibited_fieldnames=[
1028 self.source_hash_fieldname,
1029 self.trid_fieldname,
1030 ],
1031 check_against_source_db=check_against_source_db,
1032 )
1033 self.init_row_counts()
1035 def init_row_counts(self) -> None:
1036 """
1037 Initialize the "number of rows inserted" counts to zero for all source
1038 tables.
1039 """
1040 self.rows_inserted_per_table = {} # type: Dict[Tuple[str, str], int]
1041 for db_table_tuple in self.dd.get_src_db_tablepairs():
1042 self.rows_inserted_per_table[db_table_tuple] = 0
1043 self.warned_re_limits[db_table_tuple] = False
1045 def check_valid(self) -> None:
1046 """
1047 Raise :exc:`ValueError` if the config is invalid.
1048 """
1050 # Destination databases
1051 if not self.destdb:
1052 raise ValueError(f"No {AK.DESTINATION_DATABASE} specified.")
1053 if not self.admindb:
1054 raise ValueError(f"No {AK.ADMIN_DATABASE} specified.")
1056 # Test table names
1057 if not self.temporary_tablename:
1058 raise ValueError(f"No {AK.TEMPORARY_TABLENAME} specified.")
1059 ensure_valid_table_name(self.temporary_tablename)
1061 # Test field names
1062 def validate_fieldattr(name: str) -> None:
1063 if not getattr(self, name):
1064 raise ValueError("Blank fieldname: " + name)
1065 ensure_valid_field_name(getattr(self, name))
1067 specialfieldlist = [
1068 # Our attributes have the same names as these parameters:
1069 AK.RESEARCH_ID_FIELDNAME,
1070 AK.TRID_FIELDNAME,
1071 AK.MASTER_RESEARCH_ID_FIELDNAME,
1072 AK.SOURCE_HASH_FIELDNAME,
1073 ]
1074 fieldset = set() # type: Set[str]
1075 for attrname in specialfieldlist:
1076 validate_fieldattr(attrname)
1077 fieldset.add(getattr(self, attrname))
1078 if len(fieldset) != len(specialfieldlist):
1079 raise ValueError(
1080 "Config: these must all be DIFFERENT fieldnames: "
1081 + ",".join(specialfieldlist)
1082 )
1084 # Test strings
1085 if not self.replace_patient_info_with:
1086 raise ValueError(f"Blank {AK.REPLACE_PATIENT_INFO_WITH}")
1087 if not self.replace_third_party_info_with:
1088 raise ValueError(f"Blank {AK.REPLACE_THIRD_PARTY_INFO_WITH}")
1089 if not self.replace_nonspecific_info_with:
1090 raise ValueError(f"Blank {AK.REPLACE_NONSPECIFIC_INFO_WITH}")
1091 replacements = list(
1092 {
1093 self.replace_patient_info_with,
1094 self.replace_third_party_info_with,
1095 self.replace_nonspecific_info_with,
1096 }
1097 )
1098 if len(replacements) != 3:
1099 # So inadvisable that we prevent it.
1100 raise ValueError(
1101 f"{AK.REPLACE_PATIENT_INFO_WITH}, "
1102 f"{AK.REPLACE_THIRD_PARTY_INFO_WITH}, and "
1103 f"{AK.REPLACE_NONSPECIFIC_INFO_WITH} should all be distinct"
1104 )
1106 # Regex
1107 if self.string_max_regex_errors < 0:
1108 raise ValueError(f"{AK.STRING_MAX_REGEX_ERRORS} < 0, nonsensical")
1109 if self.min_string_length_for_errors < 1:
1110 raise ValueError(
1111 f"{AK.MIN_STRING_LENGTH_FOR_ERRORS} < 1, nonsensical"
1112 )
1113 if self.min_string_length_to_scrub_with < 1:
1114 raise ValueError(
1115 f"{AK.MIN_STRING_LENGTH_TO_SCRUB_WITH} < 1, nonsensical"
1116 )
1118 # Source databases
1119 if not self.sources:
1120 raise ValueError("No source databases specified.")
1121 for dbname, dbinfo in self.sources.items():
1122 cfg = dbinfo.srccfg
1123 if cfg.ddgen_per_table_pid_field:
1124 ensure_valid_field_name(cfg.ddgen_per_table_pid_field)
1125 if cfg.ddgen_per_table_pid_field == self.source_hash_fieldname:
1126 raise ValueError(
1127 f"Config: {SK.DDGEN_PER_TABLE_PID_FIELD} "
1128 f"parameter can't be the same as "
1129 f"{AK.SOURCE_HASH_FIELDNAME}"
1130 )
1131 if cfg.ddgen_master_pid_fieldname:
1132 ensure_valid_field_name(cfg.ddgen_master_pid_fieldname)
1133 if (
1134 cfg.ddgen_master_pid_fieldname
1135 == self.source_hash_fieldname
1136 ):
1137 raise ValueError(
1138 f"Config: {SK.DDGEN_MASTER_PID_FIELDNAME} "
1139 f"parameter can't be the same as "
1140 f"{AK.SOURCE_HASH_FIELDNAME}"
1141 )
1143 # OK!
1144 log.debug("Config validated.")
1146 def encrypt_primary_pid(self, pid: Union[int, str]) -> str:
1147 """
1148 Encrypt a primary patient ID (PID), producing a research ID (RID).
1149 """
1150 if pid is None: # this is very unlikely!
1151 raise ValueError("Trying to hash NULL PID!")
1152 # ... see encrypt_master_pid() below
1153 return self.primary_pid_hasher.hash(pid)
1155 def encrypt_master_pid(self, mpid: Union[int, str]) -> Optional[str]:
1156 """
1157 Encrypt a master PID, producing a master RID (MRID).
1158 """
1159 if mpid is None:
1160 return None
1161 # potentially: risk of revealing the hash
1162 # and DEFINITELY: two patients, both with no NHS number, must not
1163 # be equated on the hash (e.g. hash(None) -> hash("None") -> ...)!
1164 return self.master_pid_hasher.hash(mpid)
1166 def hash_object(self, x: Any) -> str:
1167 """
1168 Hashes an object using our ``change_detection_hasher``.
1170 We could use Python's build-in :func:`hash` function, which produces a
1171 64-bit unsigned integer (calculated from: ``sys.maxint``). However,
1172 there is an outside chance that someone uses a single-field table and
1173 therefore that this is vulnerable to content discovery via a dictionary
1174 attack. Thus, we should use a better version.
1175 """
1176 return self.change_detection_hasher.hash(repr(x))
1178 def get_extra_hasher(self, hasher_name: str) -> GenericHasher:
1179 """
1180 Return a named hasher from our ``extra_hashers`` dictionary.
1182 Args:
1183 hasher_name: name of the hasher
1185 Returns:
1186 the hasher
1188 Raises:
1189 :exc:`ValueError` if it doesn't exist
1190 """
1191 if hasher_name not in self.extra_hashers.keys():
1192 raise ValueError(
1193 f"Extra hasher {hasher_name} requested but doesn't exist; "
1194 f"check you have listed it in "
1195 f"{AK.EXTRA_HASH_CONFIG_SECTIONS!r} in the config file"
1196 )
1197 return self.extra_hashers[hasher_name]
1199 @property
1200 def source_db_names(self) -> List[str]:
1201 """
1202 Get all source database names.
1203 """
1204 return self._source_db_names
1206 def set_echo(self, echo: bool) -> None:
1207 """
1208 Sets the "echo" property for all our SQLAlchemy database connections.
1210 Args:
1211 echo: show SQL?
1212 """
1213 self._echo = echo
1214 self.admindb.engine.echo = echo
1215 self.destdb.engine.echo = echo
1216 for db in self.sources.values():
1217 db.engine.echo = echo
1218 # Now, SQLAlchemy will mess things up by adding an additional handler.
1219 # So, bye-bye:
1220 for logname in (
1221 "sqlalchemy.engine.base.Engine",
1222 "sqlalchemy.engine.base.OptionEngine",
1223 ):
1224 logger = logging.getLogger(logname)
1225 # log.critical(logger.__dict__)
1226 remove_all_logger_handlers(logger)
1228 def get_src_dialect(self, src_db: str) -> Dialect:
1229 """
1230 Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for
1231 the specified source database.
1232 """
1233 return self.src_dialects[src_db]
1235 @property
1236 def dest_dialect(self) -> Dialect:
1237 """
1238 Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for
1239 the destination database.
1240 """
1241 return self._dest_dialect
1243 @property
1244 def dest_dialect_name(self) -> str:
1245 """
1246 Returns the SQLAlchemy name for the destination database dialect (e.g.
1247 ``mysql``).
1248 """
1249 return self._dest_dialect.name
1251 def commit_dest_db(self) -> None:
1252 """
1253 Executes a ``COMMIT`` on the destination database.
1254 """
1255 self._destdb_transaction_limiter.commit()
1257 def notify_src_bytes_read(self, n_bytes: int) -> None:
1258 """
1259 Use this function to tell the config how many bytes have been read
1260 from the source database. See, for example, :func:`overall_progress`.
1262 Args:
1263 n_bytes: the number of bytes read
1264 """
1265 self._src_bytes_read += n_bytes
1267 def notify_dest_db_transaction(self, n_rows: int, n_bytes: int) -> None:
1268 """
1269 Use this function to tell the config how many rows and bytes have been
1270 written to the source database. See, for example,
1271 :func:`overall_progress`.
1273 Note that this may trigger a ``COMMIT``, via our
1274 :class:`crate_anon.common.sql.TransactionSizeLimiter`.
1276 Args:
1277 n_rows: the number of rows written
1278 n_bytes: the number of bytes written
1279 """
1280 self._destdb_transaction_limiter.notify(n_rows=n_rows, n_bytes=n_bytes)
1281 # ... may trigger a commit
1282 self._dest_bytes_written += n_bytes
1284 def extract_text_extension_permissible(self, extension: str) -> bool:
1285 """
1286 Is this file extension (e.g. ``.doc``, ``.txt``) one that the config
1287 permits to use for text extraction?
1289 See the config options ``extract_text_extensions_permitted`` and
1290 ``extract_text_extensions_prohibited``.
1292 Args:
1293 extension: file extension, beginning with ``.``
1295 Returns:
1296 permitted?
1298 """
1299 if not self.extract_text_extensions_case_sensitive:
1300 extension = extension.upper()
1301 if self.extract_text_extensions_permitted:
1302 return extension in self.extract_text_extensions_permitted
1303 return extension not in self.extract_text_extensions_prohibited