Coverage for anonymise/dd.py: 27%

552 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-02-05 06:46 -0600

1""" 

2crate_anon/anonymise/dd.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Data dictionary classes for CRATE anonymiser.** 

27 

28The data dictionary is a TSV file, for ease of editing by multiple authors, 

29rather than a database table. 

30 

31""" 

32 

33# ============================================================================= 

34# Imports 

35# ============================================================================= 

36 

37from collections import Counter, OrderedDict 

38from dataclasses import dataclass 

39from functools import lru_cache 

40from itertools import zip_longest 

41import logging 

42import operator 

43from typing import ( 

44 AbstractSet, 

45 Any, 

46 Callable, 

47 Dict, 

48 List, 

49 Optional, 

50 Set, 

51 Tuple, 

52 TYPE_CHECKING, 

53 Union, 

54) 

55 

56from cardinal_pythonlib.sql.validation import is_sqltype_integer 

57from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName 

58from cardinal_pythonlib.sqlalchemy.schema import ( 

59 is_sqlatype_integer, 

60 is_sqlatype_string, 

61 is_sqlatype_text_over_one_char, 

62) 

63from sortedcontainers import SortedSet 

64from sqlalchemy import Column, Table, DateTime 

65from sqlalchemy.engine.interfaces import Dialect 

66from sqlalchemy.sql.sqltypes import String, TypeEngine 

67 

68# don't import config: circular dependency would have to be sorted out 

69from crate_anon.anonymise.constants import ( 

70 AlterMethodType, 

71 AnonymiseColumnComments, 

72 AnonymiseConfigKeys, 

73 Decision, 

74 IndexType, 

75 ScrubMethod, 

76 ScrubSrc, 

77 SrcFlag, 

78 TABLE_KWARGS, 

79 TridType, 

80) 

81from crate_anon.anonymise.ddr import DataDictionaryRow 

82from crate_anon.anonymise.scrub import PersonalizedScrubber 

83from crate_anon.common.spreadsheet import ( 

84 gen_rows_from_spreadsheet, 

85 SINGLE_SPREADSHEET_TYPE, 

86 write_spreadsheet, 

87) 

88from crate_anon.common.sql import matches_fielddef, ReflectedColumnInfo 

89 

90if TYPE_CHECKING: 

91 from crate_anon.anonymise.config import Config 

92 

93log = logging.getLogger(__name__) 

94 

95 

96# ============================================================================= 

97# Constants 

98# ============================================================================= 

99 

100STRING_LENGTH_FOR_BIGINT = len(str(-(2**63))) 

101# = -2^63: https://dev.mysql.com/doc/refman/8.0/en/integer-types.html 

102 

103 

104# ============================================================================= 

105# Helper classes 

106# ============================================================================= 

107 

108 

109@dataclass 

110class ScrubSourceFieldInfo: 

111 is_mpid: bool 

112 is_patient: bool 

113 recurse: bool 

114 required_scrubber: bool 

115 scrub_method: ScrubMethod 

116 signature: str 

117 value_fieldname: str 

118 

119 

120@dataclass 

121class DDTableSummary: 

122 # Which table? 

123 src_db: str 

124 src_table: str 

125 

126 # Source information 

127 src_has_pk: bool 

128 src_pk_fieldname: str 

129 src_constant: bool 

130 src_addition_only: bool 

131 src_defines_pid: bool 

132 src_has_pid: bool 

133 src_has_mpid: bool 

134 src_has_opt_out: bool 

135 src_has_patient_scrub_info: bool 

136 src_has_third_party_scrub_info: bool 

137 src_has_required_scrub_info: bool 

138 src_has_table_comment: bool 

139 

140 # Destination information 

141 dest_table: str 

142 dest_has_rows: bool 

143 dest_add_src_hash: bool 

144 dest_being_scrubbed: bool 

145 

146 

147# ============================================================================= 

148# Helper functions 

149# ============================================================================= 

150 

151 

152def ensure_no_source_type_mismatch( 

153 ddr: DataDictionaryRow, 

154 config_sqlatype: Union[TypeEngine, String], 

155 primary_pid: bool = True, 

156) -> None: 

157 """ 

158 Ensure that the source column type of a data dictionary row is compatible 

159 with what's expected from the config. We check this only for specific type 

160 of column (PID, MPID), because we need to know their data types concretely 

161 for the secret mapping table. The question is not whether the types are the 

162 same, but whether the value will fit into the config-determined type (for 

163 example, it's OK to convert an integer to a long-enough string but 

164 necessarily not the other way round). 

165 

166 Args: 

167 ddr: 

168 Data dictionary row. 

169 config_sqlatype: 

170 SQLAlchemy column type that would be expected based on the current 

171 config. 

172 primary_pid: 

173 Is this the main PID field? If false, it's the MPID. 

174 """ 

175 if primary_pid: 

176 human_type = "primary PID" 

177 configparam = AnonymiseConfigKeys.SQLATYPE_PID 

178 else: 

179 human_type = "master PID" 

180 configparam = AnonymiseConfigKeys.SQLATYPE_MPID 

181 rowtype = ddr.src_sqla_coltype 

182 suffix = "" 

183 error = True # we may downgrade to a warning 

184 destination_is_integer = is_sqlatype_integer(config_sqlatype) 

185 assert destination_is_integer or is_sqlatype_string(config_sqlatype), ( 

186 f"Bug: config parameter {configparam!r} has given a type of " 

187 f"{config_sqlatype}, which appears neither integer nor string." 

188 ) 

189 if is_sqlatype_integer(rowtype): 

190 # --------------------------------------------------------------------- 

191 # Integer source 

192 # --------------------------------------------------------------------- 

193 if destination_is_integer: 

194 # Good enough. The only integer type we use for storing a PID/MPID 

195 # in the secret mapping table is BigInteger, so any integer type 

196 # should fit. 

197 return 

198 else: 

199 # Storing an integer in a string. This may be OK, if the string is 

200 # long enough. We could do detailed checks here based on the type 

201 # of integer, but we'll be simple. 

202 if STRING_LENGTH_FOR_BIGINT <= config_sqlatype.length: 

203 # It'll fit! 

204 return 

205 else: 

206 suffix = ( 

207 f"Using a bigger string field in the config (minimum " 

208 f"length {STRING_LENGTH_FOR_BIGINT}) would fix this." 

209 ) 

210 elif is_sqlatype_string(rowtype): 

211 # --------------------------------------------------------------------- 

212 # String source 

213 # --------------------------------------------------------------------- 

214 if destination_is_integer: 

215 error = False 

216 suffix = ( 

217 "Warning only: this is acceptable if, but only if, the source " 

218 "string fields contain only integers." 

219 ) 

220 else: 

221 # Storing a string in a string. Fine if the destination is big 

222 # enough. 

223 # noinspection PyUnresolvedReferences 

224 if rowtype.length <= config_sqlatype.length: 

225 return 

226 else: 

227 suffix = ( 

228 f"Using a bigger string field in the config (minimum " 

229 f"length {rowtype.length}) would fix this." 

230 ) 

231 else: 

232 # e.g. something silly like a DATETIME source 

233 pass 

234 # Generic error or warning: 

235 msg = ( 

236 f"Source column {ddr.src_signature} is marked as a {human_type} field " 

237 f"but its type is {rowtype}, while the config thinks it should be " 

238 f"{config_sqlatype} (determined by the {configparam!r} parameter). " 

239 f"{suffix}" 

240 ) 

241 if error: 

242 raise ValueError(msg) 

243 else: 

244 log.warning(msg) 

245 

246 

247# ============================================================================= 

248# DataDictionary 

249# ============================================================================= 

250 

251 

252class DataDictionary: 

253 """ 

254 Class representing an entire data dictionary. 

255 """ 

256 

257 def __init__(self, config: "Config") -> None: 

258 """ 

259 Set defaults. 

260 

261 Args: 

262 config: :class:`crate_anon.anonymise.config.Config` 

263 """ 

264 self.config = config 

265 self.rows = [] # type: List[DataDictionaryRow] 

266 # noinspection PyArgumentList 

267 self.cached_srcdb_table_pairs = SortedSet() 

268 

269 # ------------------------------------------------------------------------- 

270 # Information 

271 # ------------------------------------------------------------------------- 

272 

273 @property 

274 def n_rows(self) -> int: 

275 """ 

276 Number of rows. 

277 """ 

278 return len(self.rows) 

279 

280 @property 

281 def dest_dialect(self) -> Dialect: 

282 """ 

283 Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for 

284 the destination database. 

285 """ 

286 return self.config.dest_dialect 

287 

288 @property 

289 def dest_dialect_name(self) -> str: 

290 """ 

291 Returns the SQLAlchemy dialect name for the destination database. 

292 """ 

293 return self.config.dest_dialect_name 

294 

295 # ------------------------------------------------------------------------- 

296 # Loading 

297 # ------------------------------------------------------------------------- 

298 

299 def read_from_file( 

300 self, 

301 filename: str, 

302 check_valid: bool = True, 

303 override_dialect: Dialect = None, 

304 ) -> None: 

305 """ 

306 Read DD from file. 

307 

308 Args: 

309 filename: 

310 Filename to read. 

311 check_valid: 

312 Run a validity check after setting each row from its values? 

313 override_dialect: 

314 SQLAlchemy SQL dialect to enforce (e.g. for interpreting 

315 textual column types in the source database). By default, the 

316 source database's own dialect is used. 

317 """ 

318 log.debug(f"Loading data dictionary: {filename}") 

319 row_gen = gen_rows_from_spreadsheet(filename) 

320 self._read_from_rows( 

321 row_gen, check_valid=check_valid, override_dialect=override_dialect 

322 ) 

323 

324 def _read_from_rows( 

325 self, 

326 rows: SINGLE_SPREADSHEET_TYPE, 

327 check_valid: bool = True, 

328 override_dialect: Dialect = None, 

329 ) -> None: 

330 """ 

331 Internal function to read from a set of rows, whatever the underlying 

332 format. 

333 

334 Args: 

335 rows: 

336 Iterable of rows (one per data dictionary row), each row being 

337 a list of values. 

338 check_valid: 

339 Run a validity check after setting the values? 

340 override_dialect: 

341 SQLAlchemy SQL dialect to enforce (e.g. for interpreting 

342 textual column types in the source database). By default, the 

343 source database's own dialect is used. 

344 """ 

345 # Clear existing data 

346 self.rows = [] # type: List[DataDictionaryRow] 

347 

348 # Headers 

349 # noinspection PyTypeChecker 

350 headers = next(rows) 

351 if not all(x in headers for x in DataDictionaryRow.ROWNAMES): 

352 actual = "\n".join( 

353 f"{i}. {h}" for i, h in enumerate(headers, start=1) 

354 ) 

355 desired = "\n".join( 

356 f"{i}. {h}" 

357 for i, h in enumerate(DataDictionaryRow.ROWNAMES, start=1) 

358 ) 

359 raise ValueError( 

360 f"Bad data dictionary file. Data dictionaries must be in " 

361 f"tabular format and contain the following headings:\n\n" 

362 f"{desired}\n\n" 

363 f"but yours are:\n\n" 

364 f"{actual}" 

365 ) 

366 log.debug("Data dictionary has correct header. Loading content...") 

367 

368 # Data 

369 for values in rows: 

370 if len(values) < len(headers): 

371 valuedict = dict(zip_longest(headers, values, fillvalue="")) 

372 else: 

373 valuedict = dict(zip(headers, values)) 

374 ddr = DataDictionaryRow(self.config) 

375 try: 

376 ddr.set_from_dict(valuedict, override_dialect=override_dialect) 

377 if check_valid: 

378 ddr.check_valid() 

379 except ValueError: 

380 log.critical(f"Offending input: {valuedict}") 

381 raise 

382 self.rows.append(ddr) 

383 log.debug("... content loaded.") 

384 

385 # Clear caches 

386 self.clear_caches() 

387 

388 @classmethod 

389 def create_from_file( 

390 cls, 

391 filename: str, 

392 config: "Config", 

393 check_valid: bool = True, 

394 override_dialect: Dialect = None, 

395 ) -> "DataDictionary": 

396 """ 

397 Creates a new data dictionary by reading a file. 

398 """ 

399 dd = DataDictionary(config) 

400 dd.read_from_file( 

401 filename, 

402 check_valid=check_valid, 

403 override_dialect=override_dialect, 

404 ) 

405 return dd 

406 

407 def draft_from_source_databases(self, report_every: int = 100) -> None: 

408 """ 

409 Create a draft DD from a source database. 

410 

411 Will skip any rows it knows about already (thus allowing the generation 

412 of incremental changes). 

413 

414 Args: 

415 report_every: report to the Python log every *n* columns 

416 """ 

417 log.info("Reading information for draft data dictionary") 

418 

419 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

420 # Scan databases 

421 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

422 existing_signatures = set(ddr.src_signature for ddr in self.rows) 

423 for pretty_dbname, db in self.config.sources.items(): 

424 log.info(f"... database nice name = {pretty_dbname}") 

425 cfg = db.srccfg 

426 meta = db.metadata 

427 i = 0 

428 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

429 # Scan each table 

430 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

431 for t in meta.sorted_tables: 

432 tablename = t.name 

433 log.info(f"... ... table: {tablename}") 

434 

435 # Skip table? 

436 if cfg.is_table_denied(tablename): 

437 log.debug(f"Skipping denied table: {tablename}") 

438 continue 

439 all_col_names = [c.name for c in t.columns] 

440 if cfg.does_table_fail_minimum_fields(all_col_names): 

441 log.debug( 

442 f"Skipping table {t} because it fails " 

443 f"minimum field requirements" 

444 ) 

445 continue 

446 

447 # Does the database know the PK for this table? 

448 table_has_explicit_pk = any(c.primary_key for c in t.columns) 

449 table_has_candidate_pk = any( 

450 matches_fielddef(tablename, c.name, cfg.ddgen_pk_fields) 

451 for c in t.columns 

452 ) 

453 

454 # Is there a table comment? 

455 if t.comment: 

456 comment_ddr = DataDictionaryRow(self.config) 

457 comment_ddr.set_as_table_comment( 

458 src_db=pretty_dbname, 

459 src_table=tablename, 

460 comment=t.comment, 

461 ) 

462 self.rows.append(comment_ddr) 

463 

464 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

465 # Scan each column 

466 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

467 for c in t.columns: 

468 i += 1 

469 if report_every and i % report_every == 0: 

470 log.debug(f"... reading source field number {i}") 

471 log.debug(repr(c)) 

472 r = ReflectedColumnInfo(c) 

473 

474 # Skip column? 

475 if cfg.is_field_denied(r.columnname): 

476 log.debug( 

477 f"Skipping denied column: {r.tablename_columname}" 

478 ) 

479 continue 

480 

481 comment = r.comment 

482 if cfg.ddgen_append_source_info_to_comment: 

483 comment += r.get_column_source_description() 

484 comment = comment.strip() 

485 

486 # Create row 

487 ddr = DataDictionaryRow(self.config) 

488 ddr.set_from_src_db_info( 

489 src_db=pretty_dbname, 

490 src_table=tablename, 

491 src_field=r.columnname, 

492 src_datatype_sqltext=r.datatype_sqltext, 

493 src_sqla_coltype=r.sqla_coltype, 

494 dbconf=cfg, 

495 comment=comment, 

496 nullable=c.nullable, 

497 primary_key=c.primary_key, 

498 table_has_explicit_pk=table_has_explicit_pk, 

499 table_has_candidate_pk=table_has_candidate_pk, 

500 ) 

501 

502 # --------------------------------------------------------- 

503 # If we have this one already, skip ASAP 

504 # This is how incremental data dictionaries get generated. 

505 # --------------------------------------------------------- 

506 sig = ddr.src_signature 

507 if sig in existing_signatures: 

508 log.debug( 

509 f"Skipping duplicated column: " 

510 f"{r.tablename_columname}" 

511 ) 

512 continue 

513 existing_signatures.add(sig) 

514 

515 self.rows.append(ddr) 

516 

517 log.info("... done") 

518 self.clear_caches() 

519 self.sort() 

520 

521 def tidy_draft(self) -> None: 

522 """ 

523 Corrects a draft data dictionary for overall logical consistency. 

524 

525 The checks are: 

526 

527 - Don't scrub in non-patient tables. 

528 - SQL Server only supports one FULLTEXT index per table, and only if 

529 the table has a non-null column with a unique index. 

530 

531 Test code for full-text index creation: 

532 

533 .. code-block:: sql 

534 

535 -- ---------------------------------------------------------------- 

536 -- SQL Server: basic use 

537 -- ---------------------------------------------------------------- 

538 

539 USE mydb; 

540 CREATE FULLTEXT CATALOG default_fulltext_catalog AS DEFAULT; 

541 CREATE TABLE junk (intthing INT PRIMARY KEY, textthing VARCHAR(MAX)); 

542 -- now find the name of the PK index (! -- by hand or see cardinal_pythonlib) 

543 CREATE FULLTEXT INDEX ON junk (textthing) KEY INDEX <pk_index_name>; 

544 

545 -- ---------------------------------------------------------------- 

546 -- SQL Server: it means it about the "NOT NULL" aspects, and a 

547 -- unique index is not enough 

548 -- ---------------------------------------------------------------- 

549 

550 USE mydb; 

551 DROP TABLE IF EXISTS rubbish; 

552 CREATE TABLE rubbish (a INT NOT NULL, b VARCHAR(MAX)); 

553 CREATE UNIQUE INDEX rubbish_a ON rubbish (a); 

554 CREATE FULLTEXT INDEX ON rubbish (b) KEY INDEX rubbish_a; 

555 

556 -- .. that works, but if you remove the "NOT NULL" from the table 

557 -- definition, it fails with: 

558 -- 

559 -- 'rubbish_a' is not a valid index to enforce a full-text search 

560 -- key. A full-text search key must be a unique, non-nullable, 

561 -- single-column index which is not offline, is not defined on a 

562 -- non-deterministic or imprecise nonpersisted computed column, 

563 -- does not have a filter, and has maximum size of 900 bytes. 

564 -- Choose another index for the full-text key. 

565 

566 -- ---------------------------------------------------------------- 

567 -- MySQL: two FULLTEXT indexes on one table 

568 -- ---------------------------------------------------------------- 

569 

570 USE mydb; 

571 CREATE TABLE junk (intthing INT PRIMARY KEY, text1 LONGTEXT, text2 LONGTEXT); 

572 ALTER TABLE junk ADD FULLTEXT INDEX ftidx1 (text1); 

573 ALTER TABLE junk ADD FULLTEXT INDEX ftidx2 (text2); -- OK 

574 """ # noqa: E501 

575 log.info("Tidying/correcting draft data dictionary") 

576 

577 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

578 log.info("... Ensuring we don't scrub in non-patient tables") 

579 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

580 for d, t in self.get_src_db_tablepairs_w_no_pt_info(): 

581 for ddr in self.get_rows_for_src_table(d, t): 

582 if ddr.being_scrubbed: 

583 log.warning( 

584 f"Removing {AlterMethodType.SCRUBIN.value} from " 

585 f"{DataDictionaryRow.ALTER_METHOD} setting of " 

586 f"destination {ddr.dest_signature}, since that is not " 

587 f"a patient table" 

588 ) 

589 ddr.remove_scrub_from_alter_methods() 

590 

591 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

592 log.info("... Make full-text indexes follow dialect rules") 

593 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

594 # https://docs.microsoft.com/en-us/sql/t-sql/statements/create-fulltext-index-transact-sql?view=sql-server-ver15 # noqa: E501 

595 if self.dest_dialect_name == SqlaDialectName.SQLSERVER: 

596 # ----------------------------------------------------------------- 

597 # Checks for Microsoft SQL Server 

598 # ----------------------------------------------------------------- 

599 for d, t in self.get_src_db_tablepairs(): 

600 rows = self.get_rows_for_src_table(d, t) 

601 

602 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

603 # SQL Server: every table with a FULLTEXT index must have a 

604 # column that is non-nullable with a unique index. 

605 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

606 table_ok_for_fulltext = False 

607 for ddr in rows: 

608 if ( 

609 ddr.include 

610 and ddr.not_null 

611 and ddr.index == IndexType.UNIQUE 

612 ): 

613 table_ok_for_fulltext = True 

614 if not table_ok_for_fulltext: 

615 for ddr in rows: 

616 if ddr.include and ddr.index == IndexType.FULLTEXT: 

617 log.warning( 

618 f"To create a FULLTEXT index, SQL Server " 

619 f"requires the table to have a non-nullable " 

620 f"column with a unique index. Can't find one " 

621 f"for destination table {ddr.dest_table!r}. " 

622 f"Removing index from column " 

623 f"{ddr.dest_field!r}." 

624 ) 

625 ddr.index = IndexType.NONE 

626 

627 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

628 # SQL server: only one FULLTEXT index per table. (Although in 

629 # principle you can have one FULLTEXT index that covers 

630 # multiple columns; we don't support that.) 

631 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

632 n_fulltext = 0 

633 for ddr in rows: 

634 if ddr.include and ddr.index == IndexType.FULLTEXT: 

635 if n_fulltext >= 1: 

636 log.warning( 

637 f"SQL Server permits only one FULLTEXT index " 

638 f"per table (and CRATE does not support " 

639 f"multi-column full-text indexes). Since " 

640 f"there is already one, removing the " 

641 f"full-text index from " 

642 f"{ddr.dest_table}.{ddr.dest_field}." 

643 ) 

644 ddr.index = IndexType.NONE 

645 else: 

646 n_fulltext += 1 

647 

648 # MySQL: fine to have multiple FULLTEXT indexes in one table. 

649 # See text code above. 

650 

651 log.info("... done") 

652 self.clear_caches() 

653 

654 def make_dest_datatypes_explicit(self) -> None: 

655 """ 

656 By default, when autocreating a data dictionary, the ``dest_datatype`` 

657 field is not populated explicit, just implicitly. This option makes 

658 them explicit by instantiating those values. Primarily for debugging. 

659 """ 

660 for ddr in self.rows: 

661 ddr.make_dest_datatype_explicit() 

662 

663 # ------------------------------------------------------------------------- 

664 # Sorting 

665 # ------------------------------------------------------------------------- 

666 

667 def sort(self) -> None: 

668 """ 

669 Sorts the data dictionary. 

670 

671 (Table comments, having no source field, will be first among rows for 

672 their tables.) 

673 """ 

674 log.info("Sorting data dictionary") 

675 self.rows = sorted( 

676 self.rows, 

677 key=operator.attrgetter( 

678 "src_db_lowercase", 

679 "src_table_lowercase", 

680 "src_field_lowercase", 

681 ), 

682 ) 

683 log.info("... done") 

684 

685 # ------------------------------------------------------------------------- 

686 # Validation 

687 # ------------------------------------------------------------------------- 

688 

689 def check_against_source_db(self) -> None: 

690 """ 

691 Check DD validity against the source database(s). 

692 

693 Also caches SQLAlchemy source column types. 

694 """ 

695 for d in self.get_source_databases(): 

696 db = self.config.sources[d] 

697 

698 for t in self.get_src_tables(d): 

699 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

700 # Ensure each source table maps to only one destination table 

701 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

702 dt = self.get_dest_tables_for_src_db_table(d, t) 

703 if len(dt) > 1: 

704 raise ValueError( 

705 f"Source table {d}.{t} maps to >1 destination " 

706 f"table: {', '.join(dt)}" 

707 ) 

708 

709 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

710 # Ensure source table is in database 

711 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

712 if t not in db.table_names: 

713 log.debug( 

714 f"Source database {d!r} has tables: {db.table_names}" 

715 ) 

716 raise ValueError( 

717 f"Table {t!r} missing from source database {d!r}" 

718 ) 

719 

720 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

721 # Row checks: preamble 

722 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

723 rows = self.get_rows_for_src_table(d, t) 

724 # We may need to cross-reference rows, so all rows need to know 

725 # their type. 

726 for r in rows: 

727 if r.src_field not in db.metadata.tables[t].columns: 

728 raise ValueError( 

729 f"Column {r.src_field!r} missing from table {t!r} " 

730 f"in source database {d!r}" 

731 ) 

732 sqla_coltype = ( 

733 db.metadata.tables[t].columns[r.src_field].type 

734 ) 

735 r.set_src_sqla_coltype(sqla_coltype) # CACHES TYPE HERE 

736 

737 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

738 # If PID field is required, is it present? 

739 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

740 needs_pidfield = any(r.being_scrubbed for r in rows) 

741 # Before 2021-12-07, we used to check r.master_pid, too. 

742 # However, if nothing is being scrubbed, then the lack of a 

743 # link via primary PID is a researcher inconvenience, not an 

744 # de-identification risk. 

745 if needs_pidfield and not self.get_pid_name(d, t): 

746 raise ValueError( 

747 f"Source table {d}.{t} has a " 

748 f"{AlterMethodType.SCRUBIN.value!r} " 

749 f"field but no primary patient ID field" 

750 ) 

751 

752 pk_colname = None 

753 for r in rows: 

754 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

755 # Data types for special rows 

756 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

757 if r.primary_pid: 

758 ensure_no_source_type_mismatch( 

759 r, self.config.pidtype, primary_pid=True 

760 ) 

761 if r.master_pid: 

762 ensure_no_source_type_mismatch( 

763 r, self.config.mpidtype, primary_pid=False 

764 ) 

765 

766 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

767 # Too many PKs? 

768 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

769 if r.pk: 

770 if pk_colname: 

771 raise ValueError( 

772 f"Table {d}.{t} has >1 source PK set " 

773 f"(previously {pk_colname!r}, " 

774 f"now {r.src_field!r})." 

775 ) 

776 pk_colname = r.src_field 

777 

778 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

779 # Duff alter method? 

780 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

781 for am in r.alter_methods: 

782 if am.extract_from_blob: 

783 extrow = next( 

784 ( 

785 r2 

786 for r2 in rows 

787 if r2.src_field == am.extract_ext_field 

788 ), 

789 None, 

790 ) 

791 if extrow is None: 

792 raise ValueError( 

793 f"alter_method = {r.alter_method}, " 

794 f"but field {am.extract_ext_field} " 

795 f"not found in the same table" 

796 ) 

797 if not is_sqlatype_text_over_one_char( 

798 extrow.src_sqla_coltype 

799 ): 

800 raise ValueError( 

801 f"alter_method = {r.alter_method}, but " 

802 f"field {am.extract_ext_field}, which " 

803 f"should contain an extension or " 

804 f"filename, is not text of >1 character" 

805 ) 

806 

807 def check_valid( 

808 self, 

809 prohibited_fieldnames: List[str] = None, 

810 check_against_source_db: bool = True, 

811 ) -> None: 

812 """ 

813 Check DD validity, internally ± against the source database(s). 

814 

815 Args: 

816 prohibited_fieldnames: 

817 list of prohibited destination fieldnames 

818 check_against_source_db: 

819 check validity against the source database(s)? 

820 

821 Raises: 

822 :exc:`ValueError` if the DD is invalid 

823 """ 

824 if prohibited_fieldnames is None: 

825 prohibited_fieldnames = [] # type: List[str] 

826 log.info("Checking data dictionary...") 

827 if not self.rows: 

828 raise ValueError("Empty data dictionary") 

829 if not self.get_dest_tables_included(): 

830 raise ValueError( 

831 "Empty data dictionary after removing " "redundant tables" 

832 ) 

833 

834 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

835 # Check (or re-check) individual rows 

836 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

837 log.debug("Checking DD: individual row validity...") 

838 for r in self.rows: 

839 r.check_valid() 

840 

841 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

842 # Check collective consistency 

843 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

844 

845 log.debug("Checking DD: prohibited flags...") 

846 for d in self.get_source_databases(): 

847 for t in self.get_src_tables(d): 

848 # This will have excluded all tables where all rows are 

849 # omitted. So now we have only active tables, for which we 

850 # cannot combine certain flags. 

851 # (We used to prohibit these combinations at all times, in the 

852 # DataDictionaryRow class, but it's inconvenient to have to 

853 # alter these flags if you want to omit the whole table.) 

854 for r in self.get_rows_for_src_table(d, t): 

855 if r.add_src_hash and r.omit: 

856 raise ValueError( 

857 f"Do not set {Decision.OMIT.value} on " 

858 f"{DataDictionaryRow.SRC_FLAGS}=" 

859 f"{SrcFlag.ADD_SRC_HASH} fields -- " 

860 f"currently set for {r.src_signature}" 

861 ) 

862 if r.constant and r.omit: 

863 raise ValueError( 

864 f"Do not set {Decision.OMIT.value} on " 

865 f"{DataDictionaryRow.SRC_FLAGS}=" 

866 f"{SrcFlag.CONSTANT} fields -- " 

867 f"currently set for {r.src_signature}" 

868 ) 

869 

870 log.debug("Checking DD: table consistency...") 

871 for d, t in self.get_scrub_from_db_table_pairs(): 

872 pid_field = self.get_pid_name(d, t) 

873 if not pid_field: 

874 raise ValueError( 

875 f"Scrub-source table {d}.{t} must have a patient ID field " 

876 f"(one with flag {SrcFlag.PRIMARY_PID})" 

877 ) 

878 

879 log.debug("Checking DD: prohibited fieldnames...") 

880 if prohibited_fieldnames: 

881 for r in self.rows: 

882 r.check_prohibited_fieldnames(prohibited_fieldnames) 

883 

884 log.debug("Checking DD: opt-out fields...") 

885 for t in self.get_optout_defining_fields(): 

886 (src_db, src_table, optout_colname, pid_colname, mpid_colname) = t 

887 if not pid_colname and not mpid_colname: 

888 raise ValueError( 

889 f"Field {src_db}.{src_table}.{optout_colname} has " 

890 f"{DataDictionaryRow.SRC_FLAGS}={SrcFlag.OPT_OUT} set, " 

891 f"but that table does not have a primary patient ID field " 

892 f"or a master patient ID field" 

893 ) 

894 

895 log.debug("Checking DD: destination tables...") 

896 for t in self.get_dest_tables_included(): 

897 sdt = self.get_src_dbs_tables_for_dest_table(t) 

898 if len(sdt) > 1: 

899 raise ValueError( 

900 "Destination table {t} is mapped to by multiple source " 

901 "databases: {s}".format( 

902 t=t, 

903 s=", ".join(["{}.{}".format(s[0], s[1]) for s in sdt]), 

904 ) 

905 ) 

906 

907 log.debug("Checking DD: duplicate source rows?") 

908 src_sigs = [r.src_signature for r in self.rows] 

909 src_duplicates = [ 

910 item for item, count in Counter(src_sigs).items() if count > 1 

911 ] 

912 if src_duplicates: 

913 raise ValueError(f"Duplicate source rows: {src_duplicates}") 

914 

915 log.debug("Checking DD: duplicate destination rows?") 

916 dst_sigs = [r.dest_signature for r in self.rows if not r.omit] 

917 dst_duplicates = [ 

918 item for item, count in Counter(dst_sigs).items() if count > 1 

919 ] 

920 if dst_duplicates: 

921 raise ValueError(f"Duplicate destination rows: {dst_duplicates}") 

922 

923 if check_against_source_db: 

924 log.debug("Checking DD against source database tables...") 

925 self.check_against_source_db() 

926 

927 log.debug("Checking DD: global patient-defining fields...") 

928 n_definers = self.n_definers 

929 if n_definers == 0: 

930 if self.config.allow_no_patient_info: 

931 log.warning( 

932 "NO PATIENT-DEFINING FIELD! DATABASE(S) WILL " 

933 "BE COPIED, NOT ANONYMISED." 

934 ) 

935 else: 

936 raise ValueError( 

937 f"No patient-defining field! (And " 

938 f"{AnonymiseConfigKeys.ALLOW_NO_PATIENT_INFO} not set.)" 

939 ) 

940 elif n_definers > 1: 

941 log.warning( 

942 f"Unusual: >1 field with " 

943 f"{DataDictionaryRow.SRC_FLAGS}=" 

944 f"{SrcFlag.DEFINES_PRIMARY_PIDS} set." 

945 ) 

946 

947 log.debug("Checking DD: table comments") 

948 table_comments_seen = set() # type: Set[str] 

949 for ddr in self.rows: 

950 if not ddr.is_table_comment: 

951 continue 

952 if ddr.dest_table in table_comments_seen: 

953 raise ValueError( 

954 f"More than one table comment for destination table " 

955 f"{ddr.dest_table}" 

956 ) 

957 table_comments_seen.add(ddr.dest_table) 

958 

959 log.debug("... DD checked.") 

960 

961 # ------------------------------------------------------------------------- 

962 # Saving 

963 # ------------------------------------------------------------------------- 

964 

965 def write(self, filename: str, filetype: str = None) -> None: 

966 """ 

967 Writes the dictionary, either specifying the filetype or autodetecting 

968 it from the specified filename. 

969 

970 Args: 

971 filename: 

972 Name of file to write, or "-" for stdout (in which case the 

973 filetype is forced to TSV). 

974 filetype: 

975 File type as one of ``.ods``, ``.tsv``, or ``.xlsx``; 

976 alternatively, use ``None`` to autodetect from the filename. 

977 """ 

978 log.info("Saving data dictionary...") 

979 data = self._as_dict() 

980 write_spreadsheet(filename, data, filetype=filetype) 

981 

982 def _as_dict(self) -> Dict[str, Any]: 

983 """ 

984 Returns an ordered dictionary representation used for writing 

985 spreadsheets. 

986 """ 

987 sheetname = "data_dictionary" 

988 rows = [DataDictionaryRow.header_row()] + [ 

989 ddr.as_row() for ddr in self.rows 

990 ] 

991 data = OrderedDict() 

992 data[sheetname] = rows 

993 return data 

994 

995 # ------------------------------------------------------------------------- 

996 # Global DD queries 

997 # ------------------------------------------------------------------------- 

998 

999 @property 

1000 def n_definers(self) -> int: 

1001 """ 

1002 The number of patient-defining columns. 

1003 """ 

1004 return sum([1 if x.defines_primary_pids else 0 for x in self.rows]) 

1005 

1006 @lru_cache(maxsize=None) 

1007 def get_source_databases(self) -> AbstractSet[str]: 

1008 """ 

1009 Return a SortedSet of source database names. 

1010 """ 

1011 return SortedSet([ddr.src_db for ddr in self.rows if ddr.required]) 

1012 

1013 @lru_cache(maxsize=None) 

1014 def get_scrub_from_db_table_pairs(self) -> AbstractSet[Tuple[str, str]]: 

1015 """ 

1016 Return a SortedSet of ``source_database_name, source_table`` tuples 

1017 where those fields contain ``scrub_src`` (scrub-from) information. 

1018 """ 

1019 return SortedSet( 

1020 [(ddr.src_db, ddr.src_table) for ddr in self.rows if ddr.scrub_src] 

1021 ) 

1022 # even if omit flag set 

1023 

1024 @lru_cache(maxsize=None) 

1025 def get_src_db_tablepairs(self) -> AbstractSet[Tuple[str, str]]: 

1026 """ 

1027 Return a SortedSet of all ``source_database_name, source_table`` 

1028 tuples. 

1029 """ 

1030 return SortedSet([(ddr.src_db, ddr.src_table) for ddr in self.rows]) 

1031 

1032 @lru_cache(maxsize=None) 

1033 def get_src_db_tablepairs_w_pt_info(self) -> AbstractSet[Tuple[str, str]]: 

1034 """ 

1035 Return a SortedSet of ``source_database_name, source_table`` tuples 

1036 for tables that contain patient information. 

1037 """ 

1038 return SortedSet( 

1039 [ 

1040 (ddr.src_db, ddr.src_table) 

1041 for ddr in self.rows 

1042 if ddr.contains_patient_info 

1043 ] 

1044 ) 

1045 

1046 def get_src_db_tablepairs_w_no_pt_info( 

1047 self, 

1048 ) -> AbstractSet[Tuple[str, str]]: 

1049 """ 

1050 Return a SortedSet of ``source_database_name, source_table`` tuples 

1051 for tables that contain no patient information. 

1052 """ 

1053 return ( 

1054 self.get_src_db_tablepairs() 

1055 - self.get_src_db_tablepairs_w_pt_info() 

1056 ) 

1057 

1058 def get_tables_w_no_pt_info(self) -> AbstractSet[str]: 

1059 """ 

1060 Return a SortedSet of ``source_table`` names for tables that contain no 

1061 patient information. 

1062 """ 

1063 tables_with_pt_info = SortedSet( 

1064 [ddr.src_table for ddr in self.rows if ddr.contains_patient_info] 

1065 ) 

1066 all_tables = SortedSet([ddr.src_table for ddr in self.rows]) 

1067 return all_tables - tables_with_pt_info 

1068 

1069 def get_tables_w_scrub_src(self) -> AbstractSet[str]: 

1070 """ 

1071 Return a SortedSet of ``source_table`` names for tables that contain 

1072 ``scrub_src`` information, i.e. that contribute to anonymisation. 

1073 """ 

1074 return SortedSet( 

1075 [ddr.src_table for ddr in self.rows if ddr.contains_scrub_src] 

1076 ) 

1077 

1078 @lru_cache(maxsize=None) 

1079 def get_src_db_tablepairs_w_int_pk(self) -> AbstractSet[Tuple[str, str]]: 

1080 """ 

1081 Return a SortedSet of ``source_database_name, source_table`` tuples 

1082 for tables that have an integer PK. 

1083 """ 

1084 return SortedSet( 

1085 [ 

1086 (ddr.src_db, ddr.src_table) 

1087 for ddr in self.rows 

1088 if self.get_int_pk_ddr(ddr.src_db, ddr.src_table) is not None 

1089 ] 

1090 ) 

1091 

1092 @lru_cache(maxsize=None) 

1093 def get_src_dbs_tables_with_no_pt_info_no_pk( 

1094 self, 

1095 ) -> AbstractSet[Tuple[str, str]]: 

1096 """ 

1097 Return a SortedSet of ``source_database_name, source_table`` tuples 

1098 where the table has no patient information and no integer PK. 

1099 """ 

1100 return ( 

1101 self.get_src_db_tablepairs() 

1102 - self.get_src_db_tablepairs_w_pt_info() 

1103 - self.get_src_db_tablepairs_w_int_pk() 

1104 ) 

1105 

1106 @lru_cache(maxsize=None) 

1107 def get_src_dbs_tables_with_no_pt_info_int_pk( 

1108 self, 

1109 ) -> AbstractSet[Tuple[str, str]]: 

1110 """ 

1111 Return a SortedSet of ``source_database_name, source_table`` tuples 

1112 where the table has no patient information and has an integer PK. 

1113 """ 

1114 return ( 

1115 self.get_src_db_tablepairs() 

1116 - self.get_src_db_tablepairs_w_pt_info() 

1117 ) & self.get_src_db_tablepairs_w_int_pk() # & is intersection 

1118 

1119 @lru_cache(maxsize=None) 

1120 def get_dest_tables_all(self) -> AbstractSet[str]: 

1121 """ 

1122 Return a SortedSet of all destination table names (including tables 

1123 that will receive no contents). 

1124 """ 

1125 return SortedSet([ddr.dest_table for ddr in self.rows]) 

1126 

1127 @lru_cache(maxsize=None) 

1128 def get_dest_tables_included(self) -> AbstractSet[str]: 

1129 """ 

1130 Return a SortedSet of all destination table names (tables with at least 

1131 some columns that are included). 

1132 """ 

1133 return SortedSet([ddr.dest_table for ddr in self.rows if not ddr.omit]) 

1134 

1135 @lru_cache(maxsize=None) 

1136 def get_dest_tables_with_patient_info(self) -> AbstractSet[str]: 

1137 """ 

1138 Return a SortedSet of destination table names that have patient 

1139 information. 

1140 """ 

1141 return SortedSet( 

1142 [ 

1143 ddr.dest_table 

1144 for ddr in self.rows 

1145 if ddr.contains_patient_info and not ddr.omit 

1146 ] 

1147 ) 

1148 

1149 @lru_cache(maxsize=None) 

1150 def get_optout_defining_fields( 

1151 self, 

1152 ) -> AbstractSet[Tuple[str, str, str, str, str]]: 

1153 """ 

1154 Return a SortedSet of ``src_db, src_table, src_field, pidfield, 

1155 mpidfield`` tuples for rows that define opt-out information. 

1156 """ 

1157 return SortedSet( 

1158 [ 

1159 ( 

1160 ddr.src_db, 

1161 ddr.src_table, 

1162 ddr.src_field, 

1163 self.get_pid_name(ddr.src_db, ddr.src_table), 

1164 self.get_mpid_name(ddr.src_db, ddr.src_table), 

1165 ) 

1166 for ddr in self.rows 

1167 if ddr.opt_out_info 

1168 ] 

1169 ) 

1170 

1171 @lru_cache(maxsize=None) 

1172 def get_mandatory_scrubber_sigs(self) -> AbstractSet[str]: 

1173 """ 

1174 Return a set of field signatures (strings of the format 

1175 ``db.table.column``) for all rows representing "required scrubber" 

1176 fields -- that is, rows that must have at least one non-NULL value for 

1177 each patient, or the patient won't get processed. 

1178 """ 

1179 return set( 

1180 [ddr.src_signature for ddr in self.rows if ddr.required_scrubber] 

1181 ) 

1182 

1183 def get_summary_info_for_table( 

1184 self, src_db: str, src_table: str 

1185 ) -> DDTableSummary: 

1186 """ 

1187 Returns summary information for a specific table. 

1188 """ 

1189 rows = self.get_rows_for_src_table( 

1190 src_db, src_table, skip_table_comments=False 

1191 ) 

1192 

1193 # Source 

1194 src_has_pk = False 

1195 src_pk_fieldname = None # type: Optional[str] 

1196 src_constant = False 

1197 src_addition_only = False 

1198 src_defines_pid = False 

1199 src_has_pid = False 

1200 src_has_mpid = False 

1201 src_has_opt_out = False 

1202 src_has_patient_scrub_info = False 

1203 src_has_third_party_scrub_info = False 

1204 src_has_required_scrub_info = False 

1205 src_has_table_comment = False 

1206 # Destination 

1207 dest_table = None # type: Optional[str] 

1208 dest_has_rows = False 

1209 dest_add_src_hash = False 

1210 dest_being_scrubbed = False 

1211 

1212 for ddr in rows: 

1213 # Source 

1214 src_has_pk = src_has_pk or ddr.pk 

1215 if ddr.pk: 

1216 src_pk_fieldname = ddr.src_field 

1217 src_constant = src_constant or ddr.constant 

1218 src_addition_only = src_addition_only or ddr.addition_only 

1219 src_defines_pid = src_defines_pid or ddr.defines_primary_pids 

1220 src_has_pid = src_has_pid or ddr.primary_pid 

1221 src_has_mpid = src_has_mpid or ddr.master_pid 

1222 src_has_opt_out = src_has_opt_out or ddr.opt_out_info 

1223 src_has_patient_scrub_info = ( 

1224 src_has_patient_scrub_info 

1225 or ddr.contains_patient_scrub_src_info 

1226 ) 

1227 src_has_third_party_scrub_info = ( 

1228 src_has_third_party_scrub_info or ddr.contains_third_party_info 

1229 ) 

1230 src_has_required_scrub_info = ( 

1231 src_has_required_scrub_info or ddr.required_scrubber 

1232 ) 

1233 src_has_table_comment = ( 

1234 src_has_table_comment or ddr.is_table_comment 

1235 ) 

1236 # Destination 

1237 dest_table = dest_table or ddr.dest_table 

1238 dest_has_rows = dest_has_rows or not ddr.omit 

1239 dest_add_src_hash = dest_add_src_hash or ddr.add_src_hash 

1240 dest_being_scrubbed = dest_being_scrubbed or ddr.being_scrubbed 

1241 

1242 return DDTableSummary( 

1243 # Which table? 

1244 src_db=src_db, 

1245 src_table=src_table, 

1246 # Source info 

1247 src_has_pk=src_has_pk, 

1248 src_pk_fieldname=src_pk_fieldname, 

1249 src_constant=src_constant, 

1250 src_addition_only=src_addition_only, 

1251 src_defines_pid=src_defines_pid, 

1252 src_has_pid=src_has_pid, 

1253 src_has_mpid=src_has_mpid, 

1254 src_has_opt_out=src_has_opt_out, 

1255 src_has_patient_scrub_info=src_has_patient_scrub_info, 

1256 src_has_third_party_scrub_info=src_has_third_party_scrub_info, 

1257 src_has_required_scrub_info=src_has_required_scrub_info, 

1258 src_has_table_comment=src_has_table_comment, 

1259 # Destination info 

1260 dest_table=dest_table, 

1261 dest_has_rows=dest_has_rows, 

1262 dest_add_src_hash=dest_add_src_hash, 

1263 dest_being_scrubbed=dest_being_scrubbed, 

1264 ) 

1265 

1266 def get_summary_info_all_tables(self) -> List[DDTableSummary]: 

1267 """ 

1268 Returns summary information by table. 

1269 """ 

1270 infolist = [] # type: List[DDTableSummary] 

1271 for src_db, src_table in self.get_src_db_tablepairs(): 

1272 infolist.append(self.get_summary_info_for_table(src_db, src_table)) 

1273 return infolist 

1274 

1275 # ------------------------------------------------------------------------- 

1276 # Queries by source DB 

1277 # ------------------------------------------------------------------------- 

1278 

1279 @lru_cache(maxsize=None) 

1280 def get_src_tables(self, src_db: str) -> AbstractSet[str]: 

1281 """ 

1282 For a given source database name, return a SortedSet of all source 

1283 tables that are required (that is, ones being copied and ones providing 

1284 vital patient information). 

1285 """ 

1286 return SortedSet( 

1287 [ 

1288 ddr.src_table 

1289 for ddr in self.rows 

1290 if ddr.src_db == src_db and ddr.required 

1291 ] 

1292 ) 

1293 

1294 @lru_cache(maxsize=None) 

1295 def get_src_tables_with_active_dest(self, src_db: str) -> AbstractSet[str]: 

1296 """ 

1297 For a given source database name, return a SortedSet of its source 

1298 tables that have an active destination. 

1299 """ 

1300 return SortedSet( 

1301 [ 

1302 ddr.src_table 

1303 for ddr in self.rows 

1304 if ddr.src_db == src_db and not ddr.omit 

1305 ] 

1306 ) 

1307 

1308 @lru_cache(maxsize=None) 

1309 def get_src_tables_with_patient_info( 

1310 self, src_db: str 

1311 ) -> AbstractSet[str]: 

1312 """ 

1313 For a given source database name, return a SortedSet of source tables 

1314 that have patient information. 

1315 """ 

1316 return SortedSet( 

1317 [ 

1318 ddr.src_table 

1319 for ddr in self.rows 

1320 if ddr.src_db == src_db and ddr.contains_patient_info 

1321 ] 

1322 ) 

1323 

1324 @lru_cache(maxsize=None) 

1325 def get_patient_src_tables_with_active_dest( 

1326 self, src_db: str 

1327 ) -> AbstractSet[str]: 

1328 """ 

1329 For a given source database name, return a SortedSet of source tables 

1330 that contain patient information and have an active destination table. 

1331 """ 

1332 return self.get_src_tables_with_active_dest( 

1333 src_db 

1334 ) & self.get_src_tables_with_patient_info(src_db) 

1335 

1336 # ------------------------------------------------------------------------- 

1337 # Queries by source DB/table 

1338 # ------------------------------------------------------------------------- 

1339 

1340 @lru_cache(maxsize=None) 

1341 def get_dest_tables_for_src_db_table( 

1342 self, src_db: str, src_table: str 

1343 ) -> AbstractSet[str]: 

1344 """ 

1345 For a given source database/table, return a SortedSet of destination 

1346 tables. 

1347 """ 

1348 return SortedSet( 

1349 [ 

1350 ddr.dest_table 

1351 for ddr in self.rows 

1352 if ( 

1353 ddr.src_db == src_db 

1354 and ddr.src_table == src_table 

1355 and not ddr.omit 

1356 ) 

1357 ] 

1358 ) 

1359 

1360 @lru_cache(maxsize=None) 

1361 def get_dest_table_for_src_db_table( 

1362 self, src_db: str, src_table: str 

1363 ) -> str: 

1364 """ 

1365 For a given source database/table, return the single or the first 

1366 destination table. 

1367 """ 

1368 return list(self.get_dest_tables_for_src_db_table(src_db, src_table))[ 

1369 0 

1370 ] 

1371 

1372 @lru_cache(maxsize=None) 

1373 def get_rows_for_src_table( 

1374 self, src_db: str, src_table: str, skip_table_comments: bool = True 

1375 ) -> AbstractSet[DataDictionaryRow]: 

1376 """ 

1377 For a given source database name/table, return a SortedSet of DD rows. 

1378 """ 

1379 return SortedSet( 

1380 [ 

1381 ddr 

1382 for ddr in self.rows 

1383 if ddr.src_db == src_db 

1384 and ddr.src_table == src_table 

1385 and (not skip_table_comments or not ddr.is_table_comment) 

1386 ] 

1387 ) 

1388 

1389 @lru_cache(maxsize=None) 

1390 def get_fieldnames_for_src_table( 

1391 self, src_db: str, src_table: str 

1392 ) -> AbstractSet[DataDictionaryRow]: 

1393 """ 

1394 For a given source database name/table, return a SortedSet of source 

1395 fields. 

1396 """ 

1397 return SortedSet( 

1398 [ 

1399 ddr.src_field 

1400 for ddr in self.rows 

1401 if ddr.src_db == src_db 

1402 and ddr.src_table == src_table 

1403 and ddr.src_field 

1404 ] 

1405 ) 

1406 

1407 @lru_cache(maxsize=None) 

1408 def get_scrub_from_rows( 

1409 self, src_db: str, src_table: str 

1410 ) -> AbstractSet[DataDictionaryRow]: 

1411 """ 

1412 Return a SortedSet of DD rows for all fields containing ``scrub_src`` 

1413 (scrub-from) information. 

1414 """ 

1415 return SortedSet( 

1416 [ 

1417 ddr 

1418 for ddr in self.rows 

1419 if ( 

1420 ddr.scrub_src 

1421 and ddr.src_db == src_db 

1422 and ddr.src_table == src_table 

1423 ) 

1424 ] 

1425 ) 

1426 # even if omit flag set 

1427 

1428 def get_scrub_from_rows_as_fieldinfo( 

1429 self, src_db: str, src_table: str, depth: int, max_depth: int 

1430 ) -> List[ScrubSourceFieldInfo]: 

1431 """ 

1432 Using :meth:`get_scrub_from_rows`, as a list of 

1433 :class:`ScrubSourceFieldInfo` objects, which is more convenient for 

1434 scrubbing. 

1435 

1436 Args: 

1437 src_db: 

1438 Source database name. 

1439 src_table: 

1440 Source table name. 

1441 depth: 

1442 Current recursion depth for looking up third-party information. 

1443 max_depth: 

1444 Maximum permitted recursion depth for looking up third-party 

1445 information. 

1446 """ 

1447 ddrows = self.get_scrub_from_rows(src_db, src_table) 

1448 infolist = [] # type: List[ScrubSourceFieldInfo] 

1449 for ddr in ddrows: 

1450 info = ScrubSourceFieldInfo( 

1451 is_mpid=( 

1452 depth == 0 

1453 and ddr.master_pid 

1454 # The check for "depth == 0" means that third-party 

1455 # information is never marked as patient-related. 

1456 ), 

1457 is_patient=(depth == 0 and ddr.scrub_src is ScrubSrc.PATIENT), 

1458 recurse=( 

1459 depth < max_depth 

1460 and ddr.scrub_src is ScrubSrc.THIRDPARTY_XREF_PID 

1461 ), 

1462 required_scrubber=ddr.required_scrubber, 

1463 scrub_method=PersonalizedScrubber.get_scrub_method( 

1464 ddr.src_datatype, ddr.scrub_method 

1465 ), 

1466 signature=ddr.src_signature, 

1467 value_fieldname=ddr.src_field, 

1468 ) 

1469 infolist.append(info) 

1470 return infolist 

1471 

1472 @lru_cache(maxsize=None) 

1473 def get_pk_ddr( 

1474 self, src_db: str, src_table: str 

1475 ) -> Optional[DataDictionaryRow]: 

1476 """ 

1477 For a given source database name and table, return the DD row for the 

1478 PK for that table, whether integer or not. 

1479 

1480 Will return ``None`` if no such data dictionary row exists. 

1481 """ 

1482 for ddr in self.rows: 

1483 if ddr.src_db == src_db and ddr.src_table == src_table and ddr.pk: 

1484 return ddr 

1485 return None 

1486 

1487 @lru_cache(maxsize=None) 

1488 def get_int_pk_ddr( 

1489 self, src_db: str, src_table: str 

1490 ) -> Optional[DataDictionaryRow]: 

1491 """ 

1492 For a given source database name and table, return the DD row for the 

1493 integer PK for that table. 

1494 

1495 Will return ``None`` if no such data dictionary row exists. 

1496 """ 

1497 for ddr in self.rows: 

1498 if ( 

1499 ddr.src_db == src_db 

1500 and ddr.src_table == src_table 

1501 and ddr.pk 

1502 and is_sqltype_integer(ddr.src_datatype) 

1503 ): 

1504 return ddr 

1505 return None 

1506 

1507 @lru_cache(maxsize=None) 

1508 def get_int_pk_name(self, src_db: str, src_table: str) -> Optional[str]: 

1509 """ 

1510 For a given source database name and table, return the field name of 

1511 the integer PK for that table (or ``None`` if there isn't one). 

1512 """ 

1513 ddr = self.get_int_pk_ddr(src_db, src_table) 

1514 if ddr is None: 

1515 return None 

1516 return ddr.src_field 

1517 

1518 @lru_cache(maxsize=None) 

1519 def has_active_destination(self, src_db: str, src_table: str) -> bool: 

1520 """ 

1521 For a given source database name and table, does it have an active 

1522 destination? 

1523 """ 

1524 for ddr in self.rows: 

1525 if ( 

1526 ddr.src_db == src_db 

1527 and ddr.src_table == src_table 

1528 and not ddr.omit 

1529 ): 

1530 return True 

1531 return False 

1532 

1533 @lru_cache(maxsize=None) 

1534 def get_pid_name(self, src_db: str, src_table: str) -> Optional[str]: 

1535 """ 

1536 For a given source database name and table: return the field name of 

1537 the field providing primary PID information (or ``None`` if there isn't 

1538 one). 

1539 """ 

1540 for ddr in self.rows: 

1541 if ( 

1542 ddr.src_db == src_db 

1543 and ddr.src_table == src_table 

1544 and ddr.primary_pid 

1545 ): 

1546 return ddr.src_field 

1547 return None 

1548 

1549 @lru_cache(maxsize=None) 

1550 def get_mpid_name(self, src_db: str, src_table: str) -> Optional[str]: 

1551 """ 

1552 For a given source database name and table: return the field name of 

1553 the field providing master PID (MPID) information (or ``None`` if there 

1554 isn't one). 

1555 """ 

1556 for ddr in self.rows: 

1557 if ( 

1558 ddr.src_db == src_db 

1559 and ddr.src_table == src_table 

1560 and ddr.master_pid 

1561 ): 

1562 return ddr.src_field 

1563 return None 

1564 

1565 # ------------------------------------------------------------------------- 

1566 # Queries by destination table 

1567 # ------------------------------------------------------------------------- 

1568 

1569 @lru_cache(maxsize=None) 

1570 def get_src_dbs_tables_for_dest_table( 

1571 self, dest_table: str 

1572 ) -> AbstractSet[Tuple[str, str]]: 

1573 """ 

1574 For a given destination table, return a SortedSet of ``dbname, table`` 

1575 tuples, representing source database(s)/table(s). 

1576 """ 

1577 return SortedSet( 

1578 [ 

1579 (ddr.src_db, ddr.src_table) 

1580 for ddr in self.rows 

1581 if ddr.dest_table == dest_table 

1582 ] 

1583 ) 

1584 

1585 @lru_cache(maxsize=None) 

1586 def get_rows_for_dest_table( 

1587 self, dest_table: str, skip_table_comments: bool = True 

1588 ) -> AbstractSet[DataDictionaryRow]: 

1589 """ 

1590 For a given destination table, return a SortedSet of DD rows. 

1591 """ 

1592 return SortedSet( 

1593 [ 

1594 ddr 

1595 for ddr in self.rows 

1596 if ddr.dest_table == dest_table 

1597 and not ddr.omit 

1598 and (not skip_table_comments or not ddr.is_table_comment) 

1599 ] 

1600 ) 

1601 

1602 # ------------------------------------------------------------------------- 

1603 # SQLAlchemy Table objects 

1604 # ------------------------------------------------------------------------- 

1605 

1606 @lru_cache(maxsize=None) 

1607 def get_dest_sqla_table(self, tablename: str) -> Table: 

1608 """ 

1609 For a given destination table name, return an 

1610 :class:`sqlalchemy.sql.schema.Table` object for the destination table 

1611 (which we will create). 

1612 """ 

1613 config = self.config 

1614 metadata = config.destdb.metadata 

1615 timefield = config.timefield 

1616 add_mrid_wherever_rid_added = config.add_mrid_wherever_rid_added 

1617 pid_found = False 

1618 rows_include_mrid_with_expected_name = False 

1619 columns = [] # type: List[Column] 

1620 extra_kwargs = dict(extend_existing=True) # type: Dict[str, Any] 

1621 for ddr in self.get_rows_for_dest_table( 

1622 tablename, skip_table_comments=False 

1623 ): 

1624 if ddr.is_table_comment and not ddr.omit: 

1625 extra_kwargs["comment"] = ddr.comment 

1626 continue # don't build a column for this one 

1627 columns.append(ddr.dest_sqla_column) 

1628 if ddr.add_src_hash: 

1629 columns.append(self._get_srchash_sqla_column()) 

1630 if ddr.primary_pid: 

1631 columns.append(self._get_trid_sqla_column()) 

1632 pid_found = True 

1633 if ( 

1634 ddr.master_pid 

1635 and ddr.dest_field == config.master_research_id_fieldname 

1636 ): 

1637 # This table has an explicit MRID field with the expected name; 

1638 # we make a note, because if we're being asked to add MRIDs 

1639 # automatically along with RIDs, we need not to do it twice. 

1640 rows_include_mrid_with_expected_name = True 

1641 if ( 

1642 pid_found 

1643 and add_mrid_wherever_rid_added 

1644 and not rows_include_mrid_with_expected_name 

1645 ): 

1646 columns.append(self._get_mrid_sqla_column()) 

1647 if timefield: 

1648 timecol = Column( 

1649 timefield, 

1650 DateTime, 

1651 comment=AnonymiseColumnComments.TIMEFIELD_COMMENT, 

1652 ) 

1653 columns.append(timecol) 

1654 return Table( 

1655 tablename, metadata, *columns, **TABLE_KWARGS, **extra_kwargs 

1656 ) 

1657 

1658 def _get_srchash_sqla_column(self) -> Column: 

1659 """ 

1660 Returns a :class:`sqlalchemy.sql.schema.Column` object for the 

1661 "source hash" column (which is inserted into many destination tables 

1662 so they can record the hash of their source, for change detection). 

1663 """ 

1664 return Column( 

1665 self.config.source_hash_fieldname, 

1666 self.config.sqltype_encrypted_pid, 

1667 comment="Hashed amalgamation of all source fields", 

1668 ) 

1669 

1670 def _get_trid_sqla_column(self) -> Column: 

1671 """ 

1672 Returns a :class:`sqlalchemy.sql.schema.Column` object for the "TRID" 

1673 column. This is inserted into all patient-related destination tables as 

1674 a high-speed (integer) but impermanent research ID -- a transient 

1675 research ID (TRID). 

1676 """ 

1677 return Column( 

1678 self.config.trid_fieldname, 

1679 TridType, 

1680 nullable=False, 

1681 comment="Transient integer research ID (TRID)", 

1682 ) 

1683 

1684 def _get_mrid_sqla_column(self) -> Column: 

1685 """ 

1686 Returns a :class:`sqlalchemy.sql.schema.Column` object for the "MRID" 

1687 column. This is inserted into all patient-related destination tables 

1688 where the flag 'add_mrid_wherever_rid_added' is set. 

1689 """ 

1690 return Column( 

1691 self.config.master_research_id_fieldname, 

1692 self.config.sqltype_encrypted_pid, 

1693 nullable=True, 

1694 comment="Master research ID (MRID)", 

1695 ) 

1696 

1697 # ------------------------------------------------------------------------- 

1698 # Clear caches 

1699 # ------------------------------------------------------------------------- 

1700 

1701 def cached_funcs(self) -> List[Any]: 

1702 """ 

1703 Returns a list of our methods that are cached. See 

1704 :func:`clear_caches`. 

1705 """ 

1706 return [ 

1707 self.get_source_databases, 

1708 self.get_scrub_from_db_table_pairs, 

1709 self.get_src_db_tablepairs, 

1710 self.get_src_db_tablepairs_w_pt_info, 

1711 self.get_src_db_tablepairs_w_int_pk, 

1712 self.get_src_dbs_tables_with_no_pt_info_no_pk, 

1713 self.get_src_dbs_tables_with_no_pt_info_int_pk, 

1714 self.get_dest_tables_all, 

1715 self.get_dest_tables_included, 

1716 self.get_dest_tables_with_patient_info, 

1717 self.get_optout_defining_fields, 

1718 self.get_mandatory_scrubber_sigs, 

1719 self.get_src_tables, 

1720 self.get_src_tables_with_active_dest, 

1721 self.get_src_tables_with_patient_info, 

1722 self.get_patient_src_tables_with_active_dest, 

1723 self.get_dest_tables_for_src_db_table, 

1724 self.get_dest_table_for_src_db_table, 

1725 self.get_rows_for_src_table, 

1726 self.get_fieldnames_for_src_table, 

1727 self.get_scrub_from_rows, 

1728 self.get_pk_ddr, 

1729 self.get_int_pk_ddr, 

1730 self.get_int_pk_name, 

1731 self.has_active_destination, 

1732 self.get_pid_name, 

1733 self.get_mpid_name, 

1734 self.get_src_dbs_tables_for_dest_table, 

1735 self.get_rows_for_dest_table, 

1736 self.get_dest_sqla_table, 

1737 ] 

1738 

1739 def clear_caches(self) -> None: 

1740 """ 

1741 Clear all our cached information. 

1742 """ 

1743 for func in self.cached_funcs(): 

1744 func.cache_clear() 

1745 

1746 def debug_cache_hits(self) -> None: 

1747 """ 

1748 Report cache hit information for our caches, to the Python log. 

1749 """ 

1750 for func in self.cached_funcs(): 

1751 log.debug(f"{func.__name__}: {func.cache_info()}") 

1752 

1753 # ------------------------------------------------------------------------- 

1754 # Filtering 

1755 # ------------------------------------------------------------------------- 

1756 

1757 KEEP_FUNCTION_TYPE = Callable[[DataDictionaryRow], bool] 

1758 # ... returns keep (True/False) 

1759 

1760 def remove_rows_by_filter(self, keep: KEEP_FUNCTION_TYPE) -> None: 

1761 """ 

1762 Removes any rows that do not pass a filter function. 

1763 (Retains table comment rows.) 

1764 

1765 Args: 

1766 keep: 

1767 Function taking a data dictionary row as an argument, and 

1768 returning a boolean of whether to keep the row. 

1769 """ 

1770 self.rows = [row for row in self.rows if keep(row)] 

1771 

1772 def omit_rows_by_filter(self, keep: KEEP_FUNCTION_TYPE) -> None: 

1773 """ 

1774 Set to "omit" any rows that do not pass a filter function. 

1775 Does not alter any rows already set to omit. 

1776 (Skips table comment rows.) 

1777 

1778 Args: 

1779 keep: 

1780 Function taking a data dictionary row as an argument, and 

1781 returning a boolean of whether to keep the row. 

1782 """ 

1783 for row in self.rows: 

1784 if row.is_table_comment: 

1785 continue 

1786 if not row.omit: 

1787 row.omit = not keep(row) 

1788 

1789 MODIFYING_KEEP_FUNCTION_TYPE = Callable[ 

1790 [DataDictionaryRow], Optional[DataDictionaryRow] 

1791 ] 

1792 # returns the row (perhaps modified) to keep, or None to reject 

1793 

1794 def remove_rows_by_modifying_filter( 

1795 self, keep_modify: MODIFYING_KEEP_FUNCTION_TYPE 

1796 ) -> None: 

1797 """ 

1798 Removes any rows that do not pass a filter function; allows the filter 

1799 function to modify rows that are kept. 

1800 (Retains table comment rows.) 

1801 

1802 Args: 

1803 keep_modify: 

1804 Function taking a data dictionary row as an argument, and 

1805 returning either the row (potentially modified) to retain it, 

1806 or ``None`` to reject it. 

1807 """ 

1808 new_rows = [] # type: List[DataDictionaryRow] 

1809 for row in self.rows: 

1810 if row.is_table_comment: 

1811 continue 

1812 result = keep_modify(row) 

1813 if result is not None: 

1814 new_rows.append(result) 

1815 self.rows = new_rows