Coverage for cc_modules/merge_db.py: 18%

283 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/merge_db.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Tool to merge data from one CamCOPS database into another.** 

27 

28Has special code to deal with old databases. 

29 

30""" 

31 

32import logging 

33from pprint import pformat 

34from typing import Any, cast, Dict, List, Optional, Type 

35 

36from cardinal_pythonlib.sqlalchemy.merge_db import ( 

37 merge_db, 

38 TableDependency, 

39 TranslationContext, 

40) 

41from cardinal_pythonlib.sqlalchemy.schema import get_table_names 

42from cardinal_pythonlib.sqlalchemy.session import get_safe_url_from_engine 

43from cardinal_pythonlib.sqlalchemy.table_identity import TableIdentity 

44from sqlalchemy.engine import create_engine 

45from sqlalchemy.engine.base import Engine 

46from sqlalchemy.engine.result import Result 

47from sqlalchemy.engine.row import RowMapping 

48from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound 

49from sqlalchemy.orm.session import Session, sessionmaker 

50from sqlalchemy.sql.expression import column, func, select, table, text 

51 

52from camcops_server.cc_modules.cc_audit import AuditEntry 

53from camcops_server.cc_modules.cc_constants import ( 

54 FP_ID_NUM, 

55 NUMBER_OF_IDNUMS_DEFUNCT, 

56) 

57from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin 

58from camcops_server.cc_modules.cc_device import Device 

59from camcops_server.cc_modules.cc_dirtytables import DirtyTable 

60from camcops_server.cc_modules.cc_email import Email 

61from camcops_server.cc_modules.cc_exportmodels import ( 

62 ExportedTask, 

63 ExportedTaskEmail, 

64 ExportedTaskFileGroup, 

65 ExportedTaskHL7Message, 

66) 

67from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

68from camcops_server.cc_modules.cc_group import Group, group_group_table 

69from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

70from camcops_server.cc_modules.cc_membership import UserGroupMembership 

71from camcops_server.cc_modules.cc_patient import Patient 

72from camcops_server.cc_modules.cc_patientidnum import ( 

73 fake_tablet_id_for_patientidnum, 

74 PatientIdNum, 

75) 

76from camcops_server.cc_modules.cc_request import get_command_line_request 

77from camcops_server.cc_modules.cc_session import CamcopsSession 

78from camcops_server.cc_modules.cc_serversettings import ( 

79 server_stored_var_table_defunct, 

80 ServerSettings, 

81 ServerStoredVarNamesDefunct as StoredVarDefunct, 

82) 

83from camcops_server.cc_modules.cc_sqlalchemy import Base 

84from camcops_server.cc_modules.cc_taskindex import reindex_everything 

85from camcops_server.cc_modules.cc_user import ( 

86 SecurityAccountLockout, 

87 SecurityLoginFailure, 

88 User, 

89) 

90 

91log = logging.getLogger(__name__) 

92 

93DEBUG_VIA_PDB = False 

94 

95 

96# ============================================================================= 

97# Information relating to the source database 

98# ============================================================================= 

99 

100 

101def get_skip_tables(src_tables: List[str]) -> List[TableIdentity]: 

102 """ 

103 From the list of source table names provided, return details of tables in 

104 the metadata to skip because they are not in the source database. 

105 

106 Also checks that some core CamCOPS tables are present in the source, or 

107 raises :exc:`ValueError`. 

108 

109 Args: 

110 src_tables: list of all table names in the source database 

111 

112 Returns: 

113 list of 

114 :class:`cardinal_pythonlib.sqlalchemy.table_identity.TableIdentity` 

115 objects representing tables to skip 

116 

117 Note that other tables to skip are defined in :func:`merge_camcops_db`. 

118 

119 """ 

120 skip_tables: List[TableIdentity] = [] 

121 

122 # Check we have some core tables present in the sources 

123 

124 for tname in (Patient.__tablename__, User.__tablename__): 

125 if tname not in src_tables: 

126 raise ValueError( 

127 f"Cannot proceed; table {tname!r} missing from source; " 

128 f"unlikely that the source is any sort of old CamCOPS " 

129 f"database!" 

130 ) 

131 

132 # In general, we allow missing source tables. 

133 # However, we can't allow source tables to be missing if they are 

134 # automatically eager-loaded by relationships. This is only true in 

135 # CamCOPS for some high-performance queries: Patient, User, 

136 # PatientIdNum. In the context of merges we're going to run, that means 

137 # PatientIdNum. 

138 

139 # SKIP -- disable eager loading instead 

140 # # Create patient ID number table in SOURCE database, because it's 

141 # # eager-loaded 

142 # if PatientIdNum.__tablename__ not in src_tables: 

143 # create_table_from_orm_class(engine=src_engine, 

144 # ormclass=PatientIdNum, 

145 # without_constraints=True) 

146 

147 if Group.__tablename__ not in src_tables: 

148 log.warning( 

149 f"No Group information in source database; skipping source " 

150 f"table {Group.__tablename__!r}; will create a default group" 

151 ) 

152 skip_tables.append(TableIdentity(tablename=Group.__tablename__)) 

153 

154 return skip_tables 

155 

156 

157def get_src_iddefs( 

158 src_engine: Engine, src_tables: List[str] 

159) -> Dict[int, IdNumDefinition]: 

160 """ 

161 Get information about all the ID number definitions in the source database. 

162 

163 Args: 

164 src_engine: source SQLAlchemy :class:`Engine` 

165 src_tables: list of all table names in the source database 

166 

167 Returns: 

168 dictionary: ``{which_idnum: idnumdef}`` mappings, where each 

169 ``idnumdef`` is a 

170 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` not 

171 attached to any database session 

172 """ 

173 iddefs: Dict[int, IdNumDefinition] = {} 

174 with src_engine.connect() as connection: 

175 if IdNumDefinition.__tablename__ in src_tables: 

176 # Source is a more modern CamCOPS database, with an IdNumDefinition 

177 # table. 

178 log.info( 

179 f"Fetching source ID number definitions from " 

180 f"{IdNumDefinition.__tablename__!r} table" 

181 ) 

182 # noinspection PyUnresolvedReferences 

183 q = ( 

184 select( 

185 IdNumDefinition.which_idnum, 

186 IdNumDefinition.description, 

187 IdNumDefinition.short_description, 

188 ) 

189 .select_from(IdNumDefinition.__table__) 

190 .order_by(IdNumDefinition.which_idnum) 

191 ) 

192 rows = connection.execute(q).fetchall() 

193 for row in rows: 

194 which_idnum = row[0] 

195 iddefs[which_idnum] = IdNumDefinition( 

196 which_idnum=which_idnum, 

197 description=row[1], 

198 short_description=row[2], 

199 ) 

200 elif server_stored_var_table_defunct.name in src_tables: 

201 # Source is an older CamCOPS database. 

202 log.info( 

203 f"Fetching source ID number definitions from " 

204 f"{server_stored_var_table_defunct.name!r} table" 

205 ) 

206 for which_idnum in range(1, NUMBER_OF_IDNUMS_DEFUNCT + 1): 

207 nstr = str(which_idnum) 

208 qd = ( 

209 select(server_stored_var_table_defunct.columns.valueText) 

210 .select_from(server_stored_var_table_defunct) 

211 .where( 

212 server_stored_var_table_defunct.columns.name 

213 == StoredVarDefunct.ID_DESCRIPTION_PREFIX + nstr 

214 ) 

215 ) 

216 rd = connection.execute(qd).fetchall() 

217 qs = ( 

218 select(server_stored_var_table_defunct.columns.valueText) 

219 .select_from(server_stored_var_table_defunct) 

220 .where( 

221 server_stored_var_table_defunct.columns.name 

222 == StoredVarDefunct.ID_SHORT_DESCRIPTION_PREFIX + nstr 

223 ) 

224 ) 

225 rs = connection.execute(qs).fetchall() 

226 iddefs[which_idnum] = IdNumDefinition( 

227 which_idnum=which_idnum, 

228 description=rd[0][0] if rd else None, 

229 short_description=rs[0][0] if rs else None, 

230 ) 

231 else: 

232 log.warning( 

233 "No information available on source ID number descriptions" 

234 ) 

235 return iddefs 

236 

237 

238# ============================================================================= 

239# Information relating to the destination database 

240# ============================================================================= 

241 

242 

243def group_exists(group_id: int, dst_session: Session) -> bool: 

244 """ 

245 Does a group exist in the destination session with the specified group ID? 

246 

247 Args: 

248 group_id: integer group ID 

249 dst_session: destination SQLAlchemy :class:`Session` 

250 """ 

251 return Group.group_exists(dbsession=dst_session, group_id=group_id) 

252 

253 

254def fetch_group_id_by_name(group_name: str, dst_session: Session) -> int: 

255 """ 

256 Returns the group ID of the group with the specified name, in the 

257 destination session. 

258 

259 If there are multiple such groups, that's a bug, and 

260 :exc:`MultipleResultsFound` will be raised. 

261 

262 If there's no such group in the destination database with that name, one 

263 will be created, and its ID returned. 

264 

265 Args: 

266 group_name: group name 

267 dst_session: destination SQLAlchemy :class:`Session` 

268 

269 Returns: 

270 group ID in the destination database 

271 

272 """ 

273 try: 

274 group: Group = ( 

275 dst_session.query(Group).filter(Group.name == group_name).one() 

276 ) 

277 # ... will fail if there are 0 or >1 results 

278 except MultipleResultsFound: 

279 log.critical( 

280 f"Nasty bug: can't have two groups with the same name! " 

281 f"Group name was {group_name!r}" 

282 ) 

283 raise 

284 except NoResultFound: 

285 log.info(f"Creating new group named {group_name!r}") 

286 group = Group() 

287 group.name = group_name 

288 dst_session.add(group) 

289 flush_session(dst_session) # creates the PK 

290 # https://stackoverflow.com/questions/1316952/sqlalchemy-flush-and-get-inserted-id # noqa 

291 log.info(f"... new group has ID {group.id!r}") 

292 return group.id 

293 

294 

295def get_dst_group(dest_groupnum: int, dst_session: Session) -> Group: 

296 """ 

297 Ensures that the specified group number exists in the destination database 

298 and returns the corresponding group. 

299 

300 Args: 

301 dest_groupnum: group number 

302 dst_session: SQLAlchemy session for the destination database 

303 

304 Returns: 

305 the group 

306 

307 Raises: 

308 :exc:`ValueError` upon failure 

309 """ 

310 try: 

311 group: Group = ( 

312 dst_session.query(Group).filter(Group.id == dest_groupnum).one() 

313 ) 

314 # ... will fail if there are 0 or >1 results 

315 except MultipleResultsFound: 

316 log.critical( 

317 f"Nasty bug: can't have two groups with the same ID! " 

318 f"Group ID was {dest_groupnum!r}" 

319 ) 

320 raise 

321 except NoResultFound: 

322 raise ValueError( 

323 f"Group with ID {dest_groupnum} missing from " 

324 f"destination database" 

325 ) 

326 return group 

327 

328 

329def ensure_dest_iddef_exists( 

330 which_idnum: int, dst_session: Session 

331) -> IdNumDefinition: 

332 """ 

333 Ensures that the specified ID number type exists in the destination 

334 database. 

335 

336 Args: 

337 which_idnum: ID number type 

338 dst_session: SQLAlchemy session for the destination database 

339 

340 Raises: 

341 :exc:`ValueError` upon failure 

342 """ 

343 try: 

344 iddef: IdNumDefinition = ( 

345 dst_session.query(IdNumDefinition) 

346 .filter(IdNumDefinition.which_idnum == which_idnum) 

347 .one() 

348 ) 

349 # ... will fail if there are 0 or >1 results 

350 except MultipleResultsFound: 

351 log.critical( 

352 f"Nasty bug: can't have two ID number types with the same " 

353 f"which_idnum! which_idnum was {which_idnum!r}" 

354 ) 

355 raise 

356 except NoResultFound: 

357 raise ValueError( 

358 f"ID number type with which_idnum={which_idnum} " 

359 f"missing from destination database" 

360 ) 

361 return iddef 

362 

363 

364def get_dst_iddef( 

365 dst_session: Session, which_idnum: int 

366) -> Optional[IdNumDefinition]: 

367 """ 

368 Fetches an ID number definition from the destination database, ensuring it 

369 exists. 

370 

371 Args: 

372 dst_session: destination SQLAlchemy :class:`Session` 

373 which_idnum: integer expressing which ID number type to look up 

374 

375 Returns: 

376 an :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, or 

377 ``None`` if none was found 

378 

379 """ 

380 return ( 

381 dst_session.query(IdNumDefinition) 

382 .filter(IdNumDefinition.which_idnum == which_idnum) 

383 .first() 

384 ) 

385 

386 

387# ============================================================================= 

388# Extra translation to be applied to individual objects 

389# ============================================================================= 

390# The extra logic for this database: 

391 

392 

393def flush_session(dst_session: Session) -> None: 

394 """ 

395 Flushes the destination SQLAlchemy session. 

396 """ 

397 log.debug("Flushing session") 

398 dst_session.flush() 

399 

400 

401def ensure_default_group_id(trcon: TranslationContext) -> None: 

402 """ 

403 Ensure that the :class:`TranslationContext` has a ``default_group_id`` 

404 key in its ``info`` dictionary. This is the ID, in the destination 

405 database, of the group to put records in where those records come from 

406 an older, pre-group-based CamCOPS database. 

407 

408 The user may have specified that ``default_group_id` on the command line. 

409 Otherwise, they may have specified a ``default_group_name``, so we'll use 

410 the ID of that group (creating it if necessary). If they specified neither, 

411 we will raise an :exc:`AssertionError`, because we have come to a 

412 situation where we need one or the other. 

413 

414 Args: 

415 trcon: the :class:`TranslationContext` 

416 

417 """ 

418 default_group_id: Optional[int] = trcon.info["default_group_id"] 

419 if default_group_id is not None: 

420 # The user specified a group ID to use for records without one 

421 assert group_exists( 

422 group_id=default_group_id, dst_session=trcon.dst_session 

423 ), ( 

424 "User specified default_group_id={!r}, and object {!r} needs " 

425 "a _group_id (directly or indirectly), but that ID doesn't exist " 

426 "in the {!r} table of the destination database".format( 

427 default_group_id, trcon.oldobj, Group.__tablename__ 

428 ) 

429 ) 

430 else: 

431 default_group_name: Optional[str] = trcon.info["default_group_name"] 

432 if not default_group_name: 

433 assert False, ( 

434 "User specified neither default_group_id or " 

435 "default_group_name, but object {!r} needs a " 

436 "_group_id, directly or indirectly".format(trcon.oldobj) 

437 ) 

438 default_group_id = fetch_group_id_by_name( 

439 group_name=default_group_name, dst_session=trcon.dst_session 

440 ) 

441 trcon.info["default_group_id"] = default_group_id # for next time! 

442 

443 

444def ensure_no_iddef_clash( 

445 src_iddef: IdNumDefinition, dst_iddef: IdNumDefinition 

446) -> None: 

447 """ 

448 Ensure that a given source and destination pair of ID number definitions, 

449 which must match on ``which_idnum``, have the same description and short 

450 description, or raise :exc:`ValueError`. 

451 

452 Args: 

453 src_iddef: source 

454 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` 

455 dst_iddef: destination 

456 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` 

457 """ 

458 # We do NOT need to check that 

459 # src_iddef.which_idnum == dst_iddef.which_idnum 

460 # ... they may be different during a database merge. 

461 if src_iddef.description != dst_iddef.description: 

462 raise ValueError( 

463 "ID description mismatch for ID#{}: source {!r}, " 

464 "destination {!r}".format( 

465 src_iddef.which_idnum, 

466 src_iddef.description, 

467 dst_iddef.description, 

468 ) 

469 ) 

470 if src_iddef.short_description != dst_iddef.short_description: 

471 raise ValueError( 

472 "ID short_description mismatch for ID#{}: source {!r}, " 

473 "destination {!r}".format( 

474 src_iddef.which_idnum, 

475 src_iddef.short_description, 

476 dst_iddef.short_description, 

477 ) 

478 ) 

479 

480 

481def log_warning_srcobj(srcobj: Any) -> None: 

482 """ 

483 Prints a source (old) object to the log. 

484 

485 Args: 

486 srcobj: the source object 

487 """ 

488 log.warning(f"Source was:\n\n{pformat(srcobj.__dict__)}\n\n") 

489 

490 

491def get_dest_groupnum( 

492 src_groupnum: int, trcon: TranslationContext, oldobj: Any 

493) -> int: 

494 """ 

495 For a given source group number, returns the corresponding destination 

496 group number (validating en route). 

497 

498 Args: 

499 src_groupnum: the group number in the source database 

500 trcon: the :class:`TranslationContext` 

501 oldobj: the source object 

502 

503 Returns: 

504 the corresponding which_idnum in the destination database 

505 

506 Raises: 

507 :exc:`ValueError` if bad 

508 """ 

509 groupnum_map: Dict[int, int] = trcon.info["groupnum_map"] 

510 if src_groupnum not in groupnum_map: 

511 log_warning_srcobj(oldobj) 

512 log.critical( 

513 f"Old database contains group number {src_groupnum} and " 

514 f"equivalent group in destination not known; {groupnum_map=}" 

515 ) 

516 raise ValueError("Bad group mapping") 

517 return groupnum_map[src_groupnum] 

518 

519 

520def get_dest_which_idnum( 

521 src_which_idnum: int, trcon: TranslationContext, oldobj: Any 

522) -> int: 

523 """ 

524 For a given source ID number type, returns the corresponding destination 

525 ID number type (validating en route). 

526 

527 Args: 

528 src_which_idnum: which_idnum in the source database 

529 trcon: the :class:`TranslationContext` 

530 oldobj: the source object 

531 

532 Returns: 

533 the corresponding which_idnum in the destination database 

534 

535 Raises: 

536 :exc:`ValueError` if bad 

537 

538 """ 

539 whichidnum_map: Dict[int, int] = trcon.info["whichidnum_map"] 

540 if src_which_idnum not in whichidnum_map: 

541 log_warning_srcobj(oldobj) 

542 log.critical( 

543 f"Old database contains ID number definitions of type " 

544 f"{src_which_idnum} and equivalent ID number type in destination " 

545 f"not known; {whichidnum_map=}" 

546 ) 

547 raise ValueError("Bad ID number type mapping") 

548 return whichidnum_map[src_which_idnum] 

549 

550 

551# noinspection PyProtectedMember 

552def camcops_mergedb_translate_fn(trcon: TranslationContext) -> None: 

553 """ 

554 Function to translate source objects to their destination counterparts, 

555 where special processing is required. Called as a callback from 

556 :func:`cardinal_pythonlib.sqlalchemy.merge_db.merge_db`. 

557 

558 Args: 

559 trcon: the :class:`TranslationContext`; all the relevant information is 

560 in here, and our function modifies its members. 

561 

562 This function does the following things: 

563 

564 - For any records uploaded from tablets: set ``_group_id``, if it's blank. 

565 

566 - For :class:`camcops_server.cc_modules.cc_user.User` objects: if an 

567 identical user is found in the destination database, merge on it rather 

568 than creating a new one. Users with matching usernames are considered to 

569 be identical. 

570 

571 - For :class:`Device` objects: if an identical device is found, merge on it 

572 rather than creating a new one. Devices with matching names are 

573 considered to be identical. 

574 

575 - For :class:`camcops_server.cc_modules.cc_group.Group` objects: if an 

576 identical group is found, merge on it rather than creating a new one. 

577 Groups with matching names are considered to be identical. 

578 

579 - For :class:`camcops_server.cc_modules.cc_patient.Patient` objects: if any 

580 have ID numbers in the old format (as columns in the Patient table), 

581 convert them to the :class:`PatientIdNum` system. 

582 

583 - If we're inserting a :class:`PatientIdNum`, make sure there is a 

584 corresponding 

585 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, and that 

586 it's valid. 

587 

588 - If we're merging from a more modern database with the 

589 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` table, 

590 check our ID number definitions don't conflict. 

591 

592 - Check we're not creating duplicates for anything uploaded. 

593 

594 """ 

595 log.debug(f"Translating object from table: {trcon.tablename!r}") 

596 oldobj = trcon.oldobj 

597 newobj = trcon.newobj 

598 

599 # ------------------------------------------------------------------------- 

600 # Set _group_id correctly for tablet records 

601 # ------------------------------------------------------------------------- 

602 if isinstance(oldobj, GenericTabletRecordMixin): 

603 if ( 

604 "_group_id" in trcon.missing_src_columns 

605 or oldobj._group_id is None 

606 ): 

607 # ... order that "if" statement carefully; if the _group_id column 

608 # is missing from the source, don't touch oldobj._group_id or 

609 # it'll trigger a DB query that fails. 

610 # 

611 # Set _group_id because it's blank 

612 # 

613 ensure_default_group_id(trcon) 

614 default_group_id: int = trcon.info["default_group_id"] 

615 log.debug(f"Assigning new _group_id of {default_group_id!r}") 

616 newobj._group_id = default_group_id # type: ignore[attr-defined] 

617 else: 

618 # 

619 # Re-map _group_id 

620 # 

621 newobj._group_id = get_dest_groupnum( # type: ignore[attr-defined] 

622 oldobj._group_id, trcon, oldobj 

623 ) 

624 

625 # ------------------------------------------------------------------------- 

626 # If an identical user is found, merge on it rather than creating a new 

627 # one. Users with matching usernames are considered to be identical. 

628 # ------------------------------------------------------------------------- 

629 if trcon.tablename == User.__tablename__: 

630 src_user = cast(User, oldobj) 

631 src_username = src_user.username 

632 matching_user: Optional[User] = ( 

633 trcon.dst_session.query(User) 

634 .filter(User.username == src_username) 

635 .one_or_none() 

636 ) 

637 if matching_user is not None: 

638 log.debug( 

639 f"Matching User (username {matching_user.username!r}) found; " 

640 f"merging" 

641 ) 

642 trcon.newobj = matching_user # so that related records will work 

643 

644 # ------------------------------------------------------------------------- 

645 # If an identical device is found, merge on it rather than creating a 

646 # new one. Devices with matching names are considered to be identical. 

647 # ------------------------------------------------------------------------- 

648 if trcon.tablename == Device.__tablename__: 

649 # log.debug(f"considering: {trcon=}") 

650 src_device = cast(Device, oldobj) 

651 src_devicename = src_device.name 

652 matching_device: Optional[Device] = ( 

653 trcon.dst_session.query(Device) 

654 .filter(Device.name == src_devicename) 

655 .one_or_none() 

656 ) 

657 if matching_device is not None: 

658 log.debug( 

659 f"Matching Device (name {matching_device.name!r}) found; " 

660 f"merging" 

661 ) 

662 trcon.newobj = matching_device 

663 

664 # BUT BEWARE, BECAUSE IF YOU MERGE THE SAME DATABASE TWICE (even if 

665 # that's a silly thing to do...), MERGING DEVICES WILL BREAK THE KEY 

666 # RELATIONSHIPS. For example, 

667 # source: 

668 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1 

669 # dest after first merge: 

670 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1 

671 # dest after second merge: 

672 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1 

673 # pk = 2, id = 1, device = 100, era = 'NOW', current = 1 

674 # ... so you get a clash/duplicate. 

675 # Mind you, that's fair, because there is a duplicate. 

676 # SO WE DO SEPARATE DUPLICATE CHECKING, below. 

677 

678 # ------------------------------------------------------------------------- 

679 # Don't copy Group records; the user must set these up manually and specify 

680 # groupnum_map, for safety 

681 # ------------------------------------------------------------------------- 

682 if trcon.tablename == Group.__tablename__: 

683 trcon.newobj = None # don't insert this object 

684 # ... don't set "newobj = None"; that wouldn't alter trcon 

685 # Now make sure the map is OK: 

686 src_group = cast(Group, oldobj) 

687 trcon.objmap[oldobj] = get_dst_group( 

688 dest_groupnum=get_dest_groupnum(src_group.id, trcon, src_group), 

689 dst_session=trcon.dst_session, 

690 ) 

691 

692 # ------------------------------------------------------------------------- 

693 # If there are any patient numbers in the old format (as a set of 

694 # columns in the Patient table) which were not properly converted 

695 # to the new format (as individual records in the PatientIdNum 

696 # table), create new entries. 

697 # Only worth bothering with for _current entries. 

698 # (More explicitly: do not create new PatientIdNum entries for non-current 

699 # patients; it's very fiddly if there might be asynchrony between 

700 # Patient and PatientIdNum objects for that patient.) 

701 # ------------------------------------------------------------------------- 

702 if trcon.tablename == Patient.__tablename__: 

703 # (a) Find old patient numbers 

704 old_patient = cast(Patient, oldobj) 

705 # noinspection PyUnresolvedReferences 

706 src_pt_query = ( 

707 select(text("*")) 

708 .select_from(table(trcon.tablename)) 

709 .where(column(Patient.id.name) == old_patient.id) 

710 .where(column(Patient._current.name) == True) # noqa: E712 

711 .where(column(Patient._device_id.name) == old_patient._device_id) 

712 .where(column(Patient._era.name) == old_patient._era) 

713 ) 

714 

715 result: Result = trcon.src_session.execute(src_pt_query) 

716 list_of_dicts: List[RowMapping] = result.mappings().fetchall() # type: ignore[assignment] # noqa: E501 

717 assert ( 

718 len(list_of_dicts) == 1 

719 ), "Failed to fetch old patient IDs correctly; bug?" 

720 old_patient_dict = list_of_dicts[0] 

721 

722 # (b) If any don't exist in the new database, create them. 

723 # -- no, that's not right; we will be processing Patient before 

724 # PatientIdNum, so that should be: if any don't exist in the *source* 

725 # database, create them. 

726 src_tables = trcon.src_table_names 

727 for src_which_idnum in range(1, NUMBER_OF_IDNUMS_DEFUNCT + 1): 

728 old_fieldname = FP_ID_NUM + str(src_which_idnum) 

729 idnum_value = old_patient_dict.get(old_fieldname) 

730 if idnum_value is None: 

731 # Old Patient record didn't contain this ID number 

732 # (value None -- or key missing) 

733 continue 

734 # Old Patient record *did* contain the ID number... 

735 if PatientIdNum.__tablename__ in src_tables: 

736 # noinspection PyUnresolvedReferences 

737 src_idnum_query = ( 

738 select(func.count()) 

739 .select_from(table(PatientIdNum.__tablename__)) 

740 .where( 

741 column(PatientIdNum.patient_id.name) == old_patient.id 

742 ) 

743 .where( 

744 column(PatientIdNum._current.name) 

745 == old_patient._current 

746 ) 

747 .where( 

748 column(PatientIdNum._device_id.name) 

749 == old_patient._device_id 

750 ) 

751 .where(column(PatientIdNum._era.name) == old_patient._era) 

752 .where( 

753 column(PatientIdNum.which_idnum.name) 

754 == src_which_idnum 

755 ) 

756 ) 

757 n_present = trcon.src_session.execute(src_idnum_query).scalar() 

758 # ^^^ 

759 # ! 

760 if n_present != 0: 

761 # There was already a PatientIdNum for this which_idnum 

762 continue 

763 pidnum = PatientIdNum() 

764 # PatientIdNum fields: 

765 pidnum.id = fake_tablet_id_for_patientidnum( 

766 patient_id=old_patient.id, which_idnum=src_which_idnum 

767 ) 

768 # ... guarantees a pseudo client (tablet) PK 

769 pidnum.patient_id = old_patient.id 

770 pidnum.which_idnum = get_dest_which_idnum( 

771 src_which_idnum, trcon, oldobj 

772 ) 

773 pidnum.idnum_value = idnum_value 

774 # GenericTabletRecordMixin fields: 

775 # _pk: autogenerated 

776 # noinspection PyUnresolvedReferences 

777 pidnum._device_id = trcon.objmap[old_patient._device].id # type: ignore[attr-defined] # noqa: E501 

778 pidnum._era = old_patient._era 

779 pidnum._current = old_patient._current 

780 pidnum._when_added_exact = old_patient._when_added_exact 

781 pidnum._when_added_batch_utc = old_patient._when_added_batch_utc 

782 # noinspection PyUnresolvedReferences 

783 pidnum._adding_user_id = ( 

784 trcon.objmap[old_patient._adding_user].id # type: ignore[attr-defined] # noqa: E501 

785 if old_patient._adding_user is not None 

786 else None 

787 ) 

788 pidnum._when_removed_exact = old_patient._when_removed_exact 

789 pidnum._when_removed_batch_utc = ( 

790 old_patient._when_removed_batch_utc 

791 ) 

792 # noinspection PyUnresolvedReferences 

793 pidnum._removing_user_id = ( 

794 trcon.objmap[old_patient._removing_user].id # type: ignore[attr-defined] # noqa: E501 

795 if old_patient._removing_user is not None 

796 else None 

797 ) 

798 # noinspection PyUnresolvedReferences 

799 pidnum._preserving_user_id = ( 

800 trcon.objmap[old_patient._preserving_user].id # type: ignore[attr-defined] # noqa: E501 

801 if old_patient._preserving_user is not None 

802 else None 

803 ) 

804 pidnum._forcibly_preserved = old_patient._forcibly_preserved 

805 pidnum._predecessor_pk = None # Impossible to calculate properly 

806 pidnum._successor_pk = None # Impossible to calculate properly 

807 pidnum._manually_erased = old_patient._manually_erased 

808 pidnum._manually_erased_at = old_patient._manually_erased_at 

809 # noinspection PyUnresolvedReferences 

810 pidnum._manually_erasing_user_id = ( 

811 trcon.objmap[old_patient._manually_erasing_user].id # type: ignore[attr-defined] # noqa: E501 

812 if old_patient._manually_erasing_user is not None 

813 else None 

814 ) 

815 pidnum._camcops_version = old_patient._camcops_version 

816 pidnum._addition_pending = old_patient._addition_pending 

817 pidnum._removal_pending = old_patient._removal_pending 

818 pidnum._group_id = newobj._group_id # type: ignore[attr-defined] 

819 # ... will have been set above if it was blank 

820 

821 # OK. 

822 log.debug(f"Inserting new PatientIdNum: {pidnum}") 

823 trcon.dst_session.add(pidnum) 

824 

825 # ------------------------------------------------------------------------- 

826 # If we're inserting a PatientIdNum, make sure there is a corresponding 

827 # IdNumDefinition, and that it's valid 

828 # ------------------------------------------------------------------------- 

829 if trcon.tablename == PatientIdNum.__tablename__: 

830 src_pidnum = cast(PatientIdNum, oldobj) 

831 src_which_idnum = src_pidnum.which_idnum 

832 # Is it present? 

833 if src_which_idnum is None: 

834 raise ValueError(f"Bad PatientIdNum: {src_pidnum!r}") 

835 # Ensure the new object has an appropriate ID number FK: 

836 dst_pidnum = cast(PatientIdNum, newobj) 

837 dst_pidnum.which_idnum = get_dest_which_idnum( 

838 src_which_idnum, trcon, oldobj 

839 ) 

840 

841 # ------------------------------------------------------------------------- 

842 # If we're merging from a more modern database with the IdNumDefinition 

843 # table, skip source IdNumDefinition records; the user must set these up 

844 # manually and specify whichidnum_map, for safety 

845 # ------------------------------------------------------------------------- 

846 if trcon.tablename == IdNumDefinition.__tablename__: 

847 trcon.newobj = None # don't insert this object 

848 # ... don't set "newobj = None"; that wouldn't alter trcon 

849 # Now make sure the map is OK: 

850 src_iddef = cast(IdNumDefinition, oldobj) 

851 trcon.objmap[oldobj] = get_dst_iddef( 

852 which_idnum=get_dest_which_idnum( 

853 src_iddef.which_idnum, trcon, src_iddef 

854 ), 

855 dst_session=trcon.dst_session, 

856 ) 

857 

858 # ------------------------------------------------------------------------- 

859 # Check we're not creating duplicates for anything uploaded 

860 # ------------------------------------------------------------------------- 

861 if isinstance(oldobj, GenericTabletRecordMixin): 

862 # noinspection PyTypeChecker 

863 cls: Type[GenericTabletRecordMixin] = newobj.__class__ # type: ignore[assignment] # noqa: E501 

864 # Records uploaded from tablets must be unique on the combination of: 

865 # id = table PK 

866 # _device_id = device 

867 # _era = device era 

868 # _when_removed_exact = removal date or NULL 

869 # noinspection PyUnresolvedReferences 

870 exists_query = ( 

871 select(func.count()) 

872 .select_from(table(trcon.tablename)) 

873 .where(column(cls.id.name) == oldobj.id) 

874 .where( 

875 column(cls._device_id.name) == trcon.objmap[oldobj._device].id # type: ignore[attr-defined] # noqa: E501 

876 ) 

877 .where(column(cls._era.name) == oldobj._era) 

878 .where( 

879 column(cls._when_removed_exact.name) 

880 == oldobj._when_removed_exact 

881 ) 

882 ) 

883 # Note re NULLs... Although it's an inconvenient truth in SQL that 

884 # SELECT NULL = NULL; -- returns NULL 

885 # in this code we have a comparison of a column to a Python value. 

886 # SQLAlchemy is clever and renders "IS NULL" if the Python value is 

887 # None, or an "=" comparison otherwise. 

888 # If we were comparing a column to another column, we'd have to do 

889 # more; e.g. 

890 # 

891 # WRONG one-to-one join to self: 

892 # 

893 # SELECT a._pk, b._pk, a._when_removed_exact 

894 # FROM phq9 a 

895 # INNER JOIN phq9 b 

896 # ON a._pk = b._pk 

897 # AND a._when_removed_exact = b._when_removed_exact; 

898 # 

899 # -- drops all rows 

900 # 

901 # CORRECT one-to-one join to self: 

902 # 

903 # SELECT a._pk, b._pk, a._when_removed_exact 

904 # FROM phq9 a 

905 # INNER JOIN phq9 b 

906 # ON a._pk = b._pk 

907 # AND (a._when_removed_exact = b._when_removed_exact 

908 # OR (a._when_removed_exact IS NULL AND 

909 # b._when_removed_exact IS NULL)); 

910 # 

911 # -- returns all rows 

912 n_exists = trcon.dst_session.execute(exists_query).scalar() 

913 if n_exists > 0: 

914 # noinspection PyUnresolvedReferences 

915 existing_rec_q = ( 

916 select("*") 

917 .select_from(table(trcon.tablename)) 

918 .where(column(cls.id.name) == oldobj.id) 

919 .where( 

920 column(cls._device_id.name) 

921 == trcon.objmap[oldobj._device].id # type: ignore[attr-defined] # noqa: E501 

922 ) 

923 .where(column(cls._era.name) == oldobj._era) 

924 .where( 

925 column(cls._when_removed_exact.name) 

926 == oldobj._when_removed_exact 

927 ) 

928 ) 

929 result: Result = trcon.dst_session.execute(existing_rec_q) # type: ignore[no-redef] # noqa: E501 

930 existing_rec: List[RowMapping] = result.mappings().fetchall() # type: ignore[assignment] # noqa: E501 

931 log.critical( 

932 f"Source record, inheriting from GenericTabletRecordMixin and " 

933 f"shown below, already exists in destination database... " 

934 f"in table {trcon.tablename!r}, clashing on: " 

935 f"id={oldobj.id!r}, " 

936 f"device_id={oldobj._device_id!r}, " 

937 f"era={oldobj._era!r}, " 

938 f"_when_removed_exact={oldobj._when_removed_exact!r}.\n" 

939 f"ARE YOU TRYING TO MERGE THE SAME DATABASE IN TWICE? DON'T." 

940 ) 

941 if trcon.tablename == PatientIdNum.__tablename__ and ( 

942 oldobj.id % NUMBER_OF_IDNUMS_DEFUNCT == 0 

943 ): 

944 log.critical( 

945 f"Since this error has occurred for table " 

946 f"{trcon.tablename!r} " 

947 f"(and for id % {NUMBER_OF_IDNUMS_DEFUNCT} == 0), " 

948 f"this error may reflect a previous bug in the patient ID " 

949 f"number fix for the database upload script, in which all " 

950 f"ID numbers for patients with patient.id = n were given " 

951 f"patient_idnum.id = n * {NUMBER_OF_IDNUMS_DEFUNCT} " 

952 f"themselves (or possibly were all given " 

953 f"patient_idnum.id = 0). " 

954 f"Fix this by running, on the source database:\n\n" 

955 f" UPDATE patient_idnum SET id = _pk;\n\n" 

956 ) 

957 # Print the actual instance last; accessing them via pformat can 

958 # lead to crashes if there are missing source fields, as an 

959 # on-demand SELECT is executed sometimes (e.g. when a PatientIdNum 

960 # is printed, its Patient is selected, including the [user] 

961 # 'fullname' attribute that is absent in old databases). 

962 # Not a breaking point, since we're going to crash anyway, but 

963 # inelegant. 

964 # Since lazy loading (etc.) is configured at query time, the best 

965 # thing (as per Michael Bayer) is to detach the object from the 

966 # session: 

967 # https://groups.google.com/forum/#!topic/sqlalchemy/X_wA8K97smE 

968 trcon.src_session.expunge(oldobj) # prevent implicit queries 

969 # Then all should work: 

970 log_warning_srcobj(oldobj) 

971 log.critical( 

972 f"Existing record(s) in destination DB was/were:\n\n" 

973 f"{pformat(existing_rec)}\n\n" 

974 ) 

975 raise ValueError( 

976 "Attempt to insert duplicate record; see log message above." 

977 ) 

978 

979 

980# ============================================================================= 

981# Postprocess 

982# ============================================================================= 

983 

984 

985# noinspection PyUnusedLocal 

986def postprocess(src_engine: Engine, dst_session: Session) -> None: 

987 """ 

988 Implement any extra processing after :func:`merge_db` has been called. 

989 

990 - Reindexes tasks. 

991 - Warns you about things that need to be done manually. 

992 

993 Args: 

994 src_engine: source database SQLAlchemy engine 

995 dst_session: destination database SQLAlchemy session 

996 """ 

997 log.info("Reindexing destination database") 

998 reindex_everything(dst_session) 

999 log.warning( 

1000 f"NOT IMPLEMENTED AUTOMATICALLY: copying user/group mapping " 

1001 f"from table {UserGroupMembership.__tablename__!r}; do this by hand." 

1002 ) 

1003 log.warning( 

1004 f"NOT IMPLEMENTED AUTOMATICALLY: copying group/group mapping " 

1005 f"from table {group_group_table.name!r}; do this by hand." 

1006 ) 

1007 

1008 

1009# ============================================================================= 

1010# Main 

1011# ============================================================================= 

1012 

1013 

1014def merge_camcops_db( 

1015 src: str, 

1016 echo: bool, 

1017 dummy_run: bool, 

1018 info_only: bool, 

1019 default_group_id: Optional[int], 

1020 default_group_name: Optional[str], 

1021 groupnum_map: Dict[int, int], 

1022 whichidnum_map: Dict[int, int], 

1023 report_every: int = 10000, 

1024 skip_export_logs: bool = True, 

1025 skip_audit_logs: bool = True, 

1026 dst_url: str = None, 

1027) -> None: 

1028 """ 

1029 Merge an existing database (with a pre-v2 or later structure) into a 

1030 comtemporary CamCOPS database. 

1031 

1032 Args: 

1033 src: 

1034 source database SQLAlchemy URL 

1035 

1036 echo: 

1037 echo the SQL that is produced? 

1038 

1039 dummy_run: 

1040 don't alter the destination database 

1041 

1042 info_only: 

1043 show info, then stop 

1044 

1045 default_group_id: 

1046 integer group ID (in the destination database) to use for source 

1047 records that have no group (because they come from a very old 

1048 source database) but need one 

1049 

1050 default_group_name: 

1051 group name (in the destination database) to use for source 

1052 records that have no group (because they come from a very old 

1053 source database) but need one 

1054 

1055 groupnum_map: 

1056 dictionary mapping group ID values from the source database to 

1057 the destination database 

1058 

1059 whichidnum_map: 

1060 dictionary mapping ``which_idnum`` values from the source database 

1061 to the destination database 

1062 

1063 report_every: 

1064 provide a progress report every *n* records 

1065 

1066 skip_export_logs: 

1067 skip export log tables 

1068 

1069 skip_audit_logs: 

1070 skip audit log table 

1071 

1072 dst_url: 

1073 Destination SQLAlchemy URL. By default this is blank, and a 

1074 command-line request is used to get the current CamCOPS database 

1075 from the config file. But for debugging, you can override this. 

1076 

1077 """ 

1078 src_engine = create_engine(src, echo=echo, pool_pre_ping=True) 

1079 log.info("SOURCE: " + get_safe_url_from_engine(src_engine)) 

1080 

1081 if dst_url: 

1082 dst_engine = create_engine(dst_url, echo=echo, pool_pre_ping=True) 

1083 dst_session: Session = sessionmaker()(bind=dst_engine) 

1084 else: 

1085 req = get_command_line_request() # requires manual COMMIT; see below 

1086 dst_engine = req.engine 

1087 dst_session = req.dbsession 

1088 log.info("DESTINATION: " + get_safe_url_from_engine(dst_engine)) 

1089 

1090 log.info( 

1091 f"Destination ID number type map (source:destination) is: " 

1092 f"{whichidnum_map!r}" 

1093 ) 

1094 log.info(f"Group number type map (source:destination) is {groupnum_map!r}") 

1095 

1096 # Delay the slow import until we've checked our syntax 

1097 log.info("Loading all models...") 

1098 # noinspection PyUnresolvedReferences 

1099 import camcops_server.cc_modules.cc_all_models # delayed import # import side effects (ensure all models registered) # noqa 

1100 

1101 log.info("Models loaded.") 

1102 

1103 # Now, any special dependencies? 

1104 # From the point of view of translating any tablet-related fields, the 

1105 # actual (server) PK values are irrelevant; all relationships will be 

1106 # identical if you change any PK (not standard database practice, but 

1107 # convenient here). 

1108 # The dependencies that do matter are server-side things, like user_id 

1109 # variables. 

1110 

1111 # For debugging only, some junk: 

1112 # test_dependencies = [ 

1113 # TableDependency(parent_tablename="patient", 

1114 # child_tablename="_dirty_tables") 

1115 # ] 

1116 

1117 # ------------------------------------------------------------------------- 

1118 # Tables to skip 

1119 # ------------------------------------------------------------------------- 

1120 

1121 skip_tables = [ 

1122 # Transient stuff we don't want to copy across, or wouldn't want to 

1123 # overwrite the destination with, or where the PK structure has 

1124 # changed and we don't care about old data: 

1125 TableIdentity(tablename=x) 

1126 for x in ( 

1127 CamcopsSession.__tablename__, 

1128 DirtyTable.__tablename__, 

1129 ServerSettings.__tablename__, 

1130 SecurityAccountLockout.__tablename__, 

1131 SecurityLoginFailure.__tablename__, 

1132 UserGroupMembership.__tablename__, 

1133 group_group_table.name, 

1134 ) 

1135 ] 

1136 

1137 # Tedious and bulky stuff the user may want to skip: 

1138 if skip_export_logs: 

1139 skip_tables.extend( 

1140 [ 

1141 TableIdentity(tablename=x) 

1142 for x in ( 

1143 Email.__tablename__, 

1144 ExportRecipient.__tablename__, 

1145 ExportedTask.__tablename__, 

1146 ExportedTaskEmail.__tablename__, 

1147 ExportedTaskFileGroup.__tablename__, 

1148 ExportedTaskHL7Message.__tablename__, 

1149 ) 

1150 ] 

1151 ) 

1152 if skip_audit_logs: 

1153 skip_tables.append(TableIdentity(tablename=AuditEntry.__tablename__)) 

1154 

1155 # ------------------------------------------------------------------------- 

1156 # Initial operations on SOURCE database 

1157 # ------------------------------------------------------------------------- 

1158 

1159 src_tables = get_table_names(src_engine) 

1160 skip_tables += get_skip_tables(src_tables=src_tables) 

1161 src_iddefs = get_src_iddefs(src_engine, src_tables) 

1162 log.info(f"Source ID number definitions: {src_iddefs!r}") 

1163 

1164 # ------------------------------------------------------------------------- 

1165 # Initial operations on DESTINATION database 

1166 # ------------------------------------------------------------------------- 

1167 # So that system users get the first ID (cosmetic!): 

1168 _ = User.get_system_user(dbsession=dst_session) 

1169 _ = Device.get_server_device(dbsession=dst_session) 

1170 

1171 # ------------------------------------------------------------------------- 

1172 # Set up source-to-destination mappings 

1173 # ------------------------------------------------------------------------- 

1174 

1175 # Map source to destination ID number types 

1176 for src_which_idnum, dest_which_idnum in whichidnum_map.items(): 

1177 assert isinstance(src_which_idnum, int) 

1178 assert isinstance(dest_which_idnum, int) 

1179 if src_which_idnum not in src_iddefs: 

1180 log.warning( 

1181 f"Source ID number {src_which_idnum} is in whichidnum_map, " 

1182 f"but is not in the source database." 

1183 ) 

1184 continue 

1185 src_iddef = src_iddefs[src_which_idnum] 

1186 dst_iddef = ensure_dest_iddef_exists(dest_which_idnum, dst_session) 

1187 ensure_no_iddef_clash(src_iddef, dst_iddef) 

1188 

1189 # Map source to destination group numbers 

1190 for src_groupnum, dest_groupnum in groupnum_map.items(): 

1191 assert isinstance(src_groupnum, int) 

1192 assert isinstance(dest_groupnum, int) 

1193 _ = get_dst_group(dest_groupnum, dst_session) 

1194 

1195 # ------------------------------------------------------------------------- 

1196 # Merge 

1197 # ------------------------------------------------------------------------- 

1198 

1199 # Merge! It's easy... 

1200 trcon_info = dict( 

1201 default_group_id=default_group_id, 

1202 default_group_name=default_group_name, 

1203 src_iddefs=src_iddefs, 

1204 whichidnum_map=whichidnum_map, 

1205 groupnum_map=groupnum_map, 

1206 ) 

1207 skip_table_dependencies = [ 

1208 # "The _security_users table does NOT depend on the patient table." 

1209 # Otherwise this is circular. 

1210 TableDependency( 

1211 parent_tablename="patient", child_tablename="_security_users" 

1212 ) 

1213 ] 

1214 merge_db( 

1215 base_class=Base, 

1216 src_engine=src_engine, 

1217 dst_session=dst_session, 

1218 allow_missing_src_tables=True, 

1219 allow_missing_src_columns=True, 

1220 translate_fn=camcops_mergedb_translate_fn, 

1221 skip_tables=skip_tables, 

1222 only_tables=None, 

1223 tables_to_keep_pks_for=None, 

1224 extra_table_dependencies=None, 

1225 skip_table_dependencies=skip_table_dependencies, 

1226 dummy_run=dummy_run, 

1227 info_only=info_only, 

1228 report_every=report_every, 

1229 flush_per_table=True, 

1230 flush_per_record=False, 

1231 commit_with_flush=False, 

1232 commit_at_end=True, 

1233 prevent_eager_load=True, 

1234 trcon_info=trcon_info, 

1235 even_use_alter_relationships=True, 

1236 debug_table_dependencies=False, 

1237 debug_rewrite_relationships=False, 

1238 use_sqlalchemy_order=False, 

1239 ) 

1240 

1241 # ------------------------------------------------------------------------- 

1242 # Postprocess 

1243 # ------------------------------------------------------------------------- 

1244 

1245 postprocess(src_engine=src_engine, dst_session=dst_session) 

1246 

1247 # ------------------------------------------------------------------------- 

1248 # Done 

1249 # ------------------------------------------------------------------------- 

1250 

1251 dst_session.commit()