Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/cardinal_pythonlib/sqlalchemy/merge_db.py : 19%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# cardinal_pythonlib/sqlalchemy/merge_db.py
4"""
5===============================================================================
7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com).
9 This file is part of cardinal_pythonlib.
11 Licensed under the Apache License, Version 2.0 (the "License");
12 you may not use this file except in compliance with the License.
13 You may obtain a copy of the License at
15 https://www.apache.org/licenses/LICENSE-2.0
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
23===============================================================================
25**Function "merge_db" to merge two databases via SQLAlchemy.**
27*Notes:*
29Note in passing: there is no common base class for SQLAlchemy ORM instances
30(it's not :class:`DeclarativeMeta`). For example, in CamCOPS:
32.. code-block:: none
34 > Phq9.__bases__
35 (<class 'camcops_server.cc_modules.cc_task.TaskHasPatientMixin'>,
36 <class 'camcops_server.cc_modules.cc_task.Task'>,
37 <class 'sqlalchemy.ext.declarative.api.Base'>)
39... and that last :class:`Base` isn't a permanent class, just a newly named
40thing; see :func:`sqlalchemy.ext.declarative.api.declarative_base`.
42Again, with the CamCOPS classes:
44.. code-block:: none
46 > issubclass(Phq9, Base)
47 True
49 > issubclass(Base, DeclarativeMeta)
50 False
52 > Base.__bases__
53 (<class 'object'>,)
55So the best type hints we have are:
57.. code-block:: none
59 class: Type
60 instance: object
62"""
64import sys
65from typing import Any, Callable, Dict, List, Tuple, Type
66import unittest
68from sqlalchemy.engine import create_engine
69from sqlalchemy.engine.base import Engine
70from sqlalchemy.ext.declarative import declarative_base
71from sqlalchemy.orm import lazyload, load_only
72from sqlalchemy.orm import relationship
73# noinspection PyProtectedMember
74from sqlalchemy.orm.session import make_transient, Session, sessionmaker
75from sqlalchemy.schema import sort_tables
76from sqlalchemy.sql.schema import Column, ForeignKey, MetaData, Table
77from sqlalchemy.sql.sqltypes import Integer, Text
79from cardinal_pythonlib.dicts import map_keys_to_values
80from cardinal_pythonlib.logs import (
81 get_brace_style_log_with_null_handler,
82 main_only_quicksetup_rootlogger,
83)
84from cardinal_pythonlib.sqlalchemy.dump import dump_database_as_insert_sql
85from cardinal_pythonlib.sqlalchemy.orm_inspect import (
86 rewrite_relationships,
87 colname_to_attrname_dict,
88 copy_sqla_object,
89 get_orm_classes_by_table_name_from_base,
90 get_pk_attrnames,
91)
92from cardinal_pythonlib.sqlalchemy.schema import (
93 get_column_names,
94 get_table_names,
95)
96from cardinal_pythonlib.sqlalchemy.session import (
97 get_engine_from_session,
98 get_safe_url_from_engine,
99 get_safe_url_from_session,
100 SQLITE_MEMORY_URL,
101)
102from cardinal_pythonlib.sqlalchemy.table_identity import TableIdentity
104log = get_brace_style_log_with_null_handler(__name__)
107# =============================================================================
108# TableDependency; get_all_dependencies
109# =============================================================================
111class TableDependency(object):
112 """
113 Stores a table dependency for use in functions such as
114 :func:`sqlalchemy.schema.sort_tables`, which requires a tuple of two
115 :class:`Table` objects, in the order ``(parent, child)``, where ``child``
116 depends on ``parent`` (e.g. a field like ``child.parent_id`` refers to
117 ``parent.id``).
118 """
119 def __init__(self,
120 parent_table_id: TableIdentity = None,
121 child_table_id: TableIdentity = None,
122 parent_table: Table = None,
123 child_table: Table = None,
124 parent_tablename: str = None,
125 child_tablename: str = None,
126 metadata: MetaData = None) -> None:
127 """
128 The parent and child tables can be specified by name, :class:`Table`
129 object, or our :class:`TableIdentity` descriptor class.
130 """
131 overspecified = "Don't specify table with both TableIdentity and " \
132 "Table/tablename"
133 if parent_table_id:
134 self._parent = parent_table_id
135 assert parent_table is None and not parent_tablename, overspecified
136 else:
137 self._parent = TableIdentity(table=parent_table,
138 tablename=parent_tablename,
139 metadata=metadata)
140 if child_table_id:
141 self._child = child_table_id
142 assert child_table is None and not child_tablename, overspecified
143 else:
144 self._child = TableIdentity(table=child_table,
145 tablename=child_tablename,
146 metadata=metadata)
148 def __str__(self) -> str:
149 return f"{self.child_tablename} -> {self.parent_tablename}"
151 def __repr__(self) -> str:
152 return (
153 f"TableDependency({self.child_tablename!r} "
154 f"depends on {self.parent_tablename!r})"
155 )
157 def set_metadata(self, metadata: MetaData) -> None:
158 """
159 Sets the metadata for the parent and child tables.
160 """
161 self._parent.set_metadata(metadata)
162 self._child.set_metadata(metadata)
164 def set_metadata_if_none(self, metadata: MetaData) -> None:
165 """
166 Sets the metadata for the parent and child tables, unless they were
167 set already.
168 """
169 self._parent.set_metadata_if_none(metadata)
170 self._child.set_metadata_if_none(metadata)
172 @property
173 def parent_table(self) -> Table:
174 """
175 Returns the parent table as a :class:`Table`.
176 """
177 return self._parent.table
179 @property
180 def child_table(self) -> Table:
181 """
182 Returns the child table as a :class:`Table`.
183 """
184 return self._child.table
186 @property
187 def parent_tablename(self) -> str:
188 """
189 Returns the parent table's string name.
190 """
191 return self._parent.tablename
193 @property
194 def child_tablename(self) -> str:
195 """
196 Returns the child table's string name.
197 """
198 return self._child.tablename
200 def sqla_tuple(self) -> Tuple[Table, Table]:
201 """
202 Returns the tuple ``(parent_table, child_table)``, both as
203 :class:`Table` objects.
204 """
205 return self.parent_table, self.child_table
208def get_all_dependencies(metadata: MetaData,
209 extra_dependencies: List[TableDependency] = None,
210 sort: bool = True) \
211 -> List[TableDependency]:
212 """
213 Describes how the tables found in the metadata depend on each other.
214 (If table B contains a foreign key to table A, for example, then B depends
215 on A.)
217 Args:
218 metadata: the metadata to inspect
219 extra_dependencies: additional table dependencies to specify manually
220 sort: sort into alphabetical order of (parent, child) table names?
222 Returns:
223 a list of :class:`TableDependency` objects
225 See :func:`sort_tables_and_constraints` for method.
226 """
227 extra_dependencies = extra_dependencies or [] # type: List[TableDependency] # noqa
228 for td in extra_dependencies:
229 td.set_metadata_if_none(metadata)
230 dependencies = set([td.sqla_tuple() for td in extra_dependencies])
232 tables = list(metadata.tables.values()) # type: List[Table]
234 for table in tables:
235 for fkc in table.foreign_key_constraints:
236 if fkc.use_alter is True:
237 # http://docs.sqlalchemy.org/en/latest/core/constraints.html#sqlalchemy.schema.ForeignKeyConstraint.params.use_alter # noqa
238 continue
240 dependent_on = fkc.referred_table
241 if dependent_on is not table:
242 dependencies.add((dependent_on, table))
244 if hasattr(table, "_extra_dependencies"):
245 # noinspection PyProtectedMember
246 dependencies.update(
247 (parent, table) for parent in table._extra_dependencies
248 )
250 dependencies = [
251 TableDependency(parent_table=parent, child_table=child)
252 for parent, child in dependencies
253 ]
254 if sort:
255 dependencies.sort(key=lambda td_: (td_.parent_tablename,
256 td_.child_tablename))
257 return dependencies
260# =============================================================================
261# TableDependencyClassification; classify_tables_by_dependency_type
262# =============================================================================
264class TableDependencyClassification(object):
265 """
266 Class to describe/classify a table in terms of its dependencies.
267 """
268 def __init__(self,
269 table: Table,
270 children: List[Table] = None,
271 parents: List[Table] = None) -> None:
272 """
273 Args:
274 table: the table in question
275 children: its children (things that depend on it)
276 parents: its parents (things that it depends on)
277 """
278 self.table = table
279 self.children = children or [] # type: List[Table]
280 self.parents = parents or [] # type: List[Table]
281 self.circular = False
282 self.circular_chain = [] # type: List[Table]
284 @property
285 def is_child(self) -> bool:
286 """
287 Is this table a child?
288 """
289 return bool(self.parents)
291 @property
292 def is_parent(self) -> bool:
293 """
294 Is this table a parent?
295 """
296 return bool(self.children)
298 @property
299 def standalone(self) -> bool:
300 """
301 Is this table standalone (neither a child nor a parent)?
302 """
303 return not self.is_child and not self.is_parent
305 @property
306 def tablename(self) -> str:
307 """
308 Returns the table's name.
309 """
310 return self.table.name
312 @property
313 def parent_names(self) -> List[str]:
314 """
315 Returns the names of this table's parents.
316 """
317 return [t.name for t in self.parents]
319 @property
320 def child_names(self) -> List[str]:
321 """
322 Returns the names of this table's children.
323 """
324 return [t.name for t in self.children]
326 def set_circular(self, circular: bool, chain: List[Table] = None) -> None:
327 """
328 Mark this table as circular (or not).
330 Args:
331 circular: is it circular?
332 chain: if it's circular, this should be the list of tables
333 participating in the circular chain
334 """
335 self.circular = circular
336 self.circular_chain = chain or [] # type: List[Table]
338 @property
339 def circular_description(self) -> str:
340 """
341 Description of the circular chain.
342 """
343 return " -> ".join(t.name for t in self.circular_chain)
345 @property
346 def description(self) -> str:
347 """
348 Short description.
349 """
350 if self.is_parent and self.is_child:
351 desc = "parent+child"
352 elif self.is_parent:
353 desc = "parent"
354 elif self.is_child:
355 desc = "child"
356 else:
357 desc = "standalone"
358 if self.circular:
359 desc += f"+CIRCULAR({self.circular_description})"
360 return desc
362 def __str__(self) -> str:
363 return f"{self.tablename}:{self.description}"
365 def __repr__(self) -> str:
366 return (
367 f"TableDependencyClassification("
368 f"{self.tablename!r}:{self.description})"
369 )
372def classify_tables_by_dependency_type(
373 metadata: MetaData,
374 extra_dependencies: List[TableDependency] = None,
375 sort: bool = True) \
376 -> List[TableDependencyClassification]:
377 """
378 Inspects a metadata object (optionally adding other specified dependencies)
379 and returns a list of objects describing their dependencies.
381 Args:
382 metadata: the :class:`MetaData` to inspect
383 extra_dependencies: additional dependencies
384 sort: sort the results by table name?
386 Returns:
387 list of :class:`TableDependencyClassification` objects, one for each
388 table
390 """
391 tables = list(metadata.tables.values()) # type: List[Table]
392 all_deps = get_all_dependencies(metadata, extra_dependencies)
393 tdcmap = {} # type: Dict[Table, TableDependencyClassification]
394 for table in tables:
395 parents = [td.parent_table for td in all_deps
396 if td.child_table == table]
397 children = [td.child_table for td in all_deps
398 if td.parent_table == table]
399 tdcmap[table] = TableDependencyClassification(
400 table, parents=parents, children=children
401 )
403 # Check for circularity
404 def parents_contain(start: Table,
405 probe: Table) -> Tuple[bool, List[Table]]:
406 tdc_ = tdcmap[start]
407 if probe in tdc_.parents:
408 return True, [start, probe]
409 for parent in tdc_.parents:
410 contains_, chain_ = parents_contain(start=parent, probe=probe)
411 if contains_:
412 return True, [start] + chain_
413 return False, []
415 def children_contain(start: Table,
416 probe: Table) -> Tuple[bool, List[Table]]:
417 tdc_ = tdcmap[start]
418 if probe in tdc_.children:
419 return True, [start, probe]
420 for child in tdc_.children:
421 contains_, chain_ = children_contain(start=child, probe=probe)
422 if contains_:
423 return True, [start] + chain_
424 return False, []
426 for table in tables:
427 tdc = tdcmap[table]
428 contains, chain = parents_contain(start=table, probe=table)
429 if contains:
430 tdc.set_circular(contains, chain)
431 else:
432 contains, chain = children_contain(start=table, probe=table)
433 if contains:
434 tdc.set_circular(contains, chain)
435 else:
436 tdc.set_circular(False)
438 classifications = list(tdcmap.values())
439 if sort:
440 classifications.sort(key=lambda c: c.tablename)
441 return classifications
444# =============================================================================
445# TranslationContext (for merge_db)
446# =============================================================================
448class TranslationContext(object):
449 """
450 Information-passing object for user callbacks from :func:`merge_db`.
452 Args:
454 oldobj:
455 The old SQLAlchemy ORM object from the source session.
457 newobj:
458 The framework's go at building a new SQLAlchemy ORM object, which
459 will be inserted into the destination session.
461 The sequence is:
463 1. ``newobj`` is created
464 2. a :class:`TranslationContext` is created, referring to
465 ``newobj``
466 3. The ``translate_fn`` parameter to :func:`merge_db` will be
467 called with the :class:`TranslationContext` as its parameter
469 - the user-suppled :func:`translate_fn` function can, at this
470 point, modify the ``newobj`` attribute
471 - if the user function sets the ``newobj`` attribute to
472 ``None``, this object will be skipped
474 4. If the :class:`TranslationContext`'s ``newobj`` member is not
475 ``None``, the new object is inserted into the destination
476 session.
478 objmap:
479 A dictionary mapping old to new objects, for objects in tables
480 other than standalone tables.
482 table:
483 SQLAlchemy ``Table`` object from the metadata. (Not necessarily
484 bound to any session, but will reflect the structure of the
485 destination, not necessarily the source, since the merge operation
486 assumes that the metadata describes the destination.)
488 tablename:
489 Table name that corresponds to ``table``.
491 src_session:
492 The SQLAlchemy :class:`Session` object for the source.
494 dst_session:
495 The SQLAlchemy :class:`Session` object for the destination.
497 src_engine:
498 The SQLAlchemy :class:`Engine` object for the source.
500 dst_engine:
501 The SQLAlchemy :class:`Engine` object for the destination.
503 missing_src_columns:
504 Names of columns known to be present in the destination but absent
505 from the source.
507 info:
508 Extra dictionary for additional user-specified information.
510 It is possible that ``oldobj`` and ``newobj`` are the SAME OBJECT.
512 """
513 def __init__(self,
514 oldobj: object,
515 newobj: object,
516 objmap: Dict[object, object],
517 table: Table,
518 tablename: str,
519 src_session: Session,
520 dst_session: Session,
521 src_engine: Engine,
522 dst_engine: Engine,
523 src_table_names: List[str],
524 missing_src_columns: List[str] = None,
525 info: Dict[str, Any] = None) -> None:
526 self.oldobj = oldobj
527 self.newobj = newobj
528 self.objmap = objmap
529 self.table = table
530 self.tablename = tablename
531 self.src_session = src_session
532 self.dst_session = dst_session
533 self.src_engine = src_engine
534 self.dst_engine = dst_engine
535 self.src_table_names = src_table_names
536 self.missing_src_columns = missing_src_columns or [] # type: List[str]
537 self.info = info or {} # type: Dict[str, Any]
540# =============================================================================
541# merge_db
542# =============================================================================
544def merge_db(base_class: Type,
545 src_engine: Engine,
546 dst_session: Session,
547 allow_missing_src_tables: bool = True,
548 allow_missing_src_columns: bool = True,
549 translate_fn: Callable[[TranslationContext], None] = None,
550 skip_tables: List[TableIdentity] = None,
551 only_tables: List[TableIdentity] = None,
552 tables_to_keep_pks_for: List[TableIdentity] = None,
553 extra_table_dependencies: List[TableDependency] = None,
554 dummy_run: bool = False,
555 info_only: bool = False,
556 report_every: int = 1000,
557 flush_per_table: bool = True,
558 flush_per_record: bool = False,
559 commit_with_flush: bool = False,
560 commit_at_end: bool = True,
561 prevent_eager_load: bool = True,
562 trcon_info: Dict[str, Any] = None) -> None:
563 """
564 Copies an entire database as far as it is described by ``metadata`` and
565 ``base_class``, from SQLAlchemy ORM session ``src_session`` to
566 ``dst_session``, and in the process:
568 - creates new primary keys at the destination, or raises an error if it
569 doesn't know how (typically something like: ``Field 'name' doesn't have a
570 default value``)
572 - maintains relationships, or raises an error if it doesn't know how
574 Basic method:
576 - Examines the metadata for the SQLAlchemy ORM base class you provide.
578 - Assumes that the tables exist (in the destination).
580 - For each table/ORM class found in the metadata:
582 - Queries (via the ORM) from the source.
584 - For each ORM instance retrieved:
586 - Writes information to the destination SQLAlchemy session.
588 - If that ORM object has relationships, process them too.
590 If a table is missing in the source, then that's OK if and only if
591 ``allow_missing_src_tables`` is set. (Similarly with columns and
592 ``allow_missing_src_columns``; we ask the ORM to perform a partial load,
593 of a subset of attributes only.)
595 Args:
596 base_class:
597 your ORM base class, e.g. from ``Base = declarative_base()``
599 src_engine:
600 SQLALchemy :class:`Engine` for the source database
602 dst_session:
603 SQLAlchemy :class:`Session` for the destination database
605 allow_missing_src_tables:
606 proceed if tables are missing from the source (allowing you to
607 import from older, incomplete databases)
609 allow_missing_src_columns:
610 proceed if columns are missing from the source (allowing you to
611 import from older, incomplete databases)
613 translate_fn:
614 optional function called with each instance, so you can modify
615 instances in the pipeline. Signature:
617 .. code-block:: python
619 def my_translate_fn(trcon: TranslationContext) -> None:
620 # We can modify trcon.newobj, or replace it (including
621 # setting trcon.newobj = None to omit this object).
622 pass
624 skip_tables:
625 tables to skip (specified as a list of :class:`TableIdentity`)
627 only_tables:
628 tables to restrict the processor to (specified as a list of
629 :class:`TableIdentity`)
631 tables_to_keep_pks_for:
632 tables for which PKs are guaranteed to be safe to insert into the
633 destination database, without modification (specified as a list of
634 :class:`TableIdentity`)
636 extra_table_dependencies:
637 optional list of :class:`TableDependency` objects (q.v.)
639 dummy_run:
640 don't alter the destination database
642 info_only:
643 show info, then stop
645 report_every:
646 provide a progress report every *n* records
648 flush_per_table:
649 flush the session after every table (reasonable)
651 flush_per_record:
652 flush the session after every instance (AVOID this if tables may
653 refer to themselves)
655 commit_with_flush:
656 ``COMMIT`` with each flush?
658 commit_at_end:
659 ``COMMIT`` when finished?
661 prevent_eager_load:
662 disable any eager loading (use lazy loading instead)
664 trcon_info:
665 additional dictionary passed to ``TranslationContext.info``
666 (see :class:`.TranslationContext`)
667 """
669 log.info("merge_db(): starting")
670 if dummy_run:
671 log.warning("Dummy run only; destination will not be changed")
673 # Check parameters before we modify them
674 if only_tables is not None and not only_tables:
675 log.warning("... only_tables == []; nothing to do")
676 return
678 # Finalize parameters
679 skip_tables = skip_tables or [] # type: List[TableIdentity]
680 only_tables = only_tables or [] # type: List[TableIdentity]
681 tables_to_keep_pks_for = tables_to_keep_pks_for or [] # type: List[TableIdentity] # noqa
682 extra_table_dependencies = extra_table_dependencies or [] # type: List[TableDependency] # noqa
683 trcon_info = trcon_info or {} # type: Dict[str, Any]
685 # We need both Core and ORM for the source.
686 # noinspection PyUnresolvedReferences
687 metadata = base_class.metadata # type: MetaData
688 src_session = sessionmaker(bind=src_engine)() # type: Session
689 dst_engine = get_engine_from_session(dst_session)
690 tablename_to_ormclass = get_orm_classes_by_table_name_from_base(base_class)
692 # Tell all TableIdentity objects about their metadata
693 for tilist in [skip_tables, only_tables, tables_to_keep_pks_for]:
694 for ti in tilist:
695 ti.set_metadata_if_none(metadata)
696 for td in extra_table_dependencies:
697 td.set_metadata_if_none(metadata)
699 # Get all lists of tables as their names
700 skip_table_names = [ti.tablename for ti in skip_tables]
701 only_table_names = [ti.tablename for ti in only_tables]
702 tables_to_keep_pks_for = [ti.tablename for ti in tables_to_keep_pks_for] # type: List[str] # noqa
703 # ... now all are of type List[str]
705 # Safety check: this is an imperfect check for source == destination, but
706 # it is fairly easy to pass in the wrong URL, so let's try our best:
707 _src_url = get_safe_url_from_engine(src_engine)
708 _dst_url = get_safe_url_from_session(dst_session)
709 assert _src_url != _dst_url or _src_url == SQLITE_MEMORY_URL, (
710 "Source and destination databases are the same!"
711 )
713 # Check the right tables are present.
714 src_tables = sorted(get_table_names(src_engine))
715 dst_tables = sorted(list(tablename_to_ormclass.keys()))
716 log.debug("Source tables: {!r}", src_tables)
717 log.debug("Destination tables: {!r}", dst_tables)
718 if not allow_missing_src_tables:
719 missing_tables = sorted(
720 d for d in dst_tables
721 if d not in src_tables and d not in skip_table_names
722 )
723 if missing_tables:
724 raise RuntimeError("The following tables are missing from the "
725 "source database: " + repr(missing_tables))
727 table_num = 0
728 overall_record_num = 0
730 tables = list(metadata.tables.values()) # type: List[Table]
731 # Very helpfully, MetaData.sorted_tables produces tables in order of
732 # relationship dependency ("each table is preceded by all tables which
733 # it references");
734 # http://docs.sqlalchemy.org/en/latest/core/metadata.html
735 # HOWEVER, it only works if you specify ForeignKey relationships
736 # explicitly.
737 # We can also add in user-specified dependencies, and therefore can do the
738 # sorting in one step with sqlalchemy.schema.sort_tables:
739 ordered_tables = sort_tables(
740 tables,
741 extra_dependencies=[td.sqla_tuple() for td in extra_table_dependencies]
742 )
743 # Note that the ordering is NOT NECESSARILY CONSISTENT, though (in that
744 # the order of stuff it doesn't care about varies across runs).
745 all_dependencies = get_all_dependencies(metadata, extra_table_dependencies)
746 dep_classifications = classify_tables_by_dependency_type(
747 metadata, extra_table_dependencies)
748 circular = [tdc for tdc in dep_classifications if tdc.circular]
749 assert not circular, f"Circular dependencies! {circular!r}"
750 log.debug("All table dependencies: {}",
751 "; ".join(str(td) for td in all_dependencies))
752 log.debug("Table dependency classifications: {}",
753 "; ".join(str(c) for c in dep_classifications))
754 log.info("Processing tables in the order: {!r}",
755 [table.name for table in ordered_tables])
757 objmap = {}
759 def flush() -> None:
760 if not dummy_run:
761 log.debug("Flushing session")
762 dst_session.flush()
763 if commit_with_flush:
764 log.debug("Committing...")
765 dst_session.commit()
767 def translate(oldobj_: object, newobj_: object) -> object:
768 if translate_fn is None:
769 return newobj_
770 tc = TranslationContext(oldobj=oldobj_,
771 newobj=newobj_,
772 objmap=objmap,
773 table=table,
774 tablename=tablename,
775 src_session=src_session,
776 dst_session=dst_session,
777 src_engine=src_engine,
778 dst_engine=dst_engine,
779 missing_src_columns=missing_columns,
780 src_table_names=src_tables,
781 info=trcon_info)
782 translate_fn(tc)
783 if tc.newobj is None:
784 log.debug("Instance skipped by user-supplied translate_fn")
785 return tc.newobj
787 # -------------------------------------------------------------------------
788 # Now, per table/ORM class...
789 # -------------------------------------------------------------------------
790 for table in ordered_tables:
791 tablename = table.name
793 if tablename in skip_table_names:
794 log.info("... skipping table {!r} (as per skip_tables)", tablename)
795 continue
796 if only_table_names and tablename not in only_table_names:
797 log.info("... ignoring table {!r} (as per only_tables)", tablename)
798 continue
799 if allow_missing_src_tables and tablename not in src_tables:
800 log.info("... ignoring table {!r} (not in source database)",
801 tablename)
802 continue
803 table_num += 1
804 table_record_num = 0
806 src_columns = sorted(get_column_names(src_engine, tablename))
807 dst_columns = sorted([column.name for column in table.columns])
808 missing_columns = sorted(list(set(dst_columns) - set(src_columns)))
810 if not allow_missing_src_columns and missing_columns:
811 raise RuntimeError(
812 f"The following columns are missing from source table "
813 f"{tablename!r}: {missing_columns!r}")
815 orm_class = tablename_to_ormclass[tablename]
816 pk_attrs = get_pk_attrnames(orm_class)
817 c2a = colname_to_attrname_dict(orm_class)
818 missing_attrs = map_keys_to_values(missing_columns, c2a)
819 tdc = [tdc for tdc in dep_classifications if tdc.table == table][0]
821 log.info("Processing table {!r} via ORM class {!r}",
822 tablename, orm_class)
823 log.debug("PK attributes: {!r}", pk_attrs)
824 log.debug("Table: {!r}", table)
825 log.debug("Dependencies: parents = {!r}; children = {!r}",
826 tdc.parent_names, tdc.child_names)
828 if info_only:
829 log.debug("info_only; skipping table contents")
830 continue
832 def wipe_primary_key(inst: object) -> None:
833 for attrname in pk_attrs:
834 setattr(inst, attrname, None)
836 query = src_session.query(orm_class)
838 if allow_missing_src_columns and missing_columns:
839 src_attrs = map_keys_to_values(src_columns, c2a)
840 log.info("Table {} is missing columns {} in the source",
841 tablename, missing_columns)
842 log.debug("... using only columns {} via attributes {}",
843 src_columns, src_attrs)
844 query = query.options(load_only(*src_attrs))
845 # PROBLEM: it will not ignore the PK.
847 if prevent_eager_load:
848 query = query.options(lazyload("*"))
850 wipe_pk = tablename not in tables_to_keep_pks_for
852 # How best to deal with relationships?
853 #
854 # This doesn't work:
855 # - process tables in order of dependencies, eager-loading
856 # relationships with
857 # for relationship in insp.mapper.relationships: # type: RelationshipProperty # noqa
858 # related_col = getattr(orm_class, relationship.key)
859 # query = query.options(joinedload(related_col))
860 # - expunge from old session / make_transient / wipe_primary_key/ add
861 # to new session
862 # ... get errors like
863 # sqlalchemy.exc.InvalidRequestError: Object '<Parent at
864 # 0x7f99492440b8>' is already attached to session '7' (this is
865 # '6')
866 #
867 # ... at the point of dst_session.add(instance)
868 # ... when adding the object on the child side of the relationship
869 # ... I suspect that we move the Parent from session S to session D,
870 # but when we eager-load the Parent from the Child, that makes
871 # another in session S, so when we add the Child to session D, its
872 # parent is in session S, which is wrong.
873 #
874 # We must, therefore, take a more interventional approach, in which we
875 # maintain a copy of the old object, make a copy using
876 # copy_sqla_object, and re-assign relationships accordingly.
878 for instance in query.all():
879 # log.debug("Source instance: {!r}", instance)
880 table_record_num += 1
881 overall_record_num += 1
882 if table_record_num % report_every == 0:
883 log.info("... progress{}: on table {} ({}); record {} this "
884 "table; overall record {}",
885 " (DUMMY RUN)" if dummy_run else "",
886 table_num, tablename,
887 table_record_num, overall_record_num)
889 if tdc.standalone:
890 # Our table has neither parents nor children. We can therefore
891 # simply move the instance from one session to the other,
892 # blanking primary keys.
894 # https://stackoverflow.com/questions/14636192/sqlalchemy-modification-of-detached-object # noqa
895 src_session.expunge(instance)
896 make_transient(instance)
897 if wipe_pk:
898 wipe_primary_key(instance)
900 instance = translate(instance, instance)
901 if not instance:
902 continue # translate_fn elected to skip it
904 if not dummy_run:
905 dst_session.add(instance)
906 # new PK will be created when session is flushed
908 else:
909 # Our table has either parents or children. We therefore make
910 # a copy and place the COPY in the destination session. If
911 # this object may be a parent, we maintain a log (in objmap)
912 # of the old-to-new mapping. If this object is a child, we
913 # re-assign its relationships based on the old-to-new mapping
914 # (since we will have processed the parent table first, having
915 # carefully ordered them in advance).
917 oldobj = instance # rename for clarity
918 newobj = copy_sqla_object(
919 oldobj, omit_pk=wipe_pk, omit_fk=True,
920 omit_attrs=missing_attrs, debug=False
921 )
923 rewrite_relationships(oldobj, newobj, objmap, debug=False,
924 skip_table_names=skip_table_names)
926 newobj = translate(oldobj, newobj)
927 if not newobj:
928 continue # translate_fn elected to skip it
930 if not dummy_run:
931 dst_session.add(newobj)
932 # new PK will be created when session is flushed
934 if tdc.is_parent:
935 objmap[oldobj] = newobj # for its children's benefit
937 if flush_per_record:
938 flush()
940 if flush_per_table:
941 flush()
943 flush()
944 if commit_at_end:
945 log.debug("Committing...")
946 dst_session.commit()
947 log.info("merge_db(): finished")
950# =============================================================================
951# Unit tests
952# =============================================================================
954class MergeTestMixin(object):
955 """
956 Mixin to create source/destination databases as in-memory SQLite databases
957 for unit testing purposes.
958 """
959 def __init__(self, *args, echo: bool = False, **kwargs) -> None:
960 self.src_engine = create_engine(SQLITE_MEMORY_URL, echo=echo) # type: Engine # noqa
961 self.dst_engine = create_engine(SQLITE_MEMORY_URL, echo=echo) # type: Engine # noqa
962 self.src_session = sessionmaker(bind=self.src_engine)() # type: Session # noqa
963 self.dst_session = sessionmaker(bind=self.dst_engine)() # type: Session # noqa
964 # log.critical("SRC SESSION: {}", self.src_session)
965 # log.critical("DST SESSION: {}", self.dst_session)
967 self.Base = declarative_base()
969 # noinspection PyArgumentList
970 super().__init__(*args, **kwargs)
972 def dump_source(self) -> None:
973 log.warning("Dumping source")
974 dump_database_as_insert_sql(
975 engine=self.src_engine,
976 fileobj=sys.stdout,
977 include_ddl=True,
978 multirow=True
979 )
981 def dump_destination(self) -> None:
982 log.warning("Dumping destination")
983 dump_database_as_insert_sql(
984 engine=self.dst_engine,
985 fileobj=sys.stdout,
986 include_ddl=True,
987 multirow=True
988 )
990 def do_merge(self, dummy_run: bool = False) -> None:
991 merge_db(
992 base_class=self.Base,
993 src_engine=self.src_engine,
994 dst_session=self.dst_session,
995 allow_missing_src_tables=False,
996 allow_missing_src_columns=True,
997 translate_fn=None,
998 skip_tables=None,
999 only_tables=None,
1000 extra_table_dependencies=None,
1001 dummy_run=dummy_run,
1002 report_every=1000
1003 )
1006class MergeTestPlain(MergeTestMixin, unittest.TestCase):
1007 """
1008 Unit tests for a simple merge operation.
1010 *Notes re unit testing:*
1012 - tests are found by virtue of the fact that their names start with
1013 "test"; see
1014 https://docs.python.org/3.6/library/unittest.html#basic-example
1016 - A separate instance of the class is created for each test, and in each
1017 case is called with:
1019 .. code-block:: python
1021 setUp()
1022 testSOMETHING()
1023 tearDown()
1025 ... see https://docs.python.org/3.6/library/unittest.html#test-cases
1027 - If you use mixins, they go AFTER :class:`unittest.TestCase`; see
1028 https://stackoverflow.com/questions/1323455/python-unit-test-with-base-and-sub-class
1030 """ # noqa
1031 def setUp(self) -> None:
1032 # log.info('In setUp()')
1034 class Parent(self.Base):
1035 __tablename__ = "parent"
1036 id = Column(Integer, primary_key=True, autoincrement=True)
1037 name = Column(Text)
1039 class Child(self.Base):
1040 __tablename__ = "child"
1041 id = Column(Integer, primary_key=True, autoincrement=True)
1042 name = Column(Text)
1043 parent_id = Column(Integer, ForeignKey("parent.id"))
1044 parent = relationship(Parent)
1046 self.Base.metadata.create_all(self.src_engine)
1047 self.Base.metadata.create_all(self.dst_engine)
1049 p1 = Parent(name="Parent 1")
1050 p2 = Parent(name="Parent 2")
1051 c1 = Child(name="Child 1")
1052 c2 = Child(name="Child 2")
1053 c1.parent = p1
1054 c2.parent = p2
1055 self.src_session.add_all([p1, p2, c1, c2])
1056 self.src_session.commit()
1058 def tearDown(self) -> None:
1059 pass
1060 # log.info('In tearDown()')
1062 def test_source(self) -> None:
1063 self.dump_source()
1065 def test_dummy(self) -> None:
1066 log.info("Testing merge_db() in dummy run mode")
1067 self.do_merge(dummy_run=True)
1068 self.dst_session.commit()
1069 self.dump_destination()
1071 def test_merge_to_empty(self) -> None:
1072 log.info("Testing merge_db() to empty database")
1073 self.do_merge(dummy_run=False)
1074 self.dst_session.commit()
1075 self.dump_destination()
1077 # @unittest.skip
1078 def test_merge_to_existing(self) -> None:
1079 log.info("Testing merge_db() to pre-populated database")
1080 self.do_merge(dummy_run=False)
1081 self.dst_session.commit()
1082 self.do_merge(dummy_run=False)
1083 self.dst_session.commit()
1084 self.dump_destination()
1087class MergeTestCircular(MergeTestMixin, unittest.TestCase):
1088 """
1089 Unit tests including a circular dependency, which will fail.
1090 """
1092 @unittest.expectedFailure
1093 def test_setup_circular(self):
1095 class Parent(self.Base):
1096 __tablename__ = "parent"
1097 id = Column(Integer, primary_key=True, autoincrement=True)
1098 name = Column(Text)
1099 child_id = Column(Integer, ForeignKey("child.id"))
1100 child = relationship("Child", foreign_keys=[child_id])
1102 class Child(self.Base):
1103 __tablename__ = "child"
1104 id = Column(Integer, primary_key=True, autoincrement=True)
1105 name = Column(Text)
1106 parent_id = Column(Integer, ForeignKey("parent.id"))
1107 parent = relationship(Parent, foreign_keys=[parent_id])
1109 self.Base.metadata.create_all(self.src_engine)
1110 self.Base.metadata.create_all(self.dst_engine)
1112 p1 = Parent(name="Parent 1")
1113 p2 = Parent(name="Parent 2")
1114 c1 = Child(name="Child 1")
1115 c2 = Child(name="Child 2")
1116 c1.parent = p1
1117 c2.parent = p2
1118 p1.child = c1
1119 p2.child = c2
1120 self.src_session.add_all([p1, p2, c1, c2])
1121 self.src_session.commit() # will raise sqlalchemy.exc.CircularDependencyError # noqa
1123 @unittest.expectedFailure
1124 def test_circular(self) -> None:
1125 self.test_setup_circular() # fails here
1126 log.info("Testing merge_db() with circular relationship")
1127 self.do_merge(dummy_run=False) # would fail here, but fails earlier!
1128 self.dst_session.commit()
1129 self.dump_destination()
1132# =============================================================================
1133# main
1134# =============================================================================
1135# run with "python merge_db.py -v" to be verbose
1137if __name__ == "__main__":
1138 main_only_quicksetup_rootlogger()
1139 unittest.main()