Coverage for preprocess/autoimport_db.py: 44%
418 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/preprocess/autoimport_db.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Automatically import to a database from a collection of tabular files.**
28Efficiency is challenging here. Simple CSV/TSV files are efficiently handled
29as file-like objects, and can be iterated in a low-memory way very fast.
30Spreadsheet-type objects often need to be loaded "whole", so repeat iteration
31is less sensible. We're trying to handle both, so no perfect/simple way.
33Considered but not done:
35- Track min/max for numeric types. This would allow us to refine the integer
36 type. However, there is always the danger that we scan data and create tables
37 for one set of files, then want to import from another, and the latter has
38 more extreme values. So we just use a column type (BigInteger) with wide
39 capabilities. (Having said that, we do track the maximum length of strings!)
41"""
43import argparse
44import csv
45import datetime
46from itertools import zip_longest
47import logging
48from operator import attrgetter
49from pathlib import Path
50import tempfile
51from typing import (
52 Any,
53 BinaryIO,
54 Callable,
55 Dict,
56 Generator,
57 Iterable,
58 IO,
59 List,
60 Optional,
61 TextIO,
62 Tuple,
63 Union,
64)
65import zipfile
67from cardinal_pythonlib.logs import main_only_quicksetup_rootlogger
68from cardinal_pythonlib.sqlalchemy.session import get_safe_url_from_engine
69from rich_argparse import ArgumentDefaultsRichHelpFormatter
70import openpyxl # xlrd might be faster...
71from openpyxl.cell.cell import Cell
72import pendulum
73import pendulum.parsing.exceptions
74import pyexcel_ods
75from sqlalchemy.engine import create_engine, Engine
76from sqlalchemy.exc import InvalidRequestError
77from sqlalchemy.orm.session import sessionmaker, Session
78from sqlalchemy.sql.expression import insert
79from sqlalchemy.sql.schema import Column, MetaData, Table
80from sqlalchemy.sql.sqltypes import (
81 BigInteger,
82 Boolean,
83 Date,
84 DateTime,
85 Float,
86 String,
87)
88from sqlalchemy.sql.type_api import TypeEngine
90from crate_anon.common.future import batched
92log = logging.getLogger(__name__)
95# =============================================================================
96# Constants
97# =============================================================================
99SPREADSHEET_DICT_ROW_TYPE = Dict[str, Any]
100DEFAULT_CHUNKSIZE = int(1e5)
101DEFAULT_COL_TYPE = String(1)
102USE_SPREADSHEET_NAMES = "use_spreadsheet_names"
103FIRST_SHEET_ONLY_MSG = (
104 "Only reading the first sheet of this file. See "
105 f"--{USE_SPREADSHEET_NAMES}."
106)
107WARNING_VALUES_VISIBLE = (
108 "WARNING: not suitable for production use (may show actual data values). "
109 "Use for testing only."
110)
113class SheetFiletypes:
114 CSV = ".csv"
115 ODS = ".ods"
116 TSV = ".tsv"
117 XLSX = ".xlsx"
118 ZIP = ".zip"
120 @staticmethod
121 def get_ext_lower(path: Path) -> str:
122 """
123 Returns the extension.
124 """
125 return path.suffix.lower()
127 @classmethod
128 def is_single_sheet_filetype(cls, path: Path) -> bool:
129 """
130 Does this file extension indicate a file type containing just a single
131 sheet/table of values?
132 """
133 ext = path.suffix.lower()
134 return ext in (cls.CSV, cls.TSV)
136 @classmethod
137 def is_multisheet_filetype(cls, path: Path) -> bool:
138 """
139 Does this file extension indicate a file type containing just multiple
140 sheets/tables of values?
141 """
142 ext = path.suffix.lower()
143 return ext in (cls.ODS, cls.XLSX)
145 @classmethod
146 def get_read_mode(cls, path: Path) -> str:
147 """
148 What file reading mode to use, i.e. is it binary or text?
149 """
150 ext = path.suffix.lower()
151 return "r" if ext in (cls.CSV, cls.TSV) else "rb"
153 @classmethod
154 def is_csv(cls, path: Path) -> bool:
155 return path.suffix.lower() == cls.CSV
157 @classmethod
158 def is_ods(cls, path: Path) -> bool:
159 return path.suffix.lower() == cls.ODS
161 @classmethod
162 def is_tsv(cls, path: Path) -> bool:
163 return path.suffix.lower() == cls.TSV
165 @classmethod
166 def is_xlsx(cls, path: Path) -> bool:
167 return path.suffix.lower() == cls.XLSX
169 @classmethod
170 def is_zip(cls, path: Path) -> bool:
171 return path.suffix.lower() == cls.ZIP
174# =============================================================================
175# Data type detection functions
176# =============================================================================
179def does_datetime_have_zero_time(
180 d: Union[datetime.datetime, pendulum.DateTime]
181) -> bool:
182 """
183 Does a given datetime-like object have all its time fields set to zero?
184 """
185 return d.hour == d.minute == d.second == d.microsecond == 0
188def is_date_like_not_datetime_like(v: Any) -> bool:
189 """
190 Does this look like a date (but not a datetime)?
191 """
192 if isinstance(v, datetime.date):
193 # NB a datetime.datetime is also an instance of datetime.date (but not
194 # the reverse)
195 if isinstance(v, datetime.datetime):
196 return does_datetime_have_zero_time(v)
197 else:
198 return True
199 elif isinstance(v, pendulum.Date):
200 # Likewise, pendulum.DateTime is an instance of pendulum.Date.
201 if isinstance(v, pendulum.DateTime):
202 return does_datetime_have_zero_time(v)
203 else:
204 return True
205 elif isinstance(v, str):
206 try:
207 d = pendulum.parse(v)
208 except (pendulum.parsing.exceptions.ParserError, ValueError):
209 return False
210 # pendulum.parse() can return a pendulum.DateTime, or a
211 # pendulum.Duration.
212 if isinstance(d, pendulum.DateTime):
213 return does_datetime_have_zero_time(d)
214 else:
215 return False
216 else:
217 return False
220def is_datetime_or_date_like(v: Any) -> bool:
221 """
222 Does this look like a datetime (or a date)?
223 """
224 if isinstance(
225 v, (datetime.date, datetime.datetime, pendulum.Date, pendulum.DateTime)
226 ):
227 return True
228 elif isinstance(v, str):
229 try:
230 d = pendulum.parse(v)
231 except (pendulum.parsing.exceptions.ParserError, ValueError):
232 return False
233 return isinstance(d, pendulum.DateTime)
234 else:
235 return False
238# =============================================================================
239# Column type detection
240# =============================================================================
243def mk_columns(
244 datagen: Iterable[SPREADSHEET_DICT_ROW_TYPE], verbose: bool = False
245) -> List[Column]:
246 """
247 Attempt to autodetect SQLAlchemy column types.
249 Args:
250 datagen:
251 Generator of data.
252 verbose:
253 Be verbose and report values? (WARNING: therefore unsuitable for
254 production use.)
255 """
256 coldict = {} # type: Dict[str, ColumnTypeDetector]
257 for row in datagen:
258 for k, v in row.items():
259 if k is None:
260 raise ValueError(
261 "Sheet has missing column headings; headings are: "
262 f"{list(row.keys())}"
263 )
264 try:
265 d = coldict[k]
266 except KeyError:
267 d = ColumnTypeDetector(
268 k, default_type=DEFAULT_COL_TYPE, verbose=verbose
269 )
270 coldict[k] = d
271 d.inspect(v)
272 return [c.sqlalchemy_column() for c in coldict.values()]
275# =============================================================================
276# Helper classes
277# =============================================================================
280class TabularFileInfo:
281 """
282 Simple class to represent information about a potential database table,
283 from a tabular data file format.
284 """
286 def __init__(
287 self,
288 tablename: str,
289 metadata: MetaData,
290 engine: Engine,
291 datagen: Iterable[SPREADSHEET_DICT_ROW_TYPE] = None,
292 with_columns_from_data: bool = False,
293 with_columns_from_reflection: bool = False,
294 with_data: bool = False,
295 verbose: bool = False,
296 ) -> None:
297 """
298 Args:
299 tablename:
300 Name of the table.
301 metadata:
302 Database MetaData object.
303 engine:
304 SQLAlchemy engine.
305 datagen:
306 Optional iterable to provide data. (Must be supplied if
307 with_columns or with_data is True.)
308 with_columns_from_data:
309 Read/autodetect column information from data, for creating
310 tables?
311 with_columns_from_reflection:
312 Should columns be read from the metadata?
313 with_data:
314 Provide data?
315 verbose:
316 Be verbose if the process fails? WARNING: will report values.
317 """
318 # If requested, a list of SQLAlchemy columns to create:
319 self.columns = None # type: Optional[List[Column]]
320 # If requested, a generator for data (with each dictionary mapping
321 # column name to value):
322 self.data_generator = (
323 None
324 ) # type: Optional[Iterable[SPREADSHEET_DICT_ROW_TYPE]]
326 # Internal cached SQLAlchemy table:
327 self._table = None # type: Optional[Table]
328 self.table_exists_in_database = False
329 if with_columns_from_reflection:
330 try:
331 self._table = metadata.tables[tablename]
332 log.info(f"Read table from database: {tablename}")
333 self.table_exists_in_database = True
334 with_columns_from_data = False
335 self.columns = self._table.columns
336 except KeyError:
337 log.info(f"Table not found in database: {tablename}")
338 if not with_columns_from_data:
339 raise ValueError(
340 f"Table {tablename!r} not found in the database. "
341 f"Did you mean to turn on table creation?"
342 )
344 if with_columns_from_data or with_data:
345 assert datagen is not None
346 # Don't use a generator twice:
347 if with_columns_from_data and with_data:
348 data = list(datagen) # consumes the generator
349 self.columns = mk_columns(data, verbose=verbose)
350 self.data_generator = data
351 elif with_columns_from_data:
352 self.columns = mk_columns(datagen, verbose=verbose)
353 # ... consumes the generator
354 elif with_data:
355 self.data_generator = datagen
356 # generator unused, ready for use
358 self.tablename = tablename
359 self.metadata = metadata
360 self.engine = engine
361 self.with_columns_from_reflection = with_columns_from_reflection
362 self.with_data = with_data
363 self.with_columns_from_data = with_columns_from_data
365 def has_columns(self) -> bool:
366 """
367 Do we have at least one column?
368 """
369 return bool(self.columns)
371 def validate_columns(self) -> None:
372 """
373 Validates columns, or raises ValueError.
374 """
375 if self.columns is None:
376 log.warning("Validating columns, but there aren't any")
377 return
378 if not self.columns:
379 raise ValueError(f"Table {self.tablename} has no columns")
380 colnames = [c.name for c in self.columns]
381 if len(colnames) != len(set(colnames)):
382 raise ValueError(
383 f"Table {self.tablename} has duplicate columns: {colnames!r}"
384 )
386 def colreport(self) -> str:
387 """
388 A text-format report of our columns.
389 """
390 if self.columns is None:
391 raise ValueError(
392 f"Can't produce column report: table {self.tablename} "
393 f"has no columns"
394 )
395 return "\n".join(f"- {c!r}" for c in self.columns)
397 def table(self) -> Table:
398 """
399 Returns an SQLAlchemy table object. Caches this across requests (or
400 SQLAlchemy will complain that we re-assign columns to a table). Assumes
401 that the MetaData object WILL NOT CHANGE ACROSS CALLS.
402 """
403 if self._table is None:
404 self._table = Table(
405 self.tablename, self.metadata, *(self.columns or ())
406 )
407 return self._table
409 def drop_table(self) -> None:
410 """
411 Drop a database table, if it exists.
412 """
413 log.info(f"Dropping table (if it exists): {self.tablename}")
414 # See also crate_anon.anonymise.subset_db.drop_dst_table_if_exists().
415 t = self.table() # doesn't need columns
416 t.drop(self.engine, checkfirst=True)
417 # No COMMIT required after DDL.
418 self.metadata.remove(t) # otherwise we may struggle to re-create it
419 self.table_exists_in_database = False
421 def create_table(self) -> None:
422 """
423 Create a database table, if it doesn't already exist.
424 """
425 if self.table_exists_in_database:
426 log.info(f"Table already exists, not creating: {self.tablename}")
427 return
428 log.info(f"Creating table: {self.tablename}\n{self.colreport()}")
429 t = self.table()
430 try:
431 t.create(self.engine, checkfirst=True)
432 except InvalidRequestError:
433 log.warning(
434 "Table already exists, unexpectedly; not re-creating: "
435 f"{self.tablename}"
436 )
437 # No COMMIT required after DDL.
440class ColumnTypeDetector:
441 """
442 Class to inspect values from a spreadsheet column, and make a decision
443 about what kind of SQLAlchemy column should be used.
444 """
446 def __init__(
447 self,
448 colname: str,
449 values: Iterable[Any] = None,
450 default_type: TypeEngine = None,
451 verbose: bool = False,
452 ) -> None:
453 """
454 Args:
455 colname:
456 Future column name.
457 values:
458 Optional values to inspect immediately.
459 default_type:
460 Type to use if no non-NULL data whatsoever is seen.
461 verbose:
462 Report values on failure. WARNING: unsuitable, for this reason,
463 for production code.
464 """
465 if not colname:
466 raise ValueError("Missing column name")
467 if "\t" in colname:
468 raise ValueError(
469 f"Likely TSV file being read as CSV, because column name "
470 f"includes tabs: {colname!r}"
471 )
473 self.colname = colname
474 self.default_type = default_type
475 self.verbose = verbose
477 # None/NULL types:
478 self.seen_none = False
480 # Sanity check
481 self.seen_something_not_none = False
483 # Non-string types:
484 self.seen_bool = False
485 self.seen_int = False
486 self.seen_float = False
488 # String and string-derived types:
489 self.seen_str = False
490 self.seen_date = False
491 self.seen_datetime = False
492 self.seen_str_not_date_or_datetime = False
493 self.max_strlen = 0
495 # For verbose mode:
496 self.inspected = set()
498 # If created with values:
499 if values is not None:
500 for v in values:
501 self.inspect(v)
503 def __str__(self) -> str:
504 """
505 String representation.
506 """
507 try:
508 c = self.sqlalchemy_column()
509 except ValueError:
510 c = "<no_known_column_type>"
511 return f"{self.colname}: {c}"
513 def inspect(self, v: Any) -> None:
514 """
515 Inspect a new value.
516 """
517 if self.verbose:
518 self.inspected.add(v)
519 if v is None:
520 self.seen_none = True
521 return
522 self.seen_something_not_none = True
523 if isinstance(v, bool):
524 self.seen_bool = True
525 return
526 if isinstance(v, int):
527 self.seen_int = True
528 return
529 if isinstance(v, float):
530 self.seen_float = True
531 return
532 is_str = isinstance(v, str)
533 if is_date_like_not_datetime_like(v):
534 self.seen_date = True
535 if is_str:
536 self.max_strlen = max(len(v), self.max_strlen)
537 elif is_datetime_or_date_like(v):
538 self.seen_datetime = True
539 if is_str:
540 self.max_strlen = max(len(v), self.max_strlen)
541 elif is_str:
542 # NB A string that does not look like a date/datetime.
543 self.seen_str = True
544 self.max_strlen = max(len(v), self.max_strlen)
545 self.seen_str_not_date_or_datetime = True
546 else:
547 raise ValueError(f"Unexpected value of type {type(v)}")
548 # Implausible that we will get a BLOB via a spreadsheet.
550 def _values_seen_suffix(self) -> str:
551 """
552 In verbose mode, returns a string suffix (for error messages) showing
553 what values we've seen.
554 """
555 return ": " + repr(self.inspected) if self.verbose else ""
557 def _sqla_coltype(self) -> TypeEngine:
558 """
559 Returns the SQLAlchemy column type to use, or raises ValueError.
561 Copes with simple types (and NULL/None values):
563 - bool -> Boolean
564 - int -> BigInteger
565 - float -> Float
566 - date -> Date
567 - datetime -> DateTime
568 - str (interpretable as date) -> Date
569 - str (interpretable as datetime, but not date) -> DateTime
570 - str (otherwise) -> String (of the maximum length seen)
572 Copes with mixtures:
574 - int + float -> Float
575 - date and date-as-str ("date-like") -> Date
576 - datetime and datetime-as-str ("datetime-like") -> DateTime
577 - date-like + datetime-like -> DateTime
578 - {date-like or datetime-like} and {str not interpretable as date or
579 datetime} -> String
581 Failure conditions (raises ValueError):
583 - nothing seen
584 - only NULL/None values seen
585 - more than one type among {bool, int, float}
586 - string type and {bool, int, float} type
587 - something not recognized as above
589 """
590 if not self.seen_something_not_none:
591 if self.default_type is not None:
592 return self.default_type
593 raise ValueError(
594 f"Column type {self.colname}: no data seen yet and no "
595 f"default type"
596 )
597 is_numeric = self.seen_int or self.seen_float
598 n_non_string_based = sum([self.seen_bool, is_numeric])
599 if n_non_string_based > 1:
600 raise ValueError(
601 f"Column {self.colname}: mixed non-string types"
602 + self._values_seen_suffix()
603 )
604 if n_non_string_based > 0 and self.seen_str:
605 raise ValueError(
606 f"Column {self.colname}: mixed string/non-string types"
607 + self._values_seen_suffix()
608 )
609 # If we get here, either it's string-derived, or a single non-string
610 # type.
611 if self.seen_bool:
612 return Boolean()
613 elif is_numeric:
614 if self.seen_float:
615 return Float()
616 else:
617 return BigInteger()
618 elif self.seen_str_not_date_or_datetime:
619 return String(length=max(1, self.max_strlen))
620 elif self.seen_datetime:
621 return DateTime()
622 elif self.seen_date:
623 return Date()
624 else:
625 raise AssertionError("Type analysis bug")
627 def sqlalchemy_column(self, nullable: bool = None) -> Column:
628 """
629 Returns an SQLAlchemy Column object (free-floating, i.e. with no table
630 attached), or raises ValueError.
632 Args:
633 nullable:
634 Should the column be NULL-capable? Use True for "NULL", False
635 for "NOT NULL", and None for "NULL if NULL/None values have
636 been seen, otherwise NOT NULL".
637 """
638 if nullable is None:
639 nullable = self.seen_none
640 elif not nullable and self.seen_none:
641 raise ValueError(
642 f"Column {self.colname}: requested nullable=False but have "
643 f"seen a NULL value"
644 )
645 return Column(
646 self.colname,
647 self._sqla_coltype(),
648 nullable=nullable,
649 )
652# =============================================================================
653# Spreadsheet generator functions
654# =============================================================================
657def ods_row_to_list(row: Iterable[Any]) -> List[Any]:
658 """
659 Convert an OpenOffice ODS row to a list of values, translating the empty
660 string (used for empty cells) to None.
661 """
662 return [None if v == "" else v for v in row]
665def xlsx_row_to_list(row: Iterable[Cell]) -> List[Any]:
666 """
667 Convert an OpenPyXL XLSX row to a list of values, translating the empty
668 string (used for empty cells) to None.
669 """
670 return [None if cell.value == "" else cell.value for cell in row]
673def dict_from_rows(
674 row_iterator: Iterable[Iterable],
675 row_to_list_fn: Callable[[Iterable], List],
676) -> Generator[Dict, None, None]:
677 """
678 Iterate through rows (from row_iterator); apply row_to_list_fn() to each;
679 yield dictionaries mapping column names to values.
680 """
681 headings = [] # type: List[str]
682 first_row = True
683 for row in row_iterator:
684 values = row_to_list_fn(row)
685 if first_row:
686 headings = values
687 first_row = False
688 else:
689 if not bool(list(filter(None, values))):
690 # skip blank rows
691 continue
692 # Care required here. If values is shorter than headings, zip()
693 # will just discard headings that don't have a value. So use
694 # zip_longest().
695 d = dict(zip_longest(headings, values))
696 yield d
699def translate_empty_str_to_none(
700 reader: Iterable[Dict[str, Any]]
701) -> Generator[SPREADSHEET_DICT_ROW_TYPE, None, None]:
702 """
703 Yield dictionaries (mapping column name to value), but
704 (a) translating blank strings (often the product of empty cells e.g. with
705 csv.DictReader) to None; (b) skipping entirely blank rows.
707 Args:
708 reader:
709 For example, a csv.DictReader().
710 """
711 for row in reader:
712 d = {k: (None if v == "" else v) for k, v in row.items()}
713 if d: # skip blank rows
714 yield d
717def gen_dicts_from_csv(
718 fileobj: TextIO,
719) -> Generator[SPREADSHEET_DICT_ROW_TYPE, None, None]:
720 """
721 Generates value dictionaries from a CSV file.
722 """
723 reader = csv.DictReader(fileobj)
724 yield from translate_empty_str_to_none(reader)
727def gen_dicts_from_tsv(
728 fileobj: TextIO,
729) -> Generator[SPREADSHEET_DICT_ROW_TYPE, None, None]:
730 """
731 Generates value dictionaries from a TSV file.
732 """
733 reader = csv.DictReader(fileobj, delimiter="\t")
734 yield from translate_empty_str_to_none(reader)
737def gen_sheets_from_ods(
738 fileobj: BinaryIO, first_sheet_only: bool = False
739) -> Generator[Tuple[str, Iterable[SPREADSHEET_DICT_ROW_TYPE]], None, None]:
740 """
741 Generates tuples of (sheet name, iterable-of-value-dictionaries) from an
742 ODS file.
743 """
744 data = pyexcel_ods.get_data(fileobj)
745 # That's an ordered dictionary, whose keys are spreadsheet names.
746 if first_sheet_only:
747 log.warning(FIRST_SHEET_ONLY_MSG)
748 for sheetname, sheetdata in data.items():
749 yield sheetname, dict_from_rows(
750 sheetdata, row_to_list_fn=ods_row_to_list
751 )
752 if first_sheet_only:
753 return
756def gen_sheets_from_xlsx(
757 fileobj: BinaryIO, first_sheet_only: bool = False
758) -> Generator[Tuple[str, Iterable[SPREADSHEET_DICT_ROW_TYPE]], None, None]:
759 """
760 Generates tuples of (sheet name, iterable-of-value-dictionaries) from an
761 Excel XLSX file.
762 """
763 workbook = openpyxl.load_workbook(
764 fileobj,
765 read_only=True,
766 keep_vba=False,
767 data_only=True,
768 keep_links=False,
769 )
770 # No obvious bug now (with openpyxl==3.0.7) with read-only mode.
771 if first_sheet_only:
772 log.warning(FIRST_SHEET_ONLY_MSG)
773 for worksheet in workbook.worksheets:
774 yield worksheet.title, dict_from_rows(
775 worksheet.iter_rows(), row_to_list_fn=xlsx_row_to_list
776 )
777 if first_sheet_only:
778 return
781# =============================================================================
782# File-related generator functions
783# =============================================================================
786def gen_files_from_zipfile(
787 zipfilename: Union[Path, str]
788) -> Generator[Tuple[Path, IO], None, None]:
789 """
790 Iterates ZIP file(s), yielding filenames and corresponding file-like
791 objects from within it/them.
793 Args:
794 zipfilename: filename of the ``.zip`` file
796 Yields:
797 tuple (Path, file-like object) for each inner file
799 NB related to ``cardinal_pythonlib.file_io.gen_files_from_zipfiles``, but
800 simpler and also provides the filenames.
801 """
802 with zipfile.ZipFile(zipfilename) as zf:
803 infolist = zf.infolist() # type: List[zipfile.ZipInfo]
804 infolist.sort(key=attrgetter("filename"))
805 for zipinfo in infolist:
806 log.info(
807 f"Within zip file: {zipfilename} - "
808 f"reading subfile: {zipinfo.filename}"
809 )
810 with tempfile.TemporaryDirectory() as tmpdir:
811 zf.extract(zipinfo.filename, tmpdir)
812 diskpath = Path(tmpdir) / zipinfo.filename
813 with open(
814 diskpath, SheetFiletypes.get_read_mode(diskpath)
815 ) as subfile:
816 yield Path(zipinfo.filename), subfile
819def gen_filename_fileobj(
820 filenames: List[str],
821) -> Generator[Tuple[Path, IO], None, None]:
822 """
823 Iterates files, yielding (filename, file-like object) tuples. If a file is
824 a ZIP file, iterate within it similarly (but not recursively).
826 Args:
827 filenames:
828 Filenames to process.
830 Yields:
831 tuple (Path, file-like object) for each inner file
832 """
833 for filename in filenames:
834 p = Path(filename)
835 if not p.is_file():
836 raise ValueError(f"Not a file: {p}")
837 log.info(f">>> Processing file: {p}")
838 if SheetFiletypes.is_zip(p):
839 yield from gen_files_from_zipfile(p)
840 else:
841 with open(p, SheetFiletypes.get_read_mode(p)) as f:
842 yield p, f
845def gen_tablename_info(
846 filenames: List[str],
847 metadata: MetaData,
848 engine: Engine,
849 use_spreadsheet_names: bool = True,
850 with_columns_from_data: bool = False,
851 with_columns_from_reflection: bool = False,
852 with_data: bool = False,
853 skip_tables: List[str] = None,
854 verbose: bool = False,
855) -> Generator[TabularFileInfo, None, None]:
856 """
857 Args:
858 filenames:
859 Filenames to iterate through.
860 metadata:
861 Database MetaData object.
862 engine:
863 SQLAlchemy engine.
864 use_spreadsheet_names:
865 Use spreadsheet names (where relevant) as table names, rather than
866 filenames. (If False, only the first sheet in each spreadsheet
867 file will be used.)
868 with_columns_from_data:
869 Read/autodetect column information from data, for creating
870 tables?
871 with_columns_from_reflection:
872 Should columns be read from the metadata?
873 with_data:
874 Provide data?
875 skip_tables:
876 Optional names of tables to skip.
877 verbose:
878 Be verbose if the process fails? WARNING: will report values.
880 Yields:
881 TabularFileInfo instances.
882 """
883 skip_tables = skip_tables or []
884 for path, fileobj in gen_filename_fileobj(filenames):
885 if SheetFiletypes.is_single_sheet_filetype(path):
886 tablename = path.stem
887 if tablename in skip_tables:
888 log.warning(f"Skipping table: {tablename}")
889 continue
890 if SheetFiletypes.is_csv(path):
891 dictgen = gen_dicts_from_csv(fileobj)
892 elif SheetFiletypes.is_tsv(path):
893 dictgen = gen_dicts_from_tsv(fileobj)
894 else:
895 raise AssertionError("Bug")
896 yield TabularFileInfo(
897 tablename=tablename,
898 metadata=metadata,
899 engine=engine,
900 datagen=dictgen,
901 with_columns_from_data=with_columns_from_data,
902 with_columns_from_reflection=with_columns_from_reflection,
903 with_data=with_data,
904 verbose=verbose,
905 )
906 elif SheetFiletypes.is_multisheet_filetype(path):
907 if SheetFiletypes.is_ods(path):
908 sheetgen = gen_sheets_from_ods
909 elif SheetFiletypes.is_xlsx(path):
910 sheetgen = gen_sheets_from_xlsx
911 else:
912 raise AssertionError("Bug")
913 for sheetname, sheetdatagen in sheetgen(
914 fileobj, first_sheet_only=not use_spreadsheet_names
915 ):
916 log.info(f"... Processing sheet: {sheetname}")
917 tablename = sheetname if use_spreadsheet_names else path.stem
918 if tablename in skip_tables:
919 log.warning(f"Skipping table: {tablename}")
920 continue
921 yield TabularFileInfo(
922 tablename=tablename,
923 metadata=metadata,
924 engine=engine,
925 datagen=sheetdatagen,
926 with_columns_from_data=with_columns_from_data,
927 with_columns_from_reflection=with_columns_from_reflection,
928 with_data=with_data,
929 verbose=verbose,
930 )
931 else:
932 log.warning(f"Unknown file type: {path}")
933 continue
936# =============================================================================
937# Database functios
938# =============================================================================
941def import_table(
942 ti: TabularFileInfo,
943 session: Session,
944 chunksize: int = DEFAULT_CHUNKSIZE,
945) -> None:
946 """
947 Import a database table.
948 """
949 log.info(f"Importing to table: {ti.tablename}")
950 t = ti.table()
951 data = list(ti.data_generator)
952 for datachunk in batched(data, chunksize):
953 log.debug(f"Inserting {len(datachunk)} rows...")
954 session.execute(insert(t), datachunk)
955 session.commit()
958# =============================================================================
959# Importer
960# =============================================================================
963def auto_import_db(
964 url: str,
965 filenames: List[str],
966 use_spreadsheet_names: bool = True,
967 drop_tables: bool = False,
968 create_tables: bool = False,
969 import_data: bool = False,
970 chunksize: int = DEFAULT_CHUNKSIZE,
971 skip_tables: List[str] = None,
972 echo: bool = False,
973 verbose: bool = False,
974) -> None:
975 """
976 Main import function.
978 Args:
979 url:
980 Database URL.
981 filenames:
982 Filenames to iterate through.
983 use_spreadsheet_names:
984 Use spreadsheet names (where relevant) as table names, rather than
985 filenames. (If False, only the first sheet in each spreadsheet
986 file will be used.)
987 drop_tables:
988 Drop tables first?
989 create_tables:
990 Create tables, if required?
991 import_data:
992 Do the actual import?
993 skip_tables:
994 Optional names of tables to skip.
995 chunksize:
996 Number of records to insert at once.
997 echo:
998 Echo SQL?
999 verbose:
1000 Be verbose?
1001 """
1002 if drop_tables and not create_tables and import_data:
1003 raise ValueError(
1004 "You can't drop tables, not create them, and then hope to import "
1005 "data"
1006 )
1008 engine = create_engine(url, echo=echo, future=True)
1009 safe_url = get_safe_url_from_engine(engine)
1010 log.info(f"Connected to database: {safe_url}")
1011 session = sessionmaker(bind=engine, future=True)() # type: Session
1012 metadata = MetaData()
1014 # Reflection:
1015 # - dropping doesn't need reflection
1016 # - creation doesn't need reflection
1017 # - insertion needs Table objects, either from creation or reflection
1018 # ... so if we're inserting and not creating, we need reflection.
1019 # But if we create without reflecting, you can get exceptions when creating
1020 # Table objects on the metadata. So we should reflect for creation too.
1021 reflect = create_tables or import_data
1022 if reflect:
1023 log.info("Reading table structure from database...")
1024 metadata.reflect(bind=engine) # views not required, though
1026 log.info("Processing...")
1027 for ti in gen_tablename_info(
1028 filenames=filenames,
1029 metadata=metadata,
1030 engine=engine,
1031 use_spreadsheet_names=use_spreadsheet_names,
1032 with_columns_from_data=create_tables,
1033 with_columns_from_reflection=reflect,
1034 with_data=import_data,
1035 skip_tables=skip_tables,
1036 verbose=verbose,
1037 ):
1038 if drop_tables:
1039 ti.drop_table()
1040 if create_tables:
1041 if not ti.has_columns():
1042 log.warning(
1043 f"Skipping creation for table {ti.tablename!r}, "
1044 f"which has no columns"
1045 )
1046 continue
1047 ti.validate_columns()
1048 ti.create_table()
1049 if import_data:
1050 if not ti.has_columns():
1051 log.warning(
1052 f"Skipping import for table {ti.tablename!r}, "
1053 f"which has no columns"
1054 )
1055 continue
1056 import_table(ti, session, chunksize=chunksize)
1057 log.info("Finished.")
1060# =============================================================================
1061# Main
1062# =============================================================================
1065def main() -> None:
1066 parser = argparse.ArgumentParser(
1067 formatter_class=ArgumentDefaultsRichHelpFormatter,
1068 description=f"""
1069Take data from one or several tabular files (e.g. CSV, ODS, TSV, XLSX), or ZIP
1070files containing these. Import that data to a database, if necessary creating
1071the tables required. Use the filename as the table name (or, with
1072--{USE_SPREADSHEET_NAMES}, use the names of sheets within multi-sheet
1073spreadsheet files). The assumption is that within each tabular set of data, the
1074first row contains column names. The program will attempt to autodetect column
1075types from the data.
1076""",
1077 )
1078 parser.add_argument(
1079 "--url",
1080 help="SQLAlchemy database URL, to write to.",
1081 required=True,
1082 )
1083 # For testing, remember e.g.
1084 # sqlite:////home/rudolf/temp.sqlite
1085 parser.add_argument(
1086 f"--{USE_SPREADSHEET_NAMES}",
1087 dest=USE_SPREADSHEET_NAMES,
1088 action="store_true",
1089 default=True,
1090 help="Use spreadsheet names (where relevant) as table names, rather "
1091 "than filenames. (If False, only the first sheet in each spreadsheet "
1092 "file will be used.) This applies only to multi-sheet file formats "
1093 "such as XLSX; for file formats such as CSV, only filenames can be "
1094 "used.",
1095 )
1096 parser.add_argument(
1097 "--use_filenames_only",
1098 dest=USE_SPREADSHEET_NAMES,
1099 action="store_false",
1100 default=False,
1101 help=f"The opposite of --{USE_SPREADSHEET_NAMES}.",
1102 )
1103 parser.add_argument(
1104 "--drop_tables",
1105 action="store_true",
1106 help="Drop tables first if these exist.",
1107 )
1108 parser.add_argument(
1109 "--create_tables",
1110 action="store_true",
1111 help="Creates tables if these do not exist. Table creation may be "
1112 "IMPERFECT as it attempts to infer column types from the data.",
1113 )
1114 parser.add_argument(
1115 "--skip_data",
1116 action="store_true",
1117 help="Skip the data import itself.",
1118 )
1119 parser.add_argument(
1120 "--chunksize",
1121 type=int,
1122 default=DEFAULT_CHUNKSIZE,
1123 help="When inserting rows into the database, insert this many "
1124 "at a time. (A COMMIT is requested after each complete table.)",
1125 )
1126 parser.add_argument(
1127 "--skip_tables", type=str, nargs="*", help="Named tables to skip."
1128 )
1129 parser.add_argument(
1130 "--echo",
1131 action="store_true",
1132 help="Echo SQL. " + WARNING_VALUES_VISIBLE,
1133 )
1134 parser.add_argument(
1135 "--verbose",
1136 "-v",
1137 action="store_true",
1138 help="Be verbose. " + WARNING_VALUES_VISIBLE,
1139 )
1140 parser.add_argument(
1141 "filename",
1142 type=str,
1143 nargs="+",
1144 help="Filename(s) to read. These can be tabular files (CSV, ODS, TSV, "
1145 "XLSX), or ZIP file(s) containing these. (Recursive ZIPs are not "
1146 "supported.)",
1147 )
1148 args = parser.parse_args()
1149 main_only_quicksetup_rootlogger(
1150 level=logging.DEBUG if args.verbose else logging.INFO
1151 )
1153 auto_import_db(
1154 url=args.url,
1155 filenames=args.filename,
1156 use_spreadsheet_names=args.use_spreadsheet_names,
1157 drop_tables=args.drop_tables,
1158 create_tables=args.create_tables,
1159 import_data=not args.skip_data,
1160 chunksize=args.chunksize,
1161 skip_tables=args.skip_tables,
1162 echo=args.echo,
1163 verbose=args.verbose,
1164 )
1167if __name__ == "__main__":
1168 main()