Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/merge_db.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Tool to merge data from one CamCOPS database into another.** 

28 

29Has special code to deal with old databases. 

30 

31""" 

32 

33import logging 

34from pprint import pformat 

35from typing import Any, cast, Dict, List, Optional, Type, TYPE_CHECKING 

36 

37from cardinal_pythonlib.logs import BraceStyleAdapter 

38from cardinal_pythonlib.sqlalchemy.merge_db import merge_db, TranslationContext 

39from cardinal_pythonlib.sqlalchemy.schema import get_table_names 

40from cardinal_pythonlib.sqlalchemy.session import get_safe_url_from_engine 

41from cardinal_pythonlib.sqlalchemy.table_identity import TableIdentity 

42from sqlalchemy.engine import create_engine 

43from sqlalchemy.engine.base import Engine 

44from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound 

45from sqlalchemy.orm.session import Session 

46from sqlalchemy.sql.expression import column, func, select, table, text 

47 

48from camcops_server.cc_modules.cc_audit import AuditEntry 

49from camcops_server.cc_modules.cc_constants import ( 

50 FP_ID_NUM, 

51 NUMBER_OF_IDNUMS_DEFUNCT, 

52) 

53from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin 

54from camcops_server.cc_modules.cc_device import Device 

55from camcops_server.cc_modules.cc_dirtytables import DirtyTable 

56from camcops_server.cc_modules.cc_email import Email 

57from camcops_server.cc_modules.cc_exportmodels import ( 

58 ExportedTask, 

59 ExportedTaskEmail, 

60 ExportedTaskFileGroup, 

61 ExportedTaskHL7Message, 

62) 

63from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

64from camcops_server.cc_modules.cc_group import Group, group_group_table 

65from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

66from camcops_server.cc_modules.cc_membership import UserGroupMembership 

67from camcops_server.cc_modules.cc_patient import Patient 

68from camcops_server.cc_modules.cc_patientidnum import ( 

69 fake_tablet_id_for_patientidnum, 

70 PatientIdNum, 

71) 

72from camcops_server.cc_modules.cc_request import get_command_line_request 

73from camcops_server.cc_modules.cc_session import CamcopsSession 

74from camcops_server.cc_modules.cc_serversettings import ( 

75 server_stored_var_table_defunct, 

76 ServerSettings, 

77 ServerStoredVarNamesDefunct, 

78) 

79from camcops_server.cc_modules.cc_sqlalchemy import Base 

80from camcops_server.cc_modules.cc_taskindex import reindex_everything 

81from camcops_server.cc_modules.cc_user import ( 

82 SecurityAccountLockout, 

83 SecurityLoginFailure, 

84 User, 

85) 

86 

87if TYPE_CHECKING: 

88 from sqlalchemy.engine.result import ResultProxy 

89 

90log = BraceStyleAdapter(logging.getLogger(__name__)) 

91 

92DEBUG_VIA_PDB = False 

93 

94 

95# ============================================================================= 

96# Information relating to the source database 

97# ============================================================================= 

98 

99def get_skip_tables(src_tables: List[str]) -> List[TableIdentity]: 

100 """ 

101 From the list of source table names provided, return details of tables in 

102 the metadata to skip because they are not in the source database. 

103 

104 Also checks that some core CamCOPS tables are present in the source, or 

105 raises :exc:`ValueError`. 

106 

107 Args: 

108 src_tables: list of all table names in the source database 

109 

110 Returns: 

111 list of 

112 :class:`cardinal_pythonlib.sqlalchemy.table_identity.TableIdentity` 

113 objects representing tables to skip 

114 

115 Note that other tables to skip are defined in :func:`merge_camcops_db`. 

116 

117 """ 

118 skip_tables = [] # type: List[TableIdentity] 

119 

120 # Check we have some core tables present in the sources 

121 

122 for tname in [Patient.__tablename__, User.__tablename__]: 

123 if tname not in src_tables: 

124 raise ValueError( 

125 f"Cannot proceed; table {tname!r} missing from source; " 

126 f"unlikely that the source is any sort of old CamCOPS " 

127 f"database!") 

128 

129 # In general, we allow missing source tables. 

130 # However, we can't allow source tables to be missing if they are 

131 # automatically eager-loaded by relationships. This is only true in 

132 # CamCOPS for some high-performance queries: Patient, User, 

133 # PatientIdNum. In the context of merges we're going to run, that means 

134 # PatientIdNum. 

135 

136 # SKIP -- disable eager loading instead 

137 # # Create patient ID number table in SOURCE database, because it's 

138 # # eager-loaded 

139 # if PatientIdNum.__tablename__ not in src_tables: 

140 # create_table_from_orm_class(engine=src_engine, 

141 # ormclass=PatientIdNum, 

142 # without_constraints=True) 

143 

144 if Group.__tablename__ not in src_tables: 

145 log.warning("No Group information in source database; skipping source " 

146 "table {!r}; will create a default group", 

147 Group.__tablename__) 

148 skip_tables.append(TableIdentity(tablename=Group.__tablename__)) 

149 

150 return skip_tables 

151 

152 

153def get_src_iddefs(src_engine: Engine, 

154 src_tables: List[str]) -> Dict[int, IdNumDefinition]: 

155 """ 

156 Get information about all the ID number definitions in the source database. 

157 

158 Args: 

159 src_engine: source SQLAlchemy :class:`Engine` 

160 src_tables: list of all table names in the source database 

161 

162 Returns: 

163 dictionary: ``{which_idnum: idnumdef}`` mappings, where each 

164 ``idnumdef`` is a 

165 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` not 

166 attached to any database session 

167 """ 

168 iddefs = {} # type: Dict[int, IdNumDefinition] 

169 if IdNumDefinition.__tablename__ in src_tables: 

170 # Source is a more modern CamCOPS database, with an IdNumDefinition 

171 # table. 

172 log.info("Fetching source ID number definitions from {!r} table", 

173 IdNumDefinition.__tablename__) 

174 # noinspection PyUnresolvedReferences 

175 q = ( 

176 select([IdNumDefinition.which_idnum, 

177 IdNumDefinition.description, 

178 IdNumDefinition.short_description]) 

179 .select_from(IdNumDefinition.__table__) 

180 .order_by(IdNumDefinition.which_idnum) 

181 ) 

182 rows = src_engine.execute(q).fetchall() 

183 for row in rows: 

184 which_idnum = row[0] 

185 iddefs[which_idnum] = IdNumDefinition( 

186 which_idnum=which_idnum, 

187 description=row[1], 

188 short_description=row[2] 

189 ) 

190 elif server_stored_var_table_defunct.name in src_tables: 

191 # Source is an older CamCOPS database. 

192 log.info("Fetching source ID number definitions from {!r} table", 

193 server_stored_var_table_defunct.name) 

194 for which_idnum in range(1, NUMBER_OF_IDNUMS_DEFUNCT + 1): 

195 nstr = str(which_idnum) 

196 qd = ( 

197 select([server_stored_var_table_defunct.columns.valueText]) 

198 .select_from(server_stored_var_table_defunct) 

199 .where(server_stored_var_table_defunct.columns.name == 

200 ServerStoredVarNamesDefunct.ID_DESCRIPTION_PREFIX 

201 + nstr) 

202 ) 

203 rd = src_engine.execute(qd).fetchall() 

204 qs = ( 

205 select([server_stored_var_table_defunct.columns.valueText]) 

206 .select_from(server_stored_var_table_defunct) 

207 .where(server_stored_var_table_defunct.columns.name == 

208 ServerStoredVarNamesDefunct.ID_SHORT_DESCRIPTION_PREFIX 

209 + nstr) 

210 ) 

211 rs = src_engine.execute(qs).fetchall() 

212 iddefs[which_idnum] = IdNumDefinition( 

213 which_idnum=which_idnum, 

214 description=rd[0][0] if rd else None, 

215 short_description=rs[0][0] if rs else None 

216 ) 

217 else: 

218 log.warning("No information available on source ID number " 

219 "descriptions") 

220 return iddefs 

221 

222 

223# ============================================================================= 

224# Information relating to the destination database 

225# ============================================================================= 

226 

227def group_exists(group_id: int, dst_session: Session) -> bool: 

228 """ 

229 Does a group exist in the destination session with the specified group ID? 

230 

231 Args: 

232 group_id: integer group ID 

233 dst_session: destination SQLAlchemy :class:`Session` 

234 """ 

235 return Group.group_exists(dbsession=dst_session, group_id=group_id) 

236 

237 

238def fetch_group_id_by_name(group_name: str, dst_session: Session) -> int: 

239 """ 

240 Returns the group ID of the group with the specified name, in the 

241 destination session. 

242 

243 If there are multiple such groups, that's a bug, and 

244 :exc:`MultipleResultsFound` will be raised. 

245 

246 If there's no such group in the destination database with that name, one 

247 will be created, and its ID returned. 

248 

249 Args: 

250 group_name: group name 

251 dst_session: destination SQLAlchemy :class:`Session` 

252 

253 Returns: 

254 group ID in the destination database 

255 

256 """ 

257 try: 

258 group = ( 

259 dst_session.query(Group) 

260 .filter(Group.name == group_name) 

261 .one() 

262 ) # type: Group 

263 # ... will fail if there are 0 or >1 results 

264 except MultipleResultsFound: 

265 log.critical("Nasty bug: can't have two groups with the same name! " 

266 "Group name was {!r}", group_name) 

267 raise 

268 except NoResultFound: 

269 log.info("Creating new group named {!r}", group_name) 

270 group = Group() 

271 group.name = group_name 

272 dst_session.add(group) 

273 flush_session(dst_session) # creates the PK 

274 # https://stackoverflow.com/questions/1316952/sqlalchemy-flush-and-get-inserted-id # noqa 

275 log.info("... new group has ID {!r}", group.id) 

276 return group.id 

277 

278 

279def get_dst_group(dest_groupnum: int, 

280 dst_session: Session) -> Group: 

281 """ 

282 Ensures that the specified group number exists in the destination database 

283 and returns the corresponding group. 

284 

285 Args: 

286 dest_groupnum: group number 

287 dst_session: SQLAlchemy session for the destination database 

288 

289 Returns: 

290 the group 

291 

292 Raises: 

293 :exc:`ValueError` upon failure 

294 """ 

295 try: 

296 group = ( 

297 dst_session.query(Group) 

298 .filter(Group.id == dest_groupnum) 

299 .one() 

300 ) # type: Group 

301 # ... will fail if there are 0 or >1 results 

302 except MultipleResultsFound: 

303 log.critical("Nasty bug: can't have two groups with the same ID! " 

304 "Group ID was {!r}", dest_groupnum) 

305 raise 

306 except NoResultFound: 

307 raise ValueError(f"Group with ID {dest_groupnum} missing from " 

308 f"destination database") 

309 return group 

310 

311 

312def ensure_dest_iddef_exists(which_idnum: int, 

313 dst_session: Session) -> IdNumDefinition: 

314 """ 

315 Ensures that the specified ID number type exists in the destination 

316 database. 

317 

318 Args: 

319 which_idnum: ID number type 

320 dst_session: SQLAlchemy session for the destination database 

321 

322 Raises: 

323 :exc:`ValueError` upon failure 

324 """ 

325 try: 

326 iddef = ( 

327 dst_session.query(IdNumDefinition) 

328 .filter(IdNumDefinition.which_idnum == which_idnum) 

329 .one() 

330 ) # type: IdNumDefinition 

331 # ... will fail if there are 0 or >1 results 

332 except MultipleResultsFound: 

333 log.critical("Nasty bug: can't have two ID number types with the same " 

334 "which_idnum! which_idnum was {!r}", which_idnum) 

335 raise 

336 except NoResultFound: 

337 raise ValueError(f"ID number type with which_idnum={which_idnum} " 

338 f"missing from destination database") 

339 return iddef 

340 

341 

342def get_dst_iddef(dst_session: Session, 

343 which_idnum: int) -> Optional[IdNumDefinition]: 

344 """ 

345 Fetches an ID number definition from the destination database, ensuring it 

346 exists. 

347 

348 Args: 

349 dst_session: destination SQLAlchemy :class:`Session` 

350 which_idnum: integer expressing which ID number type to look up 

351 

352 Returns: 

353 an :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, or 

354 ``None`` if none was found 

355 

356 """ 

357 return ( 

358 dst_session.query(IdNumDefinition) 

359 .filter(IdNumDefinition.which_idnum == which_idnum) 

360 .first() 

361 ) 

362 

363 

364# ============================================================================= 

365# Extra translation to be applied to individual objects 

366# ============================================================================= 

367# The extra logic for this database: 

368 

369def flush_session(dst_session: Session) -> None: 

370 """ 

371 Flushes the destination SQLAlchemy session. 

372 """ 

373 log.debug("Flushing session") 

374 dst_session.flush() 

375 

376 

377def ensure_default_group_id(trcon: TranslationContext) -> None: 

378 """ 

379 Ensure that the :class:`TranslationContext` has a ``default_group_id`` 

380 key in its ``info`` dictionary. This is the ID, in the destination 

381 database, of the group to put records in where those records come from 

382 an older, pre-group-based CamCOPS database. 

383 

384 The user may have specified that ``default_group_id` on the command line. 

385 Otherwise, they may have specified a ``default_group_name``, so we'll use 

386 the ID of that group (creating it if necessary). If they specified neither, 

387 we will raise an :exc:`AssertionError`, because we have come to a 

388 situation where we need one or the other. 

389 

390 Args: 

391 trcon: the :class:`TranslationContext` 

392 

393 """ 

394 default_group_id = trcon.info["default_group_id"] # type: Optional[int] 

395 if default_group_id is not None: 

396 # The user specified a group ID to use for records without one 

397 assert group_exists(group_id=default_group_id, 

398 dst_session=trcon.dst_session), ( 

399 "User specified default_group_id={!r}, and object {!r} needs " 

400 "a _group_id (directly or indirectly), but that ID doesn't exist " 

401 "in the {!r} table of the destination database".format( 

402 default_group_id, trcon.oldobj, Group.__tablename__) 

403 ) 

404 else: 

405 default_group_name = trcon.info["default_group_name"] # type: Optional[str] # noqa 

406 if not default_group_name: 

407 assert False, ( 

408 "User specified neither default_group_id or " 

409 "default_group_name, but object {!r} needs a " 

410 "_group_id, directly or indirectly".format(trcon.oldobj) 

411 ) 

412 default_group_id = fetch_group_id_by_name( 

413 group_name=default_group_name, 

414 dst_session=trcon.dst_session 

415 ) 

416 trcon.info["default_group_id"] = default_group_id # for next time! 

417 

418 

419''' 

420# SUPERSEDED BY MORE CONSERVATIVE MECHANISM, 2019-03-05 

421 

422def ensure_idnumdef(trcon: TranslationContext, 

423 which_idnum: int) -> IdNumDefinition: 

424 """ 

425 Ensure that the destination database contains an ID number definition with 

426 the same ``which_idnum`` as in the source database, or create one. 

427 

428 If an ID number definition with that ``which_idnum`` was present in the 

429 source and the destination, ensure they don't clash (i.e. ensure that they 

430 represent the same sort of ID number). 

431 

432 Args: 

433 trcon: the :class:`TranslationContext` 

434 which_idnum: integer expressing which ID number type to look up 

435 

436 Returns: 

437 the :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, 

438 attached to the destination database 

439 

440 """ 

441 dst_iddef = get_dst_iddef(trcon.dst_session, which_idnum=which_idnum) 

442 src_iddefs = trcon.info['src_iddefs'] # type: Dict[int, IdNumDefinition] # noqa 

443 if dst_iddef: 

444 # Present in the destination 

445 if which_idnum in src_iddefs.keys(): 

446 # Also present in the source 

447 src_iddef = src_iddefs[which_idnum] 

448 ensure_no_iddef_clash(src_iddef=src_iddef, dst_iddef=dst_iddef) 

449 return dst_iddef 

450 else: 

451 # Not present in the destination 

452 assert which_idnum in src_iddefs.keys(), ( 

453 "Descriptions for ID#{} are missing from the source " 

454 "database!".format(which_idnum) 

455 ) 

456 src_iddef = src_iddefs[which_idnum] 

457 new_iddef = IdNumDefinition( 

458 which_idnum=src_iddef.which_idnum, 

459 description=src_iddef.description, 

460 short_description=src_iddef.short_description 

461 ) 

462 log.info("Adding ID number definition: {!r}", new_iddef) 

463 trcon.dst_session.add(new_iddef) 

464 flush_session(trcon.dst_session) # required, or database FK checks fail # noqa 

465 return new_iddef 

466''' 

467 

468 

469def ensure_no_iddef_clash(src_iddef: IdNumDefinition, 

470 dst_iddef: IdNumDefinition) -> None: 

471 """ 

472 Ensure that a given source and destination pair of ID number definitions, 

473 which must match on ``which_idnum``, have the same description and short 

474 description, or raise :exc:`ValueError`. 

475 

476 Args: 

477 src_iddef: source 

478 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` 

479 dst_iddef: destination 

480 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` 

481 """ 

482 assert src_iddef.which_idnum == dst_iddef.which_idnum, ( 

483 "Bug: ensure_no_iddef_clash() called with IdNumDefinition objects" 

484 "that don't share the same value for which_idnum (silly!)." 

485 ) 

486 if src_iddef.description != dst_iddef.description: 

487 raise ValueError( 

488 "ID description mismatch for ID#{}: source {!r}, " 

489 "destination {!r}".format( 

490 src_iddef.which_idnum, 

491 src_iddef.description, 

492 dst_iddef.description 

493 ) 

494 ) 

495 if src_iddef.short_description != dst_iddef.short_description: 

496 raise ValueError( 

497 "ID short_description mismatch for ID#{}: source {!r}, " 

498 "destination {!r}".format( 

499 src_iddef.which_idnum, 

500 src_iddef.short_description, 

501 dst_iddef.short_description 

502 ) 

503 ) 

504 

505 

506def log_warning_srcobj(srcobj: Any) -> None: 

507 """ 

508 Prints a source (old) object to the log. 

509 

510 Args: 

511 srcobj: the source object 

512 """ 

513 log.warning("Source was:\n\n{}\n\n", pformat(srcobj.__dict__)) 

514 

515 

516def get_dest_groupnum(src_groupnum: int, 

517 trcon: TranslationContext, 

518 oldobj: Any) -> int: 

519 """ 

520 For a given source group number, returns the corresponding destination 

521 group number (validating en route). 

522 

523 Args: 

524 src_groupnum: the group number in the source database 

525 trcon: the :class:`TranslationContext` 

526 oldobj: the source object 

527 

528 Returns: 

529 the corresponding which_idnum in the destination database 

530 

531 Raises: 

532 :exc:`ValueError` if bad 

533 """ 

534 groupnum_map = trcon.info["groupnum_map"] # type: Dict[int, int] 

535 if src_groupnum not in groupnum_map: 

536 log_warning_srcobj(oldobj) 

537 log.critical( 

538 "Old database contains group number {} and equivalent " 

539 "group in destination not known", src_groupnum) 

540 raise ValueError("Bad group mapping") 

541 return groupnum_map[src_groupnum] 

542 

543 

544def get_dest_which_idnum(src_which_idnum: int, 

545 trcon: TranslationContext, 

546 oldobj: Any) -> int: 

547 """ 

548 For a given source ID number type, returns the corresponding destination 

549 ID number type (validating en route). 

550 

551 Args: 

552 src_which_idnum: which_idnum in the source database 

553 trcon: the :class:`TranslationContext` 

554 oldobj: the source object 

555 

556 Returns: 

557 the corresponding which_idnum in the destination database 

558 

559 Raises: 

560 :exc:`ValueError` if bad 

561 

562 """ 

563 whichidnum_map = trcon.info["whichidnum_map"] # type: Dict[int, int] 

564 if src_which_idnum not in whichidnum_map: 

565 log_warning_srcobj(oldobj) 

566 log.critical( 

567 "Old database contains ID number definitions of type {} and " 

568 "equivalent ID number type in destination not known", 

569 src_which_idnum) 

570 raise ValueError("Bad ID number type mapping") 

571 return whichidnum_map[src_which_idnum] 

572 

573 

574# noinspection PyProtectedMember 

575def translate_fn(trcon: TranslationContext) -> None: 

576 """ 

577 Function to translate source objects to their destination counterparts, 

578 where special processing is required. Called as a callback from 

579 :func:`cardinal_pythonlib.sqlalchemy.merge_db.merge_db`. 

580 

581 Args: 

582 trcon: the :class:`TranslationContext`; all the relevant information is 

583 in here, and our function modifies its members. 

584 

585 This function does the following things: 

586 

587 - For any records uploaded from tablets: set ``_group_id``, if it's blank. 

588 

589 - For :class:`camcops_server.cc_modules.cc_user.User` objects: if an 

590 identical user is found in the destination database, merge on it rather 

591 than creating a new one. Users with matching usernames are considered to 

592 be identical. 

593 

594 - For :class:`Device` objects: if an identical device is found, merge on it 

595 rather than creating a new one. Devices with matching names are 

596 considered to be identical. 

597 

598 - For :class:`camcops_server.cc_modules.cc_group.Group` objects: if an 

599 identical group is found, merge on it rather than creating a new one. 

600 Groups with matching names are considered to be identical. 

601 

602 - For :class:`camcops_server.cc_modules.cc_patient.Patient` objects: if any 

603 have ID numbers in the old format (as columns in the Patient table), 

604 convert them to the :class:`PatientIdNum` system. 

605 

606 - If we're inserting a :class:`PatientIdNum`, make sure there is a 

607 corresponding 

608 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, and that 

609 it's valid. 

610 

611 - If we're merging from a more modern database with the 

612 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` table, 

613 check our ID number definitions don't conflict. 

614 

615 - Check we're not creating duplicates for anything uploaded. 

616 

617 """ 

618 log.debug("Translating object from table: {!r}", trcon.tablename) 

619 oldobj = trcon.oldobj 

620 newobj = trcon.newobj 

621 # log.debug("Translating: {}", auto_repr(oldobj)) 

622 

623 # ------------------------------------------------------------------------- 

624 # Set _group_id correctly for tablet records 

625 # ------------------------------------------------------------------------- 

626 if isinstance(oldobj, GenericTabletRecordMixin): 

627 if ("_group_id" in trcon.missing_src_columns or 

628 oldobj._group_id is None): 

629 # ... order that "if" statement carefully; if the _group_id column 

630 # is missing from the source, don't touch oldobj._group_id or 

631 # it'll trigger a DB query that fails. 

632 # 

633 # Set _group_id because it's blank 

634 # 

635 ensure_default_group_id(trcon) 

636 default_group_id = trcon.info["default_group_id"] # type: int 

637 log.debug("Assiging new _group_id of {!r}", default_group_id) 

638 newobj._group_id = default_group_id 

639 else: 

640 # 

641 # Re-map _group_id 

642 # 

643 newobj._group_id = get_dest_groupnum(oldobj._group_id, 

644 trcon, oldobj) 

645 

646 # ------------------------------------------------------------------------- 

647 # If an identical user is found, merge on it rather than creating a new 

648 # one. Users with matching usernames are considered to be identical. 

649 # ------------------------------------------------------------------------- 

650 if trcon.tablename == User.__tablename__: 

651 src_user = cast(User, oldobj) 

652 src_username = src_user.username 

653 matching_user = ( 

654 trcon.dst_session.query(User) 

655 .filter(User.username == src_username) 

656 .one_or_none() 

657 ) # type: Optional[User] 

658 if matching_user is not None: 

659 log.debug("Matching User (username {!r}) found; merging", 

660 matching_user.username) 

661 trcon.newobj = matching_user # so that related records will work 

662 

663 # ------------------------------------------------------------------------- 

664 # If an identical device is found, merge on it rather than creating a 

665 # new one. Devices with matching names are considered to be identical. 

666 # ------------------------------------------------------------------------- 

667 if trcon.tablename == Device.__tablename__: 

668 src_device = cast(Device, oldobj) 

669 src_devicename = src_device.name 

670 matching_device = ( 

671 trcon.dst_session.query(Device) 

672 .filter(Device.name == src_devicename) 

673 .one_or_none() 

674 ) # type: Optional[Device] 

675 if matching_device is not None: 

676 log.debug("Matching Device (name {!r}) found; merging", 

677 matching_device.name) 

678 trcon.newobj = matching_device 

679 

680 # BUT BEWARE, BECAUSE IF YOU MERGE THE SAME DATABASE TWICE (even if 

681 # that's a silly thing to do...), MERGING DEVICES WILL BREAK THE KEY 

682 # RELATIONSHIPS. For example, 

683 # source: 

684 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1 

685 # dest after first merge: 

686 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1 

687 # dest after second merge: 

688 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1 

689 # pk = 2, id = 1, device = 100, era = 'NOW', current = 1 

690 # ... so you get a clash/duplicate. 

691 # Mind you, that's fair, because there is a duplicate. 

692 # SO WE DO SEPARATE DUPLICATE CHECKING, below. 

693 

694 # ------------------------------------------------------------------------- 

695 # Don't copy Group records; the user must set these up manually and specify 

696 # groupnum_map, for safety 

697 # ------------------------------------------------------------------------- 

698 if trcon.tablename == Group.__tablename__: 

699 trcon.newobj = None # don't insert this object 

700 # ... don't set "newobj = None"; that wouldn't alter trcon 

701 # Now make sure the map is OK: 

702 src_group = cast(Group, oldobj) 

703 trcon.objmap[oldobj] = get_dst_group( 

704 dest_groupnum=get_dest_groupnum(src_group.id, trcon, src_group), 

705 dst_session=trcon.dst_session 

706 ) 

707 

708 # ------------------------------------------------------------------------- 

709 # If there are any patient numbers in the old format (as a set of 

710 # columns in the Patient table) which were not properly converted 

711 # to the new format (as individual records in the PatientIdNum 

712 # table), create new entries. 

713 # Only worth bothering with for _current entries. 

714 # (More explicitly: do not create new PatientIdNum entries for non-current 

715 # patients; it's very fiddly if there might be asynchrony between 

716 # Patient and PatientIdNum objects for that patient.) 

717 # ------------------------------------------------------------------------- 

718 if trcon.tablename == Patient.__tablename__: 

719 # (a) Find old patient numbers 

720 old_patient = cast(Patient, oldobj) 

721 # noinspection PyUnresolvedReferences 

722 src_pt_query = ( 

723 select([text('*')]) 

724 .select_from(table(trcon.tablename)) 

725 .where(column(Patient.id.name) == old_patient.id) 

726 .where(column(Patient._current.name) == True) # noqa: E712 

727 .where(column(Patient._device_id.name) == old_patient._device_id) 

728 .where(column(Patient._era.name) == old_patient._era) 

729 ) 

730 rows = trcon.src_session.execute(src_pt_query) # type: ResultProxy 

731 list_of_dicts = [dict(row.items()) for row in rows] 

732 assert len(list_of_dicts) == 1, ( 

733 "Failed to fetch old patient IDs correctly; bug?" 

734 ) 

735 old_patient_dict = list_of_dicts[0] 

736 

737 # (b) If any don't exist in the new database, create them. 

738 # -- no, that's not right; we will be processing Patient before 

739 # PatientIdNum, so that should be: if any don't exist in the *source* 

740 # database, create them. 

741 src_tables = trcon.src_table_names 

742 for src_which_idnum in range(1, NUMBER_OF_IDNUMS_DEFUNCT + 1): 

743 old_fieldname = FP_ID_NUM + str(src_which_idnum) 

744 idnum_value = old_patient_dict[old_fieldname] 

745 if idnum_value is None: 

746 # Old Patient record didn't contain this ID number 

747 continue 

748 # Old Patient record *did* contain the ID number... 

749 if PatientIdNum.__tablename__ in src_tables: 

750 # noinspection PyUnresolvedReferences 

751 src_idnum_query = ( 

752 select([func.count()]) 

753 .select_from(table(PatientIdNum.__tablename__)) 

754 .where(column(PatientIdNum.patient_id.name) == 

755 old_patient.id) 

756 .where(column(PatientIdNum._current.name) == 

757 old_patient._current) 

758 .where(column(PatientIdNum._device_id.name) == 

759 old_patient._device_id) 

760 .where(column(PatientIdNum._era.name) == 

761 old_patient._era) 

762 .where(column(PatientIdNum.which_idnum.name) == 

763 src_which_idnum) 

764 ) 

765 n_present = trcon.src_session.execute(src_idnum_query).scalar() 

766 # ^^^ 

767 # ! 

768 if n_present != 0: 

769 # There was already a PatientIdNum for this which_idnum 

770 continue 

771 pidnum = PatientIdNum() 

772 # PatientIdNum fields: 

773 pidnum.id = fake_tablet_id_for_patientidnum( 

774 patient_id=old_patient.id, which_idnum=src_which_idnum) 

775 # ... guarantees a pseudo client (tablet) PK 

776 pidnum.patient_id = old_patient.id 

777 pidnum.which_idnum = get_dest_which_idnum(src_which_idnum, 

778 trcon, oldobj) 

779 pidnum.idnum_value = idnum_value 

780 # GenericTabletRecordMixin fields: 

781 # _pk: autogenerated 

782 # noinspection PyUnresolvedReferences 

783 pidnum._device_id = ( 

784 trcon.objmap[old_patient._device].id 

785 ) 

786 pidnum._era = old_patient._era 

787 pidnum._current = old_patient._current 

788 pidnum._when_added_exact = old_patient._when_added_exact 

789 pidnum._when_added_batch_utc = old_patient._when_added_batch_utc 

790 # noinspection PyUnresolvedReferences 

791 pidnum._adding_user_id = ( 

792 trcon.objmap[old_patient._adding_user].id 

793 if old_patient._adding_user is not None else None 

794 ) 

795 pidnum._when_removed_exact = old_patient._when_removed_exact 

796 pidnum._when_removed_batch_utc = old_patient._when_removed_batch_utc # noqa 

797 # noinspection PyUnresolvedReferences 

798 pidnum._removing_user_id = ( 

799 trcon.objmap[old_patient._removing_user].id 

800 if old_patient._removing_user is not None else None 

801 ) 

802 # noinspection PyUnresolvedReferences 

803 pidnum._preserving_user_id = ( 

804 trcon.objmap[old_patient._preserving_user].id 

805 if old_patient._preserving_user is not None else None 

806 ) 

807 pidnum._forcibly_preserved = old_patient._forcibly_preserved 

808 pidnum._predecessor_pk = None # Impossible to calculate properly 

809 pidnum._successor_pk = None # Impossible to calculate properly 

810 pidnum._manually_erased = old_patient._manually_erased 

811 pidnum._manually_erased_at = old_patient._manually_erased_at 

812 # noinspection PyUnresolvedReferences 

813 pidnum._manually_erasing_user_id = ( 

814 trcon.objmap[old_patient._manually_erasing_user].id 

815 if old_patient._manually_erasing_user is not None else None 

816 ) 

817 pidnum._camcops_version = old_patient._camcops_version 

818 pidnum._addition_pending = old_patient._addition_pending 

819 pidnum._removal_pending = old_patient._removal_pending 

820 pidnum._group_id = newobj._group_id 

821 # ... will have been set above if it was blank 

822 

823 # OK. 

824 log.debug("Inserting new PatientIdNum: {}", pidnum) 

825 trcon.dst_session.add(pidnum) 

826 

827 # ------------------------------------------------------------------------- 

828 # If we're inserting a PatientIdNum, make sure there is a corresponding 

829 # IdNumDefinition, and that it's valid 

830 # ------------------------------------------------------------------------- 

831 if trcon.tablename == PatientIdNum.__tablename__: 

832 src_pidnum = cast(PatientIdNum, oldobj) 

833 src_which_idnum = src_pidnum.which_idnum 

834 # Is it present? 

835 if src_which_idnum is None: 

836 raise ValueError(f"Bad PatientIdNum: {src_pidnum!r}") 

837 # Ensure the new object has an appropriate ID number FK: 

838 dst_pidnum = cast(PatientIdNum, newobj) 

839 dst_pidnum.which_idnum = get_dest_which_idnum(src_which_idnum, 

840 trcon, oldobj) 

841 

842 # ------------------------------------------------------------------------- 

843 # If we're merging from a more modern database with the IdNumDefinition 

844 # table, skip source IdNumDefinition records; the user must set these up 

845 # manually and specify whichidnum_map, for safety 

846 # ------------------------------------------------------------------------- 

847 if trcon.tablename == IdNumDefinition.__tablename__: 

848 trcon.newobj = None # don't insert this object 

849 # ... don't set "newobj = None"; that wouldn't alter trcon 

850 # Now make sure the map is OK: 

851 src_iddef = cast(IdNumDefinition, oldobj) 

852 trcon.objmap[oldobj] = get_dst_iddef( 

853 which_idnum=get_dest_which_idnum(src_iddef.which_idnum, 

854 trcon, src_iddef), 

855 dst_session=trcon.dst_session 

856 ) 

857 

858 # ------------------------------------------------------------------------- 

859 # Check we're not creating duplicates for anything uploaded 

860 # ------------------------------------------------------------------------- 

861 if isinstance(oldobj, GenericTabletRecordMixin): 

862 # noinspection PyTypeChecker 

863 cls = newobj.__class__ # type: Type[GenericTabletRecordMixin] 

864 # Records uploaded from tablets must be unique on the combination of: 

865 # id = table PK 

866 # _device_id = device 

867 # _era = device era 

868 # _when_removed_exact = removal date or NULL 

869 # noinspection PyUnresolvedReferences 

870 exists_query = ( 

871 select([func.count()]) 

872 .select_from(table(trcon.tablename)) 

873 .where(column(cls.id.name) == oldobj.id) 

874 .where(column(cls._device_id.name) == 

875 trcon.objmap[oldobj._device].id) 

876 .where(column(cls._era.name) == oldobj._era) 

877 .where(column(cls._when_removed_exact.name) == 

878 oldobj._when_removed_exact) 

879 ) 

880 # Note re NULLs... Although it's an inconvenient truth in SQL that 

881 # SELECT NULL = NULL; -- returns NULL 

882 # in this code we have a comparison of a column to a Python value. 

883 # SQLAlchemy is clever and renders "IS NULL" if the Python value is 

884 # None, or an "=" comparison otherwise. 

885 # If we were comparing a column to another column, we'd have to do 

886 # more; e.g. 

887 # 

888 # WRONG one-to-one join to self: 

889 # 

890 # SELECT a._pk, b._pk, a._when_removed_exact 

891 # FROM phq9 a 

892 # INNER JOIN phq9 b 

893 # ON a._pk = b._pk 

894 # AND a._when_removed_exact = b._when_removed_exact; 

895 # 

896 # -- drops all rows 

897 # 

898 # CORRECT one-to-one join to self: 

899 # 

900 # SELECT a._pk, b._pk, a._when_removed_exact 

901 # FROM phq9 a 

902 # INNER JOIN phq9 b 

903 # ON a._pk = b._pk 

904 # AND (a._when_removed_exact = b._when_removed_exact 

905 # OR (a._when_removed_exact IS NULL AND 

906 # b._when_removed_exact IS NULL)); 

907 # 

908 # -- returns all rows 

909 n_exists = trcon.dst_session.execute(exists_query).scalar() 

910 if n_exists > 0: 

911 # noinspection PyUnresolvedReferences 

912 existing_rec_q = ( 

913 select(['*']) 

914 .select_from(table(trcon.tablename)) 

915 .where(column(cls.id.name) == oldobj.id) 

916 .where(column(cls._device_id.name) == 

917 trcon.objmap[oldobj._device].id) 

918 .where(column(cls._era.name) == oldobj._era) 

919 .where(column(cls._when_removed_exact.name) == 

920 oldobj._when_removed_exact) 

921 ) 

922 resultproxy = trcon.dst_session.execute(existing_rec_q).fetchall() 

923 existing_rec = [dict(row) for row in resultproxy] 

924 log.critical( 

925 "Source record, inheriting from GenericTabletRecordMixin and " 

926 "shown below, already exists in destination database... " 

927 "in table {t!r}, clashing on: " 

928 "id={i!r}, device_id={d!r}, era={e!r}, " 

929 "_when_removed_exact={w!r}.\n" 

930 "ARE YOU TRYING TO MERGE THE SAME DATABASE IN TWICE? " 

931 "DON'T.", 

932 t=trcon.tablename, 

933 i=oldobj.id, 

934 d=oldobj._device_id, 

935 e=oldobj._era, 

936 w=oldobj._when_removed_exact, 

937 ) 

938 if (trcon.tablename == PatientIdNum.__tablename__ and 

939 (oldobj.id % NUMBER_OF_IDNUMS_DEFUNCT == 0)): 

940 log.critical( 

941 'Since this error has occurred for table {t!r} ' 

942 '(and for id % {n} == 0), ' 

943 'this error may reflect a previous bug in the patient ID ' 

944 'number fix for the database upload script, in which all ' 

945 'ID numbers for patients with patient.id = n were given ' 

946 'patient_idnum.id = n * {n} themselves (or possibly were ' 

947 'all given patient_idnum.id = 0). ' 

948 'Fix this by running, on the source database:\n\n' 

949 ' UPDATE patient_idnum SET id = _pk;\n\n', 

950 t=trcon.tablename, 

951 n=NUMBER_OF_IDNUMS_DEFUNCT, 

952 ) 

953 # Print the actual instance last; accessing them via pformat can 

954 # lead to crashes if there are missing source fields, as an 

955 # on-demand SELECT is executed sometimes (e.g. when a PatientIdNum 

956 # is printed, its Patient is selected, including the [user] 

957 # 'fullname' attribute that is absent in old databases). 

958 # Not a breaking point, since we're going to crash anyway, but 

959 # inelegant. 

960 # Since lazy loading (etc.) is configured at query time, the best 

961 # thing (as per Michael Bayer) is to detach the object from the 

962 # session: 

963 # https://groups.google.com/forum/#!topic/sqlalchemy/X_wA8K97smE 

964 trcon.src_session.expunge(oldobj) # prevent implicit queries 

965 # Then all should work: 

966 log_warning_srcobj(oldobj) 

967 log.critical( 

968 "Existing record(s) in destination DB was/were:\n\n{}\n\n", 

969 pformat(existing_rec)) 

970 raise ValueError("Attempt to insert duplicate record; see log " 

971 "message above.") 

972 

973 

974# ============================================================================= 

975# Postprocess 

976# ============================================================================= 

977 

978# noinspection PyUnusedLocal 

979def postprocess(src_engine: Engine, dst_session: Session) -> None: 

980 """ 

981 Implement any extra processing after :func:`merge_db` has been called. 

982 

983 - Reindexes tasks. 

984 - Warns you about things that need to be done manually. 

985 

986 Args: 

987 src_engine: source database SQLAlchemy engine 

988 dst_session: destination database SQLAlchemy session 

989 """ 

990 log.info("Reindexing destination database") 

991 reindex_everything(dst_session) 

992 log.warning("NOT IMPLEMENTED AUTOMATICALLY: copying user/group mapping " 

993 "from table {!r}; do this by hand.", 

994 UserGroupMembership.__tablename__) 

995 log.warning("NOT IMPLEMENTED AUTOMATICALLY: copying group/group mapping " 

996 "from table {!r}; do this by hand.", group_group_table.name) 

997 

998 

999# ============================================================================= 

1000# Main 

1001# ============================================================================= 

1002 

1003def merge_camcops_db(src: str, 

1004 echo: bool, 

1005 report_every: int, 

1006 dummy_run: bool, 

1007 info_only: bool, 

1008 default_group_id: Optional[int], 

1009 default_group_name: Optional[str], 

1010 groupnum_map: Dict[int, int], 

1011 whichidnum_map: Dict[int, int], 

1012 skip_export_logs: bool = True, 

1013 skip_audit_logs: bool = True) -> None: 

1014 """ 

1015 Merge an existing database (with a pre-v2 or later structure) into a 

1016 comtemporary CamCOPS database. 

1017 

1018 Args: 

1019 src: 

1020 source database SQLAlchemy URL 

1021 

1022 echo: 

1023 echo the SQL that is produced? 

1024 

1025 report_every: 

1026 provide a progress report every *n* records 

1027 

1028 dummy_run: 

1029 don't alter the destination database 

1030 

1031 info_only: 

1032 show info, then stop 

1033 

1034 default_group_id: 

1035 integer group ID (in the destination database) to use for source 

1036 records that have no group (because they come from a very old 

1037 source database) but need one 

1038 

1039 default_group_name: 

1040 group name (in the destination database) to use for source 

1041 records that have no group (because they come from a very old 

1042 source database) but need one 

1043 

1044 groupnum_map: 

1045 dictionary mapping group ID values from the source database to 

1046 the destination database 

1047 

1048 whichidnum_map: 

1049 dictionary mapping ``which_idnum`` values from the source database 

1050 to the destination database 

1051 

1052 skip_export_logs: 

1053 skip export log tables 

1054 

1055 skip_audit_logs: 

1056 skip audit log table 

1057 

1058 """ 

1059 req = get_command_line_request() # requires manual COMMIT; see below 

1060 src_engine = create_engine(src, echo=echo, pool_pre_ping=True) 

1061 log.info("SOURCE: " + get_safe_url_from_engine(src_engine)) 

1062 log.info("DESTINATION: " + get_safe_url_from_engine(req.engine)) 

1063 log.info("Destination ID number type map (source:destination) is: {!r}", 

1064 whichidnum_map) 

1065 log.info("Group number type map (source:destination) is {!r}", 

1066 groupnum_map) 

1067 

1068 # Delay the slow import until we've checked our syntax 

1069 log.info("Loading all models...") 

1070 # noinspection PyUnresolvedReferences 

1071 import camcops_server.cc_modules.cc_all_models # delayed import # import side effects (ensure all models registered) # noqa 

1072 log.info("Models loaded.") 

1073 

1074 # Now, any special dependencies? 

1075 # From the point of view of translating any tablet-related fields, the 

1076 # actual (server) PK values are irrelevant; all relationships will be 

1077 # identical if you change any PK (not standard database practice, but 

1078 # convenient here). 

1079 # The dependencies that do matter are server-side things, like user_id 

1080 # variables. 

1081 

1082 # For debugging only, some junk: 

1083 # test_dependencies = [ 

1084 # TableDependency(parent_tablename="patient", 

1085 # child_tablename="_dirty_tables") 

1086 # ] 

1087 

1088 # ------------------------------------------------------------------------- 

1089 # Tables to skip 

1090 # ------------------------------------------------------------------------- 

1091 

1092 skip_tables = [ 

1093 # Transient stuff we don't want to copy across, or wouldn't want to 

1094 # overwrite the destination with, or where the PK structure has 

1095 # changed and we don't care about old data: 

1096 TableIdentity(tablename=x) 

1097 for x in [ 

1098 CamcopsSession.__tablename__, 

1099 DirtyTable.__tablename__, 

1100 ServerSettings.__tablename__, 

1101 SecurityAccountLockout.__tablename__, 

1102 SecurityLoginFailure.__tablename__, 

1103 UserGroupMembership.__tablename__, 

1104 group_group_table.name, 

1105 ] 

1106 ] 

1107 

1108 # Tedious and bulky stuff the user may want to skip: 

1109 if skip_export_logs: 

1110 skip_tables.extend([ 

1111 TableIdentity(tablename=x) 

1112 for x in [ 

1113 Email.__tablename__, 

1114 ExportRecipient.__tablename__, 

1115 ExportedTask.__tablename__, 

1116 ExportedTaskEmail.__tablename__, 

1117 ExportedTaskFileGroup.__tablename__, 

1118 ExportedTaskHL7Message.__tablename__, 

1119 ] 

1120 ]) 

1121 if skip_audit_logs: 

1122 skip_tables.append(TableIdentity(tablename=AuditEntry.__tablename__)) 

1123 

1124 # ------------------------------------------------------------------------- 

1125 # Initial operations on SOURCE database 

1126 # ------------------------------------------------------------------------- 

1127 

1128 src_tables = get_table_names(src_engine) 

1129 skip_tables += get_skip_tables(src_tables=src_tables) 

1130 src_iddefs = get_src_iddefs(src_engine, src_tables) 

1131 log.info("Source ID number definitions: {!r}", src_iddefs) 

1132 

1133 # ------------------------------------------------------------------------- 

1134 # Initial operations on DESTINATION database 

1135 # ------------------------------------------------------------------------- 

1136 dst_session = req.dbsession 

1137 # So that system users get the first ID (cosmetic!): 

1138 _ = User.get_system_user(dbsession=dst_session) 

1139 _ = Device.get_server_device(dbsession=dst_session) 

1140 

1141 # ------------------------------------------------------------------------- 

1142 # Set up source-to-destination mappings 

1143 # ------------------------------------------------------------------------- 

1144 

1145 # Map source to destination ID number types 

1146 for src_which_idnum, dest_which_idnum in whichidnum_map.items(): 

1147 assert isinstance(src_which_idnum, int) 

1148 assert isinstance(dest_which_idnum, int) 

1149 src_iddef = src_iddefs[src_which_idnum] 

1150 dst_iddef = ensure_dest_iddef_exists(dest_which_idnum, dst_session) 

1151 ensure_no_iddef_clash(src_iddef, dst_iddef) 

1152 

1153 # Map source to destination group numbers 

1154 for src_groupnum, dest_groupnum in groupnum_map.items(): 

1155 assert isinstance(src_groupnum, int) 

1156 assert isinstance(dest_groupnum, int) 

1157 _ = get_dst_group(dest_groupnum, dst_session) 

1158 

1159 # ------------------------------------------------------------------------- 

1160 # Merge 

1161 # ------------------------------------------------------------------------- 

1162 

1163 # Merge! It's easy... 

1164 trcon_info = dict(default_group_id=default_group_id, 

1165 default_group_name=default_group_name, 

1166 src_iddefs=src_iddefs, 

1167 whichidnum_map=whichidnum_map, 

1168 groupnum_map=groupnum_map) 

1169 merge_db( 

1170 base_class=Base, 

1171 src_engine=src_engine, 

1172 dst_session=dst_session, 

1173 allow_missing_src_tables=True, 

1174 allow_missing_src_columns=True, 

1175 translate_fn=translate_fn, 

1176 skip_tables=skip_tables, 

1177 only_tables=None, 

1178 tables_to_keep_pks_for=None, 

1179 # extra_table_dependencies=test_dependencies, 

1180 extra_table_dependencies=None, 

1181 dummy_run=dummy_run, 

1182 info_only=info_only, 

1183 report_every=report_every, 

1184 flush_per_table=True, 

1185 flush_per_record=False, 

1186 commit_with_flush=False, 

1187 commit_at_end=True, 

1188 prevent_eager_load=True, 

1189 trcon_info=trcon_info 

1190 ) 

1191 

1192 # ------------------------------------------------------------------------- 

1193 # Postprocess 

1194 # ------------------------------------------------------------------------- 

1195 

1196 postprocess(src_engine=src_engine, dst_session=dst_session) 

1197 

1198 # ------------------------------------------------------------------------- 

1199 # Done 

1200 # ------------------------------------------------------------------------- 

1201 

1202 dst_session.commit()