yaml_shredder.data_loader

Load generated tables into databases.

  1"""Load generated tables into databases."""
  2
  3import sqlite3
  4from pathlib import Path
  5
  6import pandas as pd
  7
  8
  9class SQLiteLoader:
 10    """Load DataFrames into SQLite database."""
 11
 12    def __init__(self, db_path: str | Path):
 13        """
 14        Initialize SQLite loader.
 15
 16        Args:
 17            db_path: Path to SQLite database file (will be created if doesn't exist)
 18        """
 19        self.db_path = Path(db_path)
 20        self.connection = None
 21        self.loaded_tables = []
 22
 23    def connect(self) -> None:
 24        """Establish connection to SQLite database."""
 25        self.connection = sqlite3.connect(str(self.db_path))
 26        print(f"Connected to SQLite database: {self.db_path}")
 27
 28    def disconnect(self) -> None:
 29        """Close database connection."""
 30        if self.connection:
 31            self.connection.close()
 32            self.connection = None
 33            print("Database connection closed")
 34
 35    def load_tables(
 36        self,
 37        tables: dict[str, pd.DataFrame],
 38        if_exists: str = "replace",
 39        create_indexes: bool = True,
 40    ) -> None:
 41        """
 42        Load multiple tables into SQLite.
 43
 44        Args:
 45            tables: Dictionary of table_name -> DataFrame
 46            if_exists: How to behave if table exists ('fail', 'replace', 'append')
 47            create_indexes: Whether to create indexes on foreign key columns
 48        """
 49        if not self.connection:
 50            self.connect()
 51
 52        self.loaded_tables = []
 53
 54        for table_name, df in tables.items():
 55            self._load_table(table_name, df, if_exists)
 56            self.loaded_tables.append(table_name)
 57
 58            if create_indexes:
 59                self._create_indexes(table_name, df)
 60
 61        print(f"\n✓ Loaded {len(tables)} tables into {self.db_path}")
 62
 63    def _load_table(self, table_name: str, df: pd.DataFrame, if_exists: str = "replace") -> None:
 64        """
 65        Load a single table into SQLite.
 66
 67        Args:
 68            table_name: Name of the table
 69            df: DataFrame to load
 70            if_exists: How to behave if table exists
 71        """
 72        # Convert boolean columns to integers for SQLite
 73        df_copy = df.copy()
 74        for col in df_copy.columns:
 75            if df_copy[col].dtype == "bool":
 76                df_copy[col] = df_copy[col].astype(int)
 77
 78        df_copy.to_sql(table_name, self.connection, if_exists=if_exists, index=False)
 79        print(f"  Loaded {len(df_copy)} rows into table: {table_name}")
 80
 81    def _create_indexes(self, table_name: str, df: pd.DataFrame) -> None:
 82        """
 83        Create indexes on key columns.
 84
 85        Args:
 86            table_name: Name of the table
 87            df: DataFrame to analyze for index creation
 88        """
 89        cursor = self.connection.cursor()
 90
 91        # Create index on id column if exists
 92        if "id" in [c.lower() for c in df.columns]:
 93            id_col = next(c for c in df.columns if c.lower() == "id")
 94            index_name = f"idx_{table_name}_id"
 95            try:
 96                cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{id_col}")')
 97            except sqlite3.OperationalError:
 98                pass  # Index might already exist
 99
100        # Create indexes on foreign key columns (parent_*)
101        fk_columns = [c for c in df.columns if c.startswith("parent_")]
102        for fk_col in fk_columns:
103            index_name = f"idx_{table_name}_{fk_col}"
104            try:
105                cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{fk_col}")')
106            except sqlite3.OperationalError:
107                pass
108
109        self.connection.commit()
110
111    def execute_ddl(self, ddl_statements: dict[str, str]) -> None:
112        """
113        Execute DDL statements to create tables.
114
115        Args:
116            ddl_statements: Dictionary of table_name -> CREATE TABLE statement
117        """
118        if not self.connection:
119            self.connect()
120
121        cursor = self.connection.cursor()
122
123        for table_name, ddl in ddl_statements.items():
124            # SQLite doesn't support ALTER TABLE ADD CONSTRAINT for foreign keys
125            # Skip ALTER statements
126            statements = ddl.split(";")
127            for stmt in statements:
128                stmt = stmt.strip()
129                if stmt and not stmt.startswith("ALTER TABLE"):
130                    try:
131                        cursor.execute(stmt)
132                    except sqlite3.OperationalError as e:
133                        print(f"Warning: Could not execute DDL for {table_name}: {e}")
134
135        self.connection.commit()
136        print(f"✓ Executed DDL for {len(ddl_statements)} tables")
137
138    def query(self, sql: str) -> pd.DataFrame:
139        """
140        Execute a SQL query and return results as DataFrame.
141
142        Args:
143            sql: SQL query to execute
144
145        Returns:
146            Query results as DataFrame
147        """
148        if not self.connection:
149            self.connect()
150
151        return pd.read_sql_query(sql, self.connection)
152
153    def get_table_info(self, table_name: str) -> pd.DataFrame:
154        """
155        Get schema information for a table.
156
157        Args:
158            table_name: Name of the table
159
160        Returns:
161            DataFrame with table schema information
162        """
163        return self.query(f'PRAGMA table_info("{table_name}")')
164
165    def list_tables(self) -> list[str]:
166        """
167        List all tables in the database.
168
169        Returns:
170            List of table names
171        """
172        cursor = self.connection.cursor()
173        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
174        return [row[0] for row in cursor.fetchall()]
175
176    def print_summary(self) -> None:
177        """Print summary of loaded database."""
178        if not self.connection:
179            print("Not connected to database")
180            return
181
182        print(f"\n{'=' * 60}")
183        print("DATABASE SUMMARY")
184        print(f"{'=' * 60}")
185        print(f"Database: {self.db_path}")
186        print(f"Size: {self.db_path.stat().st_size / 1024:.1f} KB")
187
188        tables = self.list_tables()
189        print(f"\nTables: {len(tables)}")
190
191        for table in tables:
192            cursor = self.connection.cursor()
193            cursor.execute(f'SELECT COUNT(*) FROM "{table}"')
194            count = cursor.fetchone()[0]
195            print(f"  - {table}: {count} rows")
196
197    def __enter__(self):
198        """Context manager entry."""
199        self.connect()
200        return self
201
202    def __exit__(self, exc_type, exc_val, exc_tb):
203        """Context manager exit."""
204        self.disconnect()
205
206
207def load_to_sqlite(
208    tables: dict[str, pd.DataFrame],
209    db_path: str | Path,
210    if_exists: str = "replace",
211    create_indexes: bool = True,
212) -> SQLiteLoader:
213    """
214    Quick function to load tables into SQLite.
215
216    Args:
217        tables: Dictionary of table_name -> DataFrame
218        db_path: Path to SQLite database
219        if_exists: How to behave if table exists
220        create_indexes: Whether to create indexes
221
222    Returns:
223        SQLiteLoader instance (still connected)
224    """
225    loader = SQLiteLoader(db_path)
226    loader.connect()
227    loader.load_tables(tables, if_exists, create_indexes)
228    return loader
class SQLiteLoader:
 10class SQLiteLoader:
 11    """Load DataFrames into SQLite database."""
 12
 13    def __init__(self, db_path: str | Path):
 14        """
 15        Initialize SQLite loader.
 16
 17        Args:
 18            db_path: Path to SQLite database file (will be created if doesn't exist)
 19        """
 20        self.db_path = Path(db_path)
 21        self.connection = None
 22        self.loaded_tables = []
 23
 24    def connect(self) -> None:
 25        """Establish connection to SQLite database."""
 26        self.connection = sqlite3.connect(str(self.db_path))
 27        print(f"Connected to SQLite database: {self.db_path}")
 28
 29    def disconnect(self) -> None:
 30        """Close database connection."""
 31        if self.connection:
 32            self.connection.close()
 33            self.connection = None
 34            print("Database connection closed")
 35
 36    def load_tables(
 37        self,
 38        tables: dict[str, pd.DataFrame],
 39        if_exists: str = "replace",
 40        create_indexes: bool = True,
 41    ) -> None:
 42        """
 43        Load multiple tables into SQLite.
 44
 45        Args:
 46            tables: Dictionary of table_name -> DataFrame
 47            if_exists: How to behave if table exists ('fail', 'replace', 'append')
 48            create_indexes: Whether to create indexes on foreign key columns
 49        """
 50        if not self.connection:
 51            self.connect()
 52
 53        self.loaded_tables = []
 54
 55        for table_name, df in tables.items():
 56            self._load_table(table_name, df, if_exists)
 57            self.loaded_tables.append(table_name)
 58
 59            if create_indexes:
 60                self._create_indexes(table_name, df)
 61
 62        print(f"\n✓ Loaded {len(tables)} tables into {self.db_path}")
 63
 64    def _load_table(self, table_name: str, df: pd.DataFrame, if_exists: str = "replace") -> None:
 65        """
 66        Load a single table into SQLite.
 67
 68        Args:
 69            table_name: Name of the table
 70            df: DataFrame to load
 71            if_exists: How to behave if table exists
 72        """
 73        # Convert boolean columns to integers for SQLite
 74        df_copy = df.copy()
 75        for col in df_copy.columns:
 76            if df_copy[col].dtype == "bool":
 77                df_copy[col] = df_copy[col].astype(int)
 78
 79        df_copy.to_sql(table_name, self.connection, if_exists=if_exists, index=False)
 80        print(f"  Loaded {len(df_copy)} rows into table: {table_name}")
 81
 82    def _create_indexes(self, table_name: str, df: pd.DataFrame) -> None:
 83        """
 84        Create indexes on key columns.
 85
 86        Args:
 87            table_name: Name of the table
 88            df: DataFrame to analyze for index creation
 89        """
 90        cursor = self.connection.cursor()
 91
 92        # Create index on id column if exists
 93        if "id" in [c.lower() for c in df.columns]:
 94            id_col = next(c for c in df.columns if c.lower() == "id")
 95            index_name = f"idx_{table_name}_id"
 96            try:
 97                cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{id_col}")')
 98            except sqlite3.OperationalError:
 99                pass  # Index might already exist
100
101        # Create indexes on foreign key columns (parent_*)
102        fk_columns = [c for c in df.columns if c.startswith("parent_")]
103        for fk_col in fk_columns:
104            index_name = f"idx_{table_name}_{fk_col}"
105            try:
106                cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{fk_col}")')
107            except sqlite3.OperationalError:
108                pass
109
110        self.connection.commit()
111
112    def execute_ddl(self, ddl_statements: dict[str, str]) -> None:
113        """
114        Execute DDL statements to create tables.
115
116        Args:
117            ddl_statements: Dictionary of table_name -> CREATE TABLE statement
118        """
119        if not self.connection:
120            self.connect()
121
122        cursor = self.connection.cursor()
123
124        for table_name, ddl in ddl_statements.items():
125            # SQLite doesn't support ALTER TABLE ADD CONSTRAINT for foreign keys
126            # Skip ALTER statements
127            statements = ddl.split(";")
128            for stmt in statements:
129                stmt = stmt.strip()
130                if stmt and not stmt.startswith("ALTER TABLE"):
131                    try:
132                        cursor.execute(stmt)
133                    except sqlite3.OperationalError as e:
134                        print(f"Warning: Could not execute DDL for {table_name}: {e}")
135
136        self.connection.commit()
137        print(f"✓ Executed DDL for {len(ddl_statements)} tables")
138
139    def query(self, sql: str) -> pd.DataFrame:
140        """
141        Execute a SQL query and return results as DataFrame.
142
143        Args:
144            sql: SQL query to execute
145
146        Returns:
147            Query results as DataFrame
148        """
149        if not self.connection:
150            self.connect()
151
152        return pd.read_sql_query(sql, self.connection)
153
154    def get_table_info(self, table_name: str) -> pd.DataFrame:
155        """
156        Get schema information for a table.
157
158        Args:
159            table_name: Name of the table
160
161        Returns:
162            DataFrame with table schema information
163        """
164        return self.query(f'PRAGMA table_info("{table_name}")')
165
166    def list_tables(self) -> list[str]:
167        """
168        List all tables in the database.
169
170        Returns:
171            List of table names
172        """
173        cursor = self.connection.cursor()
174        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
175        return [row[0] for row in cursor.fetchall()]
176
177    def print_summary(self) -> None:
178        """Print summary of loaded database."""
179        if not self.connection:
180            print("Not connected to database")
181            return
182
183        print(f"\n{'=' * 60}")
184        print("DATABASE SUMMARY")
185        print(f"{'=' * 60}")
186        print(f"Database: {self.db_path}")
187        print(f"Size: {self.db_path.stat().st_size / 1024:.1f} KB")
188
189        tables = self.list_tables()
190        print(f"\nTables: {len(tables)}")
191
192        for table in tables:
193            cursor = self.connection.cursor()
194            cursor.execute(f'SELECT COUNT(*) FROM "{table}"')
195            count = cursor.fetchone()[0]
196            print(f"  - {table}: {count} rows")
197
198    def __enter__(self):
199        """Context manager entry."""
200        self.connect()
201        return self
202
203    def __exit__(self, exc_type, exc_val, exc_tb):
204        """Context manager exit."""
205        self.disconnect()

Load DataFrames into SQLite database.

SQLiteLoader(db_path: str | pathlib._local.Path)
13    def __init__(self, db_path: str | Path):
14        """
15        Initialize SQLite loader.
16
17        Args:
18            db_path: Path to SQLite database file (will be created if doesn't exist)
19        """
20        self.db_path = Path(db_path)
21        self.connection = None
22        self.loaded_tables = []

Initialize SQLite loader.

Arguments:
  • db_path: Path to SQLite database file (will be created if doesn't exist)
db_path
connection
loaded_tables
def connect(self) -> None:
24    def connect(self) -> None:
25        """Establish connection to SQLite database."""
26        self.connection = sqlite3.connect(str(self.db_path))
27        print(f"Connected to SQLite database: {self.db_path}")

Establish connection to SQLite database.

def disconnect(self) -> None:
29    def disconnect(self) -> None:
30        """Close database connection."""
31        if self.connection:
32            self.connection.close()
33            self.connection = None
34            print("Database connection closed")

Close database connection.

def load_tables( self, tables: dict[str, pandas.core.frame.DataFrame], if_exists: str = 'replace', create_indexes: bool = True) -> None:
36    def load_tables(
37        self,
38        tables: dict[str, pd.DataFrame],
39        if_exists: str = "replace",
40        create_indexes: bool = True,
41    ) -> None:
42        """
43        Load multiple tables into SQLite.
44
45        Args:
46            tables: Dictionary of table_name -> DataFrame
47            if_exists: How to behave if table exists ('fail', 'replace', 'append')
48            create_indexes: Whether to create indexes on foreign key columns
49        """
50        if not self.connection:
51            self.connect()
52
53        self.loaded_tables = []
54
55        for table_name, df in tables.items():
56            self._load_table(table_name, df, if_exists)
57            self.loaded_tables.append(table_name)
58
59            if create_indexes:
60                self._create_indexes(table_name, df)
61
62        print(f"\n✓ Loaded {len(tables)} tables into {self.db_path}")

Load multiple tables into SQLite.

Arguments:
  • tables: Dictionary of table_name -> DataFrame
  • if_exists: How to behave if table exists ('fail', 'replace', 'append')
  • create_indexes: Whether to create indexes on foreign key columns
def execute_ddl(self, ddl_statements: dict[str, str]) -> None:
112    def execute_ddl(self, ddl_statements: dict[str, str]) -> None:
113        """
114        Execute DDL statements to create tables.
115
116        Args:
117            ddl_statements: Dictionary of table_name -> CREATE TABLE statement
118        """
119        if not self.connection:
120            self.connect()
121
122        cursor = self.connection.cursor()
123
124        for table_name, ddl in ddl_statements.items():
125            # SQLite doesn't support ALTER TABLE ADD CONSTRAINT for foreign keys
126            # Skip ALTER statements
127            statements = ddl.split(";")
128            for stmt in statements:
129                stmt = stmt.strip()
130                if stmt and not stmt.startswith("ALTER TABLE"):
131                    try:
132                        cursor.execute(stmt)
133                    except sqlite3.OperationalError as e:
134                        print(f"Warning: Could not execute DDL for {table_name}: {e}")
135
136        self.connection.commit()
137        print(f"✓ Executed DDL for {len(ddl_statements)} tables")

Execute DDL statements to create tables.

Arguments:
  • ddl_statements: Dictionary of table_name -> CREATE TABLE statement
def query(self, sql: str) -> pandas.core.frame.DataFrame:
139    def query(self, sql: str) -> pd.DataFrame:
140        """
141        Execute a SQL query and return results as DataFrame.
142
143        Args:
144            sql: SQL query to execute
145
146        Returns:
147            Query results as DataFrame
148        """
149        if not self.connection:
150            self.connect()
151
152        return pd.read_sql_query(sql, self.connection)

Execute a SQL query and return results as DataFrame.

Arguments:
  • sql: SQL query to execute
Returns:

Query results as DataFrame

def get_table_info(self, table_name: str) -> pandas.core.frame.DataFrame:
154    def get_table_info(self, table_name: str) -> pd.DataFrame:
155        """
156        Get schema information for a table.
157
158        Args:
159            table_name: Name of the table
160
161        Returns:
162            DataFrame with table schema information
163        """
164        return self.query(f'PRAGMA table_info("{table_name}")')

Get schema information for a table.

Arguments:
  • table_name: Name of the table
Returns:

DataFrame with table schema information

def list_tables(self) -> list[str]:
166    def list_tables(self) -> list[str]:
167        """
168        List all tables in the database.
169
170        Returns:
171            List of table names
172        """
173        cursor = self.connection.cursor()
174        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
175        return [row[0] for row in cursor.fetchall()]

List all tables in the database.

Returns:

List of table names

def print_summary(self) -> None:
177    def print_summary(self) -> None:
178        """Print summary of loaded database."""
179        if not self.connection:
180            print("Not connected to database")
181            return
182
183        print(f"\n{'=' * 60}")
184        print("DATABASE SUMMARY")
185        print(f"{'=' * 60}")
186        print(f"Database: {self.db_path}")
187        print(f"Size: {self.db_path.stat().st_size / 1024:.1f} KB")
188
189        tables = self.list_tables()
190        print(f"\nTables: {len(tables)}")
191
192        for table in tables:
193            cursor = self.connection.cursor()
194            cursor.execute(f'SELECT COUNT(*) FROM "{table}"')
195            count = cursor.fetchone()[0]
196            print(f"  - {table}: {count} rows")

Print summary of loaded database.

def load_to_sqlite( tables: dict[str, pandas.core.frame.DataFrame], db_path: str | pathlib._local.Path, if_exists: str = 'replace', create_indexes: bool = True) -> SQLiteLoader:
208def load_to_sqlite(
209    tables: dict[str, pd.DataFrame],
210    db_path: str | Path,
211    if_exists: str = "replace",
212    create_indexes: bool = True,
213) -> SQLiteLoader:
214    """
215    Quick function to load tables into SQLite.
216
217    Args:
218        tables: Dictionary of table_name -> DataFrame
219        db_path: Path to SQLite database
220        if_exists: How to behave if table exists
221        create_indexes: Whether to create indexes
222
223    Returns:
224        SQLiteLoader instance (still connected)
225    """
226    loader = SQLiteLoader(db_path)
227    loader.connect()
228    loader.load_tables(tables, if_exists, create_indexes)
229    return loader

Quick function to load tables into SQLite.

Arguments:
  • tables: Dictionary of table_name -> DataFrame
  • db_path: Path to SQLite database
  • if_exists: How to behave if table exists
  • create_indexes: Whether to create indexes
Returns:

SQLiteLoader instance (still connected)