Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2# cardinal_pythonlib/sqlalchemy/dump.py 

3 

4""" 

5=============================================================================== 

6 

7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com). 

8 

9 This file is part of cardinal_pythonlib. 

10 

11 Licensed under the Apache License, Version 2.0 (the "License"); 

12 you may not use this file except in compliance with the License. 

13 You may obtain a copy of the License at 

14 

15 https://www.apache.org/licenses/LICENSE-2.0 

16 

17 Unless required by applicable law or agreed to in writing, software 

18 distributed under the License is distributed on an "AS IS" BASIS, 

19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

20 See the License for the specific language governing permissions and 

21 limitations under the License. 

22 

23=============================================================================== 

24 

25**Functions to help with large-scale dumping of data from SQLAlchemy systems.** 

26 

27""" 

28 

29import datetime 

30import decimal 

31import sys 

32from typing import Any, Callable, Dict, TextIO, Type, Union 

33 

34import pendulum 

35# noinspection PyProtectedMember 

36from sqlalchemy.engine import Connectable, create_engine 

37from sqlalchemy.engine.base import Engine 

38from sqlalchemy.engine.default import DefaultDialect # for type hints 

39from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta 

40from sqlalchemy.inspection import inspect 

41from sqlalchemy.orm.query import Query 

42from sqlalchemy.sql.base import Executable 

43from sqlalchemy.sql.elements import BindParameter 

44from sqlalchemy.sql.expression import select 

45from sqlalchemy.sql.schema import MetaData, Table 

46from sqlalchemy.sql.sqltypes import DateTime, NullType, String 

47 

48from cardinal_pythonlib.file_io import writeline_nl, writelines_nl 

49from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler 

50from cardinal_pythonlib.sql.literals import sql_comment 

51from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName 

52from cardinal_pythonlib.sqlalchemy.orm_inspect import walk_orm_tree 

53from cardinal_pythonlib.sqlalchemy.schema import get_table_names 

54 

55log = get_brace_style_log_with_null_handler(__name__) 

56 

57SEP1 = sql_comment("=" * 76) 

58SEP2 = sql_comment("-" * 76) 

59 

60 

61# ============================================================================= 

62# Dump functions: get DDL and/or data as SQL commands 

63# ============================================================================= 

64 

65def dump_connection_info(engine: Engine, fileobj: TextIO = sys.stdout) -> None: 

66 """ 

67 Dumps some connection info, as an SQL comment. Obscures passwords. 

68 

69 Args: 

70 engine: the SQLAlchemy :class:`Engine` to dump metadata information 

71 from 

72 fileobj: the file-like object (default ``sys.stdout``) to write 

73 information to 

74 """ 

75 meta = MetaData(bind=engine) 

76 writeline_nl(fileobj, sql_comment(f'Database info: {meta}')) 

77 

78 

79def dump_ddl(metadata: MetaData, 

80 dialect_name: str, 

81 fileobj: TextIO = sys.stdout, 

82 checkfirst: bool = True) -> None: 

83 """ 

84 Sends schema-creating DDL from the metadata to the dump engine. 

85 This makes ``CREATE TABLE`` statements. 

86 

87 Args: 

88 metadata: SQLAlchemy :class:`MetaData` 

89 dialect_name: string name of SQL dialect to generate DDL in 

90 fileobj: file-like object to send DDL to 

91 checkfirst: if ``True``, use ``CREATE TABLE IF NOT EXISTS`` or 

92 equivalent. 

93 """ 

94 # http://docs.sqlalchemy.org/en/rel_0_8/faq.html#how-can-i-get-the-create-table-drop-table-output-as-a-string # noqa 

95 # https://stackoverflow.com/questions/870925/how-to-generate-a-file-with-ddl-in-the-engines-sql-dialect-in-sqlalchemy # noqa 

96 # https://github.com/plq/scripts/blob/master/pg_dump.py 

97 # noinspection PyUnusedLocal 

98 def dump(querysql, *multiparams, **params): 

99 compsql = querysql.compile(dialect=engine.dialect) 

100 writeline_nl(fileobj, f"{compsql};") 

101 

102 writeline_nl(fileobj, 

103 sql_comment(f"Schema (for dialect {dialect_name}):")) 

104 engine = create_engine(f'{dialect_name}://', 

105 strategy='mock', executor=dump) 

106 metadata.create_all(engine, checkfirst=checkfirst) 

107 # ... checkfirst doesn't seem to be working for the mock strategy... 

108 # http://docs.sqlalchemy.org/en/latest/core/metadata.html 

109 # ... does it implement a *real* check (impossible here), rather than 

110 # issuing CREATE ... IF NOT EXISTS? 

111 

112 

113def quick_mapper(table: Table) -> Type[DeclarativeMeta]: 

114 """ 

115 Makes a new SQLAlchemy mapper for an existing table. 

116 See 

117 http://www.tylerlesmann.com/2009/apr/27/copying-databases-across-platforms-sqlalchemy/ 

118  

119 Args: 

120 table: SQLAlchemy :class:`Table` object 

121 

122 Returns: 

123 a :class:`DeclarativeMeta` class 

124 

125 """ # noqa 

126 # noinspection PyPep8Naming 

127 Base = declarative_base() 

128 

129 class GenericMapper(Base): 

130 __table__ = table 

131 

132 # noinspection PyTypeChecker 

133 return GenericMapper 

134 

135 

136class StringLiteral(String): 

137 """ 

138 Teach SQLAlchemy how to literalize various things. 

139 See 

140 https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query 

141 """ 

142 def literal_processor(self, 

143 dialect: DefaultDialect) -> Callable[[Any], str]: 

144 super_processor = super().literal_processor(dialect) 

145 

146 def process(value: Any) -> str: 

147 log.debug("process: {!r}", value) 

148 if isinstance(value, int): 

149 return str(value) 

150 if not isinstance(value, str): 

151 value = str(value) 

152 result = super_processor(value) 

153 if isinstance(result, bytes): 

154 result = result.decode(dialect.encoding) 

155 return result 

156 return process 

157 

158 

159# noinspection PyPep8Naming 

160def make_literal_query_fn(dialect: DefaultDialect) -> Callable[[str], str]: 

161 DialectClass = dialect.__class__ 

162 

163 # noinspection PyClassHasNoInit,PyAbstractClass 

164 class LiteralDialect(DialectClass): 

165 # https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # noqa 

166 colspecs = { 

167 # prevent various encoding explosions 

168 String: StringLiteral, 

169 # teach SA about how to literalize a datetime 

170 DateTime: StringLiteral, 

171 # don't format py2 long integers to NULL 

172 NullType: StringLiteral, 

173 } 

174 

175 def literal_query(statement: str) -> str: 

176 """ 

177 NOTE: This is entirely insecure. DO NOT execute the resulting 

178 strings. 

179 """ 

180 # https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # noqa 

181 if isinstance(statement, Query): 

182 statement = statement.statement 

183 return statement.compile( 

184 dialect=LiteralDialect(), 

185 compile_kwargs={'literal_binds': True}, 

186 ).string + ";" 

187 

188 return literal_query 

189 

190 

191# noinspection PyProtectedMember 

192def get_literal_query(statement: Union[Query, Executable], 

193 bind: Connectable = None) -> str: 

194 """ 

195 Takes an SQLAlchemy statement and produces a literal SQL version, with 

196 values filled in. 

197  

198 As per 

199 https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query 

200  

201 Notes: 

202 - for debugging purposes *only* 

203 - insecure; you should always separate queries from their values 

204 - please also note that this function is quite slow  

205  

206 Args: 

207 statement: the SQL statement (a SQLAlchemy object) to use 

208 bind: if the statement is unbound, you will need to specify an object 

209 here that supports SQL execution  

210 

211 Returns: 

212 a string literal version of the query. 

213 

214 """ # noqa 

215 # log.debug("statement: {!r}", statement) 

216 # log.debug("statement.bind: {!r}", statement.bind) 

217 if isinstance(statement, Query): 

218 if bind is None: 

219 bind = statement.session.get_bind(statement._mapper_zero_or_none()) 

220 statement = statement.statement 

221 elif bind is None: 

222 bind = statement.bind 

223 if bind is None: # despite all that 

224 raise ValueError("Attempt to call get_literal_query with an unbound " 

225 "statement and no 'bind' parameter") 

226 

227 # noinspection PyUnresolvedReferences 

228 dialect = bind.dialect 

229 compiler = statement._compiler(dialect) 

230 

231 class LiteralCompiler(compiler.__class__): 

232 # noinspection PyMethodMayBeStatic 

233 def visit_bindparam(self, 

234 bindparam: BindParameter, 

235 within_columns_clause: bool = False, 

236 literal_binds: bool = False, 

237 **kwargs) -> str: 

238 return super().render_literal_bindparam( 

239 bindparam, 

240 within_columns_clause=within_columns_clause, 

241 literal_binds=literal_binds, 

242 **kwargs 

243 ) 

244 

245 # noinspection PyUnusedLocal 

246 def render_literal_value(self, value: Any, type_) -> str: 

247 """Render the value of a bind parameter as a quoted literal. 

248 

249 This is used for statement sections that do not accept bind 

250 paramters on the target driver/database. 

251 

252 This should be implemented by subclasses using the quoting services 

253 of the DBAPI. 

254 """ 

255 if isinstance(value, str): 

256 value = value.replace("'", "''") 

257 return "'%s'" % value 

258 elif value is None: 

259 return "NULL" 

260 elif isinstance(value, (float, int)): 

261 return repr(value) 

262 elif isinstance(value, decimal.Decimal): 

263 return str(value) 

264 elif (isinstance(value, datetime.datetime) or 

265 isinstance(value, datetime.date) or 

266 isinstance(value, datetime.time) or 

267 isinstance(value, pendulum.DateTime) or 

268 isinstance(value, pendulum.Date) or 

269 isinstance(value, pendulum.Time)): 

270 # All have an isoformat() method. 

271 return f"'{value.isoformat()}'" 

272 # return ( 

273 # "TO_DATE('%s','YYYY-MM-DD HH24:MI:SS')" 

274 # % value.strftime("%Y-%m-%d %H:%M:%S") 

275 # ) 

276 else: 

277 raise NotImplementedError( 

278 "Don't know how to literal-quote value %r" % value) 

279 

280 compiler = LiteralCompiler(dialect, statement) 

281 return compiler.process(statement) + ";" 

282 

283 

284def dump_table_as_insert_sql(engine: Engine, 

285 table_name: str, 

286 fileobj: TextIO, 

287 wheredict: Dict[str, Any] = None, 

288 include_ddl: bool = False, 

289 multirow: bool = False) -> None: 

290 """ 

291 Reads a table from the database, and writes SQL to replicate the table's 

292 data to the output ``fileobj``. 

293 

294 Args: 

295 engine: SQLAlchemy :class:`Engine` 

296 table_name: name of the table 

297 fileobj: file-like object to write to 

298 wheredict: optional dictionary of ``{column_name: value}`` to use as 

299 ``WHERE`` filters 

300 include_ddl: if ``True``, include the DDL to create the table as well 

301 multirow: write multi-row ``INSERT`` statements 

302 """ 

303 # https://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query # noqa 

304 # http://docs.sqlalchemy.org/en/latest/faq/sqlexpressions.html 

305 # http://www.tylerlesmann.com/2009/apr/27/copying-databases-across-platforms-sqlalchemy/ # noqa 

306 # https://github.com/plq/scripts/blob/master/pg_dump.py 

307 log.info("dump_data_as_insert_sql: table_name={}", table_name) 

308 writelines_nl(fileobj, [ 

309 SEP1, 

310 sql_comment(f"Data for table: {table_name}"), 

311 SEP2, 

312 sql_comment(f"Filters: {wheredict}"), 

313 ]) 

314 dialect = engine.dialect 

315 if not dialect.supports_multivalues_insert: 

316 multirow = False 

317 if multirow: 

318 log.warning("dump_data_as_insert_sql: multirow parameter substitution " 

319 "not working yet") 

320 multirow = False 

321 

322 # literal_query = make_literal_query_fn(dialect) 

323 

324 meta = MetaData(bind=engine) 

325 log.debug("... retrieving schema") 

326 table = Table(table_name, meta, autoload=True) 

327 if include_ddl: 

328 log.debug("... producing DDL") 

329 dump_ddl(table.metadata, dialect_name=engine.dialect.name, 

330 fileobj=fileobj) 

331 # NewRecord = quick_mapper(table) 

332 # columns = table.columns.keys() 

333 log.debug("... fetching records") 

334 # log.debug("meta: {}", meta) # obscures password 

335 # log.debug("table: {}", table) 

336 # log.debug("table.columns: {!r}", table.columns) 

337 # log.debug("multirow: {}", multirow) 

338 query = select(table.columns) 

339 if wheredict: 

340 for k, v in wheredict.items(): 

341 col = table.columns.get(k) 

342 query = query.where(col == v) 

343 # log.debug("query: {}", query) 

344 cursor = engine.execute(query) 

345 if multirow: 

346 row_dict_list = [] 

347 for r in cursor: 

348 row_dict_list.append(dict(r)) 

349 # log.debug("row_dict_list: {}", row_dict_list) 

350 if row_dict_list: 

351 statement = table.insert().values(row_dict_list) 

352 # log.debug("statement: {!r}", statement) 

353 # insert_str = literal_query(statement) 

354 insert_str = get_literal_query(statement, bind=engine) 

355 # NOT WORKING FOR MULTIROW INSERTS. ONLY SUBSTITUTES FIRST ROW. 

356 writeline_nl(fileobj, insert_str) 

357 else: 

358 writeline_nl(fileobj, sql_comment("No data!")) 

359 else: 

360 found_one = False 

361 for r in cursor: 

362 found_one = True 

363 row_dict = dict(r) 

364 statement = table.insert(values=row_dict) 

365 # insert_str = literal_query(statement) 

366 insert_str = get_literal_query(statement, bind=engine) 

367 # log.debug("row_dict: {}", row_dict) 

368 # log.debug("insert_str: {}", insert_str) 

369 writeline_nl(fileobj, insert_str) 

370 if not found_one: 

371 writeline_nl(fileobj, sql_comment("No data!")) 

372 writeline_nl(fileobj, SEP2) 

373 log.debug("... done") 

374 

375 

376def dump_database_as_insert_sql(engine: Engine, 

377 fileobj: TextIO = sys.stdout, 

378 include_ddl: bool = False, 

379 multirow: bool = False) -> None: 

380 """ 

381 Reads an entire database and writes SQL to replicate it to the output 

382 file-like object. 

383 

384 Args: 

385 engine: SQLAlchemy :class:`Engine` 

386 fileobj: file-like object to write to 

387 include_ddl: if ``True``, include the DDL to create the table as well 

388 multirow: write multi-row ``INSERT`` statements 

389 """ 

390 for tablename in get_table_names(engine): 

391 dump_table_as_insert_sql( 

392 engine=engine, 

393 table_name=tablename, 

394 fileobj=fileobj, 

395 include_ddl=include_ddl, 

396 multirow=multirow 

397 ) 

398 

399 

400def dump_orm_object_as_insert_sql(engine: Engine, 

401 obj: object, 

402 fileobj: TextIO) -> None: 

403 """ 

404 Takes a SQLAlchemy ORM object, and writes ``INSERT`` SQL to replicate it 

405 to the output file-like object. 

406 

407 Args: 

408 engine: SQLAlchemy :class:`Engine` 

409 obj: SQLAlchemy ORM object to write 

410 fileobj: file-like object to write to 

411 """ 

412 # literal_query = make_literal_query_fn(engine.dialect) 

413 insp = inspect(obj) 

414 # insp: an InstanceState 

415 # http://docs.sqlalchemy.org/en/latest/orm/internals.html#sqlalchemy.orm.state.InstanceState # noqa 

416 # insp.mapper: a Mapper 

417 # http://docs.sqlalchemy.org/en/latest/orm/mapping_api.html#sqlalchemy.orm.mapper.Mapper # noqa 

418 

419 # Don't do this: 

420 # table = insp.mapper.mapped_table 

421 # Do this instead. The method above gives you fancy data types like list 

422 # and Arrow on the Python side. We want the bog-standard datatypes drawn 

423 # from the database itself. 

424 meta = MetaData(bind=engine) 

425 table_name = insp.mapper.mapped_table.name 

426 # log.debug("table_name: {}", table_name) 

427 table = Table(table_name, meta, autoload=True) 

428 # log.debug("table: {}", table) 

429 

430 # NewRecord = quick_mapper(table) 

431 # columns = table.columns.keys() 

432 query = select(table.columns) 

433 # log.debug("query: {}", query) 

434 for orm_pkcol in insp.mapper.primary_key: 

435 core_pkcol = table.columns.get(orm_pkcol.name) 

436 pkval = getattr(obj, orm_pkcol.name) 

437 query = query.where(core_pkcol == pkval) 

438 # log.debug("query: {}", query) 

439 cursor = engine.execute(query) 

440 row = cursor.fetchone() # should only be one... 

441 row_dict = dict(row) 

442 # log.debug("obj: {}", obj) 

443 # log.debug("row_dict: {}", row_dict) 

444 statement = table.insert(values=row_dict) 

445 # insert_str = literal_query(statement) 

446 insert_str = get_literal_query(statement, bind=engine) 

447 writeline_nl(fileobj, insert_str) 

448 

449 

450def bulk_insert_extras(dialect_name: str, 

451 fileobj: TextIO, 

452 start: bool) -> None: 

453 """ 

454 Writes bulk ``INSERT`` preamble (start=True) or end (start=False). 

455 

456 For MySQL, this temporarily switches off autocommit behaviour and index/FK 

457 checks, for speed, then re-enables them at the end and commits. 

458 

459 Args: 

460 dialect_name: SQLAlchemy dialect name (see :class:`SqlaDialectName`) 

461 fileobj: file-like object to write to 

462 start: if ``True``, write preamble; if ``False``, write end 

463 """ 

464 lines = [] 

465 if dialect_name == SqlaDialectName.MYSQL: 

466 if start: 

467 lines = [ 

468 "SET autocommit=0;", 

469 "SET unique_checks=0;", 

470 "SET foreign_key_checks=0;", 

471 ] 

472 else: 

473 lines = [ 

474 "SET foreign_key_checks=1;", 

475 "SET unique_checks=1;", 

476 "COMMIT;", 

477 ] 

478 writelines_nl(fileobj, lines) 

479 

480 

481def dump_orm_tree_as_insert_sql(engine: Engine, 

482 baseobj: object, 

483 fileobj: TextIO) -> None: 

484 """ 

485 Sends an object, and all its relations (discovered via "relationship" 

486 links) as ``INSERT`` commands in SQL, to ``fileobj``. 

487 

488 Args: 

489 engine: SQLAlchemy :class:`Engine` 

490 baseobj: starting SQLAlchemy ORM object 

491 fileobj: file-like object to write to 

492 

493 Problem: foreign key constraints. 

494  

495 - MySQL/InnoDB doesn't wait to the end of a transaction to check FK 

496 integrity (which it should): 

497 https://stackoverflow.com/questions/5014700/in-mysql-can-i-defer-referential-integrity-checks-until-commit # noqa 

498 - PostgreSQL can. 

499 - Anyway, slightly ugly hacks... 

500 https://dev.mysql.com/doc/refman/5.5/en/optimizing-innodb-bulk-data-loading.html 

501 - Not so obvious how we can iterate through the list of ORM objects and 

502 guarantee correct insertion order with respect to all FKs. 

503 """ # noqa 

504 writeline_nl( 

505 fileobj, 

506 sql_comment("Data for all objects related to the first below:")) 

507 bulk_insert_extras(engine.dialect.name, fileobj, start=True) 

508 for part in walk_orm_tree(baseobj): 

509 dump_orm_object_as_insert_sql(engine, part, fileobj) 

510 bulk_insert_extras(engine.dialect.name, fileobj, start=False)