Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/cardinal_pythonlib/sqlalchemy/dump.py : 23%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# cardinal_pythonlib/sqlalchemy/dump.py
4"""
5===============================================================================
7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com).
9 This file is part of cardinal_pythonlib.
11 Licensed under the Apache License, Version 2.0 (the "License");
12 you may not use this file except in compliance with the License.
13 You may obtain a copy of the License at
15 https://www.apache.org/licenses/LICENSE-2.0
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
23===============================================================================
25**Functions to help with large-scale dumping of data from SQLAlchemy systems.**
27"""
29import datetime
30import decimal
31import sys
32from typing import Any, Callable, Dict, TextIO, Type, Union
34import pendulum
35# noinspection PyProtectedMember
36from sqlalchemy.engine import Connectable, create_engine
37from sqlalchemy.engine.base import Engine
38from sqlalchemy.engine.default import DefaultDialect # for type hints
39from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
40from sqlalchemy.inspection import inspect
41from sqlalchemy.orm.query import Query
42from sqlalchemy.sql.base import Executable
43from sqlalchemy.sql.elements import BindParameter
44from sqlalchemy.sql.expression import select
45from sqlalchemy.sql.schema import MetaData, Table
46from sqlalchemy.sql.sqltypes import DateTime, NullType, String
48from cardinal_pythonlib.file_io import writeline_nl, writelines_nl
49from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler
50from cardinal_pythonlib.sql.literals import sql_comment
51from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
52from cardinal_pythonlib.sqlalchemy.orm_inspect import walk_orm_tree
53from cardinal_pythonlib.sqlalchemy.schema import get_table_names
55log = get_brace_style_log_with_null_handler(__name__)
57SEP1 = sql_comment("=" * 76)
58SEP2 = sql_comment("-" * 76)
61# =============================================================================
62# Dump functions: get DDL and/or data as SQL commands
63# =============================================================================
65def dump_connection_info(engine: Engine, fileobj: TextIO = sys.stdout) -> None:
66 """
67 Dumps some connection info, as an SQL comment. Obscures passwords.
69 Args:
70 engine: the SQLAlchemy :class:`Engine` to dump metadata information
71 from
72 fileobj: the file-like object (default ``sys.stdout``) to write
73 information to
74 """
75 meta = MetaData(bind=engine)
76 writeline_nl(fileobj, sql_comment(f'Database info: {meta}'))
79def dump_ddl(metadata: MetaData,
80 dialect_name: str,
81 fileobj: TextIO = sys.stdout,
82 checkfirst: bool = True) -> None:
83 """
84 Sends schema-creating DDL from the metadata to the dump engine.
85 This makes ``CREATE TABLE`` statements.
87 Args:
88 metadata: SQLAlchemy :class:`MetaData`
89 dialect_name: string name of SQL dialect to generate DDL in
90 fileobj: file-like object to send DDL to
91 checkfirst: if ``True``, use ``CREATE TABLE IF NOT EXISTS`` or
92 equivalent.
93 """
94 # http://docs.sqlalchemy.org/en/rel_0_8/faq.html#how-can-i-get-the-create-table-drop-table-output-as-a-string # noqa
95 # https://stackoverflow.com/questions/870925/how-to-generate-a-file-with-ddl-in-the-engines-sql-dialect-in-sqlalchemy # noqa
96 # https://github.com/plq/scripts/blob/master/pg_dump.py
97 # noinspection PyUnusedLocal
98 def dump(querysql, *multiparams, **params):
99 compsql = querysql.compile(dialect=engine.dialect)
100 writeline_nl(fileobj, f"{compsql};")
102 writeline_nl(fileobj,
103 sql_comment(f"Schema (for dialect {dialect_name}):"))
104 engine = create_engine(f'{dialect_name}://',
105 strategy='mock', executor=dump)
106 metadata.create_all(engine, checkfirst=checkfirst)
107 # ... checkfirst doesn't seem to be working for the mock strategy...
108 # http://docs.sqlalchemy.org/en/latest/core/metadata.html
109 # ... does it implement a *real* check (impossible here), rather than
110 # issuing CREATE ... IF NOT EXISTS?
113def quick_mapper(table: Table) -> Type[DeclarativeMeta]:
114 """
115 Makes a new SQLAlchemy mapper for an existing table.
116 See
117 http://www.tylerlesmann.com/2009/apr/27/copying-databases-across-platforms-sqlalchemy/
119 Args:
120 table: SQLAlchemy :class:`Table` object
122 Returns:
123 a :class:`DeclarativeMeta` class
125 """ # noqa
126 # noinspection PyPep8Naming
127 Base = declarative_base()
129 class GenericMapper(Base):
130 __table__ = table
132 # noinspection PyTypeChecker
133 return GenericMapper
136class StringLiteral(String):
137 """
138 Teach SQLAlchemy how to literalize various things.
139 See
140 https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query
141 """
142 def literal_processor(self,
143 dialect: DefaultDialect) -> Callable[[Any], str]:
144 super_processor = super().literal_processor(dialect)
146 def process(value: Any) -> str:
147 log.debug("process: {!r}", value)
148 if isinstance(value, int):
149 return str(value)
150 if not isinstance(value, str):
151 value = str(value)
152 result = super_processor(value)
153 if isinstance(result, bytes):
154 result = result.decode(dialect.encoding)
155 return result
156 return process
159# noinspection PyPep8Naming
160def make_literal_query_fn(dialect: DefaultDialect) -> Callable[[str], str]:
161 DialectClass = dialect.__class__
163 # noinspection PyClassHasNoInit,PyAbstractClass
164 class LiteralDialect(DialectClass):
165 # https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # noqa
166 colspecs = {
167 # prevent various encoding explosions
168 String: StringLiteral,
169 # teach SA about how to literalize a datetime
170 DateTime: StringLiteral,
171 # don't format py2 long integers to NULL
172 NullType: StringLiteral,
173 }
175 def literal_query(statement: str) -> str:
176 """
177 NOTE: This is entirely insecure. DO NOT execute the resulting
178 strings.
179 """
180 # https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # noqa
181 if isinstance(statement, Query):
182 statement = statement.statement
183 return statement.compile(
184 dialect=LiteralDialect(),
185 compile_kwargs={'literal_binds': True},
186 ).string + ";"
188 return literal_query
191# noinspection PyProtectedMember
192def get_literal_query(statement: Union[Query, Executable],
193 bind: Connectable = None) -> str:
194 """
195 Takes an SQLAlchemy statement and produces a literal SQL version, with
196 values filled in.
198 As per
199 https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query
201 Notes:
202 - for debugging purposes *only*
203 - insecure; you should always separate queries from their values
204 - please also note that this function is quite slow
206 Args:
207 statement: the SQL statement (a SQLAlchemy object) to use
208 bind: if the statement is unbound, you will need to specify an object
209 here that supports SQL execution
211 Returns:
212 a string literal version of the query.
214 """ # noqa
215 # log.debug("statement: {!r}", statement)
216 # log.debug("statement.bind: {!r}", statement.bind)
217 if isinstance(statement, Query):
218 if bind is None:
219 bind = statement.session.get_bind(statement._mapper_zero_or_none())
220 statement = statement.statement
221 elif bind is None:
222 bind = statement.bind
223 if bind is None: # despite all that
224 raise ValueError("Attempt to call get_literal_query with an unbound "
225 "statement and no 'bind' parameter")
227 # noinspection PyUnresolvedReferences
228 dialect = bind.dialect
229 compiler = statement._compiler(dialect)
231 class LiteralCompiler(compiler.__class__):
232 # noinspection PyMethodMayBeStatic
233 def visit_bindparam(self,
234 bindparam: BindParameter,
235 within_columns_clause: bool = False,
236 literal_binds: bool = False,
237 **kwargs) -> str:
238 return super().render_literal_bindparam(
239 bindparam,
240 within_columns_clause=within_columns_clause,
241 literal_binds=literal_binds,
242 **kwargs
243 )
245 # noinspection PyUnusedLocal
246 def render_literal_value(self, value: Any, type_) -> str:
247 """Render the value of a bind parameter as a quoted literal.
249 This is used for statement sections that do not accept bind
250 paramters on the target driver/database.
252 This should be implemented by subclasses using the quoting services
253 of the DBAPI.
254 """
255 if isinstance(value, str):
256 value = value.replace("'", "''")
257 return "'%s'" % value
258 elif value is None:
259 return "NULL"
260 elif isinstance(value, (float, int)):
261 return repr(value)
262 elif isinstance(value, decimal.Decimal):
263 return str(value)
264 elif (isinstance(value, datetime.datetime) or
265 isinstance(value, datetime.date) or
266 isinstance(value, datetime.time) or
267 isinstance(value, pendulum.DateTime) or
268 isinstance(value, pendulum.Date) or
269 isinstance(value, pendulum.Time)):
270 # All have an isoformat() method.
271 return f"'{value.isoformat()}'"
272 # return (
273 # "TO_DATE('%s','YYYY-MM-DD HH24:MI:SS')"
274 # % value.strftime("%Y-%m-%d %H:%M:%S")
275 # )
276 else:
277 raise NotImplementedError(
278 "Don't know how to literal-quote value %r" % value)
280 compiler = LiteralCompiler(dialect, statement)
281 return compiler.process(statement) + ";"
284def dump_table_as_insert_sql(engine: Engine,
285 table_name: str,
286 fileobj: TextIO,
287 wheredict: Dict[str, Any] = None,
288 include_ddl: bool = False,
289 multirow: bool = False) -> None:
290 """
291 Reads a table from the database, and writes SQL to replicate the table's
292 data to the output ``fileobj``.
294 Args:
295 engine: SQLAlchemy :class:`Engine`
296 table_name: name of the table
297 fileobj: file-like object to write to
298 wheredict: optional dictionary of ``{column_name: value}`` to use as
299 ``WHERE`` filters
300 include_ddl: if ``True``, include the DDL to create the table as well
301 multirow: write multi-row ``INSERT`` statements
302 """
303 # https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # noqa
304 # http://docs.sqlalchemy.org/en/latest/faq/sqlexpressions.html
305 # http://www.tylerlesmann.com/2009/apr/27/copying-databases-across-platforms-sqlalchemy/ # noqa
306 # https://github.com/plq/scripts/blob/master/pg_dump.py
307 log.info("dump_data_as_insert_sql: table_name={}", table_name)
308 writelines_nl(fileobj, [
309 SEP1,
310 sql_comment(f"Data for table: {table_name}"),
311 SEP2,
312 sql_comment(f"Filters: {wheredict}"),
313 ])
314 dialect = engine.dialect
315 if not dialect.supports_multivalues_insert:
316 multirow = False
317 if multirow:
318 log.warning("dump_data_as_insert_sql: multirow parameter substitution "
319 "not working yet")
320 multirow = False
322 # literal_query = make_literal_query_fn(dialect)
324 meta = MetaData(bind=engine)
325 log.debug("... retrieving schema")
326 table = Table(table_name, meta, autoload=True)
327 if include_ddl:
328 log.debug("... producing DDL")
329 dump_ddl(table.metadata, dialect_name=engine.dialect.name,
330 fileobj=fileobj)
331 # NewRecord = quick_mapper(table)
332 # columns = table.columns.keys()
333 log.debug("... fetching records")
334 # log.debug("meta: {}", meta) # obscures password
335 # log.debug("table: {}", table)
336 # log.debug("table.columns: {!r}", table.columns)
337 # log.debug("multirow: {}", multirow)
338 query = select(table.columns)
339 if wheredict:
340 for k, v in wheredict.items():
341 col = table.columns.get(k)
342 query = query.where(col == v)
343 # log.debug("query: {}", query)
344 cursor = engine.execute(query)
345 if multirow:
346 row_dict_list = []
347 for r in cursor:
348 row_dict_list.append(dict(r))
349 # log.debug("row_dict_list: {}", row_dict_list)
350 if row_dict_list:
351 statement = table.insert().values(row_dict_list)
352 # log.debug("statement: {!r}", statement)
353 # insert_str = literal_query(statement)
354 insert_str = get_literal_query(statement, bind=engine)
355 # NOT WORKING FOR MULTIROW INSERTS. ONLY SUBSTITUTES FIRST ROW.
356 writeline_nl(fileobj, insert_str)
357 else:
358 writeline_nl(fileobj, sql_comment("No data!"))
359 else:
360 found_one = False
361 for r in cursor:
362 found_one = True
363 row_dict = dict(r)
364 statement = table.insert(values=row_dict)
365 # insert_str = literal_query(statement)
366 insert_str = get_literal_query(statement, bind=engine)
367 # log.debug("row_dict: {}", row_dict)
368 # log.debug("insert_str: {}", insert_str)
369 writeline_nl(fileobj, insert_str)
370 if not found_one:
371 writeline_nl(fileobj, sql_comment("No data!"))
372 writeline_nl(fileobj, SEP2)
373 log.debug("... done")
376def dump_database_as_insert_sql(engine: Engine,
377 fileobj: TextIO = sys.stdout,
378 include_ddl: bool = False,
379 multirow: bool = False) -> None:
380 """
381 Reads an entire database and writes SQL to replicate it to the output
382 file-like object.
384 Args:
385 engine: SQLAlchemy :class:`Engine`
386 fileobj: file-like object to write to
387 include_ddl: if ``True``, include the DDL to create the table as well
388 multirow: write multi-row ``INSERT`` statements
389 """
390 for tablename in get_table_names(engine):
391 dump_table_as_insert_sql(
392 engine=engine,
393 table_name=tablename,
394 fileobj=fileobj,
395 include_ddl=include_ddl,
396 multirow=multirow
397 )
400def dump_orm_object_as_insert_sql(engine: Engine,
401 obj: object,
402 fileobj: TextIO) -> None:
403 """
404 Takes a SQLAlchemy ORM object, and writes ``INSERT`` SQL to replicate it
405 to the output file-like object.
407 Args:
408 engine: SQLAlchemy :class:`Engine`
409 obj: SQLAlchemy ORM object to write
410 fileobj: file-like object to write to
411 """
412 # literal_query = make_literal_query_fn(engine.dialect)
413 insp = inspect(obj)
414 # insp: an InstanceState
415 # http://docs.sqlalchemy.org/en/latest/orm/internals.html#sqlalchemy.orm.state.InstanceState # noqa
416 # insp.mapper: a Mapper
417 # http://docs.sqlalchemy.org/en/latest/orm/mapping_api.html#sqlalchemy.orm.mapper.Mapper # noqa
419 # Don't do this:
420 # table = insp.mapper.mapped_table
421 # Do this instead. The method above gives you fancy data types like list
422 # and Arrow on the Python side. We want the bog-standard datatypes drawn
423 # from the database itself.
424 meta = MetaData(bind=engine)
425 table_name = insp.mapper.mapped_table.name
426 # log.debug("table_name: {}", table_name)
427 table = Table(table_name, meta, autoload=True)
428 # log.debug("table: {}", table)
430 # NewRecord = quick_mapper(table)
431 # columns = table.columns.keys()
432 query = select(table.columns)
433 # log.debug("query: {}", query)
434 for orm_pkcol in insp.mapper.primary_key:
435 core_pkcol = table.columns.get(orm_pkcol.name)
436 pkval = getattr(obj, orm_pkcol.name)
437 query = query.where(core_pkcol == pkval)
438 # log.debug("query: {}", query)
439 cursor = engine.execute(query)
440 row = cursor.fetchone() # should only be one...
441 row_dict = dict(row)
442 # log.debug("obj: {}", obj)
443 # log.debug("row_dict: {}", row_dict)
444 statement = table.insert(values=row_dict)
445 # insert_str = literal_query(statement)
446 insert_str = get_literal_query(statement, bind=engine)
447 writeline_nl(fileobj, insert_str)
450def bulk_insert_extras(dialect_name: str,
451 fileobj: TextIO,
452 start: bool) -> None:
453 """
454 Writes bulk ``INSERT`` preamble (start=True) or end (start=False).
456 For MySQL, this temporarily switches off autocommit behaviour and index/FK
457 checks, for speed, then re-enables them at the end and commits.
459 Args:
460 dialect_name: SQLAlchemy dialect name (see :class:`SqlaDialectName`)
461 fileobj: file-like object to write to
462 start: if ``True``, write preamble; if ``False``, write end
463 """
464 lines = []
465 if dialect_name == SqlaDialectName.MYSQL:
466 if start:
467 lines = [
468 "SET autocommit=0;",
469 "SET unique_checks=0;",
470 "SET foreign_key_checks=0;",
471 ]
472 else:
473 lines = [
474 "SET foreign_key_checks=1;",
475 "SET unique_checks=1;",
476 "COMMIT;",
477 ]
478 writelines_nl(fileobj, lines)
481def dump_orm_tree_as_insert_sql(engine: Engine,
482 baseobj: object,
483 fileobj: TextIO) -> None:
484 """
485 Sends an object, and all its relations (discovered via "relationship"
486 links) as ``INSERT`` commands in SQL, to ``fileobj``.
488 Args:
489 engine: SQLAlchemy :class:`Engine`
490 baseobj: starting SQLAlchemy ORM object
491 fileobj: file-like object to write to
493 Problem: foreign key constraints.
495 - MySQL/InnoDB doesn't wait to the end of a transaction to check FK
496 integrity (which it should):
497 https://stackoverflow.com/questions/5014700/in-mysql-can-i-defer-referential-integrity-checks-until-commit # noqa
498 - PostgreSQL can.
499 - Anyway, slightly ugly hacks...
500 https://dev.mysql.com/doc/refman/5.5/en/optimizing-innodb-bulk-data-loading.html
501 - Not so obvious how we can iterate through the list of ORM objects and
502 guarantee correct insertion order with respect to all FKs.
503 """ # noqa
504 writeline_nl(
505 fileobj,
506 sql_comment("Data for all objects related to the first below:"))
507 bulk_insert_extras(engine.dialect.name, fileobj, start=True)
508 for part in walk_orm_tree(baseobj):
509 dump_orm_object_as_insert_sql(engine, part, fileobj)
510 bulk_insert_extras(engine.dialect.name, fileobj, start=False)