Coverage for preprocess/autoimport_db.py: 44%

418 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/preprocess/autoimport_db.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Automatically import to a database from a collection of tabular files.** 

27 

28Efficiency is challenging here. Simple CSV/TSV files are efficiently handled 

29as file-like objects, and can be iterated in a low-memory way very fast. 

30Spreadsheet-type objects often need to be loaded "whole", so repeat iteration 

31is less sensible. We're trying to handle both, so no perfect/simple way. 

32 

33Considered but not done: 

34 

35- Track min/max for numeric types. This would allow us to refine the integer 

36 type. However, there is always the danger that we scan data and create tables 

37 for one set of files, then want to import from another, and the latter has 

38 more extreme values. So we just use a column type (BigInteger) with wide 

39 capabilities. (Having said that, we do track the maximum length of strings!) 

40 

41""" 

42 

43import argparse 

44import csv 

45import datetime 

46from itertools import zip_longest 

47import logging 

48from operator import attrgetter 

49from pathlib import Path 

50import tempfile 

51from typing import ( 

52 Any, 

53 BinaryIO, 

54 Callable, 

55 Dict, 

56 Generator, 

57 Iterable, 

58 IO, 

59 List, 

60 Optional, 

61 TextIO, 

62 Tuple, 

63 Union, 

64) 

65import zipfile 

66 

67from cardinal_pythonlib.logs import main_only_quicksetup_rootlogger 

68from cardinal_pythonlib.sqlalchemy.session import get_safe_url_from_engine 

69from rich_argparse import ArgumentDefaultsRichHelpFormatter 

70import openpyxl # xlrd might be faster... 

71from openpyxl.cell.cell import Cell 

72import pendulum 

73import pendulum.parsing.exceptions 

74import pyexcel_ods 

75from sqlalchemy.engine import create_engine, Engine 

76from sqlalchemy.exc import InvalidRequestError 

77from sqlalchemy.orm.session import sessionmaker, Session 

78from sqlalchemy.sql.expression import insert 

79from sqlalchemy.sql.schema import Column, MetaData, Table 

80from sqlalchemy.sql.sqltypes import ( 

81 BigInteger, 

82 Boolean, 

83 Date, 

84 DateTime, 

85 Float, 

86 String, 

87) 

88from sqlalchemy.sql.type_api import TypeEngine 

89 

90from crate_anon.common.future import batched 

91 

92log = logging.getLogger(__name__) 

93 

94 

95# ============================================================================= 

96# Constants 

97# ============================================================================= 

98 

99SPREADSHEET_DICT_ROW_TYPE = Dict[str, Any] 

100DEFAULT_CHUNKSIZE = int(1e5) 

101DEFAULT_COL_TYPE = String(1) 

102USE_SPREADSHEET_NAMES = "use_spreadsheet_names" 

103FIRST_SHEET_ONLY_MSG = ( 

104 "Only reading the first sheet of this file. See " 

105 f"--{USE_SPREADSHEET_NAMES}." 

106) 

107WARNING_VALUES_VISIBLE = ( 

108 "WARNING: not suitable for production use (may show actual data values). " 

109 "Use for testing only." 

110) 

111 

112 

113class SheetFiletypes: 

114 CSV = ".csv" 

115 ODS = ".ods" 

116 TSV = ".tsv" 

117 XLSX = ".xlsx" 

118 ZIP = ".zip" 

119 

120 @staticmethod 

121 def get_ext_lower(path: Path) -> str: 

122 """ 

123 Returns the extension. 

124 """ 

125 return path.suffix.lower() 

126 

127 @classmethod 

128 def is_single_sheet_filetype(cls, path: Path) -> bool: 

129 """ 

130 Does this file extension indicate a file type containing just a single 

131 sheet/table of values? 

132 """ 

133 ext = path.suffix.lower() 

134 return ext in (cls.CSV, cls.TSV) 

135 

136 @classmethod 

137 def is_multisheet_filetype(cls, path: Path) -> bool: 

138 """ 

139 Does this file extension indicate a file type containing just multiple 

140 sheets/tables of values? 

141 """ 

142 ext = path.suffix.lower() 

143 return ext in (cls.ODS, cls.XLSX) 

144 

145 @classmethod 

146 def get_read_mode(cls, path: Path) -> str: 

147 """ 

148 What file reading mode to use, i.e. is it binary or text? 

149 """ 

150 ext = path.suffix.lower() 

151 return "r" if ext in (cls.CSV, cls.TSV) else "rb" 

152 

153 @classmethod 

154 def is_csv(cls, path: Path) -> bool: 

155 return path.suffix.lower() == cls.CSV 

156 

157 @classmethod 

158 def is_ods(cls, path: Path) -> bool: 

159 return path.suffix.lower() == cls.ODS 

160 

161 @classmethod 

162 def is_tsv(cls, path: Path) -> bool: 

163 return path.suffix.lower() == cls.TSV 

164 

165 @classmethod 

166 def is_xlsx(cls, path: Path) -> bool: 

167 return path.suffix.lower() == cls.XLSX 

168 

169 @classmethod 

170 def is_zip(cls, path: Path) -> bool: 

171 return path.suffix.lower() == cls.ZIP 

172 

173 

174# ============================================================================= 

175# Data type detection functions 

176# ============================================================================= 

177 

178 

179def does_datetime_have_zero_time( 

180 d: Union[datetime.datetime, pendulum.DateTime] 

181) -> bool: 

182 """ 

183 Does a given datetime-like object have all its time fields set to zero? 

184 """ 

185 return d.hour == d.minute == d.second == d.microsecond == 0 

186 

187 

188def is_date_like_not_datetime_like(v: Any) -> bool: 

189 """ 

190 Does this look like a date (but not a datetime)? 

191 """ 

192 if isinstance(v, datetime.date): 

193 # NB a datetime.datetime is also an instance of datetime.date (but not 

194 # the reverse) 

195 if isinstance(v, datetime.datetime): 

196 return does_datetime_have_zero_time(v) 

197 else: 

198 return True 

199 elif isinstance(v, pendulum.Date): 

200 # Likewise, pendulum.DateTime is an instance of pendulum.Date. 

201 if isinstance(v, pendulum.DateTime): 

202 return does_datetime_have_zero_time(v) 

203 else: 

204 return True 

205 elif isinstance(v, str): 

206 try: 

207 d = pendulum.parse(v) 

208 except (pendulum.parsing.exceptions.ParserError, ValueError): 

209 return False 

210 # pendulum.parse() can return a pendulum.DateTime, or a 

211 # pendulum.Duration. 

212 if isinstance(d, pendulum.DateTime): 

213 return does_datetime_have_zero_time(d) 

214 else: 

215 return False 

216 else: 

217 return False 

218 

219 

220def is_datetime_or_date_like(v: Any) -> bool: 

221 """ 

222 Does this look like a datetime (or a date)? 

223 """ 

224 if isinstance( 

225 v, (datetime.date, datetime.datetime, pendulum.Date, pendulum.DateTime) 

226 ): 

227 return True 

228 elif isinstance(v, str): 

229 try: 

230 d = pendulum.parse(v) 

231 except (pendulum.parsing.exceptions.ParserError, ValueError): 

232 return False 

233 return isinstance(d, pendulum.DateTime) 

234 else: 

235 return False 

236 

237 

238# ============================================================================= 

239# Column type detection 

240# ============================================================================= 

241 

242 

243def mk_columns( 

244 datagen: Iterable[SPREADSHEET_DICT_ROW_TYPE], verbose: bool = False 

245) -> List[Column]: 

246 """ 

247 Attempt to autodetect SQLAlchemy column types. 

248 

249 Args: 

250 datagen: 

251 Generator of data. 

252 verbose: 

253 Be verbose and report values? (WARNING: therefore unsuitable for 

254 production use.) 

255 """ 

256 coldict = {} # type: Dict[str, ColumnTypeDetector] 

257 for row in datagen: 

258 for k, v in row.items(): 

259 if k is None: 

260 raise ValueError( 

261 "Sheet has missing column headings; headings are: " 

262 f"{list(row.keys())}" 

263 ) 

264 try: 

265 d = coldict[k] 

266 except KeyError: 

267 d = ColumnTypeDetector( 

268 k, default_type=DEFAULT_COL_TYPE, verbose=verbose 

269 ) 

270 coldict[k] = d 

271 d.inspect(v) 

272 return [c.sqlalchemy_column() for c in coldict.values()] 

273 

274 

275# ============================================================================= 

276# Helper classes 

277# ============================================================================= 

278 

279 

280class TabularFileInfo: 

281 """ 

282 Simple class to represent information about a potential database table, 

283 from a tabular data file format. 

284 """ 

285 

286 def __init__( 

287 self, 

288 tablename: str, 

289 metadata: MetaData, 

290 engine: Engine, 

291 datagen: Iterable[SPREADSHEET_DICT_ROW_TYPE] = None, 

292 with_columns_from_data: bool = False, 

293 with_columns_from_reflection: bool = False, 

294 with_data: bool = False, 

295 verbose: bool = False, 

296 ) -> None: 

297 """ 

298 Args: 

299 tablename: 

300 Name of the table. 

301 metadata: 

302 Database MetaData object. 

303 engine: 

304 SQLAlchemy engine. 

305 datagen: 

306 Optional iterable to provide data. (Must be supplied if 

307 with_columns or with_data is True.) 

308 with_columns_from_data: 

309 Read/autodetect column information from data, for creating 

310 tables? 

311 with_columns_from_reflection: 

312 Should columns be read from the metadata? 

313 with_data: 

314 Provide data? 

315 verbose: 

316 Be verbose if the process fails? WARNING: will report values. 

317 """ 

318 # If requested, a list of SQLAlchemy columns to create: 

319 self.columns = None # type: Optional[List[Column]] 

320 # If requested, a generator for data (with each dictionary mapping 

321 # column name to value): 

322 self.data_generator = ( 

323 None 

324 ) # type: Optional[Iterable[SPREADSHEET_DICT_ROW_TYPE]] 

325 

326 # Internal cached SQLAlchemy table: 

327 self._table = None # type: Optional[Table] 

328 self.table_exists_in_database = False 

329 if with_columns_from_reflection: 

330 try: 

331 self._table = metadata.tables[tablename] 

332 log.info(f"Read table from database: {tablename}") 

333 self.table_exists_in_database = True 

334 with_columns_from_data = False 

335 self.columns = self._table.columns 

336 except KeyError: 

337 log.info(f"Table not found in database: {tablename}") 

338 if not with_columns_from_data: 

339 raise ValueError( 

340 f"Table {tablename!r} not found in the database. " 

341 f"Did you mean to turn on table creation?" 

342 ) 

343 

344 if with_columns_from_data or with_data: 

345 assert datagen is not None 

346 # Don't use a generator twice: 

347 if with_columns_from_data and with_data: 

348 data = list(datagen) # consumes the generator 

349 self.columns = mk_columns(data, verbose=verbose) 

350 self.data_generator = data 

351 elif with_columns_from_data: 

352 self.columns = mk_columns(datagen, verbose=verbose) 

353 # ... consumes the generator 

354 elif with_data: 

355 self.data_generator = datagen 

356 # generator unused, ready for use 

357 

358 self.tablename = tablename 

359 self.metadata = metadata 

360 self.engine = engine 

361 self.with_columns_from_reflection = with_columns_from_reflection 

362 self.with_data = with_data 

363 self.with_columns_from_data = with_columns_from_data 

364 

365 def has_columns(self) -> bool: 

366 """ 

367 Do we have at least one column? 

368 """ 

369 return bool(self.columns) 

370 

371 def validate_columns(self) -> None: 

372 """ 

373 Validates columns, or raises ValueError. 

374 """ 

375 if self.columns is None: 

376 log.warning("Validating columns, but there aren't any") 

377 return 

378 if not self.columns: 

379 raise ValueError(f"Table {self.tablename} has no columns") 

380 colnames = [c.name for c in self.columns] 

381 if len(colnames) != len(set(colnames)): 

382 raise ValueError( 

383 f"Table {self.tablename} has duplicate columns: {colnames!r}" 

384 ) 

385 

386 def colreport(self) -> str: 

387 """ 

388 A text-format report of our columns. 

389 """ 

390 if self.columns is None: 

391 raise ValueError( 

392 f"Can't produce column report: table {self.tablename} " 

393 f"has no columns" 

394 ) 

395 return "\n".join(f"- {c!r}" for c in self.columns) 

396 

397 def table(self) -> Table: 

398 """ 

399 Returns an SQLAlchemy table object. Caches this across requests (or 

400 SQLAlchemy will complain that we re-assign columns to a table). Assumes 

401 that the MetaData object WILL NOT CHANGE ACROSS CALLS. 

402 """ 

403 if self._table is None: 

404 self._table = Table( 

405 self.tablename, self.metadata, *(self.columns or ()) 

406 ) 

407 return self._table 

408 

409 def drop_table(self) -> None: 

410 """ 

411 Drop a database table, if it exists. 

412 """ 

413 log.info(f"Dropping table (if it exists): {self.tablename}") 

414 # See also crate_anon.anonymise.subset_db.drop_dst_table_if_exists(). 

415 t = self.table() # doesn't need columns 

416 t.drop(self.engine, checkfirst=True) 

417 # No COMMIT required after DDL. 

418 self.metadata.remove(t) # otherwise we may struggle to re-create it 

419 self.table_exists_in_database = False 

420 

421 def create_table(self) -> None: 

422 """ 

423 Create a database table, if it doesn't already exist. 

424 """ 

425 if self.table_exists_in_database: 

426 log.info(f"Table already exists, not creating: {self.tablename}") 

427 return 

428 log.info(f"Creating table: {self.tablename}\n{self.colreport()}") 

429 t = self.table() 

430 try: 

431 t.create(self.engine, checkfirst=True) 

432 except InvalidRequestError: 

433 log.warning( 

434 "Table already exists, unexpectedly; not re-creating: " 

435 f"{self.tablename}" 

436 ) 

437 # No COMMIT required after DDL. 

438 

439 

440class ColumnTypeDetector: 

441 """ 

442 Class to inspect values from a spreadsheet column, and make a decision 

443 about what kind of SQLAlchemy column should be used. 

444 """ 

445 

446 def __init__( 

447 self, 

448 colname: str, 

449 values: Iterable[Any] = None, 

450 default_type: TypeEngine = None, 

451 verbose: bool = False, 

452 ) -> None: 

453 """ 

454 Args: 

455 colname: 

456 Future column name. 

457 values: 

458 Optional values to inspect immediately. 

459 default_type: 

460 Type to use if no non-NULL data whatsoever is seen. 

461 verbose: 

462 Report values on failure. WARNING: unsuitable, for this reason, 

463 for production code. 

464 """ 

465 if not colname: 

466 raise ValueError("Missing column name") 

467 if "\t" in colname: 

468 raise ValueError( 

469 f"Likely TSV file being read as CSV, because column name " 

470 f"includes tabs: {colname!r}" 

471 ) 

472 

473 self.colname = colname 

474 self.default_type = default_type 

475 self.verbose = verbose 

476 

477 # None/NULL types: 

478 self.seen_none = False 

479 

480 # Sanity check 

481 self.seen_something_not_none = False 

482 

483 # Non-string types: 

484 self.seen_bool = False 

485 self.seen_int = False 

486 self.seen_float = False 

487 

488 # String and string-derived types: 

489 self.seen_str = False 

490 self.seen_date = False 

491 self.seen_datetime = False 

492 self.seen_str_not_date_or_datetime = False 

493 self.max_strlen = 0 

494 

495 # For verbose mode: 

496 self.inspected = set() 

497 

498 # If created with values: 

499 if values is not None: 

500 for v in values: 

501 self.inspect(v) 

502 

503 def __str__(self) -> str: 

504 """ 

505 String representation. 

506 """ 

507 try: 

508 c = self.sqlalchemy_column() 

509 except ValueError: 

510 c = "<no_known_column_type>" 

511 return f"{self.colname}: {c}" 

512 

513 def inspect(self, v: Any) -> None: 

514 """ 

515 Inspect a new value. 

516 """ 

517 if self.verbose: 

518 self.inspected.add(v) 

519 if v is None: 

520 self.seen_none = True 

521 return 

522 self.seen_something_not_none = True 

523 if isinstance(v, bool): 

524 self.seen_bool = True 

525 return 

526 if isinstance(v, int): 

527 self.seen_int = True 

528 return 

529 if isinstance(v, float): 

530 self.seen_float = True 

531 return 

532 is_str = isinstance(v, str) 

533 if is_date_like_not_datetime_like(v): 

534 self.seen_date = True 

535 if is_str: 

536 self.max_strlen = max(len(v), self.max_strlen) 

537 elif is_datetime_or_date_like(v): 

538 self.seen_datetime = True 

539 if is_str: 

540 self.max_strlen = max(len(v), self.max_strlen) 

541 elif is_str: 

542 # NB A string that does not look like a date/datetime. 

543 self.seen_str = True 

544 self.max_strlen = max(len(v), self.max_strlen) 

545 self.seen_str_not_date_or_datetime = True 

546 else: 

547 raise ValueError(f"Unexpected value of type {type(v)}") 

548 # Implausible that we will get a BLOB via a spreadsheet. 

549 

550 def _values_seen_suffix(self) -> str: 

551 """ 

552 In verbose mode, returns a string suffix (for error messages) showing 

553 what values we've seen. 

554 """ 

555 return ": " + repr(self.inspected) if self.verbose else "" 

556 

557 def _sqla_coltype(self) -> TypeEngine: 

558 """ 

559 Returns the SQLAlchemy column type to use, or raises ValueError. 

560 

561 Copes with simple types (and NULL/None values): 

562 

563 - bool -> Boolean 

564 - int -> BigInteger 

565 - float -> Float 

566 - date -> Date 

567 - datetime -> DateTime 

568 - str (interpretable as date) -> Date 

569 - str (interpretable as datetime, but not date) -> DateTime 

570 - str (otherwise) -> String (of the maximum length seen) 

571 

572 Copes with mixtures: 

573 

574 - int + float -> Float 

575 - date and date-as-str ("date-like") -> Date 

576 - datetime and datetime-as-str ("datetime-like") -> DateTime 

577 - date-like + datetime-like -> DateTime 

578 - {date-like or datetime-like} and {str not interpretable as date or 

579 datetime} -> String 

580 

581 Failure conditions (raises ValueError): 

582 

583 - nothing seen 

584 - only NULL/None values seen 

585 - more than one type among {bool, int, float} 

586 - string type and {bool, int, float} type 

587 - something not recognized as above 

588 

589 """ 

590 if not self.seen_something_not_none: 

591 if self.default_type is not None: 

592 return self.default_type 

593 raise ValueError( 

594 f"Column type {self.colname}: no data seen yet and no " 

595 f"default type" 

596 ) 

597 is_numeric = self.seen_int or self.seen_float 

598 n_non_string_based = sum([self.seen_bool, is_numeric]) 

599 if n_non_string_based > 1: 

600 raise ValueError( 

601 f"Column {self.colname}: mixed non-string types" 

602 + self._values_seen_suffix() 

603 ) 

604 if n_non_string_based > 0 and self.seen_str: 

605 raise ValueError( 

606 f"Column {self.colname}: mixed string/non-string types" 

607 + self._values_seen_suffix() 

608 ) 

609 # If we get here, either it's string-derived, or a single non-string 

610 # type. 

611 if self.seen_bool: 

612 return Boolean() 

613 elif is_numeric: 

614 if self.seen_float: 

615 return Float() 

616 else: 

617 return BigInteger() 

618 elif self.seen_str_not_date_or_datetime: 

619 return String(length=max(1, self.max_strlen)) 

620 elif self.seen_datetime: 

621 return DateTime() 

622 elif self.seen_date: 

623 return Date() 

624 else: 

625 raise AssertionError("Type analysis bug") 

626 

627 def sqlalchemy_column(self, nullable: bool = None) -> Column: 

628 """ 

629 Returns an SQLAlchemy Column object (free-floating, i.e. with no table 

630 attached), or raises ValueError. 

631 

632 Args: 

633 nullable: 

634 Should the column be NULL-capable? Use True for "NULL", False 

635 for "NOT NULL", and None for "NULL if NULL/None values have 

636 been seen, otherwise NOT NULL". 

637 """ 

638 if nullable is None: 

639 nullable = self.seen_none 

640 elif not nullable and self.seen_none: 

641 raise ValueError( 

642 f"Column {self.colname}: requested nullable=False but have " 

643 f"seen a NULL value" 

644 ) 

645 return Column( 

646 self.colname, 

647 self._sqla_coltype(), 

648 nullable=nullable, 

649 ) 

650 

651 

652# ============================================================================= 

653# Spreadsheet generator functions 

654# ============================================================================= 

655 

656 

657def ods_row_to_list(row: Iterable[Any]) -> List[Any]: 

658 """ 

659 Convert an OpenOffice ODS row to a list of values, translating the empty 

660 string (used for empty cells) to None. 

661 """ 

662 return [None if v == "" else v for v in row] 

663 

664 

665def xlsx_row_to_list(row: Iterable[Cell]) -> List[Any]: 

666 """ 

667 Convert an OpenPyXL XLSX row to a list of values, translating the empty 

668 string (used for empty cells) to None. 

669 """ 

670 return [None if cell.value == "" else cell.value for cell in row] 

671 

672 

673def dict_from_rows( 

674 row_iterator: Iterable[Iterable], 

675 row_to_list_fn: Callable[[Iterable], List], 

676) -> Generator[Dict, None, None]: 

677 """ 

678 Iterate through rows (from row_iterator); apply row_to_list_fn() to each; 

679 yield dictionaries mapping column names to values. 

680 """ 

681 headings = [] # type: List[str] 

682 first_row = True 

683 for row in row_iterator: 

684 values = row_to_list_fn(row) 

685 if first_row: 

686 headings = values 

687 first_row = False 

688 else: 

689 if not bool(list(filter(None, values))): 

690 # skip blank rows 

691 continue 

692 # Care required here. If values is shorter than headings, zip() 

693 # will just discard headings that don't have a value. So use 

694 # zip_longest(). 

695 d = dict(zip_longest(headings, values)) 

696 yield d 

697 

698 

699def translate_empty_str_to_none( 

700 reader: Iterable[Dict[str, Any]] 

701) -> Generator[SPREADSHEET_DICT_ROW_TYPE, None, None]: 

702 """ 

703 Yield dictionaries (mapping column name to value), but 

704 (a) translating blank strings (often the product of empty cells e.g. with 

705 csv.DictReader) to None; (b) skipping entirely blank rows. 

706 

707 Args: 

708 reader: 

709 For example, a csv.DictReader(). 

710 """ 

711 for row in reader: 

712 d = {k: (None if v == "" else v) for k, v in row.items()} 

713 if d: # skip blank rows 

714 yield d 

715 

716 

717def gen_dicts_from_csv( 

718 fileobj: TextIO, 

719) -> Generator[SPREADSHEET_DICT_ROW_TYPE, None, None]: 

720 """ 

721 Generates value dictionaries from a CSV file. 

722 """ 

723 reader = csv.DictReader(fileobj) 

724 yield from translate_empty_str_to_none(reader) 

725 

726 

727def gen_dicts_from_tsv( 

728 fileobj: TextIO, 

729) -> Generator[SPREADSHEET_DICT_ROW_TYPE, None, None]: 

730 """ 

731 Generates value dictionaries from a TSV file. 

732 """ 

733 reader = csv.DictReader(fileobj, delimiter="\t") 

734 yield from translate_empty_str_to_none(reader) 

735 

736 

737def gen_sheets_from_ods( 

738 fileobj: BinaryIO, first_sheet_only: bool = False 

739) -> Generator[Tuple[str, Iterable[SPREADSHEET_DICT_ROW_TYPE]], None, None]: 

740 """ 

741 Generates tuples of (sheet name, iterable-of-value-dictionaries) from an 

742 ODS file. 

743 """ 

744 data = pyexcel_ods.get_data(fileobj) 

745 # That's an ordered dictionary, whose keys are spreadsheet names. 

746 if first_sheet_only: 

747 log.warning(FIRST_SHEET_ONLY_MSG) 

748 for sheetname, sheetdata in data.items(): 

749 yield sheetname, dict_from_rows( 

750 sheetdata, row_to_list_fn=ods_row_to_list 

751 ) 

752 if first_sheet_only: 

753 return 

754 

755 

756def gen_sheets_from_xlsx( 

757 fileobj: BinaryIO, first_sheet_only: bool = False 

758) -> Generator[Tuple[str, Iterable[SPREADSHEET_DICT_ROW_TYPE]], None, None]: 

759 """ 

760 Generates tuples of (sheet name, iterable-of-value-dictionaries) from an 

761 Excel XLSX file. 

762 """ 

763 workbook = openpyxl.load_workbook( 

764 fileobj, 

765 read_only=True, 

766 keep_vba=False, 

767 data_only=True, 

768 keep_links=False, 

769 ) 

770 # No obvious bug now (with openpyxl==3.0.7) with read-only mode. 

771 if first_sheet_only: 

772 log.warning(FIRST_SHEET_ONLY_MSG) 

773 for worksheet in workbook.worksheets: 

774 yield worksheet.title, dict_from_rows( 

775 worksheet.iter_rows(), row_to_list_fn=xlsx_row_to_list 

776 ) 

777 if first_sheet_only: 

778 return 

779 

780 

781# ============================================================================= 

782# File-related generator functions 

783# ============================================================================= 

784 

785 

786def gen_files_from_zipfile( 

787 zipfilename: Union[Path, str] 

788) -> Generator[Tuple[Path, IO], None, None]: 

789 """ 

790 Iterates ZIP file(s), yielding filenames and corresponding file-like 

791 objects from within it/them. 

792 

793 Args: 

794 zipfilename: filename of the ``.zip`` file 

795 

796 Yields: 

797 tuple (Path, file-like object) for each inner file 

798 

799 NB related to ``cardinal_pythonlib.file_io.gen_files_from_zipfiles``, but 

800 simpler and also provides the filenames. 

801 """ 

802 with zipfile.ZipFile(zipfilename) as zf: 

803 infolist = zf.infolist() # type: List[zipfile.ZipInfo] 

804 infolist.sort(key=attrgetter("filename")) 

805 for zipinfo in infolist: 

806 log.info( 

807 f"Within zip file: {zipfilename} - " 

808 f"reading subfile: {zipinfo.filename}" 

809 ) 

810 with tempfile.TemporaryDirectory() as tmpdir: 

811 zf.extract(zipinfo.filename, tmpdir) 

812 diskpath = Path(tmpdir) / zipinfo.filename 

813 with open( 

814 diskpath, SheetFiletypes.get_read_mode(diskpath) 

815 ) as subfile: 

816 yield Path(zipinfo.filename), subfile 

817 

818 

819def gen_filename_fileobj( 

820 filenames: List[str], 

821) -> Generator[Tuple[Path, IO], None, None]: 

822 """ 

823 Iterates files, yielding (filename, file-like object) tuples. If a file is 

824 a ZIP file, iterate within it similarly (but not recursively). 

825 

826 Args: 

827 filenames: 

828 Filenames to process. 

829 

830 Yields: 

831 tuple (Path, file-like object) for each inner file 

832 """ 

833 for filename in filenames: 

834 p = Path(filename) 

835 if not p.is_file(): 

836 raise ValueError(f"Not a file: {p}") 

837 log.info(f">>> Processing file: {p}") 

838 if SheetFiletypes.is_zip(p): 

839 yield from gen_files_from_zipfile(p) 

840 else: 

841 with open(p, SheetFiletypes.get_read_mode(p)) as f: 

842 yield p, f 

843 

844 

845def gen_tablename_info( 

846 filenames: List[str], 

847 metadata: MetaData, 

848 engine: Engine, 

849 use_spreadsheet_names: bool = True, 

850 with_columns_from_data: bool = False, 

851 with_columns_from_reflection: bool = False, 

852 with_data: bool = False, 

853 skip_tables: List[str] = None, 

854 verbose: bool = False, 

855) -> Generator[TabularFileInfo, None, None]: 

856 """ 

857 Args: 

858 filenames: 

859 Filenames to iterate through. 

860 metadata: 

861 Database MetaData object. 

862 engine: 

863 SQLAlchemy engine. 

864 use_spreadsheet_names: 

865 Use spreadsheet names (where relevant) as table names, rather than 

866 filenames. (If False, only the first sheet in each spreadsheet 

867 file will be used.) 

868 with_columns_from_data: 

869 Read/autodetect column information from data, for creating 

870 tables? 

871 with_columns_from_reflection: 

872 Should columns be read from the metadata? 

873 with_data: 

874 Provide data? 

875 skip_tables: 

876 Optional names of tables to skip. 

877 verbose: 

878 Be verbose if the process fails? WARNING: will report values. 

879 

880 Yields: 

881 TabularFileInfo instances. 

882 """ 

883 skip_tables = skip_tables or [] 

884 for path, fileobj in gen_filename_fileobj(filenames): 

885 if SheetFiletypes.is_single_sheet_filetype(path): 

886 tablename = path.stem 

887 if tablename in skip_tables: 

888 log.warning(f"Skipping table: {tablename}") 

889 continue 

890 if SheetFiletypes.is_csv(path): 

891 dictgen = gen_dicts_from_csv(fileobj) 

892 elif SheetFiletypes.is_tsv(path): 

893 dictgen = gen_dicts_from_tsv(fileobj) 

894 else: 

895 raise AssertionError("Bug") 

896 yield TabularFileInfo( 

897 tablename=tablename, 

898 metadata=metadata, 

899 engine=engine, 

900 datagen=dictgen, 

901 with_columns_from_data=with_columns_from_data, 

902 with_columns_from_reflection=with_columns_from_reflection, 

903 with_data=with_data, 

904 verbose=verbose, 

905 ) 

906 elif SheetFiletypes.is_multisheet_filetype(path): 

907 if SheetFiletypes.is_ods(path): 

908 sheetgen = gen_sheets_from_ods 

909 elif SheetFiletypes.is_xlsx(path): 

910 sheetgen = gen_sheets_from_xlsx 

911 else: 

912 raise AssertionError("Bug") 

913 for sheetname, sheetdatagen in sheetgen( 

914 fileobj, first_sheet_only=not use_spreadsheet_names 

915 ): 

916 log.info(f"... Processing sheet: {sheetname}") 

917 tablename = sheetname if use_spreadsheet_names else path.stem 

918 if tablename in skip_tables: 

919 log.warning(f"Skipping table: {tablename}") 

920 continue 

921 yield TabularFileInfo( 

922 tablename=tablename, 

923 metadata=metadata, 

924 engine=engine, 

925 datagen=sheetdatagen, 

926 with_columns_from_data=with_columns_from_data, 

927 with_columns_from_reflection=with_columns_from_reflection, 

928 with_data=with_data, 

929 verbose=verbose, 

930 ) 

931 else: 

932 log.warning(f"Unknown file type: {path}") 

933 continue 

934 

935 

936# ============================================================================= 

937# Database functios 

938# ============================================================================= 

939 

940 

941def import_table( 

942 ti: TabularFileInfo, 

943 session: Session, 

944 chunksize: int = DEFAULT_CHUNKSIZE, 

945) -> None: 

946 """ 

947 Import a database table. 

948 """ 

949 log.info(f"Importing to table: {ti.tablename}") 

950 t = ti.table() 

951 data = list(ti.data_generator) 

952 for datachunk in batched(data, chunksize): 

953 log.debug(f"Inserting {len(datachunk)} rows...") 

954 session.execute(insert(t), datachunk) 

955 session.commit() 

956 

957 

958# ============================================================================= 

959# Importer 

960# ============================================================================= 

961 

962 

963def auto_import_db( 

964 url: str, 

965 filenames: List[str], 

966 use_spreadsheet_names: bool = True, 

967 drop_tables: bool = False, 

968 create_tables: bool = False, 

969 import_data: bool = False, 

970 chunksize: int = DEFAULT_CHUNKSIZE, 

971 skip_tables: List[str] = None, 

972 echo: bool = False, 

973 verbose: bool = False, 

974) -> None: 

975 """ 

976 Main import function. 

977 

978 Args: 

979 url: 

980 Database URL. 

981 filenames: 

982 Filenames to iterate through. 

983 use_spreadsheet_names: 

984 Use spreadsheet names (where relevant) as table names, rather than 

985 filenames. (If False, only the first sheet in each spreadsheet 

986 file will be used.) 

987 drop_tables: 

988 Drop tables first? 

989 create_tables: 

990 Create tables, if required? 

991 import_data: 

992 Do the actual import? 

993 skip_tables: 

994 Optional names of tables to skip. 

995 chunksize: 

996 Number of records to insert at once. 

997 echo: 

998 Echo SQL? 

999 verbose: 

1000 Be verbose? 

1001 """ 

1002 if drop_tables and not create_tables and import_data: 

1003 raise ValueError( 

1004 "You can't drop tables, not create them, and then hope to import " 

1005 "data" 

1006 ) 

1007 

1008 engine = create_engine(url, echo=echo, future=True) 

1009 safe_url = get_safe_url_from_engine(engine) 

1010 log.info(f"Connected to database: {safe_url}") 

1011 session = sessionmaker(bind=engine, future=True)() # type: Session 

1012 metadata = MetaData() 

1013 

1014 # Reflection: 

1015 # - dropping doesn't need reflection 

1016 # - creation doesn't need reflection 

1017 # - insertion needs Table objects, either from creation or reflection 

1018 # ... so if we're inserting and not creating, we need reflection. 

1019 # But if we create without reflecting, you can get exceptions when creating 

1020 # Table objects on the metadata. So we should reflect for creation too. 

1021 reflect = create_tables or import_data 

1022 if reflect: 

1023 log.info("Reading table structure from database...") 

1024 metadata.reflect(bind=engine) # views not required, though 

1025 

1026 log.info("Processing...") 

1027 for ti in gen_tablename_info( 

1028 filenames=filenames, 

1029 metadata=metadata, 

1030 engine=engine, 

1031 use_spreadsheet_names=use_spreadsheet_names, 

1032 with_columns_from_data=create_tables, 

1033 with_columns_from_reflection=reflect, 

1034 with_data=import_data, 

1035 skip_tables=skip_tables, 

1036 verbose=verbose, 

1037 ): 

1038 if drop_tables: 

1039 ti.drop_table() 

1040 if create_tables: 

1041 if not ti.has_columns(): 

1042 log.warning( 

1043 f"Skipping creation for table {ti.tablename!r}, " 

1044 f"which has no columns" 

1045 ) 

1046 continue 

1047 ti.validate_columns() 

1048 ti.create_table() 

1049 if import_data: 

1050 if not ti.has_columns(): 

1051 log.warning( 

1052 f"Skipping import for table {ti.tablename!r}, " 

1053 f"which has no columns" 

1054 ) 

1055 continue 

1056 import_table(ti, session, chunksize=chunksize) 

1057 log.info("Finished.") 

1058 

1059 

1060# ============================================================================= 

1061# Main 

1062# ============================================================================= 

1063 

1064 

1065def main() -> None: 

1066 parser = argparse.ArgumentParser( 

1067 formatter_class=ArgumentDefaultsRichHelpFormatter, 

1068 description=f""" 

1069Take data from one or several tabular files (e.g. CSV, ODS, TSV, XLSX), or ZIP 

1070files containing these. Import that data to a database, if necessary creating 

1071the tables required. Use the filename as the table name (or, with 

1072--{USE_SPREADSHEET_NAMES}, use the names of sheets within multi-sheet 

1073spreadsheet files). The assumption is that within each tabular set of data, the 

1074first row contains column names. The program will attempt to autodetect column 

1075types from the data. 

1076""", 

1077 ) 

1078 parser.add_argument( 

1079 "--url", 

1080 help="SQLAlchemy database URL, to write to.", 

1081 required=True, 

1082 ) 

1083 # For testing, remember e.g. 

1084 # sqlite:////home/rudolf/temp.sqlite 

1085 parser.add_argument( 

1086 f"--{USE_SPREADSHEET_NAMES}", 

1087 dest=USE_SPREADSHEET_NAMES, 

1088 action="store_true", 

1089 default=True, 

1090 help="Use spreadsheet names (where relevant) as table names, rather " 

1091 "than filenames. (If False, only the first sheet in each spreadsheet " 

1092 "file will be used.) This applies only to multi-sheet file formats " 

1093 "such as XLSX; for file formats such as CSV, only filenames can be " 

1094 "used.", 

1095 ) 

1096 parser.add_argument( 

1097 "--use_filenames_only", 

1098 dest=USE_SPREADSHEET_NAMES, 

1099 action="store_false", 

1100 default=False, 

1101 help=f"The opposite of --{USE_SPREADSHEET_NAMES}.", 

1102 ) 

1103 parser.add_argument( 

1104 "--drop_tables", 

1105 action="store_true", 

1106 help="Drop tables first if these exist.", 

1107 ) 

1108 parser.add_argument( 

1109 "--create_tables", 

1110 action="store_true", 

1111 help="Creates tables if these do not exist. Table creation may be " 

1112 "IMPERFECT as it attempts to infer column types from the data.", 

1113 ) 

1114 parser.add_argument( 

1115 "--skip_data", 

1116 action="store_true", 

1117 help="Skip the data import itself.", 

1118 ) 

1119 parser.add_argument( 

1120 "--chunksize", 

1121 type=int, 

1122 default=DEFAULT_CHUNKSIZE, 

1123 help="When inserting rows into the database, insert this many " 

1124 "at a time. (A COMMIT is requested after each complete table.)", 

1125 ) 

1126 parser.add_argument( 

1127 "--skip_tables", type=str, nargs="*", help="Named tables to skip." 

1128 ) 

1129 parser.add_argument( 

1130 "--echo", 

1131 action="store_true", 

1132 help="Echo SQL. " + WARNING_VALUES_VISIBLE, 

1133 ) 

1134 parser.add_argument( 

1135 "--verbose", 

1136 "-v", 

1137 action="store_true", 

1138 help="Be verbose. " + WARNING_VALUES_VISIBLE, 

1139 ) 

1140 parser.add_argument( 

1141 "filename", 

1142 type=str, 

1143 nargs="+", 

1144 help="Filename(s) to read. These can be tabular files (CSV, ODS, TSV, " 

1145 "XLSX), or ZIP file(s) containing these. (Recursive ZIPs are not " 

1146 "supported.)", 

1147 ) 

1148 args = parser.parse_args() 

1149 main_only_quicksetup_rootlogger( 

1150 level=logging.DEBUG if args.verbose else logging.INFO 

1151 ) 

1152 

1153 auto_import_db( 

1154 url=args.url, 

1155 filenames=args.filename, 

1156 use_spreadsheet_names=args.use_spreadsheet_names, 

1157 drop_tables=args.drop_tables, 

1158 create_tables=args.create_tables, 

1159 import_data=not args.skip_data, 

1160 chunksize=args.chunksize, 

1161 skip_tables=args.skip_tables, 

1162 echo=args.echo, 

1163 verbose=args.verbose, 

1164 ) 

1165 

1166 

1167if __name__ == "__main__": 

1168 main()