Coverage for anonymise/anonymise.py: 35%

774 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-01-09 10:40 -0600

1""" 

2crate_anon/anonymise/anonymise.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Anonymise one or more SQL-based source databases into a destination database 

27using a data dictionary.** 

28 

29""" 

30 

31# ============================================================================= 

32# Imports 

33# ============================================================================= 

34 

35import logging 

36import random 

37import sys 

38from datetime import datetime 

39from typing import Any, Dict, Iterable, Generator, List, Tuple, Union 

40 

41from cardinal_pythonlib.datetimefunc import get_now_utc_pendulum 

42from cardinal_pythonlib.sqlalchemy.core_query import count_star, exists_plain 

43from cardinal_pythonlib.sqlalchemy.insert_on_duplicate import ( 

44 insert_with_upsert_if_supported, 

45) 

46from cardinal_pythonlib.sqlalchemy.schema import ( 

47 add_index, 

48 get_column_names, 

49) 

50from sortedcontainers import SortedSet 

51from sqlalchemy.exc import DatabaseError, IntegrityError 

52from sqlalchemy.schema import Column, Index, MetaData, Table 

53from sqlalchemy.sql import column, func, or_, select, table, text 

54from sqlalchemy.types import Boolean 

55 

56from crate_anon.anonymise.config_singleton import config 

57from crate_anon.anonymise.constants import ( 

58 AnonymiseConfigKeys, 

59 AnonymiseDatabaseSafeConfigKeys, 

60 BIGSEP, 

61 DEFAULT_CHUNKSIZE, 

62 DEFAULT_REPORT_EVERY, 

63 IndexType, 

64 TABLE_KWARGS, 

65 SEP, 

66) 

67from crate_anon.anonymise.models import ( 

68 OptOutMpid, 

69 OptOutPid, 

70 PatientInfo, 

71 TridRecord, 

72) 

73from crate_anon.anonymise.patient import Patient 

74from crate_anon.anonymise.ddr import DataDictionaryRow 

75from crate_anon.common.file_io import ( 

76 gen_integers_from_file, 

77 gen_words_from_file, 

78) 

79from crate_anon.common.parallel import is_my_job_by_hash, is_my_job_by_int 

80from crate_anon.common.sql import matches_tabledef 

81 

82log = logging.getLogger(__name__) 

83 

84 

85# ============================================================================= 

86# Database queries 

87# ============================================================================= 

88 

89 

90def identical_record_exists_by_hash( 

91 dest_table: str, pkfield: str, pkvalue: int, hashvalue: str 

92) -> bool: 

93 """ 

94 For a given PK in a given destination table, is there a record with the 

95 specified value for its source hash? 

96 

97 Args: 

98 dest_table: name of the destination table 

99 pkfield: name of the primary key (PK) column in the destination table 

100 pkvalue: integer value of the PK in the destination table 

101 hashvalue: hash of the source 

102 """ 

103 return exists_plain( 

104 config.destdb.session, 

105 dest_table, 

106 column(pkfield) == pkvalue, 

107 column(config.source_hash_fieldname) == hashvalue, 

108 ) 

109 

110 

111def identical_record_exists_by_pk( 

112 dest_table: str, pkfield: str, pkvalue: int 

113) -> bool: 

114 """ 

115 For a given PK in a given destination table, does a record exist? 

116 

117 Args: 

118 dest_table: name of the destination table 

119 pkfield: name of the primary key (PK) column in the destination table 

120 pkvalue: integer value of the PK in the destination table 

121 """ 

122 return exists_plain( 

123 config.destdb.session, dest_table, column(pkfield) == pkvalue 

124 ) 

125 

126 

127# ============================================================================= 

128# Database actions 

129# ============================================================================= 

130 

131 

132def wipe_and_recreate_destination_db( 

133 incremental: bool = False, full_drop_only: bool = False 

134) -> None: 

135 """ 

136 Drop and recreate all destination tables (as specified in the DD) in the 

137 destination database. 

138 

139 Args: 

140 incremental: 

141 Don't drop the tables first, just create them if they don't exist. 

142 full_drop_only: 

143 Drop everything, but don't rebuild. Incompatible with 

144 ``incremental``. 

145 """ 

146 assert not (incremental and full_drop_only) 

147 engine = config.destdb.engine 

148 

149 if full_drop_only: 

150 log.info("Dropping tables from destination database") 

151 else: 

152 log.info( 

153 f"Rebuilding destination database (incremental={incremental})" 

154 ) 

155 

156 # Drop (all tables that we know about -- this prevents orphan tables when 

157 # we alter a data dictionary). 

158 if not incremental: 

159 for tablename in config.dd.get_dest_tables_all(): 

160 sqla_table = config.dd.get_dest_sqla_table(tablename) 

161 log.info(f"Dropping table: {tablename}") 

162 sqla_table.drop(engine, checkfirst=True) 

163 if full_drop_only: 

164 return 

165 

166 # Create and check (tables that will receive content). 

167 for tablename in config.dd.get_dest_tables_included(): 

168 sqla_table = config.dd.get_dest_sqla_table(tablename) 

169 # Create 

170 log.info(f"Creating table: {tablename}") 

171 log.debug(repr(sqla_table)) 

172 sqla_table.create(engine, checkfirst=True) 

173 # Check 

174 resulting_fieldnames = get_column_names(engine, tablename) 

175 target_set = set(sqla_table.columns.keys()) 

176 outcome_set = set(resulting_fieldnames) 

177 missing = list(target_set - outcome_set) 

178 extra = list(outcome_set - target_set) 

179 if missing: 

180 raise RuntimeError( 

181 f"Missing fields in destination table {tablename}: {missing}" 

182 ) 

183 if extra: 

184 log.warning( 

185 f"Extra fields in destination table {tablename}: {extra}" 

186 ) 

187 

188 

189def delete_dest_rows_with_no_src_row( 

190 srcdbname: str, 

191 src_table: str, 

192 report_every: int = DEFAULT_REPORT_EVERY, 

193 chunksize: int = DEFAULT_CHUNKSIZE, 

194) -> None: 

195 """ 

196 For a given source database/table, delete any rows in the corresponding 

197 destination table where there is no corresponding source row. 

198 

199 - Can't do this in a single SQL command, since the engine can't 

200 necessarily see both databases. 

201 - Can't do this in a multiprocess way, because we're trying to do a 

202 ``DELETE WHERE NOT IN``. 

203 - However, we can get stupidly long query lists if we try to ``SELECT`` all 

204 the values and use a ``DELETE FROM x WHERE y NOT IN (v1, v2, v3, ...)`` 

205 query. This crashes the MySQL connection, etc. 

206 - Therefore, we need a temporary table in the destination. 

207 

208 Args: 

209 srcdbname: name (as per the data dictionary) of the source database 

210 src_table: name of the source table 

211 report_every: report to the Python log every *n* records 

212 chunksize: insert records every *n* records 

213 """ 

214 if not config.dd.has_active_destination(srcdbname, src_table): 

215 return 

216 dest_table_name = config.dd.get_dest_table_for_src_db_table( 

217 srcdbname, src_table 

218 ) 

219 start = ( 

220 f"delete_dest_rows_with_no_src_row: " 

221 f"{srcdbname}.{src_table} -> {config.destdb.name}.{dest_table_name}: " 

222 ) 

223 log.info(start + "[WARNING: MAY BE SLOW]") 

224 

225 metadata = MetaData() # operate in isolation! 

226 destengine = config.destdb.engine 

227 destsession = config.destdb.session 

228 dest_table = config.dd.get_dest_sqla_table(dest_table_name) 

229 pkddr = config.dd.get_pk_ddr(srcdbname, src_table) 

230 

231 # If there's no source PK, we just delete everything 

232 if not pkddr: 

233 log.info("... No source PK; deleting everything") 

234 destsession.execute(dest_table.delete()) 

235 commit_destdb() 

236 return 

237 

238 if pkddr.addition_only: 

239 log.info("... Table marked as addition-only; not deleting anything") 

240 return 

241 

242 # Drop/create temporary table 

243 pkfield = "srcpk" 

244 temptable = Table( 

245 config.temporary_tablename, 

246 metadata, 

247 Column(pkfield, pkddr.dest_sqla_coltype, primary_key=True), 

248 **TABLE_KWARGS, 

249 ) 

250 # THIS (ABOVE) IS WHAT CONSTRAINS A USER-DEFINED PK TO BE UNIQUE WITHIN ITS 

251 # TABLE. 

252 log.debug("... dropping temporary table") 

253 temptable.drop(destengine, checkfirst=True) 

254 log.debug("... making temporary table") 

255 temptable.create(destengine, checkfirst=True) 

256 

257 # Populate temporary table, +/- PK translation 

258 n = count_star(config.sources[srcdbname].session, src_table) 

259 log.debug(f"... populating temporary table: {n} records to go") 

260 

261 def insert(records_: List[Dict[str, Any]]) -> None: 

262 log.debug(start + f"... inserting {len(records_)} records") 

263 destsession.execute(temptable.insert(), records_) 

264 

265 i = 0 

266 records = [] # type: List[Dict[str, Any]] 

267 for pk in gen_pks(srcdbname, src_table, pkddr.src_field): 

268 i += 1 

269 if report_every and i % report_every == 0: 

270 log.debug(start + f"... src row# {i} / {n}") 

271 if pkddr.primary_pid: 

272 pk = config.encrypt_primary_pid(pk) 

273 elif pkddr.master_pid: 

274 pk = config.encrypt_master_pid(pk) 

275 records.append({pkfield: pk}) 

276 if i % chunksize == 0: 

277 insert(records) 

278 records = [] # type: List[Dict[str: Any]] 

279 if records: # remainder 

280 insert(records) 

281 commit_destdb() 

282 

283 # 4. Index -- no, hang on, it's a primary key already 

284 

285 # 5. DELETE FROM desttable 

286 # WHERE destpk NOT IN (SELECT srcpk FROM temptable) 

287 log.debug("... deleting from destination where appropriate") 

288 query = dest_table.delete().where( 

289 ~column(pkddr.dest_field).in_(select(temptable.columns[pkfield])) 

290 ) 

291 destsession.execute(query) 

292 commit_destdb() 

293 

294 # 6. Drop temporary table 

295 log.debug("... dropping temporary table") 

296 temptable.drop(destengine, checkfirst=True) 

297 

298 # 7. Commit 

299 commit_destdb() 

300 

301 

302def commit_destdb() -> None: 

303 """ 

304 Execute a ``COMMIT`` on the destination database, and reset row counts. 

305 """ 

306 config.commit_dest_db() 

307 

308 

309def commit_admindb() -> None: 

310 """ 

311 Execute a ``COMMIT`` on the admin database, which is using ORM sessions. 

312 """ 

313 config.admindb.session.commit() 

314 

315 

316# ============================================================================= 

317# Opt-out 

318# ============================================================================= 

319 

320 

321def opting_out_pid(pid: Union[int, str]) -> bool: 

322 """ 

323 Does this patient wish to opt out? 

324 

325 Args: 

326 pid: patient identifier (PID) 

327 """ 

328 if pid is None: 

329 return False 

330 return OptOutPid.opting_out(config.admindb.session, pid) 

331 

332 

333def opting_out_mpid(mpid: Union[int, str]) -> bool: 

334 """ 

335 Does this patient wish to opt out? 

336 

337 Args: 

338 mpid: master patient identifier (MPID) 

339 """ 

340 if mpid is None: 

341 return False 

342 return OptOutMpid.opting_out(config.admindb.session, mpid) 

343 

344 

345def gen_optout_rids() -> Generator[str, None, None]: 

346 """ 

347 Generates RIDs for patients who opt out (which we can use to wipe their 

348 information from the destination database). 

349 

350 Yields: 

351 string: research ID (RID) 

352 """ 

353 session = config.admindb.session 

354 result = session.query(PatientInfo.rid).filter( 

355 or_( 

356 PatientInfo.pid.in_(session.query(OptOutPid.pid)), 

357 PatientInfo.mpid.in_(session.query(OptOutMpid.mpid)), 

358 ) 

359 ) 

360 for row in result: 

361 yield row[0] 

362 

363 

364# ============================================================================= 

365# Functions for getting PIDs from restricted set 

366# ============================================================================= 

367 

368 

369def get_valid_pid_subset(given_pids: List[str]) -> List[str]: 

370 """ 

371 Takes a list of PIDs and returns those in the list which are also in the 

372 database. 

373 """ 

374 pid_is_integer = config.pidtype_is_integer 

375 if pid_is_integer: 

376 # Remove non-integer values of pid if pids are supposed to be integer 

377 final_given_pids = [] # type: List[str] 

378 for pid in given_pids: 

379 try: 

380 int(pid) 

381 final_given_pids.append(pid) 

382 except (TypeError, ValueError): 

383 print(f"pid '{pid}' should be in integer form. ", end="") 

384 print("Excluding value.") 

385 else: 

386 final_given_pids = given_pids 

387 

388 pids = [] # type: List[str] 

389 for ddr in config.dd.rows: 

390 if not ddr.defines_primary_pids: 

391 continue 

392 pidcol = column(ddr.src_field) 

393 session = config.sources[ddr.src_db].session 

394 query = ( 

395 select(pidcol) 

396 .select_from(table(ddr.src_table)) 

397 .where(pidcol is not None) 

398 .distinct() 

399 ) 

400 result = session.execute(query) 

401 real_pids = [str(x[0]) for x in result] 

402 for pid in real_pids: 

403 if pid in final_given_pids: 

404 pids.append(pid) 

405 

406 return pids 

407 

408 

409def get_pid_subset_from_field( 

410 field: str, values_to_find: List[Any] 

411) -> List[Any]: 

412 """ 

413 Takes a field name and elements from that field (values present in that 

414 field) and queries the database to find the PIDs associated with these 

415 values. 

416 

417 Args: 

418 field: field name in the format ``database.table.field`` 

419 values_to_find: values to look for 

420 

421 Returns: 

422 list of PIDs 

423 

424 For example, suppose you have a source table called 

425 ``mydb.mystudyinfo`` like this: 

426 

427 =============== ============================ 

428 pid (INTEGER) include_in_extract (VARCHAR) 

429 =============== ============================ 

430 1 no 

431 2 0 

432 3 yes 

433 4 1 

434 5 definitely 

435 =============== ============================ 

436 

437 then a call like 

438 

439 .. code-block:: python 

440 

441 get_subset_from_field("mydb.mystudyinfo.include_in_extract", 

442 ["yes", "1", "definitely"]) 

443 

444 should return ``[3, 4, 5]``, assuming that ``pid`` has been correctly 

445 marked as the PID column in the data dictionary. 

446 

447 """ 

448 pids = [] # type: List[Any] 

449 

450 # Get database, table and field from 'field' 

451 db_parts = field.split(".") 

452 assert ( 

453 len(db_parts) == 3 

454 ), "field parameter must be of the form 'db.table.field'" 

455 db, tablename, fieldname = db_parts 

456 

457 try: 

458 session = config.sources[db].session 

459 except (KeyError, AttributeError): 

460 print( 

461 f"Unable to connect to database {db}. " 

462 f"Remember argument to '--restrict' must be of the form " 

463 f"'database.table.field', or be 'pid'." 

464 ) 

465 return pids 

466 

467 fieldcol = column(fieldname) 

468 row = None # for type checker 

469 for ddr in config.dd.rows: 

470 if ddr.src_db != db or not ddr.defines_primary_pids: 

471 continue 

472 # Check if the field given is in the table with the pids 

473 if fieldname in config.dd.get_fieldnames_for_src_table( 

474 db, ddr.src_table 

475 ): 

476 pidcol = column(ddr.src_field) 

477 session = config.sources[ddr.src_db].session 

478 # Find pids corresponding to the given values of specified field 

479 query = ( 

480 select(pidcol) 

481 .select_from(table(ddr.src_table)) 

482 .where((fieldcol.in_(values_to_find)) & (pidcol is not None)) 

483 .distinct() 

484 ) 

485 result = session.execute(query) 

486 pids.extend([x[0] for x in result]) 

487 # As there is only one relavant database here, we return pids 

488 return pids 

489 # Mark out row of dd with primary pid for relavant database 

490 row = ddr 

491 if row is None: 

492 # Didn't find one 

493 return [] 

494 

495 # ###### Doesn't work! Trying in plain SQL ######### 

496 # # Deal with case where the field specified isn't in the table 

497 # # with the primary pid 

498 # session = config.sources[db].session 

499 # pidcol = column(row.src_field) 

500 # session = config.sources[ddr.src_db].session 

501 # chosen_table = table(tablename) 

502 # ddr_table = table(row.src_table) 

503 # join_obj = ddr_table.join(chosen_table, 

504 # chosen_table.c.fieldcol == ddr_table.c.pidcol) 

505 # query = ( 

506 # select(pidcol). 

507 # select_from(join_obj). 

508 # where((chosen_table.fieldcol.in_(field_elements)) & 

509 # (ddr_table.pidcol is not None)). 

510 # distinct() 

511 # ) 

512 

513 # # Deal with case where the field specified isn't in the table 

514 # # with the primary pid 

515 source_field = row.src_field 

516 source_table = row.src_table 

517 # Convert list to string in correct form for query 

518 txt_elements = ", ".join(values_to_find) 

519 txt_elements = "(" + txt_elements + ")" 

520 

521 sql = text( 

522 f""" 

523 SELECT {source_table}.{source_field} 

524 FROM {source_table} 

525 JOIN {tablename} 

526 ON {source_table}.{source_field} = {tablename}.{fieldname} 

527 WHERE {tablename}.{fieldname} IN {txt_elements} 

528 AND {source_table}.{source_field} IS NOT NULL 

529 """ 

530 ) 

531 result = session.execute(sql) 

532 

533 # todo:: fix this raw SQL to SQLAlchemy; should be possible. 

534 # Is the proble the "&"? 

535 

536 pids.extend([x[0] for x in result]) 

537 return pids 

538 

539 

540def fieldname_is_pid(field: str) -> bool: 

541 """ 

542 Checks if a field name is the literal ``'pid'`` or, if in the form 

543 ``'database.table.field'``, is the name of a primary PID field in the 

544 source database. If either of those conditions is met, return ``True``; 

545 otherwise, ``False``. 

546 """ 

547 if field == "pid": 

548 return True 

549 for ddr in config.dd.rows: 

550 if ddr.defines_primary_pids: 

551 if field == ddr.src_signature: 

552 return True 

553 return False 

554 

555 

556def get_pids_from_file(field: str, filename: str) -> List[str]: 

557 """ 

558 Takes a field name, and a filename of values of that field, and returns 

559 a list of PIDs associated with them. 

560 

561 Args: 

562 field: 

563 a fieldname of the format ``database.table.field``, or the literal 

564 ``pid`` 

565 filename: 

566 A file containing words that represent values to look for, as 

567 follows. 

568 

569 - If ``field`` is the string literal ``'pid'``, or is the name of 

570 a source database field containing PIDs, then the values in the 

571 file should be PIDs. We check that they are valid. 

572 - If it's another kind of field, look for values (from the file) 

573 in this field, and return the value of the PID column from the 

574 same row of the table. (See :func:`get_pid_subset_from_field`.) 

575 

576 Returns: 

577 list of PIDs 

578 """ 

579 field_is_pid = fieldname_is_pid(field) 

580 # pid_is_integer = config.pidtype_is_integer 

581 if field_is_pid: 

582 # If the chosen field is a PID field, just make sure all PIDs in the 

583 # file are valid 

584 given_pids = list(gen_words_from_file(filename)) 

585 pids = get_valid_pid_subset(given_pids) 

586 else: 

587 field_elements = list(gen_words_from_file(filename)) 

588 pids = get_pid_subset_from_field(field, field_elements) 

589 

590 return pids 

591 

592 

593def get_pids_from_list(field: str, values: List[str]) -> List[str]: 

594 """ 

595 Takes a field name and a list of values, and returns a list of PIDs 

596 associated with them. 

597 

598 Args: 

599 field: 

600 a fieldname of the format ``database.table.field``, or the literal 

601 ``pid`` 

602 values: 

603 Values to look for, as follows. 

604 

605 - If ``field`` is the string literal ``'pid'``, or is the name of 

606 a source database field containing PIDs, then the values in the 

607 should be PIDs. We check that they are valid. 

608 - If it's another kind of field, look for the values in this field, 

609 and return the value of the PID column from the same row of the 

610 table. (See :func:`get_pid_subset_from_field`.) 

611 

612 Returns: 

613 list of PIDs 

614 

615 """ 

616 field_is_pid = fieldname_is_pid(field) 

617 if field_is_pid: 

618 pids = get_valid_pid_subset(values) 

619 else: 

620 pids = get_pid_subset_from_field(field, values) 

621 

622 return pids 

623 

624 

625def get_pids_from_limits(low: int, high: int) -> List[Any]: 

626 """ 

627 Finds PIDs from the source database that are between ``low`` and ``high`` 

628 inclusive. 

629 

630 - The SQL ``BETWEEN`` operator is inclusive 

631 (https://www.w3schools.com/sql/sql_between.asp). 

632 

633 Args: 

634 low: lower (inclusive) limit 

635 high: upper (inclusive) limit 

636 

637 Returns: 

638 list of PIDs in this range 

639 """ 

640 pids = [] # type: List[Any] 

641 for ddr in config.dd.rows: 

642 if not ddr.defines_primary_pids: 

643 continue 

644 pidcol = column(ddr.src_field) 

645 session = config.sources[ddr.src_db].session 

646 query = ( 

647 select(pidcol) 

648 .select_from(table(ddr.src_table)) 

649 .where((pidcol.between(low, high)) & (pidcol is not None)) 

650 .distinct() 

651 ) 

652 result = session.execute(query) 

653 pids.extend([x[0] for x in result]) 

654 

655 return pids 

656 

657 

658def get_pids_query_field_limits(field: str, low: int, high: int) -> List[Any]: 

659 """ 

660 Takes a field name and queries the database to find the PIDs associated 

661 with records where ``field`` is in the range ``low`` to ``high`` inclusive. 

662 

663 Args: 

664 field: field name in the format ``database.table.field`` 

665 low: lower (inclusive) limit 

666 high: upper (inclusive) limit 

667 

668 Returns: 

669 list of PIDs 

670 

671 For example, suppose you have a source table called ``mydb.myoptouts`` like 

672 this: 

673 

674 =============== ========================== 

675 pid (INTEGER) opt_out_level (INTEGER) 

676 =============== ========================== 

677 1 0 

678 2 1 

679 3 2 

680 4 3 

681 5 4 

682 =============== ========================== 

683 

684 then a call like 

685 

686 .. code-block:: python 

687 

688 get_subset_from_field("mydb.myoptouts.opt_out_level", 2, 3) 

689 

690 should return ``[3, 4]``, assuming that ``pid`` has been correctly marked 

691 as the PID column in the data dictionary. 

692 """ 

693 pids = [] # type: List[Any] 

694 # Get database, table and field from 'field' 

695 db_parts = field.split(".") 

696 assert ( 

697 len(db_parts) == 3 

698 ), "field parameter must be of the form 'db.table.field'" 

699 db, tablename, fieldname = db_parts 

700 

701 try: 

702 session = config.sources[db].session 

703 except (KeyError, AttributeError): 

704 print( 

705 f"Unable to connect to database {db}. " 

706 f"Remember argument to '--restrict' must be of the form " 

707 f"'database.table.field', or be 'pid'." 

708 ) 

709 return pids 

710 

711 fieldcol = column(fieldname) 

712 row = None 

713 for ddr in config.dd.rows: 

714 if ddr.src_db != db or not ddr.defines_primary_pids: 

715 continue 

716 # Check if the field given is in the table with the pids 

717 if fieldname in config.dd.get_fieldnames_for_src_table( 

718 ddr.src_db, ddr.src_table 

719 ): 

720 pidcol = column(ddr.src_field) 

721 session = config.sources[ddr.src_db].session 

722 # Find pids corresponding to the given values of specified field 

723 query = ( 

724 select(pidcol) 

725 .select_from(table(ddr.src_table)) 

726 .where((fieldcol.between(low, high)) & (pidcol is not None)) 

727 .distinct() 

728 ) 

729 result = session.execute(query) 

730 pids.extend([x[0] for x in result]) 

731 # As there is only one relavant database here, we return pids 

732 return pids 

733 # Mark out row of dd with primary pid for relavant database 

734 row = ddr 

735 if row is None: 

736 # Didn't find one 

737 return [] 

738 

739 # Deal with case where the field specified isn't in the table 

740 # with the primary pid 

741 source_field = row.src_field 

742 source_table = row.src_table 

743 sql = text( 

744 f""" 

745 SELECT {source_table}.{source_field} 

746 FROM {source_table} 

747 JOIN {tablename} 

748 ON {source_table}.{source_field} = {tablename}.{fieldname}" 

749 WHERE ({tablename}.{fieldname} BETWEEN {low} AND {high}) 

750 AND {source_table}.{source_field} IS NOT NULL" 

751 """ 

752 ) 

753 result = session.execute(sql) 

754 

755 # todo:: fix raw SQL as above 

756 

757 pids.extend([x[0] for x in result]) 

758 

759 return pids 

760 

761 

762def get_pids_from_field_limits(field: str, low: int, high: int) -> List[Any]: 

763 """ 

764 Takes a field name and a lower/upper limit, and returns a list of 

765 associated PIDs. 

766 

767 Args: 

768 field: 

769 a fieldname of the format ``database.table.field``, or the literal 

770 ``pid`` 

771 low: 

772 lower (inclusive) limit 

773 high: 

774 upper (inclusive) limit 

775 

776 The range is used as follows. 

777 

778 - If ``field`` is the string literal ``'pid'``, or is the name of 

779 a source database field containing PIDs, then fetch PIDs in the specified 

780 range and check that they are valid. 

781 - If it's another kind of field, look for rows where this field is in the 

782 specified range, and return the value of the PID column from the same row 

783 of the table. (See :func:`get_pids_query_field_limits`.) 

784 

785 Returns: 

786 list of PIDs 

787 

788 """ 

789 field_is_pid = fieldname_is_pid(field) 

790 if field_is_pid: 

791 pids = get_pids_from_limits(low, high) 

792 else: 

793 pids = get_pids_query_field_limits(field, low, high) 

794 

795 return pids 

796 

797 

798# ============================================================================= 

799# Generators. Anything reading the main database should use a generator, so the 

800# script can scale to databases of arbitrary size. 

801# ============================================================================= 

802 

803 

804def gen_patient_ids( 

805 tasknum: int = 0, 

806 ntasks: int = 1, 

807 specified_pids: List[Union[int, str]] = None, 

808) -> Generator[Union[int, str], None, None]: 

809 """ 

810 Generate patient IDs. 

811 

812 Args: 

813 tasknum: task number of this process (for dividing up work) 

814 ntasks: total number of processes (for dividing up work) 

815 specified_pids: optional list of PIDs to restrict ourselves to 

816 

817 Yields: 

818 integer or string patient IDs (PIDs) 

819 

820 - Assigns work to threads/processes, via the simple expedient of processing 

821 only those patient ID numbers where ``patientnum % ntasks == tasknum`` 

822 (for integers), or an equivalent method for string PIDs. 

823 """ 

824 

825 assert ntasks >= 1 

826 assert 0 <= tasknum < ntasks 

827 

828 pid_is_integer = config.pidtype_is_integer 

829 distribute_by_hash = ntasks > 1 and not pid_is_integer 

830 

831 # If we're going to define based on >1 table, we need to keep track of 

832 # what we've processed. However, if we only have one table, we don't. 

833 # We can't use the mapping table easily (*), because it leads to thread/ 

834 # process locking for database access. So we use a set. 

835 # (*) if not patient_id_exists_in_mapping_db(admindb, patient_id): ... 

836 

837 # Debug option? 

838 if config.debug_pid_list: 

839 log.warning("USING MANUALLY SPECIFIED PATIENT ID LIST") 

840 for pid in config.debug_pid_list: 

841 if pid_is_integer: 

842 pid = int(pid) 

843 if is_my_job_by_int(pid, tasknum=tasknum, ntasks=ntasks): 

844 yield pid 

845 else: 

846 pid = str(pid) 

847 if is_my_job_by_hash(pid, tasknum=tasknum, ntasks=ntasks): 

848 yield pid 

849 return 

850 

851 # Subset specified? 

852 if specified_pids is not None: 

853 for i, pid in enumerate(specified_pids): 

854 if i % ntasks == tasknum: 

855 yield pid 

856 return 

857 

858 # Otherwise do it properly: 

859 keeping_track = config.dd.n_definers > 1 

860 processed_ids = set() # used only if keeping_track is True 

861 # ... POTENTIAL FOR MEMORY PROBLEM WITH V. BIG DB 

862 # ... if we ever get near that limit (for a huge number of *patients*, 

863 # which is much less likely than a huge number of other records), we'd 

864 # need to generate the IDs and stash them in a temporary table, then 

865 # work through that. However, a few million patients should be fine 

866 # for a Python set on realistic computers. 

867 n_found = 0 

868 debuglimit = config.debug_max_n_patients 

869 for ddr in config.dd.rows: 

870 if not ddr.defines_primary_pids: 

871 continue 

872 log.debug( 

873 f"Looking for patient IDs in " f"{ddr.src_table}.{ddr.src_field}" 

874 ) 

875 session = config.sources[ddr.src_db].session 

876 pidcol = column(ddr.src_field) 

877 query = ( 

878 select(pidcol) 

879 .select_from(table(ddr.src_table)) 

880 .where(pidcol is not None) 

881 .distinct() 

882 # .order_by(pidcol) # no need to order by 

883 ) 

884 if ntasks > 1 and pid_is_integer: 

885 # With integers, we can take our slice of the workload through a 

886 # restricted query. 

887 query = query.where(pidcol % ntasks == tasknum) 

888 result = session.execute(query) 

889 for row in result: 

890 # Extract patient ID 

891 pid = row[0] 

892 

893 # Duff? 

894 if pid is None: 

895 log.warning("Patient ID is NULL") 

896 continue 

897 

898 # Ensure type is correct -- even if we are querying from an integer 

899 # field and then behaving as if it is a string subsequently. 

900 # Note that e.g. SELECT '123' = 123 gives 1 (true), i.e. strings 

901 # can be compared to integers. 

902 if pid_is_integer: 

903 pid = int(pid) 

904 else: 

905 pid = str(pid) 

906 # Operating on non-integer PIDs and not our job? 

907 if distribute_by_hash and not is_my_job_by_hash( 

908 pid, tasknum=tasknum, ntasks=ntasks 

909 ): 

910 continue 

911 

912 # Duplicate? 

913 if keeping_track: 

914 # Consider, for non-integer PIDs, storing the hash64 instead 

915 # of the raw value. 

916 if pid in processed_ids: 

917 # we've done this one already; skip it this time 

918 continue 

919 processed_ids.add(pid) 

920 

921 # Valid one 

922 log.debug(f"Found patient id: {pid}") 

923 n_found += 1 

924 yield pid 

925 

926 # Too many? 

927 if 0 < debuglimit <= n_found: 

928 log.warning( 

929 f"Not fetching more than {debuglimit} " 

930 f"patients (in total for this process) due to " 

931 f"{AnonymiseConfigKeys.DEBUG_MAX_N_PATIENTS} limit" 

932 ) 

933 result.close() 

934 # http://docs.sqlalchemy.org/en/latest/core/connections.html 

935 return 

936 

937 

938def estimate_count_patients() -> int: 

939 """ 

940 Estimate the number of patients in the source database. 

941 

942 We can't easily and quickly get the total number of patients, because they 

943 may be defined in multiple tables across multiple databases. We shouldn't 

944 fetch them all into Python in case there are billions, and it's a waste of 

945 effort to stash them in a temporary table and count unique rows, because 

946 this is all only for a progress indicator. So we approximate. 

947 """ 

948 count = 0 

949 for ddr in config.dd.rows: 

950 if not ddr.defines_primary_pids: 

951 continue 

952 session = config.sources[ddr.src_db].session 

953 tablename = ddr.src_table 

954 count += count_star(session, tablename) 

955 return count 

956 

957 

958def gen_rows( 

959 dbname: str, 

960 sourcetable: str, 

961 sourcefields: Iterable[str], 

962 pid: Union[int, str] = None, 

963 intpkname: str = None, 

964 tasknum: int = 0, 

965 ntasks: int = 1, 

966 debuglimit: int = 0, 

967) -> Generator[List[Any], None, None]: 

968 """ 

969 Generates rows from a source table: 

970 - ... each row being a list of values 

971 - ... each value corresponding to a field in sourcefields. 

972 - ... optionally restricted to a single patient 

973 

974 If the table has a PK and we're operating in a multitasking situation, 

975 generate just the rows for this task (thread/process). 

976 

977 Args: 

978 dbname: name (as per the data dictionary) of the source database 

979 sourcetable: name of the source table 

980 sourcefields: names of fields in the source table 

981 pid: patient ID (PID) 

982 intpkname: name of the integer PK column in the source table, if one 

983 exists 

984 tasknum: task number of this process (for dividing up work) 

985 ntasks: total number of processes (for dividing up work) 

986 debuglimit: if specified, the maximum number of rows to process 

987 

988 Yields: 

989 lists, each representing one row and containing values for each of the 

990 ``sourcefields`` 

991 """ 

992 t = config.sources[dbname].metadata.tables[sourcetable] 

993 q = select(*[column(c) for c in sourcefields]).select_from(t) 

994 # not ordered 

995 

996 # Restrict to one patient? 

997 if pid is not None: 

998 pidcol_name = config.dd.get_pid_name(dbname, sourcetable) 

999 q = q.where(column(pidcol_name) == pid) 

1000 else: 

1001 # For non-patient tables: divide up rows across tasks? 

1002 if intpkname is not None and ntasks > 1: 

1003 q = q.where(column(intpkname) % ntasks == tasknum) 

1004 # This does not require a user-defined PK to be unique. But other 

1005 # constraints do: see delete_dest_rows_with_no_src_row(). 

1006 

1007 db_table_tuple = (dbname, sourcetable) 

1008 result = config.sources[dbname].session.execute(q) 

1009 for row in result: 

1010 if 0 < debuglimit <= config.rows_inserted_per_table[db_table_tuple]: 

1011 if not config.warned_re_limits[db_table_tuple]: 

1012 log.warning( 

1013 f"Table {dbname}.{sourcetable}: not fetching more than " 

1014 f"{debuglimit} rows (in total for this process) due to " 

1015 f"{AnonymiseDatabaseSafeConfigKeys.DEBUG_ROW_LIMIT}" 

1016 ) 

1017 config.warned_re_limits[db_table_tuple] = True 

1018 result.close() 

1019 # http://docs.sqlalchemy.org/en/latest/core/connections.html 

1020 return 

1021 config.notify_src_bytes_read(sys.getsizeof(row)) # ... approximate! 

1022 yield list(row) 

1023 # yield dict(zip(row.keys(), row)) 

1024 # see also https://stackoverflow.com/questions/19406859 

1025 config.rows_inserted_per_table[db_table_tuple] += 1 

1026 

1027 

1028def count_rows( 

1029 dbname: str, sourcetable: str, pid: Union[int, str] = None 

1030) -> int: 

1031 """ 

1032 Count the number of rows in a table for a given PID. 

1033 

1034 Args: 

1035 dbname: name (as per the data dictionary) of the source database 

1036 sourcetable: name of the source table 

1037 pid: patient ID (PID) 

1038 

1039 Returns: 

1040 the number of records 

1041 

1042 """ 

1043 # Count function to match gen_rows() 

1044 session = config.sources[dbname].session 

1045 query = select(func.count()).select_from(table(sourcetable)) 

1046 if pid is not None: 

1047 pidcol_name = config.dd.get_pid_name(dbname, sourcetable) 

1048 if not pidcol_name: 

1049 raise ValueError( 

1050 "No row in the data dictionary provides primary PID " 

1051 f"information for db:{dbname}, table: {sourcetable}" 

1052 ) 

1053 query = query.where(column(pidcol_name) == pid) 

1054 return session.execute(query).scalar() 

1055 

1056 

1057def gen_index_row_sets_by_table( 

1058 tasknum: int = 0, ntasks: int = 1 

1059) -> Generator[Tuple[str, List[DataDictionaryRow]], None, None]: 

1060 """ 

1061 Generate ``table, list_of_dd_rows_for_indexed_fields`` tuples for all 

1062 tables requiring indexing. 

1063 

1064 Args: 

1065 tasknum: task number of this process (for dividing up work) 

1066 ntasks: total number of processes (for dividing up work) 

1067 

1068 Yields: 

1069 tuple: ``table, list_of_dd_rows_for_indexed_fields`` for each table 

1070 as above 

1071 

1072 """ 

1073 indexrows = [ 

1074 ddr 

1075 for ddr in config.dd.rows 

1076 if ddr.index != IndexType.NONE and not ddr.omit 

1077 ] 

1078 tables = SortedSet([r.dest_table for r in indexrows]) 

1079 # must sort for parallel processing consistency: set() order varies 

1080 for i, t in enumerate(tables): 

1081 if i % ntasks != tasknum: 

1082 continue 

1083 tablerows = [r for r in indexrows if r.dest_table == t] 

1084 yield t, tablerows 

1085 

1086 

1087def gen_nonpatient_tables_without_int_pk( 

1088 tasknum: int = 0, ntasks: int = 1 

1089) -> Generator[Tuple[str, str], None, None]: 

1090 """ 

1091 Generate ``(source db name, source table)`` tuples for all tables that 

1092 

1093 (a) don't contain patient information and 

1094 (b) don't have an integer PK. 

1095 

1096 Args: 

1097 tasknum: task number of this process (for dividing up work) 

1098 ntasks: total number of processes (for dividing up work) 

1099 

1100 Yields: 

1101 tuple: ``source_db_name, source_table`` for each table as above 

1102 

1103 """ 

1104 db_table_pairs = config.dd.get_src_dbs_tables_with_no_pt_info_no_pk() 

1105 # ... returns a SortedSet, so safe to divide parallel processing like this: 

1106 for i, pair in enumerate(db_table_pairs): 

1107 if i % ntasks != tasknum: 

1108 continue 

1109 yield pair # will be a (dbname, table) tuple 

1110 

1111 

1112def gen_nonpatient_tables_with_int_pk() -> ( 

1113 Generator[Tuple[str, str, str], None, None] 

1114): 

1115 """ 

1116 Generate ``source_db_name, source_table, pk_name`` tuples for all tables 

1117 that 

1118 

1119 (a) don't contain patient information and 

1120 (b) do have an integer PK. 

1121 """ 

1122 db_table_pairs = config.dd.get_src_dbs_tables_with_no_pt_info_int_pk() 

1123 for pair in db_table_pairs: 

1124 db = pair[0] 

1125 tablename = pair[1] 

1126 pkname = config.dd.get_int_pk_name(db, tablename) 

1127 yield db, tablename, pkname 

1128 

1129 

1130def gen_pks( 

1131 srcdbname: str, tablename: str, pkname: str 

1132) -> Generator[int, None, None]: 

1133 """ 

1134 Generate PK values from a table. 

1135 

1136 Args: 

1137 srcdbname: name (as per the data dictionary) of the database 

1138 tablename: name of the table 

1139 pkname: name of the PK column 

1140 

1141 Yields: 

1142 int: each primary key 

1143 """ 

1144 db = config.sources[srcdbname] 

1145 t = db.metadata.tables[tablename] 

1146 q = select(column(pkname)).select_from(t) 

1147 result = db.session.execute(q) 

1148 for row in result: 

1149 yield row[0] 

1150 

1151 

1152# ============================================================================= 

1153# Core functions 

1154# ============================================================================= 

1155# - For multithreaded use, the patients are divvied up across the threads. 

1156# - KEY THREADING RULE: ALL THREADS MUST HAVE FULLY INDEPENDENT DATABASE 

1157# CONNECTIONS. 

1158 

1159 

1160def process_table( 

1161 sourcedbname: str, 

1162 sourcetable: str, 

1163 patient: Patient = None, 

1164 incremental: bool = False, 

1165 intpkname: str = None, 

1166 tasknum: int = 0, 

1167 ntasks: int = 1, 

1168 free_text_limit: int = None, 

1169 exclude_scrubbed_fields: bool = False, 

1170) -> None: 

1171 """ 

1172 Process a table. This can either be a patient table (in which case the 

1173 patient's scrubber is applied and only rows for that patient are processed) 

1174 or not (in which case the table is just copied). 

1175 

1176 Args: 

1177 sourcedbname: 

1178 name (as per the data dictionary) of the source database 

1179 sourcetable: 

1180 name of the source table 

1181 patient: 

1182 :class:`crate_anon.anonymise.patient.Patient` object, or ``None`` 

1183 for non-patient tables 

1184 incremental: 

1185 perform an incremental update, rather than a full run? 

1186 intpkname: 

1187 name of the integer PK column in the source table 

1188 tasknum: 

1189 task number of this process (for dividing up work) 

1190 ntasks: 

1191 total number of processes (for dividing up work) 

1192 free_text_limit: 

1193 If specified, any text field longer than this will be excluded 

1194 exclude_scrubbed_fields: 

1195 Exclude all text fields which are being scrubbed. 

1196 """ 

1197 start = f"process_table: {sourcedbname}.{sourcetable}:" 

1198 pid = None if patient is None else patient.pid 

1199 log.debug(f"{start} pid={pid}, incremental={incremental}") 

1200 

1201 # Limit the data quantity for debugging? 

1202 srccfg = config.sources[sourcedbname].srccfg 

1203 if matches_tabledef(sourcetable, srccfg.debug_limited_tables): 

1204 debuglimit = srccfg.debug_row_limit 

1205 # log.debug(f"Limiting table {sourcetable} to {debuglimit} rows " 

1206 # f"(per process)") 

1207 else: 

1208 debuglimit = 0 

1209 

1210 ddrows = config.dd.get_rows_for_src_table(sourcedbname, sourcetable) 

1211 if all(ddr.omit for ddr in ddrows): 

1212 log.debug("... ... all columns omitted.") 

1213 return 

1214 addhash = any(ddr.add_src_hash for ddr in ddrows) 

1215 addtrid = any(ddr.primary_pid and not ddr.omit for ddr in ddrows) 

1216 constant = any(ddr.constant for ddr in ddrows) 

1217 # If addhash or constant is true AND we are not omitting all rows, then 

1218 # the non-omitted rows will include the source PK (by the data dictionary's 

1219 # validation process). 

1220 ddrows = [ 

1221 ddr 

1222 for ddr in ddrows 

1223 if ( 

1224 (not ddr.omit) # used for data 

1225 or (addhash and ddr.scrub_src) # used for hash 

1226 or ddr.inclusion_values # used for filter 

1227 or ddr.exclusion_values # used for filter 

1228 ) 

1229 ] 

1230 # Exclude all text fields over a chosen length 

1231 if free_text_limit is not None: 

1232 ddrows = [ 

1233 ddr 

1234 for ddr in ddrows 

1235 if ( 

1236 ddr.src_textlength is None 

1237 or ddr.src_textlength <= free_text_limit 

1238 ) 

1239 ] 

1240 # Exclude all scrubbed fields if requested 

1241 if exclude_scrubbed_fields: 

1242 ddrows = [ 

1243 ddr 

1244 for ddr in ddrows 

1245 if (not ddr.src_is_textual) or (not ddr.being_scrubbed) 

1246 ] 

1247 if not ddrows: 

1248 # No columns to process at all. 

1249 return 

1250 dest_table = ddrows[0].dest_table 

1251 sourcefields = [] # type: List[str] 

1252 pkfield_index = None 

1253 src_pk_name = None 

1254 dest_pk_name = None 

1255 pid_is_hashed_in_dest = False 

1256 for i, ddr in enumerate(ddrows): 

1257 # log.debug(f"DD row: {str(ddr)}") 

1258 if ddr.pk: 

1259 pkfield_index = i 

1260 src_pk_name = ddr.src_field 

1261 dest_pk_name = ddr.dest_field 

1262 if ddr.add_src_hash and ddr.primary_pid: 

1263 pid_is_hashed_in_dest = True 

1264 sourcefields.append(ddr.src_field) 

1265 srchash = None 

1266 timefield = config.timefield 

1267 add_mrid_wherever_rid_added = config.add_mrid_wherever_rid_added 

1268 mrid_fieldname = config.master_research_id_fieldname 

1269 sqla_table = config.dd.get_dest_sqla_table(dest_table) 

1270 session = config.destdb.session 

1271 

1272 # Count what we'll do, so we can give a better indication of progress 

1273 count = count_rows(sourcedbname, sourcetable, pid) 

1274 n = 0 

1275 recnum = tasknum or 0 

1276 

1277 # Process the rows 

1278 

1279 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1280 # Generate data 

1281 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1282 for row in gen_rows( 

1283 sourcedbname, 

1284 sourcetable, 

1285 sourcefields, 

1286 pid, 

1287 debuglimit=debuglimit, 

1288 intpkname=intpkname, 

1289 tasknum=tasknum, 

1290 ntasks=ntasks, 

1291 ): 

1292 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1293 # Reporting 

1294 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1295 n += 1 

1296 if n % config.report_every_n_rows == 0: 

1297 log.info( 

1298 f"{start} processing record {recnum + 1}/{count}" 

1299 f"{' for this patient' if pid is not None else ''} " 

1300 f"({config.overall_progress()})" 

1301 ) 

1302 recnum += ntasks or 1 

1303 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1304 # Change detection: source hash and constant rows 

1305 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1306 pkvalue = None 

1307 if pkfield_index is not None: 

1308 pkvalue = row[pkfield_index] 

1309 if pid_is_hashed_in_dest: 

1310 pkvalue = config.encrypt_primary_pid(pkvalue) 

1311 

1312 if addhash: 

1313 srchash = config.hash_object(row) 

1314 if incremental and identical_record_exists_by_hash( 

1315 dest_table, dest_pk_name, pkvalue, srchash 

1316 ): 

1317 log.debug( 

1318 f"... ... skipping unchanged record (identical by hash): " 

1319 f"{sourcedbname}.{sourcetable}.{src_pk_name} = " 

1320 f"(destination) {dest_table}.{dest_pk_name} = " 

1321 f"{pkvalue}" 

1322 ) 

1323 continue 

1324 if constant: 

1325 if incremental and identical_record_exists_by_pk( 

1326 dest_table, dest_pk_name, pkvalue 

1327 ): 

1328 log.debug( 

1329 f"... ... skipping unchanged record (identical by PK and " 

1330 f"marked as constant): " 

1331 f"{sourcedbname}.{sourcetable}.{src_pk_name} = " 

1332 f"(destination) {dest_table}.{dest_pk_name} = " 

1333 f"{pkvalue}" 

1334 ) 

1335 continue 

1336 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1337 # Iterate through values, altering them if necessary 

1338 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1339 destvalues = {} # type: Dict[str, Any] 

1340 skip_row = False 

1341 for i, ddr in enumerate(ddrows): 

1342 value = row[i] 

1343 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1344 # Skip row? 

1345 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1346 if ddr.skip_row_by_value(value): 

1347 log.debug( 

1348 "... ... skipping row based on inclusion/exclusion values" 

1349 ) 

1350 skip_row = True 

1351 break # skip row 

1352 # NOTE: would be most efficient if ddrows were ordered with 

1353 # inclusion/exclusion fields first. (Not yet done automatically.) 

1354 

1355 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1356 # Skip column? 

1357 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1358 if ddr.omit: 

1359 continue # skip column 

1360 

1361 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1362 # Value alteration: "special" methods (PID, MPID) or other methods 

1363 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1364 if ddr.primary_pid: 

1365 assert str(value) == str(patient.pid), ( 

1366 # We compare using str() because we may have an integer and 

1367 # a string version. These will hash to the same value (see 

1368 # scrub_tests.py). 

1369 f"PID mismatch from {ddr.src_signature}: " 

1370 f"str(value) = {str(value)!r} but " 

1371 f"str(patient.pid) = {str(patient.pid)!r}" 

1372 ) 

1373 value = patient.rid 

1374 elif ddr.master_pid: 

1375 value = config.encrypt_master_pid(value) 

1376 elif ddr.third_party_pid: 

1377 # Third-party PID; we encrypt with the same hasher as for other 

1378 # PIDs, so that de-identified records remain linkable. 

1379 value = config.encrypt_primary_pid(value) 

1380 else: 

1381 # Value alteration: other methods 

1382 for alter_method in ddr.alter_methods: 

1383 value, skip_row = alter_method.alter( 

1384 value=value, 

1385 ddr=ddr, 

1386 row=row, 

1387 ddrows=ddrows, 

1388 patient=patient, 

1389 ) 

1390 if skip_row: 

1391 break # from alter method loop 

1392 

1393 if skip_row: 

1394 break # from data dictionary row (field) loop 

1395 

1396 destvalues[ddr.dest_field] = value 

1397 

1398 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1399 # Special timestamp field 

1400 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1401 if timefield: 

1402 destvalues[timefield] = datetime.utcnow() 

1403 

1404 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1405 # Skip the row? 

1406 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1407 if skip_row or not destvalues: 

1408 continue # next row 

1409 

1410 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1411 # Add extra columns? 

1412 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1413 if addhash: 

1414 destvalues[config.source_hash_fieldname] = srchash 

1415 if addtrid: 

1416 destvalues[config.trid_fieldname] = patient.trid 

1417 if add_mrid_wherever_rid_added: 

1418 destvalues[mrid_fieldname] = patient.mrid 

1419 

1420 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1421 # Insert values into database 

1422 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1423 q = insert_with_upsert_if_supported( 

1424 table=sqla_table, values=destvalues, session=session 

1425 ) 

1426 try: 

1427 session.execute(q) 

1428 except IntegrityError: 

1429 log.warning( 

1430 "Skipping record due to IntegrityError. Non-unique primary " 

1431 f"key? {sourcedbname}.{sourcetable}.{src_pk_name} = " 

1432 f"(destination) {dest_table}.{dest_pk_name} = " 

1433 f"{pkvalue}" 

1434 ) 

1435 

1436 # Trigger an early commit? 

1437 config.notify_dest_db_transaction( 

1438 n_rows=1, n_bytes=sys.getsizeof(destvalues) 

1439 ) # ... approximate! 

1440 # ... quicker than e.g. len(repr(...)), as judged by a timeit() call. 

1441 

1442 log.debug(f"{start} finished: pid={pid}") 

1443 commit_destdb() 

1444 

1445 

1446def create_indexes(tasknum: int = 0, ntasks: int = 1) -> None: 

1447 """ 

1448 Create indexes for the destination tables. 

1449 

1450 Args: 

1451 tasknum: task number of this process (for dividing up work) 

1452 ntasks: total number of processes (for dividing up work) 

1453 """ 

1454 log.info(SEP + "Create indexes") 

1455 engine = config.get_destdb_engine_outside_transaction() 

1456 mssql = engine.dialect.name == "mssql" 

1457 mssql_fulltext_columns_by_table = [] # type: List[List[Column]] 

1458 for tablename, tablerows in gen_index_row_sets_by_table( 

1459 tasknum=tasknum, ntasks=ntasks 

1460 ): 

1461 sqla_table = config.dd.get_dest_sqla_table(tablename) 

1462 mssql_fulltext_columns = [] # type: List[Column] 

1463 for tr in tablerows: 

1464 sqla_column = sqla_table.columns[tr.dest_field] 

1465 fulltext = tr.index == IndexType.FULLTEXT 

1466 if fulltext and mssql: 

1467 # Special processing: we can only create one full-text index 

1468 # per table under SQL Server, but it can cover multiple 

1469 # columns; see below 

1470 mssql_fulltext_columns.append(sqla_column) 

1471 else: 

1472 add_index( 

1473 engine=engine, 

1474 sqla_column=sqla_column, 

1475 unique=(tr.index == IndexType.UNIQUE), 

1476 fulltext=fulltext, 

1477 length=tr.indexlen, 

1478 ) 

1479 # Extra indexes for TRID, MRID? 

1480 if tr.primary_pid: 

1481 add_index( 

1482 engine, 

1483 sqla_table.columns[config.trid_fieldname], 

1484 unique=(tr.index == IndexType.UNIQUE), 

1485 ) 

1486 if config.add_mrid_wherever_rid_added: 

1487 add_index( 

1488 engine, 

1489 sqla_table.columns[ 

1490 config.master_research_id_fieldname 

1491 ], 

1492 unique=False, # see docs 

1493 ) 

1494 if mssql_fulltext_columns: 

1495 mssql_fulltext_columns_by_table.append(mssql_fulltext_columns) 

1496 # Special processing for SQL Server FULLTEXT indexes, if any: 

1497 for multiple_sqla_columns in mssql_fulltext_columns_by_table: 

1498 add_index( 

1499 engine=engine, 

1500 multiple_sqla_columns=multiple_sqla_columns, 

1501 fulltext=True, 

1502 ) 

1503 

1504 

1505def patient_processing_fn( 

1506 tasknum: int = 0, 

1507 ntasks: int = 1, 

1508 incremental: bool = False, 

1509 specified_pids: List[int] = None, 

1510 free_text_limit: int = None, 

1511 exclude_scrubbed_fields: bool = False, 

1512) -> None: 

1513 """ 

1514 Main function to anonymise patient data. 

1515 

1516 - Iterate through patient IDs; 

1517 - build the scrubber for each patient; 

1518 - process source data for that patient, scrubbing it; 

1519 - insert the patient into the mapping table in the admin database. 

1520 

1521 Args: 

1522 tasknum: task number of this process (for dividing up work) 

1523 ntasks: total number of processes (for dividing up work) 

1524 incremental: perform an incremental update, rather than a full run? 

1525 specified_pids: if specified, restrict to specific PIDs 

1526 free_text_limit: as per :func:`process_table` 

1527 exclude_scrubbed_fields: as per :func:`process_table` 

1528 """ 

1529 n_patients = estimate_count_patients() // ntasks 

1530 i = 0 

1531 for pid in gen_patient_ids(tasknum, ntasks, specified_pids=specified_pids): 

1532 # gen_patient_ids() assigns the work to the appropriate thread/process 

1533 # Check for an abort signal once per patient processed 

1534 i += 1 

1535 log.info( 

1536 f"Processing patient ID: {pid} (incremental={incremental}; " 

1537 f"patient {i}/~{n_patients} for this process; " 

1538 f"{config.overall_progress()})" 

1539 ) 

1540 

1541 # Opt out based on PID? 

1542 if opting_out_pid(pid): 

1543 log.info("... opt out based on PID") 

1544 continue 

1545 # MPID information won't be present until we scan all the fields (which 

1546 # we do as we build the scrubber). 

1547 

1548 # Gather scrubbing information for a patient. (Will save.) 

1549 try: 

1550 adminsession = config.admindb.session 

1551 # In case anything else is in the transaction pending commit 

1552 adminsession.commit() 

1553 

1554 patient = Patient(pid) 

1555 except DatabaseError: 

1556 log.warning( 

1557 f"Skipping patient with PID={pid} because the record could " 

1558 "not be saved to the secret_map table" 

1559 ) 

1560 adminsession.rollback() 

1561 continue 

1562 

1563 if patient.mandatory_scrubbers_unfulfilled: 

1564 log.warning( 

1565 f"Skipping patient with PID={pid} as the following scrub_src " 

1566 f"fields are required and had no data: " 

1567 f"{patient.mandatory_scrubbers_unfulfilled}" 

1568 ) 

1569 continue 

1570 

1571 # Opt out based on MPID? 

1572 if opting_out_mpid(patient.mpid): 

1573 log.info("... opt out based on MPID") 

1574 continue 

1575 

1576 patient_unchanged = patient.is_unchanged() 

1577 if incremental: 

1578 if patient_unchanged: 

1579 log.debug("Scrubber unchanged; may save some time") 

1580 else: 

1581 log.debug("Scrubber new or changed; reprocessing in full") 

1582 

1583 # For each source database/table... 

1584 for d in config.dd.get_source_databases(): 

1585 log.debug(f"Patient {pid}, processing database: {d}") 

1586 for t in config.dd.get_patient_src_tables_with_active_dest(d): 

1587 log.debug(f"Patient {pid}, processing table {d}.{t}") 

1588 try: 

1589 process_table( 

1590 d, 

1591 t, 

1592 patient=patient, 

1593 incremental=(incremental and patient_unchanged), 

1594 free_text_limit=free_text_limit, 

1595 exclude_scrubbed_fields=exclude_scrubbed_fields, 

1596 ) 

1597 except Exception: 

1598 log.critical( 

1599 "Error whilst processing - " 

1600 f"db: {d} table: {t}, patient id: {pid}" 

1601 ) 

1602 raise 

1603 

1604 commit_destdb() 

1605 

1606 

1607def wipe_destination_data_for_opt_out_patients( 

1608 report_every: int = 1000, chunksize: int = 10000 

1609) -> None: 

1610 """ 

1611 Delete any data from patients that have opted out (after their data was 

1612 processed on a previous occasion). 

1613 

1614 (Slightly complicated by the fact that the destination database can't 

1615 necessarily 'see' the mapping database, so we need to cache the RID keys in 

1616 the destination database temporarily.) 

1617 

1618 Args: 

1619 report_every: report logging information every *n* records 

1620 chunksize: insert records every *n* records 

1621 """ 

1622 start = "wipe_opt_out_patients" 

1623 log.info(start) 

1624 

1625 adminsession = config.admindb.session 

1626 metadata = MetaData() # operate in isolation! 

1627 destengine = config.destdb.engine 

1628 destsession = config.destdb.session 

1629 ridfield = config.research_id_fieldname 

1630 

1631 # Drop/create temporary table 

1632 pkfield = "rid" 

1633 temptable = Table( 

1634 config.temporary_tablename, 

1635 metadata, 

1636 Column(pkfield, config.sqltype_encrypted_pid, primary_key=True), 

1637 **TABLE_KWARGS, 

1638 ) 

1639 log.debug(start + ": 1. dropping temporary table") 

1640 temptable.drop(destengine, checkfirst=True) # use engine, not session 

1641 log.debug(start + ": 2. making temporary table") 

1642 temptable.create(destengine, checkfirst=True) # use engine, not session 

1643 

1644 log.debug(start + ": 3. populating temporary table with RIDs") 

1645 

1646 def insert(records_: List[Dict[str, Any]]) -> None: 

1647 # records_: a list of dictionaries 

1648 # http://docs.sqlalchemy.org/en/latest/core/tutorial.html 

1649 log.debug(start + f"... inserting {len(records_)} records") 

1650 destsession.execute(temptable.insert(), records_) 

1651 

1652 i = 0 

1653 records = [] # type: List[Dict[str: Any]] 

1654 for rid in gen_optout_rids(): 

1655 i += 1 

1656 if report_every and i % report_every == 0: 

1657 log.debug(start + f"... src row# {i}") 

1658 records.append({pkfield: rid}) # a row is a dict of values 

1659 if i % chunksize == 0: 

1660 insert(records) 

1661 records = [] # type: List[Dict[str: Any]] 

1662 if records: # remainder 

1663 insert(records) 

1664 commit_destdb() 

1665 

1666 log.debug(start + ": 4. creating index on temporary table") 

1667 index = Index("_temptable_idx", temptable.columns[pkfield]) 

1668 index.create(destengine) # use engine, not session 

1669 

1670 # 5. For each patient destination table, 

1671 # DELETE FROM desttable WHERE rid IN (SELECT rid FROM temptable) 

1672 log.debug(start + ": 5. deleting from destination table by opt-out RID") 

1673 for dest_table_name in config.dd.get_dest_tables_with_patient_info(): 

1674 log.debug(start + f": ... {dest_table_name}") 

1675 dest_table = config.dd.get_dest_sqla_table(dest_table_name) 

1676 query = dest_table.delete().where( 

1677 column(ridfield).in_(select(temptable.columns[pkfield])) 

1678 ) 

1679 destsession.execute(query) 

1680 commit_destdb() 

1681 

1682 log.debug(start + ": 6. dropping temporary table") 

1683 temptable.drop(destengine, checkfirst=True) # use engine, not session 

1684 commit_destdb() 

1685 

1686 log.debug(start + ": 7. deleting opt-out patients from mapping table") 

1687 adminsession.query(PatientInfo).filter( 

1688 or_( 

1689 PatientInfo.pid.in_(adminsession.query(OptOutPid.pid)), 

1690 PatientInfo.mpid.in_(adminsession.query(OptOutMpid.mpid)), 

1691 ) 

1692 ).delete(synchronize_session=False) 

1693 commit_admindb() 

1694 

1695 

1696def drop_remake( 

1697 incremental: bool = False, 

1698 skipdelete: bool = False, 

1699 full_drop_only: bool = False, 

1700) -> None: 

1701 """ 

1702 Drop and rebuild (a) mapping table, (b) destination tables. 

1703 

1704 Args: 

1705 incremental: 

1706 Doesn't drop tables; just deletes destination information where 

1707 source information no longer exists. 

1708 skipdelete: 

1709 For incremental updates, skip deletion of rows present in the 

1710 destination but not the source 

1711 full_drop_only: 

1712 Performs a full drop (even opt-out tables) and does nothing else. 

1713 Incompatible with ``incremental``. 

1714 """ 

1715 assert not (full_drop_only and incremental) 

1716 log.info(SEP + "Creating database structure +/- deleting dead data") 

1717 engine = config.admindb.engine 

1718 

1719 # ------------------------------------------------------------------------- 

1720 # Mapping tables 

1721 # ------------------------------------------------------------------------- 

1722 

1723 all_admin_tables = (OptOutMpid, OptOutPid, PatientInfo, TridRecord) 

1724 all_admin_except_opt_out = (PatientInfo, TridRecord) 

1725 

1726 # Drop 

1727 if full_drop_only: 

1728 log.info("Dropping all admin tables") 

1729 to_drop = all_admin_tables 

1730 elif not incremental: 

1731 log.info("Dropping admin tables except opt-out") 

1732 to_drop = all_admin_except_opt_out 

1733 else: 

1734 # Incremental mode 

1735 to_drop = () 

1736 for drop_tableclass in to_drop: 

1737 # noinspection PyUnresolvedReferences 

1738 drop_tableclass.__table__.drop(engine, checkfirst=True) 

1739 

1740 # Create 

1741 if full_drop_only: 

1742 to_create = () 

1743 else: 

1744 log.info("Creating admin tables") 

1745 to_create = all_admin_tables 

1746 for create_tableclass in to_create: 

1747 # noinspection PyUnresolvedReferences 

1748 create_tableclass.__table__.create(engine, checkfirst=True) 

1749 

1750 # ------------------------------------------------------------------------- 

1751 # Destination tables 

1752 # ------------------------------------------------------------------------- 

1753 

1754 wipe_and_recreate_destination_db( 

1755 incremental=incremental, full_drop_only=full_drop_only 

1756 ) 

1757 if full_drop_only or skipdelete or not incremental: 

1758 return 

1759 for d in config.dd.get_source_databases(): 

1760 for t in config.dd.get_src_tables(d): 

1761 delete_dest_rows_with_no_src_row( 

1762 d, 

1763 t, 

1764 report_every=config.report_every_n_rows, 

1765 chunksize=config.chunksize, 

1766 ) 

1767 

1768 

1769def gen_opt_out_pids_from_file( 

1770 mpid: bool = False, 

1771) -> Generator[Union[int, str], None, None]: 

1772 """ 

1773 Generate opt-out PIDs (or MPIDs) from a file. 

1774 

1775 Args: 

1776 mpid: 

1777 generate MPIDs, not PIDs (and therefore use 

1778 ``config.optout_mpid_filenames``, not 

1779 ``config.optout_pid_filenames``, as the set of filenames to read) 

1780 

1781 Yields: 

1782 each PID (or MPID), which will be either ``str`` or ``int`` depending 

1783 on the value of ``config.mpidtype_is_integer`` or 

1784 ``config.pidtype_is_integer``. 

1785 """ 

1786 if mpid: 

1787 txt = "MPID" 

1788 filenames = config.optout_mpid_filenames 

1789 as_int = config.mpidtype_is_integer 

1790 else: 

1791 txt = "PID" 

1792 filenames = config.optout_pid_filenames 

1793 as_int = config.pidtype_is_integer 

1794 if not filenames: 

1795 log.info(f"... no opt-out {txt} disk files in use") 

1796 else: 

1797 for filename in filenames: 

1798 log.info(f"... {txt} file: {filename}") 

1799 if as_int: 

1800 for pid in gen_integers_from_file(filename): 

1801 yield pid 

1802 else: 

1803 for pid in gen_words_from_file(filename): 

1804 yield pid 

1805 

1806 

1807def remove_invalid_bools_from_optout_values( 

1808 optout_colname: str, values: List[Any] 

1809) -> List[Any]: 

1810 """ 

1811 Called when the column that defines opt-outs is of boolean type. Removes 

1812 any values from ``values`` that is not a valid boolean value (or 

1813 None/NULL), announcing it, and return the values that pass the test. 

1814 """ 

1815 returned_values = [] # type: List[Any] 

1816 

1817 for value in values: 

1818 if value not in [None, True, False]: 

1819 coltype = type(value).__name__ 

1820 log.info( 

1821 f"... ignoring non-boolean value ({value}), " 

1822 f"type '{coltype}' for boolean column '{optout_colname}'" 

1823 ) 

1824 continue 

1825 

1826 returned_values.append(value) 

1827 

1828 return returned_values 

1829 

1830 

1831def gen_opt_out_pids_from_database( 

1832 mpid: bool = False, 

1833) -> Generator[Any, None, None]: 

1834 """ 

1835 Generate opt-out PIDs (or MPIDs) from a database. 

1836 

1837 Args: 

1838 mpid: generate MPIDs, not PIDs 

1839 

1840 Yields: 

1841 each PID (or MPID) 

1842 

1843 """ 

1844 

1845 txt = "MPID" if mpid else "PID" 

1846 found_one = False 

1847 defining_fields = config.dd.get_optout_defining_fields() 

1848 for t in defining_fields: 

1849 src_db, src_table, optout_colname, pid_colname, mpid_colname = t 

1850 id_colname = mpid_colname if mpid else pid_colname 

1851 if not id_colname: 

1852 continue 

1853 found_one = True 

1854 db_holder = config.sources[src_db] 

1855 session = db_holder.session 

1856 log.info( 

1857 f"... {src_db}.{src_table}.{optout_colname} ({txt}={id_colname})" 

1858 ) 

1859 sqla_table = db_holder.metadata.tables[src_table] 

1860 optout_defining_col = sqla_table.columns[optout_colname] 

1861 

1862 idcol = sqla_table.columns[id_colname] 

1863 

1864 optout_col_values = config.optout_col_values 

1865 

1866 # SQL Alchemy will raise a TypeError if optout_col_values includes a 

1867 # string value such as "1" and the opt-out field is boolean. The 

1868 # rationale behind this is explained at: 

1869 # https://docs.sqlalchemy.org/en/14/changelog/migration_12.html#boolean-datatype-now-enforces-strict-true-false-none-values # noqa: E501 

1870 # As optout_col_values could potentially include values for both 

1871 # boolean and string opt-out fields, we just filter out invalid values 

1872 # for boolean opt-out fields. 

1873 if isinstance(optout_defining_col.type, Boolean): 

1874 optout_col_values = remove_invalid_bools_from_optout_values( 

1875 optout_colname, optout_col_values 

1876 ) 

1877 

1878 query = select(idcol).select_from(sqla_table).distinct() 

1879 

1880 if optout_col_values: 

1881 # Note that if optout_col_values does not contain valid values, 

1882 # this function plays it safe -- ALL PIDs from this table are 

1883 # returned, i.e. everyone is opted out. (This is unlikely to happen 

1884 # for Boolean columns, because validate_optouts() will have 

1885 # pre-validated optout_col_values, but it is legitimate for other 

1886 # types.) 

1887 query = query.where(optout_defining_col.in_(optout_col_values)) 

1888 

1889 # no need for an order_by clause 

1890 result = session.execute(query) 

1891 for row in result: 

1892 pid = row[0] 

1893 yield pid 

1894 if not found_one: 

1895 log.info(f"... no opt-out-defining {txt} fields in data dictionary") 

1896 

1897 

1898def setup_opt_out(incremental: bool = False) -> None: 

1899 """ 

1900 - Hunts far and wide through its sources for PID/MPID values of patients 

1901 who wish to opt out. 

1902 - Adds them to the admin tables for 

1903 :class:`crate_anon.anonymise.models.OptOutPid` and 

1904 :class:`crate_anon.anonymise.models.OptOutMpid`. 

1905 

1906 Args: 

1907 incremental: 

1908 after adding opt-out patients, delete any data for them found 

1909 in the destination database. (Unnecessary for "full" rather than 

1910 "incremental" runs, since "full" runs delete all the destination 

1911 tables and start again.) 

1912 

1913 """ 

1914 log.info(SEP + "Managing opt-outs") 

1915 adminsession = config.admindb.session 

1916 

1917 log.info("Hunting for opt-out patients from disk file...") 

1918 for pid in gen_opt_out_pids_from_file(): 

1919 # noinspection PyTypeChecker 

1920 OptOutPid.add(adminsession, pid) 

1921 for mpid in gen_opt_out_pids_from_file(mpid=True): 

1922 # noinspection PyTypeChecker 

1923 OptOutMpid.add(adminsession, mpid) 

1924 

1925 log.info("Hunting for opt-out patients from database...") 

1926 for pid in gen_opt_out_pids_from_database(): 

1927 OptOutPid.add(adminsession, pid) 

1928 for mpid in gen_opt_out_pids_from_database(mpid=True): 

1929 OptOutMpid.add(adminsession, mpid) 

1930 

1931 adminsession.commit() 

1932 

1933 if incremental: 

1934 wipe_destination_data_for_opt_out_patients() 

1935 

1936 

1937def process_nonpatient_tables( 

1938 tasknum: int = 0, 

1939 ntasks: int = 1, 

1940 incremental: bool = False, 

1941 free_text_limit: int = None, 

1942 exclude_scrubbed_fields: bool = False, 

1943) -> None: 

1944 """ 

1945 Copies all non-patient tables. 

1946 

1947 - If they have an integer PK, the work may be parallelized. 

1948 - If not, whole tables are assigned to different processes in parallel 

1949 mode. 

1950 

1951 Args: 

1952 tasknum: 

1953 task number of this process (for dividing up work) 

1954 ntasks: 

1955 total number of processes (for dividing up work) 

1956 incremental: 

1957 perform an incremental update, rather than a full run? 

1958 free_text_limit: 

1959 as per :func:`process_table` 

1960 exclude_scrubbed_fields: 

1961 as per :func:`process_table` 

1962 

1963 """ 

1964 log.info(SEP + "Non-patient tables: (a) with integer PK") 

1965 for d, t, pkname in gen_nonpatient_tables_with_int_pk(): 

1966 log.info( 

1967 f"Processing non-patient table {d}.{t} (PK: {pkname}) " 

1968 f"({config.overall_progress()})..." 

1969 ) 

1970 try: 

1971 # noinspection PyTypeChecker 

1972 process_table( 

1973 d, 

1974 t, 

1975 patient=None, 

1976 incremental=incremental, 

1977 intpkname=pkname, 

1978 tasknum=tasknum, 

1979 ntasks=ntasks, 

1980 free_text_limit=free_text_limit, 

1981 exclude_scrubbed_fields=exclude_scrubbed_fields, 

1982 ) 

1983 except Exception: 

1984 log.critical(f"Error whilst processing - db: {d} table: {t}") 

1985 raise 

1986 commit_destdb() 

1987 log.info(SEP + "Non-patient tables: (b) without integer PK") 

1988 for d, t in gen_nonpatient_tables_without_int_pk( 

1989 tasknum=tasknum, ntasks=ntasks 

1990 ): 

1991 log.info( 

1992 f"Processing non-patient table {d}.{t} " 

1993 f"({config.overall_progress()})..." 

1994 ) 

1995 # Force this into single-task mode, i.e. we have already parallelized 

1996 # by assigning different tables to different processes; don't split 

1997 # the work within a single table. 

1998 try: 

1999 # noinspection PyTypeChecker 

2000 process_table( 

2001 d, 

2002 t, 

2003 patient=None, 

2004 incremental=incremental, 

2005 intpkname=None, 

2006 tasknum=0, 

2007 ntasks=1, 

2008 free_text_limit=free_text_limit, 

2009 exclude_scrubbed_fields=exclude_scrubbed_fields, 

2010 ) 

2011 except Exception: 

2012 log.critical(f"Error whilst processing - db: {d} table: {t}") 

2013 raise 

2014 commit_destdb() 

2015 

2016 

2017def process_patient_tables( 

2018 tasknum: int = 0, 

2019 ntasks: int = 1, 

2020 incremental: bool = False, 

2021 specified_pids: List[int] = None, 

2022 free_text_limit: int = None, 

2023 exclude_scrubbed_fields: bool = False, 

2024) -> None: 

2025 """ 

2026 Process all patient tables, optionally in a parallel-processing fashion. 

2027 

2028 All the work is done via :func:`patient_processing_fn`. 

2029 

2030 Args: 

2031 tasknum: 

2032 task number of this process (for dividing up work) 

2033 ntasks: 

2034 total number of processes (for dividing up work) 

2035 incremental: 

2036 perform an incremental update, rather than a full run? 

2037 specified_pids: 

2038 if specified, restrict to specific PIDs 

2039 free_text_limit: 

2040 as per :func:`process_table` 

2041 exclude_scrubbed_fields: 

2042 as per :func:`process_table` 

2043 

2044 """ 

2045 # We'll use multiple destination tables, so commit right at the end. 

2046 log.info(SEP + "Patient tables") 

2047 if ntasks == 1: 

2048 log.info("Single-threaded, single-process mode") 

2049 else: 

2050 log.info( 

2051 f"PROCESS {tasknum} (numbered from zero) OF {ntasks} PROCESSES" 

2052 ) 

2053 patient_processing_fn( 

2054 tasknum=tasknum, 

2055 ntasks=ntasks, 

2056 incremental=incremental, 

2057 specified_pids=specified_pids, 

2058 free_text_limit=free_text_limit, 

2059 exclude_scrubbed_fields=exclude_scrubbed_fields, 

2060 ) 

2061 

2062 if ntasks > 1: 

2063 log.info(f"Process {tasknum}: FINISHED ANONYMISATION") 

2064 else: 

2065 log.info("FINISHED ANONYMISATION") 

2066 

2067 # Commit (should be redundant) 

2068 commit_destdb() 

2069 

2070 

2071def validate_optouts(): 

2072 """ 

2073 Check that our opt-out definitions are valid, or raise ValueError. 

2074 """ 

2075 defining_fields = config.dd.get_optout_defining_fields() 

2076 for t in defining_fields: 

2077 src_db, src_table, optout_colname, pid_colname, mpid_colname = t 

2078 db_holder = config.sources[src_db] 

2079 sqla_table = db_holder.metadata.tables[src_table] 

2080 optout_defining_col = sqla_table.columns[optout_colname] 

2081 

2082 optout_col_values = config.optout_col_values 

2083 

2084 if isinstance(optout_defining_col.type, Boolean): 

2085 optout_col_values = remove_invalid_bools_from_optout_values( 

2086 optout_colname, optout_col_values 

2087 ) 

2088 if not optout_col_values: 

2089 raise ValueError( 

2090 f"No valid opt-out values for column '{optout_colname}'" 

2091 ) 

2092 

2093 

2094# ============================================================================= 

2095# Main 

2096# ============================================================================= 

2097 

2098 

2099def anonymise( 

2100 incremental: bool = False, 

2101 skipdelete: bool = False, 

2102 dropremake: bool = False, 

2103 full_drop_only: bool = False, 

2104 optout: bool = False, 

2105 patienttables: bool = False, 

2106 nonpatienttables: bool = False, 

2107 index: bool = False, 

2108 restrict: str = "", 

2109 restrict_file: str = "", 

2110 restrict_limits: Tuple[Any, Any] = None, 

2111 restrict_list: List[Any] = None, 

2112 free_text_limit: int = None, 

2113 exclude_scrubbed_fields: bool = False, 

2114 nprocesses: int = 1, 

2115 process: int = 0, 

2116 skip_dd_check: bool = False, 

2117 seed: str = "", 

2118 chunksize: int = DEFAULT_CHUNKSIZE, 

2119 reportevery: int = DEFAULT_REPORT_EVERY, 

2120 echo: bool = False, 

2121 debugscrubbers: bool = False, 

2122 savescrubbers: bool = False, 

2123) -> None: 

2124 """ 

2125 Main entry point for anonymisation. 

2126 

2127 Args: 

2128 incremental: 

2129 If true: incremental run, rather than full. 

2130 skipdelete: 

2131 (For "incremental".) Skip deletion of rows present in the 

2132 destination but not the source. 

2133 

2134 dropremake: 

2135 If true: drop/remake destination tables. 

2136 full_drop_only: 

2137 If true: drop destination tables (even opt-out ones) and do nothing 

2138 else. 

2139 optout: 

2140 If true: update opt-out list. 

2141 patienttables: 

2142 If true: process patient tables only (rather than all tables). 

2143 nonpatienttables: 

2144 If true: process non-patient tables only (rather than all tables). 

2145 index: 

2146 If true: create indexes only. 

2147 

2148 restrict: 

2149 Restrict to certain patients? Specify a field name, or ``pid`` 

2150 to restrict by patient IDs. 

2151 restrict_file: 

2152 (For "restrict".) Filename for permitted values. 

2153 restrict_limits: 

2154 (For "restrict".) Tuple of lower and upper limits to 

2155 apply to the field. 

2156 restrict_list: 

2157 (For "restrict".) List of permitted values. 

2158 free_text_limit: 

2159 Filter out all free text over the specified length. 

2160 exclude_scrubbed_fields: 

2161 Exclude all text fields which are being scrubbed. 

2162 

2163 nprocesses: 

2164 Number of processing being run (of which this is one), for work 

2165 allocation. 

2166 process: 

2167 Number of this process (from 0 to nprocesses - 1), for work 

2168 allocation. 

2169 skip_dd_check: 

2170 If true: skip data dictionary validity check. (Useful in 

2171 multiprocessing contexts when another process has already done 

2172 this.) 

2173 seed: 

2174 Seed for random number generator (for TRID generation). 

2175 Blank for the default of system time. 

2176 chunksize: 

2177 Number of records copied in a chunk when copying PKs from one 

2178 database to another. 

2179 

2180 reportevery: 

2181 Report insert progress every n rows in verbose mode. 

2182 echo: 

2183 Echo SQL? 

2184 debugscrubbers: 

2185 Report sensitive scrubbing information, for debugging 

2186 savescrubbers: 

2187 Saves sensitive scrubbing information in admin database, for 

2188 debugging 

2189 

2190 """ 

2191 # Validate args 

2192 if nprocesses < 1: 

2193 raise ValueError("--nprocesses must be >=1") 

2194 if process < 0 or process >= nprocesses: 

2195 raise ValueError( 

2196 "--process argument must be from 0 to (nprocesses - 1) inclusive" 

2197 ) 

2198 if nprocesses > 1 and dropremake: 

2199 raise ValueError("Can't use nprocesses > 1 with --dropremake") 

2200 

2201 everything = not any( 

2202 [dropremake, optout, nonpatienttables, patienttables, index] 

2203 ) 

2204 

2205 # Load/validate config 

2206 config.report_every_n_rows = reportevery 

2207 config.chunksize = chunksize 

2208 config.debug_scrubbers = debugscrubbers 

2209 config.save_scrubbers = savescrubbers 

2210 config.set_echo(echo) 

2211 config.load_dd(check_against_source_db=not skip_dd_check) 

2212 # The config must be valid: 

2213 config.check_valid() 

2214 

2215 if optout or everything: 

2216 validate_optouts() 

2217 

2218 # ------------------------------------------------------------------------- 

2219 # Setup 

2220 # ------------------------------------------------------------------------- 

2221 

2222 pids = None 

2223 if restrict: 

2224 if restrict_file: 

2225 pids = get_pids_from_file(restrict, restrict_file) 

2226 elif restrict_limits: 

2227 pids = get_pids_from_field_limits( 

2228 restrict, restrict_limits[0], restrict_limits[1] 

2229 ) 

2230 elif restrict_list: 

2231 pids = get_pids_from_list(restrict, restrict_list) 

2232 else: 

2233 raise ValueError( 

2234 "'--restrict' option requires one of " 

2235 "'--file', '--limits' or '--list'" 

2236 ) 

2237 if not pids: 

2238 log.warning("No valid patient ids found for the conditions given") 

2239 

2240 # random number seed 

2241 random.seed(seed) 

2242 

2243 # ------------------------------------------------------------------------- 

2244 # The main process 

2245 # ------------------------------------------------------------------------- 

2246 

2247 log.info(BIGSEP + "Starting") 

2248 start = get_now_utc_pendulum() 

2249 

2250 # 1. Drop/remake tables. Single-tasking only. 

2251 if full_drop_only: 

2252 drop_remake(full_drop_only=True) 

2253 return 

2254 if dropremake or everything: 

2255 drop_remake(incremental=incremental, skipdelete=skipdelete) 

2256 

2257 # 2. Deal with opt-outs 

2258 if optout or everything: 

2259 setup_opt_out(incremental=incremental) 

2260 

2261 # 3. Tables with patient info. 

2262 # Process PER PATIENT, across all tables, because we have to synthesize 

2263 # information to scrub across the entirety of that patient's record. 

2264 if patienttables or everything: 

2265 process_patient_tables( 

2266 tasknum=process, 

2267 ntasks=nprocesses, 

2268 incremental=incremental, 

2269 specified_pids=pids, 

2270 free_text_limit=free_text_limit, 

2271 exclude_scrubbed_fields=exclude_scrubbed_fields, 

2272 ) 

2273 

2274 # 4. Tables without any patient ID (e.g. lookup tables). Process PER TABLE. 

2275 if nonpatienttables or everything: 

2276 process_nonpatient_tables( 

2277 tasknum=process, 

2278 ntasks=nprocesses, 

2279 incremental=incremental, 

2280 free_text_limit=free_text_limit, 

2281 exclude_scrubbed_fields=exclude_scrubbed_fields, 

2282 ) 

2283 

2284 # 5. Indexes. ALWAYS FASTEST TO DO THIS LAST. Process PER TABLE. 

2285 if index or everything: 

2286 create_indexes(tasknum=process, ntasks=nprocesses) 

2287 

2288 log.info(BIGSEP + "Finished") 

2289 end = get_now_utc_pendulum() 

2290 time_taken = end - start 

2291 log.info(f"Time taken: {time_taken.total_seconds()} seconds") 

2292 # config.dd.debug_cache_hits()