Coverage for cc_modules/merge_db.py: 18%
283 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/merge_db.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Tool to merge data from one CamCOPS database into another.**
28Has special code to deal with old databases.
30"""
32import logging
33from pprint import pformat
34from typing import Any, cast, Dict, List, Optional, Type
36from cardinal_pythonlib.sqlalchemy.merge_db import (
37 merge_db,
38 TableDependency,
39 TranslationContext,
40)
41from cardinal_pythonlib.sqlalchemy.schema import get_table_names
42from cardinal_pythonlib.sqlalchemy.session import get_safe_url_from_engine
43from cardinal_pythonlib.sqlalchemy.table_identity import TableIdentity
44from sqlalchemy.engine import create_engine
45from sqlalchemy.engine.base import Engine
46from sqlalchemy.engine.result import Result
47from sqlalchemy.engine.row import RowMapping
48from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
49from sqlalchemy.orm.session import Session, sessionmaker
50from sqlalchemy.sql.expression import column, func, select, table, text
52from camcops_server.cc_modules.cc_audit import AuditEntry
53from camcops_server.cc_modules.cc_constants import (
54 FP_ID_NUM,
55 NUMBER_OF_IDNUMS_DEFUNCT,
56)
57from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin
58from camcops_server.cc_modules.cc_device import Device
59from camcops_server.cc_modules.cc_dirtytables import DirtyTable
60from camcops_server.cc_modules.cc_email import Email
61from camcops_server.cc_modules.cc_exportmodels import (
62 ExportedTask,
63 ExportedTaskEmail,
64 ExportedTaskFileGroup,
65 ExportedTaskHL7Message,
66)
67from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
68from camcops_server.cc_modules.cc_group import Group, group_group_table
69from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
70from camcops_server.cc_modules.cc_membership import UserGroupMembership
71from camcops_server.cc_modules.cc_patient import Patient
72from camcops_server.cc_modules.cc_patientidnum import (
73 fake_tablet_id_for_patientidnum,
74 PatientIdNum,
75)
76from camcops_server.cc_modules.cc_request import get_command_line_request
77from camcops_server.cc_modules.cc_session import CamcopsSession
78from camcops_server.cc_modules.cc_serversettings import (
79 server_stored_var_table_defunct,
80 ServerSettings,
81 ServerStoredVarNamesDefunct as StoredVarDefunct,
82)
83from camcops_server.cc_modules.cc_sqlalchemy import Base
84from camcops_server.cc_modules.cc_taskindex import reindex_everything
85from camcops_server.cc_modules.cc_user import (
86 SecurityAccountLockout,
87 SecurityLoginFailure,
88 User,
89)
91log = logging.getLogger(__name__)
93DEBUG_VIA_PDB = False
96# =============================================================================
97# Information relating to the source database
98# =============================================================================
101def get_skip_tables(src_tables: List[str]) -> List[TableIdentity]:
102 """
103 From the list of source table names provided, return details of tables in
104 the metadata to skip because they are not in the source database.
106 Also checks that some core CamCOPS tables are present in the source, or
107 raises :exc:`ValueError`.
109 Args:
110 src_tables: list of all table names in the source database
112 Returns:
113 list of
114 :class:`cardinal_pythonlib.sqlalchemy.table_identity.TableIdentity`
115 objects representing tables to skip
117 Note that other tables to skip are defined in :func:`merge_camcops_db`.
119 """
120 skip_tables: List[TableIdentity] = []
122 # Check we have some core tables present in the sources
124 for tname in (Patient.__tablename__, User.__tablename__):
125 if tname not in src_tables:
126 raise ValueError(
127 f"Cannot proceed; table {tname!r} missing from source; "
128 f"unlikely that the source is any sort of old CamCOPS "
129 f"database!"
130 )
132 # In general, we allow missing source tables.
133 # However, we can't allow source tables to be missing if they are
134 # automatically eager-loaded by relationships. This is only true in
135 # CamCOPS for some high-performance queries: Patient, User,
136 # PatientIdNum. In the context of merges we're going to run, that means
137 # PatientIdNum.
139 # SKIP -- disable eager loading instead
140 # # Create patient ID number table in SOURCE database, because it's
141 # # eager-loaded
142 # if PatientIdNum.__tablename__ not in src_tables:
143 # create_table_from_orm_class(engine=src_engine,
144 # ormclass=PatientIdNum,
145 # without_constraints=True)
147 if Group.__tablename__ not in src_tables:
148 log.warning(
149 f"No Group information in source database; skipping source "
150 f"table {Group.__tablename__!r}; will create a default group"
151 )
152 skip_tables.append(TableIdentity(tablename=Group.__tablename__))
154 return skip_tables
157def get_src_iddefs(
158 src_engine: Engine, src_tables: List[str]
159) -> Dict[int, IdNumDefinition]:
160 """
161 Get information about all the ID number definitions in the source database.
163 Args:
164 src_engine: source SQLAlchemy :class:`Engine`
165 src_tables: list of all table names in the source database
167 Returns:
168 dictionary: ``{which_idnum: idnumdef}`` mappings, where each
169 ``idnumdef`` is a
170 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` not
171 attached to any database session
172 """
173 iddefs: Dict[int, IdNumDefinition] = {}
174 with src_engine.connect() as connection:
175 if IdNumDefinition.__tablename__ in src_tables:
176 # Source is a more modern CamCOPS database, with an IdNumDefinition
177 # table.
178 log.info(
179 f"Fetching source ID number definitions from "
180 f"{IdNumDefinition.__tablename__!r} table"
181 )
182 # noinspection PyUnresolvedReferences
183 q = (
184 select(
185 IdNumDefinition.which_idnum,
186 IdNumDefinition.description,
187 IdNumDefinition.short_description,
188 )
189 .select_from(IdNumDefinition.__table__)
190 .order_by(IdNumDefinition.which_idnum)
191 )
192 rows = connection.execute(q).fetchall()
193 for row in rows:
194 which_idnum = row[0]
195 iddefs[which_idnum] = IdNumDefinition(
196 which_idnum=which_idnum,
197 description=row[1],
198 short_description=row[2],
199 )
200 elif server_stored_var_table_defunct.name in src_tables:
201 # Source is an older CamCOPS database.
202 log.info(
203 f"Fetching source ID number definitions from "
204 f"{server_stored_var_table_defunct.name!r} table"
205 )
206 for which_idnum in range(1, NUMBER_OF_IDNUMS_DEFUNCT + 1):
207 nstr = str(which_idnum)
208 qd = (
209 select(server_stored_var_table_defunct.columns.valueText)
210 .select_from(server_stored_var_table_defunct)
211 .where(
212 server_stored_var_table_defunct.columns.name
213 == StoredVarDefunct.ID_DESCRIPTION_PREFIX + nstr
214 )
215 )
216 rd = connection.execute(qd).fetchall()
217 qs = (
218 select(server_stored_var_table_defunct.columns.valueText)
219 .select_from(server_stored_var_table_defunct)
220 .where(
221 server_stored_var_table_defunct.columns.name
222 == StoredVarDefunct.ID_SHORT_DESCRIPTION_PREFIX + nstr
223 )
224 )
225 rs = connection.execute(qs).fetchall()
226 iddefs[which_idnum] = IdNumDefinition(
227 which_idnum=which_idnum,
228 description=rd[0][0] if rd else None,
229 short_description=rs[0][0] if rs else None,
230 )
231 else:
232 log.warning(
233 "No information available on source ID number descriptions"
234 )
235 return iddefs
238# =============================================================================
239# Information relating to the destination database
240# =============================================================================
243def group_exists(group_id: int, dst_session: Session) -> bool:
244 """
245 Does a group exist in the destination session with the specified group ID?
247 Args:
248 group_id: integer group ID
249 dst_session: destination SQLAlchemy :class:`Session`
250 """
251 return Group.group_exists(dbsession=dst_session, group_id=group_id)
254def fetch_group_id_by_name(group_name: str, dst_session: Session) -> int:
255 """
256 Returns the group ID of the group with the specified name, in the
257 destination session.
259 If there are multiple such groups, that's a bug, and
260 :exc:`MultipleResultsFound` will be raised.
262 If there's no such group in the destination database with that name, one
263 will be created, and its ID returned.
265 Args:
266 group_name: group name
267 dst_session: destination SQLAlchemy :class:`Session`
269 Returns:
270 group ID in the destination database
272 """
273 try:
274 group: Group = (
275 dst_session.query(Group).filter(Group.name == group_name).one()
276 )
277 # ... will fail if there are 0 or >1 results
278 except MultipleResultsFound:
279 log.critical(
280 f"Nasty bug: can't have two groups with the same name! "
281 f"Group name was {group_name!r}"
282 )
283 raise
284 except NoResultFound:
285 log.info(f"Creating new group named {group_name!r}")
286 group = Group()
287 group.name = group_name
288 dst_session.add(group)
289 flush_session(dst_session) # creates the PK
290 # https://stackoverflow.com/questions/1316952/sqlalchemy-flush-and-get-inserted-id # noqa
291 log.info(f"... new group has ID {group.id!r}")
292 return group.id
295def get_dst_group(dest_groupnum: int, dst_session: Session) -> Group:
296 """
297 Ensures that the specified group number exists in the destination database
298 and returns the corresponding group.
300 Args:
301 dest_groupnum: group number
302 dst_session: SQLAlchemy session for the destination database
304 Returns:
305 the group
307 Raises:
308 :exc:`ValueError` upon failure
309 """
310 try:
311 group: Group = (
312 dst_session.query(Group).filter(Group.id == dest_groupnum).one()
313 )
314 # ... will fail if there are 0 or >1 results
315 except MultipleResultsFound:
316 log.critical(
317 f"Nasty bug: can't have two groups with the same ID! "
318 f"Group ID was {dest_groupnum!r}"
319 )
320 raise
321 except NoResultFound:
322 raise ValueError(
323 f"Group with ID {dest_groupnum} missing from "
324 f"destination database"
325 )
326 return group
329def ensure_dest_iddef_exists(
330 which_idnum: int, dst_session: Session
331) -> IdNumDefinition:
332 """
333 Ensures that the specified ID number type exists in the destination
334 database.
336 Args:
337 which_idnum: ID number type
338 dst_session: SQLAlchemy session for the destination database
340 Raises:
341 :exc:`ValueError` upon failure
342 """
343 try:
344 iddef: IdNumDefinition = (
345 dst_session.query(IdNumDefinition)
346 .filter(IdNumDefinition.which_idnum == which_idnum)
347 .one()
348 )
349 # ... will fail if there are 0 or >1 results
350 except MultipleResultsFound:
351 log.critical(
352 f"Nasty bug: can't have two ID number types with the same "
353 f"which_idnum! which_idnum was {which_idnum!r}"
354 )
355 raise
356 except NoResultFound:
357 raise ValueError(
358 f"ID number type with which_idnum={which_idnum} "
359 f"missing from destination database"
360 )
361 return iddef
364def get_dst_iddef(
365 dst_session: Session, which_idnum: int
366) -> Optional[IdNumDefinition]:
367 """
368 Fetches an ID number definition from the destination database, ensuring it
369 exists.
371 Args:
372 dst_session: destination SQLAlchemy :class:`Session`
373 which_idnum: integer expressing which ID number type to look up
375 Returns:
376 an :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, or
377 ``None`` if none was found
379 """
380 return (
381 dst_session.query(IdNumDefinition)
382 .filter(IdNumDefinition.which_idnum == which_idnum)
383 .first()
384 )
387# =============================================================================
388# Extra translation to be applied to individual objects
389# =============================================================================
390# The extra logic for this database:
393def flush_session(dst_session: Session) -> None:
394 """
395 Flushes the destination SQLAlchemy session.
396 """
397 log.debug("Flushing session")
398 dst_session.flush()
401def ensure_default_group_id(trcon: TranslationContext) -> None:
402 """
403 Ensure that the :class:`TranslationContext` has a ``default_group_id``
404 key in its ``info`` dictionary. This is the ID, in the destination
405 database, of the group to put records in where those records come from
406 an older, pre-group-based CamCOPS database.
408 The user may have specified that ``default_group_id` on the command line.
409 Otherwise, they may have specified a ``default_group_name``, so we'll use
410 the ID of that group (creating it if necessary). If they specified neither,
411 we will raise an :exc:`AssertionError`, because we have come to a
412 situation where we need one or the other.
414 Args:
415 trcon: the :class:`TranslationContext`
417 """
418 default_group_id: Optional[int] = trcon.info["default_group_id"]
419 if default_group_id is not None:
420 # The user specified a group ID to use for records without one
421 assert group_exists(
422 group_id=default_group_id, dst_session=trcon.dst_session
423 ), (
424 "User specified default_group_id={!r}, and object {!r} needs "
425 "a _group_id (directly or indirectly), but that ID doesn't exist "
426 "in the {!r} table of the destination database".format(
427 default_group_id, trcon.oldobj, Group.__tablename__
428 )
429 )
430 else:
431 default_group_name: Optional[str] = trcon.info["default_group_name"]
432 if not default_group_name:
433 assert False, (
434 "User specified neither default_group_id or "
435 "default_group_name, but object {!r} needs a "
436 "_group_id, directly or indirectly".format(trcon.oldobj)
437 )
438 default_group_id = fetch_group_id_by_name(
439 group_name=default_group_name, dst_session=trcon.dst_session
440 )
441 trcon.info["default_group_id"] = default_group_id # for next time!
444def ensure_no_iddef_clash(
445 src_iddef: IdNumDefinition, dst_iddef: IdNumDefinition
446) -> None:
447 """
448 Ensure that a given source and destination pair of ID number definitions,
449 which must match on ``which_idnum``, have the same description and short
450 description, or raise :exc:`ValueError`.
452 Args:
453 src_iddef: source
454 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`
455 dst_iddef: destination
456 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`
457 """
458 # We do NOT need to check that
459 # src_iddef.which_idnum == dst_iddef.which_idnum
460 # ... they may be different during a database merge.
461 if src_iddef.description != dst_iddef.description:
462 raise ValueError(
463 "ID description mismatch for ID#{}: source {!r}, "
464 "destination {!r}".format(
465 src_iddef.which_idnum,
466 src_iddef.description,
467 dst_iddef.description,
468 )
469 )
470 if src_iddef.short_description != dst_iddef.short_description:
471 raise ValueError(
472 "ID short_description mismatch for ID#{}: source {!r}, "
473 "destination {!r}".format(
474 src_iddef.which_idnum,
475 src_iddef.short_description,
476 dst_iddef.short_description,
477 )
478 )
481def log_warning_srcobj(srcobj: Any) -> None:
482 """
483 Prints a source (old) object to the log.
485 Args:
486 srcobj: the source object
487 """
488 log.warning(f"Source was:\n\n{pformat(srcobj.__dict__)}\n\n")
491def get_dest_groupnum(
492 src_groupnum: int, trcon: TranslationContext, oldobj: Any
493) -> int:
494 """
495 For a given source group number, returns the corresponding destination
496 group number (validating en route).
498 Args:
499 src_groupnum: the group number in the source database
500 trcon: the :class:`TranslationContext`
501 oldobj: the source object
503 Returns:
504 the corresponding which_idnum in the destination database
506 Raises:
507 :exc:`ValueError` if bad
508 """
509 groupnum_map: Dict[int, int] = trcon.info["groupnum_map"]
510 if src_groupnum not in groupnum_map:
511 log_warning_srcobj(oldobj)
512 log.critical(
513 f"Old database contains group number {src_groupnum} and "
514 f"equivalent group in destination not known; {groupnum_map=}"
515 )
516 raise ValueError("Bad group mapping")
517 return groupnum_map[src_groupnum]
520def get_dest_which_idnum(
521 src_which_idnum: int, trcon: TranslationContext, oldobj: Any
522) -> int:
523 """
524 For a given source ID number type, returns the corresponding destination
525 ID number type (validating en route).
527 Args:
528 src_which_idnum: which_idnum in the source database
529 trcon: the :class:`TranslationContext`
530 oldobj: the source object
532 Returns:
533 the corresponding which_idnum in the destination database
535 Raises:
536 :exc:`ValueError` if bad
538 """
539 whichidnum_map: Dict[int, int] = trcon.info["whichidnum_map"]
540 if src_which_idnum not in whichidnum_map:
541 log_warning_srcobj(oldobj)
542 log.critical(
543 f"Old database contains ID number definitions of type "
544 f"{src_which_idnum} and equivalent ID number type in destination "
545 f"not known; {whichidnum_map=}"
546 )
547 raise ValueError("Bad ID number type mapping")
548 return whichidnum_map[src_which_idnum]
551# noinspection PyProtectedMember
552def camcops_mergedb_translate_fn(trcon: TranslationContext) -> None:
553 """
554 Function to translate source objects to their destination counterparts,
555 where special processing is required. Called as a callback from
556 :func:`cardinal_pythonlib.sqlalchemy.merge_db.merge_db`.
558 Args:
559 trcon: the :class:`TranslationContext`; all the relevant information is
560 in here, and our function modifies its members.
562 This function does the following things:
564 - For any records uploaded from tablets: set ``_group_id``, if it's blank.
566 - For :class:`camcops_server.cc_modules.cc_user.User` objects: if an
567 identical user is found in the destination database, merge on it rather
568 than creating a new one. Users with matching usernames are considered to
569 be identical.
571 - For :class:`Device` objects: if an identical device is found, merge on it
572 rather than creating a new one. Devices with matching names are
573 considered to be identical.
575 - For :class:`camcops_server.cc_modules.cc_group.Group` objects: if an
576 identical group is found, merge on it rather than creating a new one.
577 Groups with matching names are considered to be identical.
579 - For :class:`camcops_server.cc_modules.cc_patient.Patient` objects: if any
580 have ID numbers in the old format (as columns in the Patient table),
581 convert them to the :class:`PatientIdNum` system.
583 - If we're inserting a :class:`PatientIdNum`, make sure there is a
584 corresponding
585 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`, and that
586 it's valid.
588 - If we're merging from a more modern database with the
589 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` table,
590 check our ID number definitions don't conflict.
592 - Check we're not creating duplicates for anything uploaded.
594 """
595 log.debug(f"Translating object from table: {trcon.tablename!r}")
596 oldobj = trcon.oldobj
597 newobj = trcon.newobj
599 # -------------------------------------------------------------------------
600 # Set _group_id correctly for tablet records
601 # -------------------------------------------------------------------------
602 if isinstance(oldobj, GenericTabletRecordMixin):
603 if (
604 "_group_id" in trcon.missing_src_columns
605 or oldobj._group_id is None
606 ):
607 # ... order that "if" statement carefully; if the _group_id column
608 # is missing from the source, don't touch oldobj._group_id or
609 # it'll trigger a DB query that fails.
610 #
611 # Set _group_id because it's blank
612 #
613 ensure_default_group_id(trcon)
614 default_group_id: int = trcon.info["default_group_id"]
615 log.debug(f"Assigning new _group_id of {default_group_id!r}")
616 newobj._group_id = default_group_id # type: ignore[attr-defined]
617 else:
618 #
619 # Re-map _group_id
620 #
621 newobj._group_id = get_dest_groupnum( # type: ignore[attr-defined]
622 oldobj._group_id, trcon, oldobj
623 )
625 # -------------------------------------------------------------------------
626 # If an identical user is found, merge on it rather than creating a new
627 # one. Users with matching usernames are considered to be identical.
628 # -------------------------------------------------------------------------
629 if trcon.tablename == User.__tablename__:
630 src_user = cast(User, oldobj)
631 src_username = src_user.username
632 matching_user: Optional[User] = (
633 trcon.dst_session.query(User)
634 .filter(User.username == src_username)
635 .one_or_none()
636 )
637 if matching_user is not None:
638 log.debug(
639 f"Matching User (username {matching_user.username!r}) found; "
640 f"merging"
641 )
642 trcon.newobj = matching_user # so that related records will work
644 # -------------------------------------------------------------------------
645 # If an identical device is found, merge on it rather than creating a
646 # new one. Devices with matching names are considered to be identical.
647 # -------------------------------------------------------------------------
648 if trcon.tablename == Device.__tablename__:
649 # log.debug(f"considering: {trcon=}")
650 src_device = cast(Device, oldobj)
651 src_devicename = src_device.name
652 matching_device: Optional[Device] = (
653 trcon.dst_session.query(Device)
654 .filter(Device.name == src_devicename)
655 .one_or_none()
656 )
657 if matching_device is not None:
658 log.debug(
659 f"Matching Device (name {matching_device.name!r}) found; "
660 f"merging"
661 )
662 trcon.newobj = matching_device
664 # BUT BEWARE, BECAUSE IF YOU MERGE THE SAME DATABASE TWICE (even if
665 # that's a silly thing to do...), MERGING DEVICES WILL BREAK THE KEY
666 # RELATIONSHIPS. For example,
667 # source:
668 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1
669 # dest after first merge:
670 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1
671 # dest after second merge:
672 # pk = 1, id = 1, device = 100, era = 'NOW', current = 1
673 # pk = 2, id = 1, device = 100, era = 'NOW', current = 1
674 # ... so you get a clash/duplicate.
675 # Mind you, that's fair, because there is a duplicate.
676 # SO WE DO SEPARATE DUPLICATE CHECKING, below.
678 # -------------------------------------------------------------------------
679 # Don't copy Group records; the user must set these up manually and specify
680 # groupnum_map, for safety
681 # -------------------------------------------------------------------------
682 if trcon.tablename == Group.__tablename__:
683 trcon.newobj = None # don't insert this object
684 # ... don't set "newobj = None"; that wouldn't alter trcon
685 # Now make sure the map is OK:
686 src_group = cast(Group, oldobj)
687 trcon.objmap[oldobj] = get_dst_group(
688 dest_groupnum=get_dest_groupnum(src_group.id, trcon, src_group),
689 dst_session=trcon.dst_session,
690 )
692 # -------------------------------------------------------------------------
693 # If there are any patient numbers in the old format (as a set of
694 # columns in the Patient table) which were not properly converted
695 # to the new format (as individual records in the PatientIdNum
696 # table), create new entries.
697 # Only worth bothering with for _current entries.
698 # (More explicitly: do not create new PatientIdNum entries for non-current
699 # patients; it's very fiddly if there might be asynchrony between
700 # Patient and PatientIdNum objects for that patient.)
701 # -------------------------------------------------------------------------
702 if trcon.tablename == Patient.__tablename__:
703 # (a) Find old patient numbers
704 old_patient = cast(Patient, oldobj)
705 # noinspection PyUnresolvedReferences
706 src_pt_query = (
707 select(text("*"))
708 .select_from(table(trcon.tablename))
709 .where(column(Patient.id.name) == old_patient.id)
710 .where(column(Patient._current.name) == True) # noqa: E712
711 .where(column(Patient._device_id.name) == old_patient._device_id)
712 .where(column(Patient._era.name) == old_patient._era)
713 )
715 result: Result = trcon.src_session.execute(src_pt_query)
716 list_of_dicts: List[RowMapping] = result.mappings().fetchall() # type: ignore[assignment] # noqa: E501
717 assert (
718 len(list_of_dicts) == 1
719 ), "Failed to fetch old patient IDs correctly; bug?"
720 old_patient_dict = list_of_dicts[0]
722 # (b) If any don't exist in the new database, create them.
723 # -- no, that's not right; we will be processing Patient before
724 # PatientIdNum, so that should be: if any don't exist in the *source*
725 # database, create them.
726 src_tables = trcon.src_table_names
727 for src_which_idnum in range(1, NUMBER_OF_IDNUMS_DEFUNCT + 1):
728 old_fieldname = FP_ID_NUM + str(src_which_idnum)
729 idnum_value = old_patient_dict.get(old_fieldname)
730 if idnum_value is None:
731 # Old Patient record didn't contain this ID number
732 # (value None -- or key missing)
733 continue
734 # Old Patient record *did* contain the ID number...
735 if PatientIdNum.__tablename__ in src_tables:
736 # noinspection PyUnresolvedReferences
737 src_idnum_query = (
738 select(func.count())
739 .select_from(table(PatientIdNum.__tablename__))
740 .where(
741 column(PatientIdNum.patient_id.name) == old_patient.id
742 )
743 .where(
744 column(PatientIdNum._current.name)
745 == old_patient._current
746 )
747 .where(
748 column(PatientIdNum._device_id.name)
749 == old_patient._device_id
750 )
751 .where(column(PatientIdNum._era.name) == old_patient._era)
752 .where(
753 column(PatientIdNum.which_idnum.name)
754 == src_which_idnum
755 )
756 )
757 n_present = trcon.src_session.execute(src_idnum_query).scalar()
758 # ^^^
759 # !
760 if n_present != 0:
761 # There was already a PatientIdNum for this which_idnum
762 continue
763 pidnum = PatientIdNum()
764 # PatientIdNum fields:
765 pidnum.id = fake_tablet_id_for_patientidnum(
766 patient_id=old_patient.id, which_idnum=src_which_idnum
767 )
768 # ... guarantees a pseudo client (tablet) PK
769 pidnum.patient_id = old_patient.id
770 pidnum.which_idnum = get_dest_which_idnum(
771 src_which_idnum, trcon, oldobj
772 )
773 pidnum.idnum_value = idnum_value
774 # GenericTabletRecordMixin fields:
775 # _pk: autogenerated
776 # noinspection PyUnresolvedReferences
777 pidnum._device_id = trcon.objmap[old_patient._device].id # type: ignore[attr-defined] # noqa: E501
778 pidnum._era = old_patient._era
779 pidnum._current = old_patient._current
780 pidnum._when_added_exact = old_patient._when_added_exact
781 pidnum._when_added_batch_utc = old_patient._when_added_batch_utc
782 # noinspection PyUnresolvedReferences
783 pidnum._adding_user_id = (
784 trcon.objmap[old_patient._adding_user].id # type: ignore[attr-defined] # noqa: E501
785 if old_patient._adding_user is not None
786 else None
787 )
788 pidnum._when_removed_exact = old_patient._when_removed_exact
789 pidnum._when_removed_batch_utc = (
790 old_patient._when_removed_batch_utc
791 )
792 # noinspection PyUnresolvedReferences
793 pidnum._removing_user_id = (
794 trcon.objmap[old_patient._removing_user].id # type: ignore[attr-defined] # noqa: E501
795 if old_patient._removing_user is not None
796 else None
797 )
798 # noinspection PyUnresolvedReferences
799 pidnum._preserving_user_id = (
800 trcon.objmap[old_patient._preserving_user].id # type: ignore[attr-defined] # noqa: E501
801 if old_patient._preserving_user is not None
802 else None
803 )
804 pidnum._forcibly_preserved = old_patient._forcibly_preserved
805 pidnum._predecessor_pk = None # Impossible to calculate properly
806 pidnum._successor_pk = None # Impossible to calculate properly
807 pidnum._manually_erased = old_patient._manually_erased
808 pidnum._manually_erased_at = old_patient._manually_erased_at
809 # noinspection PyUnresolvedReferences
810 pidnum._manually_erasing_user_id = (
811 trcon.objmap[old_patient._manually_erasing_user].id # type: ignore[attr-defined] # noqa: E501
812 if old_patient._manually_erasing_user is not None
813 else None
814 )
815 pidnum._camcops_version = old_patient._camcops_version
816 pidnum._addition_pending = old_patient._addition_pending
817 pidnum._removal_pending = old_patient._removal_pending
818 pidnum._group_id = newobj._group_id # type: ignore[attr-defined]
819 # ... will have been set above if it was blank
821 # OK.
822 log.debug(f"Inserting new PatientIdNum: {pidnum}")
823 trcon.dst_session.add(pidnum)
825 # -------------------------------------------------------------------------
826 # If we're inserting a PatientIdNum, make sure there is a corresponding
827 # IdNumDefinition, and that it's valid
828 # -------------------------------------------------------------------------
829 if trcon.tablename == PatientIdNum.__tablename__:
830 src_pidnum = cast(PatientIdNum, oldobj)
831 src_which_idnum = src_pidnum.which_idnum
832 # Is it present?
833 if src_which_idnum is None:
834 raise ValueError(f"Bad PatientIdNum: {src_pidnum!r}")
835 # Ensure the new object has an appropriate ID number FK:
836 dst_pidnum = cast(PatientIdNum, newobj)
837 dst_pidnum.which_idnum = get_dest_which_idnum(
838 src_which_idnum, trcon, oldobj
839 )
841 # -------------------------------------------------------------------------
842 # If we're merging from a more modern database with the IdNumDefinition
843 # table, skip source IdNumDefinition records; the user must set these up
844 # manually and specify whichidnum_map, for safety
845 # -------------------------------------------------------------------------
846 if trcon.tablename == IdNumDefinition.__tablename__:
847 trcon.newobj = None # don't insert this object
848 # ... don't set "newobj = None"; that wouldn't alter trcon
849 # Now make sure the map is OK:
850 src_iddef = cast(IdNumDefinition, oldobj)
851 trcon.objmap[oldobj] = get_dst_iddef(
852 which_idnum=get_dest_which_idnum(
853 src_iddef.which_idnum, trcon, src_iddef
854 ),
855 dst_session=trcon.dst_session,
856 )
858 # -------------------------------------------------------------------------
859 # Check we're not creating duplicates for anything uploaded
860 # -------------------------------------------------------------------------
861 if isinstance(oldobj, GenericTabletRecordMixin):
862 # noinspection PyTypeChecker
863 cls: Type[GenericTabletRecordMixin] = newobj.__class__ # type: ignore[assignment] # noqa: E501
864 # Records uploaded from tablets must be unique on the combination of:
865 # id = table PK
866 # _device_id = device
867 # _era = device era
868 # _when_removed_exact = removal date or NULL
869 # noinspection PyUnresolvedReferences
870 exists_query = (
871 select(func.count())
872 .select_from(table(trcon.tablename))
873 .where(column(cls.id.name) == oldobj.id)
874 .where(
875 column(cls._device_id.name) == trcon.objmap[oldobj._device].id # type: ignore[attr-defined] # noqa: E501
876 )
877 .where(column(cls._era.name) == oldobj._era)
878 .where(
879 column(cls._when_removed_exact.name)
880 == oldobj._when_removed_exact
881 )
882 )
883 # Note re NULLs... Although it's an inconvenient truth in SQL that
884 # SELECT NULL = NULL; -- returns NULL
885 # in this code we have a comparison of a column to a Python value.
886 # SQLAlchemy is clever and renders "IS NULL" if the Python value is
887 # None, or an "=" comparison otherwise.
888 # If we were comparing a column to another column, we'd have to do
889 # more; e.g.
890 #
891 # WRONG one-to-one join to self:
892 #
893 # SELECT a._pk, b._pk, a._when_removed_exact
894 # FROM phq9 a
895 # INNER JOIN phq9 b
896 # ON a._pk = b._pk
897 # AND a._when_removed_exact = b._when_removed_exact;
898 #
899 # -- drops all rows
900 #
901 # CORRECT one-to-one join to self:
902 #
903 # SELECT a._pk, b._pk, a._when_removed_exact
904 # FROM phq9 a
905 # INNER JOIN phq9 b
906 # ON a._pk = b._pk
907 # AND (a._when_removed_exact = b._when_removed_exact
908 # OR (a._when_removed_exact IS NULL AND
909 # b._when_removed_exact IS NULL));
910 #
911 # -- returns all rows
912 n_exists = trcon.dst_session.execute(exists_query).scalar()
913 if n_exists > 0:
914 # noinspection PyUnresolvedReferences
915 existing_rec_q = (
916 select("*")
917 .select_from(table(trcon.tablename))
918 .where(column(cls.id.name) == oldobj.id)
919 .where(
920 column(cls._device_id.name)
921 == trcon.objmap[oldobj._device].id # type: ignore[attr-defined] # noqa: E501
922 )
923 .where(column(cls._era.name) == oldobj._era)
924 .where(
925 column(cls._when_removed_exact.name)
926 == oldobj._when_removed_exact
927 )
928 )
929 result: Result = trcon.dst_session.execute(existing_rec_q) # type: ignore[no-redef] # noqa: E501
930 existing_rec: List[RowMapping] = result.mappings().fetchall() # type: ignore[assignment] # noqa: E501
931 log.critical(
932 f"Source record, inheriting from GenericTabletRecordMixin and "
933 f"shown below, already exists in destination database... "
934 f"in table {trcon.tablename!r}, clashing on: "
935 f"id={oldobj.id!r}, "
936 f"device_id={oldobj._device_id!r}, "
937 f"era={oldobj._era!r}, "
938 f"_when_removed_exact={oldobj._when_removed_exact!r}.\n"
939 f"ARE YOU TRYING TO MERGE THE SAME DATABASE IN TWICE? DON'T."
940 )
941 if trcon.tablename == PatientIdNum.__tablename__ and (
942 oldobj.id % NUMBER_OF_IDNUMS_DEFUNCT == 0
943 ):
944 log.critical(
945 f"Since this error has occurred for table "
946 f"{trcon.tablename!r} "
947 f"(and for id % {NUMBER_OF_IDNUMS_DEFUNCT} == 0), "
948 f"this error may reflect a previous bug in the patient ID "
949 f"number fix for the database upload script, in which all "
950 f"ID numbers for patients with patient.id = n were given "
951 f"patient_idnum.id = n * {NUMBER_OF_IDNUMS_DEFUNCT} "
952 f"themselves (or possibly were all given "
953 f"patient_idnum.id = 0). "
954 f"Fix this by running, on the source database:\n\n"
955 f" UPDATE patient_idnum SET id = _pk;\n\n"
956 )
957 # Print the actual instance last; accessing them via pformat can
958 # lead to crashes if there are missing source fields, as an
959 # on-demand SELECT is executed sometimes (e.g. when a PatientIdNum
960 # is printed, its Patient is selected, including the [user]
961 # 'fullname' attribute that is absent in old databases).
962 # Not a breaking point, since we're going to crash anyway, but
963 # inelegant.
964 # Since lazy loading (etc.) is configured at query time, the best
965 # thing (as per Michael Bayer) is to detach the object from the
966 # session:
967 # https://groups.google.com/forum/#!topic/sqlalchemy/X_wA8K97smE
968 trcon.src_session.expunge(oldobj) # prevent implicit queries
969 # Then all should work:
970 log_warning_srcobj(oldobj)
971 log.critical(
972 f"Existing record(s) in destination DB was/were:\n\n"
973 f"{pformat(existing_rec)}\n\n"
974 )
975 raise ValueError(
976 "Attempt to insert duplicate record; see log message above."
977 )
980# =============================================================================
981# Postprocess
982# =============================================================================
985# noinspection PyUnusedLocal
986def postprocess(src_engine: Engine, dst_session: Session) -> None:
987 """
988 Implement any extra processing after :func:`merge_db` has been called.
990 - Reindexes tasks.
991 - Warns you about things that need to be done manually.
993 Args:
994 src_engine: source database SQLAlchemy engine
995 dst_session: destination database SQLAlchemy session
996 """
997 log.info("Reindexing destination database")
998 reindex_everything(dst_session)
999 log.warning(
1000 f"NOT IMPLEMENTED AUTOMATICALLY: copying user/group mapping "
1001 f"from table {UserGroupMembership.__tablename__!r}; do this by hand."
1002 )
1003 log.warning(
1004 f"NOT IMPLEMENTED AUTOMATICALLY: copying group/group mapping "
1005 f"from table {group_group_table.name!r}; do this by hand."
1006 )
1009# =============================================================================
1010# Main
1011# =============================================================================
1014def merge_camcops_db(
1015 src: str,
1016 echo: bool,
1017 dummy_run: bool,
1018 info_only: bool,
1019 default_group_id: Optional[int],
1020 default_group_name: Optional[str],
1021 groupnum_map: Dict[int, int],
1022 whichidnum_map: Dict[int, int],
1023 report_every: int = 10000,
1024 skip_export_logs: bool = True,
1025 skip_audit_logs: bool = True,
1026 dst_url: str = None,
1027) -> None:
1028 """
1029 Merge an existing database (with a pre-v2 or later structure) into a
1030 comtemporary CamCOPS database.
1032 Args:
1033 src:
1034 source database SQLAlchemy URL
1036 echo:
1037 echo the SQL that is produced?
1039 dummy_run:
1040 don't alter the destination database
1042 info_only:
1043 show info, then stop
1045 default_group_id:
1046 integer group ID (in the destination database) to use for source
1047 records that have no group (because they come from a very old
1048 source database) but need one
1050 default_group_name:
1051 group name (in the destination database) to use for source
1052 records that have no group (because they come from a very old
1053 source database) but need one
1055 groupnum_map:
1056 dictionary mapping group ID values from the source database to
1057 the destination database
1059 whichidnum_map:
1060 dictionary mapping ``which_idnum`` values from the source database
1061 to the destination database
1063 report_every:
1064 provide a progress report every *n* records
1066 skip_export_logs:
1067 skip export log tables
1069 skip_audit_logs:
1070 skip audit log table
1072 dst_url:
1073 Destination SQLAlchemy URL. By default this is blank, and a
1074 command-line request is used to get the current CamCOPS database
1075 from the config file. But for debugging, you can override this.
1077 """
1078 src_engine = create_engine(src, echo=echo, pool_pre_ping=True)
1079 log.info("SOURCE: " + get_safe_url_from_engine(src_engine))
1081 if dst_url:
1082 dst_engine = create_engine(dst_url, echo=echo, pool_pre_ping=True)
1083 dst_session: Session = sessionmaker()(bind=dst_engine)
1084 else:
1085 req = get_command_line_request() # requires manual COMMIT; see below
1086 dst_engine = req.engine
1087 dst_session = req.dbsession
1088 log.info("DESTINATION: " + get_safe_url_from_engine(dst_engine))
1090 log.info(
1091 f"Destination ID number type map (source:destination) is: "
1092 f"{whichidnum_map!r}"
1093 )
1094 log.info(f"Group number type map (source:destination) is {groupnum_map!r}")
1096 # Delay the slow import until we've checked our syntax
1097 log.info("Loading all models...")
1098 # noinspection PyUnresolvedReferences
1099 import camcops_server.cc_modules.cc_all_models # delayed import # import side effects (ensure all models registered) # noqa
1101 log.info("Models loaded.")
1103 # Now, any special dependencies?
1104 # From the point of view of translating any tablet-related fields, the
1105 # actual (server) PK values are irrelevant; all relationships will be
1106 # identical if you change any PK (not standard database practice, but
1107 # convenient here).
1108 # The dependencies that do matter are server-side things, like user_id
1109 # variables.
1111 # For debugging only, some junk:
1112 # test_dependencies = [
1113 # TableDependency(parent_tablename="patient",
1114 # child_tablename="_dirty_tables")
1115 # ]
1117 # -------------------------------------------------------------------------
1118 # Tables to skip
1119 # -------------------------------------------------------------------------
1121 skip_tables = [
1122 # Transient stuff we don't want to copy across, or wouldn't want to
1123 # overwrite the destination with, or where the PK structure has
1124 # changed and we don't care about old data:
1125 TableIdentity(tablename=x)
1126 for x in (
1127 CamcopsSession.__tablename__,
1128 DirtyTable.__tablename__,
1129 ServerSettings.__tablename__,
1130 SecurityAccountLockout.__tablename__,
1131 SecurityLoginFailure.__tablename__,
1132 UserGroupMembership.__tablename__,
1133 group_group_table.name,
1134 )
1135 ]
1137 # Tedious and bulky stuff the user may want to skip:
1138 if skip_export_logs:
1139 skip_tables.extend(
1140 [
1141 TableIdentity(tablename=x)
1142 for x in (
1143 Email.__tablename__,
1144 ExportRecipient.__tablename__,
1145 ExportedTask.__tablename__,
1146 ExportedTaskEmail.__tablename__,
1147 ExportedTaskFileGroup.__tablename__,
1148 ExportedTaskHL7Message.__tablename__,
1149 )
1150 ]
1151 )
1152 if skip_audit_logs:
1153 skip_tables.append(TableIdentity(tablename=AuditEntry.__tablename__))
1155 # -------------------------------------------------------------------------
1156 # Initial operations on SOURCE database
1157 # -------------------------------------------------------------------------
1159 src_tables = get_table_names(src_engine)
1160 skip_tables += get_skip_tables(src_tables=src_tables)
1161 src_iddefs = get_src_iddefs(src_engine, src_tables)
1162 log.info(f"Source ID number definitions: {src_iddefs!r}")
1164 # -------------------------------------------------------------------------
1165 # Initial operations on DESTINATION database
1166 # -------------------------------------------------------------------------
1167 # So that system users get the first ID (cosmetic!):
1168 _ = User.get_system_user(dbsession=dst_session)
1169 _ = Device.get_server_device(dbsession=dst_session)
1171 # -------------------------------------------------------------------------
1172 # Set up source-to-destination mappings
1173 # -------------------------------------------------------------------------
1175 # Map source to destination ID number types
1176 for src_which_idnum, dest_which_idnum in whichidnum_map.items():
1177 assert isinstance(src_which_idnum, int)
1178 assert isinstance(dest_which_idnum, int)
1179 if src_which_idnum not in src_iddefs:
1180 log.warning(
1181 f"Source ID number {src_which_idnum} is in whichidnum_map, "
1182 f"but is not in the source database."
1183 )
1184 continue
1185 src_iddef = src_iddefs[src_which_idnum]
1186 dst_iddef = ensure_dest_iddef_exists(dest_which_idnum, dst_session)
1187 ensure_no_iddef_clash(src_iddef, dst_iddef)
1189 # Map source to destination group numbers
1190 for src_groupnum, dest_groupnum in groupnum_map.items():
1191 assert isinstance(src_groupnum, int)
1192 assert isinstance(dest_groupnum, int)
1193 _ = get_dst_group(dest_groupnum, dst_session)
1195 # -------------------------------------------------------------------------
1196 # Merge
1197 # -------------------------------------------------------------------------
1199 # Merge! It's easy...
1200 trcon_info = dict(
1201 default_group_id=default_group_id,
1202 default_group_name=default_group_name,
1203 src_iddefs=src_iddefs,
1204 whichidnum_map=whichidnum_map,
1205 groupnum_map=groupnum_map,
1206 )
1207 skip_table_dependencies = [
1208 # "The _security_users table does NOT depend on the patient table."
1209 # Otherwise this is circular.
1210 TableDependency(
1211 parent_tablename="patient", child_tablename="_security_users"
1212 )
1213 ]
1214 merge_db(
1215 base_class=Base,
1216 src_engine=src_engine,
1217 dst_session=dst_session,
1218 allow_missing_src_tables=True,
1219 allow_missing_src_columns=True,
1220 translate_fn=camcops_mergedb_translate_fn,
1221 skip_tables=skip_tables,
1222 only_tables=None,
1223 tables_to_keep_pks_for=None,
1224 extra_table_dependencies=None,
1225 skip_table_dependencies=skip_table_dependencies,
1226 dummy_run=dummy_run,
1227 info_only=info_only,
1228 report_every=report_every,
1229 flush_per_table=True,
1230 flush_per_record=False,
1231 commit_with_flush=False,
1232 commit_at_end=True,
1233 prevent_eager_load=True,
1234 trcon_info=trcon_info,
1235 even_use_alter_relationships=True,
1236 debug_table_dependencies=False,
1237 debug_rewrite_relationships=False,
1238 use_sqlalchemy_order=False,
1239 )
1241 # -------------------------------------------------------------------------
1242 # Postprocess
1243 # -------------------------------------------------------------------------
1245 postprocess(src_engine=src_engine, dst_session=dst_session)
1247 # -------------------------------------------------------------------------
1248 # Done
1249 # -------------------------------------------------------------------------
1251 dst_session.commit()