yaml_shredder.data_loader
Load generated tables into databases.
1"""Load generated tables into databases.""" 2 3import sqlite3 4from pathlib import Path 5 6import pandas as pd 7 8 9class SQLiteLoader: 10 """Load DataFrames into SQLite database.""" 11 12 def __init__(self, db_path: str | Path): 13 """ 14 Initialize SQLite loader. 15 16 Args: 17 db_path: Path to SQLite database file (will be created if doesn't exist) 18 """ 19 self.db_path = Path(db_path) 20 self.connection = None 21 self.loaded_tables = [] 22 23 def connect(self) -> None: 24 """Establish connection to SQLite database.""" 25 self.connection = sqlite3.connect(str(self.db_path)) 26 print(f"Connected to SQLite database: {self.db_path}") 27 28 def disconnect(self) -> None: 29 """Close database connection.""" 30 if self.connection: 31 self.connection.close() 32 self.connection = None 33 print("Database connection closed") 34 35 def load_tables( 36 self, 37 tables: dict[str, pd.DataFrame], 38 if_exists: str = "replace", 39 create_indexes: bool = True, 40 ) -> None: 41 """ 42 Load multiple tables into SQLite. 43 44 Args: 45 tables: Dictionary of table_name -> DataFrame 46 if_exists: How to behave if table exists ('fail', 'replace', 'append') 47 create_indexes: Whether to create indexes on foreign key columns 48 """ 49 if not self.connection: 50 self.connect() 51 52 self.loaded_tables = [] 53 54 for table_name, df in tables.items(): 55 self._load_table(table_name, df, if_exists) 56 self.loaded_tables.append(table_name) 57 58 if create_indexes: 59 self._create_indexes(table_name, df) 60 61 print(f"\n✓ Loaded {len(tables)} tables into {self.db_path}") 62 63 def _load_table(self, table_name: str, df: pd.DataFrame, if_exists: str = "replace") -> None: 64 """ 65 Load a single table into SQLite. 66 67 Args: 68 table_name: Name of the table 69 df: DataFrame to load 70 if_exists: How to behave if table exists 71 """ 72 # Convert boolean columns to integers for SQLite 73 df_copy = df.copy() 74 for col in df_copy.columns: 75 if df_copy[col].dtype == "bool": 76 df_copy[col] = df_copy[col].astype(int) 77 78 df_copy.to_sql(table_name, self.connection, if_exists=if_exists, index=False) 79 print(f" Loaded {len(df_copy)} rows into table: {table_name}") 80 81 def _create_indexes(self, table_name: str, df: pd.DataFrame) -> None: 82 """ 83 Create indexes on key columns. 84 85 Args: 86 table_name: Name of the table 87 df: DataFrame to analyze for index creation 88 """ 89 cursor = self.connection.cursor() 90 91 # Create index on id column if exists 92 if "id" in [c.lower() for c in df.columns]: 93 id_col = next(c for c in df.columns if c.lower() == "id") 94 index_name = f"idx_{table_name}_id" 95 try: 96 cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{id_col}")') 97 except sqlite3.OperationalError: 98 pass # Index might already exist 99 100 # Create indexes on foreign key columns (parent_*) 101 fk_columns = [c for c in df.columns if c.startswith("parent_")] 102 for fk_col in fk_columns: 103 index_name = f"idx_{table_name}_{fk_col}" 104 try: 105 cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{fk_col}")') 106 except sqlite3.OperationalError: 107 pass 108 109 self.connection.commit() 110 111 def execute_ddl(self, ddl_statements: dict[str, str]) -> None: 112 """ 113 Execute DDL statements to create tables. 114 115 Args: 116 ddl_statements: Dictionary of table_name -> CREATE TABLE statement 117 """ 118 if not self.connection: 119 self.connect() 120 121 cursor = self.connection.cursor() 122 123 for table_name, ddl in ddl_statements.items(): 124 # SQLite doesn't support ALTER TABLE ADD CONSTRAINT for foreign keys 125 # Skip ALTER statements 126 statements = ddl.split(";") 127 for stmt in statements: 128 stmt = stmt.strip() 129 if stmt and not stmt.startswith("ALTER TABLE"): 130 try: 131 cursor.execute(stmt) 132 except sqlite3.OperationalError as e: 133 print(f"Warning: Could not execute DDL for {table_name}: {e}") 134 135 self.connection.commit() 136 print(f"✓ Executed DDL for {len(ddl_statements)} tables") 137 138 def query(self, sql: str) -> pd.DataFrame: 139 """ 140 Execute a SQL query and return results as DataFrame. 141 142 Args: 143 sql: SQL query to execute 144 145 Returns: 146 Query results as DataFrame 147 """ 148 if not self.connection: 149 self.connect() 150 151 return pd.read_sql_query(sql, self.connection) 152 153 def get_table_info(self, table_name: str) -> pd.DataFrame: 154 """ 155 Get schema information for a table. 156 157 Args: 158 table_name: Name of the table 159 160 Returns: 161 DataFrame with table schema information 162 """ 163 return self.query(f'PRAGMA table_info("{table_name}")') 164 165 def list_tables(self) -> list[str]: 166 """ 167 List all tables in the database. 168 169 Returns: 170 List of table names 171 """ 172 cursor = self.connection.cursor() 173 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") 174 return [row[0] for row in cursor.fetchall()] 175 176 def print_summary(self) -> None: 177 """Print summary of loaded database.""" 178 if not self.connection: 179 print("Not connected to database") 180 return 181 182 print(f"\n{'=' * 60}") 183 print("DATABASE SUMMARY") 184 print(f"{'=' * 60}") 185 print(f"Database: {self.db_path}") 186 print(f"Size: {self.db_path.stat().st_size / 1024:.1f} KB") 187 188 tables = self.list_tables() 189 print(f"\nTables: {len(tables)}") 190 191 for table in tables: 192 cursor = self.connection.cursor() 193 cursor.execute(f'SELECT COUNT(*) FROM "{table}"') 194 count = cursor.fetchone()[0] 195 print(f" - {table}: {count} rows") 196 197 def __enter__(self): 198 """Context manager entry.""" 199 self.connect() 200 return self 201 202 def __exit__(self, exc_type, exc_val, exc_tb): 203 """Context manager exit.""" 204 self.disconnect() 205 206 207def load_to_sqlite( 208 tables: dict[str, pd.DataFrame], 209 db_path: str | Path, 210 if_exists: str = "replace", 211 create_indexes: bool = True, 212) -> SQLiteLoader: 213 """ 214 Quick function to load tables into SQLite. 215 216 Args: 217 tables: Dictionary of table_name -> DataFrame 218 db_path: Path to SQLite database 219 if_exists: How to behave if table exists 220 create_indexes: Whether to create indexes 221 222 Returns: 223 SQLiteLoader instance (still connected) 224 """ 225 loader = SQLiteLoader(db_path) 226 loader.connect() 227 loader.load_tables(tables, if_exists, create_indexes) 228 return loader
class
SQLiteLoader:
10class SQLiteLoader: 11 """Load DataFrames into SQLite database.""" 12 13 def __init__(self, db_path: str | Path): 14 """ 15 Initialize SQLite loader. 16 17 Args: 18 db_path: Path to SQLite database file (will be created if doesn't exist) 19 """ 20 self.db_path = Path(db_path) 21 self.connection = None 22 self.loaded_tables = [] 23 24 def connect(self) -> None: 25 """Establish connection to SQLite database.""" 26 self.connection = sqlite3.connect(str(self.db_path)) 27 print(f"Connected to SQLite database: {self.db_path}") 28 29 def disconnect(self) -> None: 30 """Close database connection.""" 31 if self.connection: 32 self.connection.close() 33 self.connection = None 34 print("Database connection closed") 35 36 def load_tables( 37 self, 38 tables: dict[str, pd.DataFrame], 39 if_exists: str = "replace", 40 create_indexes: bool = True, 41 ) -> None: 42 """ 43 Load multiple tables into SQLite. 44 45 Args: 46 tables: Dictionary of table_name -> DataFrame 47 if_exists: How to behave if table exists ('fail', 'replace', 'append') 48 create_indexes: Whether to create indexes on foreign key columns 49 """ 50 if not self.connection: 51 self.connect() 52 53 self.loaded_tables = [] 54 55 for table_name, df in tables.items(): 56 self._load_table(table_name, df, if_exists) 57 self.loaded_tables.append(table_name) 58 59 if create_indexes: 60 self._create_indexes(table_name, df) 61 62 print(f"\n✓ Loaded {len(tables)} tables into {self.db_path}") 63 64 def _load_table(self, table_name: str, df: pd.DataFrame, if_exists: str = "replace") -> None: 65 """ 66 Load a single table into SQLite. 67 68 Args: 69 table_name: Name of the table 70 df: DataFrame to load 71 if_exists: How to behave if table exists 72 """ 73 # Convert boolean columns to integers for SQLite 74 df_copy = df.copy() 75 for col in df_copy.columns: 76 if df_copy[col].dtype == "bool": 77 df_copy[col] = df_copy[col].astype(int) 78 79 df_copy.to_sql(table_name, self.connection, if_exists=if_exists, index=False) 80 print(f" Loaded {len(df_copy)} rows into table: {table_name}") 81 82 def _create_indexes(self, table_name: str, df: pd.DataFrame) -> None: 83 """ 84 Create indexes on key columns. 85 86 Args: 87 table_name: Name of the table 88 df: DataFrame to analyze for index creation 89 """ 90 cursor = self.connection.cursor() 91 92 # Create index on id column if exists 93 if "id" in [c.lower() for c in df.columns]: 94 id_col = next(c for c in df.columns if c.lower() == "id") 95 index_name = f"idx_{table_name}_id" 96 try: 97 cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{id_col}")') 98 except sqlite3.OperationalError: 99 pass # Index might already exist 100 101 # Create indexes on foreign key columns (parent_*) 102 fk_columns = [c for c in df.columns if c.startswith("parent_")] 103 for fk_col in fk_columns: 104 index_name = f"idx_{table_name}_{fk_col}" 105 try: 106 cursor.execute(f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ("{fk_col}")') 107 except sqlite3.OperationalError: 108 pass 109 110 self.connection.commit() 111 112 def execute_ddl(self, ddl_statements: dict[str, str]) -> None: 113 """ 114 Execute DDL statements to create tables. 115 116 Args: 117 ddl_statements: Dictionary of table_name -> CREATE TABLE statement 118 """ 119 if not self.connection: 120 self.connect() 121 122 cursor = self.connection.cursor() 123 124 for table_name, ddl in ddl_statements.items(): 125 # SQLite doesn't support ALTER TABLE ADD CONSTRAINT for foreign keys 126 # Skip ALTER statements 127 statements = ddl.split(";") 128 for stmt in statements: 129 stmt = stmt.strip() 130 if stmt and not stmt.startswith("ALTER TABLE"): 131 try: 132 cursor.execute(stmt) 133 except sqlite3.OperationalError as e: 134 print(f"Warning: Could not execute DDL for {table_name}: {e}") 135 136 self.connection.commit() 137 print(f"✓ Executed DDL for {len(ddl_statements)} tables") 138 139 def query(self, sql: str) -> pd.DataFrame: 140 """ 141 Execute a SQL query and return results as DataFrame. 142 143 Args: 144 sql: SQL query to execute 145 146 Returns: 147 Query results as DataFrame 148 """ 149 if not self.connection: 150 self.connect() 151 152 return pd.read_sql_query(sql, self.connection) 153 154 def get_table_info(self, table_name: str) -> pd.DataFrame: 155 """ 156 Get schema information for a table. 157 158 Args: 159 table_name: Name of the table 160 161 Returns: 162 DataFrame with table schema information 163 """ 164 return self.query(f'PRAGMA table_info("{table_name}")') 165 166 def list_tables(self) -> list[str]: 167 """ 168 List all tables in the database. 169 170 Returns: 171 List of table names 172 """ 173 cursor = self.connection.cursor() 174 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") 175 return [row[0] for row in cursor.fetchall()] 176 177 def print_summary(self) -> None: 178 """Print summary of loaded database.""" 179 if not self.connection: 180 print("Not connected to database") 181 return 182 183 print(f"\n{'=' * 60}") 184 print("DATABASE SUMMARY") 185 print(f"{'=' * 60}") 186 print(f"Database: {self.db_path}") 187 print(f"Size: {self.db_path.stat().st_size / 1024:.1f} KB") 188 189 tables = self.list_tables() 190 print(f"\nTables: {len(tables)}") 191 192 for table in tables: 193 cursor = self.connection.cursor() 194 cursor.execute(f'SELECT COUNT(*) FROM "{table}"') 195 count = cursor.fetchone()[0] 196 print(f" - {table}: {count} rows") 197 198 def __enter__(self): 199 """Context manager entry.""" 200 self.connect() 201 return self 202 203 def __exit__(self, exc_type, exc_val, exc_tb): 204 """Context manager exit.""" 205 self.disconnect()
Load DataFrames into SQLite database.
SQLiteLoader(db_path: str | pathlib._local.Path)
13 def __init__(self, db_path: str | Path): 14 """ 15 Initialize SQLite loader. 16 17 Args: 18 db_path: Path to SQLite database file (will be created if doesn't exist) 19 """ 20 self.db_path = Path(db_path) 21 self.connection = None 22 self.loaded_tables = []
Initialize SQLite loader.
Arguments:
- db_path: Path to SQLite database file (will be created if doesn't exist)
def
connect(self) -> None:
24 def connect(self) -> None: 25 """Establish connection to SQLite database.""" 26 self.connection = sqlite3.connect(str(self.db_path)) 27 print(f"Connected to SQLite database: {self.db_path}")
Establish connection to SQLite database.
def
disconnect(self) -> None:
29 def disconnect(self) -> None: 30 """Close database connection.""" 31 if self.connection: 32 self.connection.close() 33 self.connection = None 34 print("Database connection closed")
Close database connection.
def
load_tables( self, tables: dict[str, pandas.core.frame.DataFrame], if_exists: str = 'replace', create_indexes: bool = True) -> None:
36 def load_tables( 37 self, 38 tables: dict[str, pd.DataFrame], 39 if_exists: str = "replace", 40 create_indexes: bool = True, 41 ) -> None: 42 """ 43 Load multiple tables into SQLite. 44 45 Args: 46 tables: Dictionary of table_name -> DataFrame 47 if_exists: How to behave if table exists ('fail', 'replace', 'append') 48 create_indexes: Whether to create indexes on foreign key columns 49 """ 50 if not self.connection: 51 self.connect() 52 53 self.loaded_tables = [] 54 55 for table_name, df in tables.items(): 56 self._load_table(table_name, df, if_exists) 57 self.loaded_tables.append(table_name) 58 59 if create_indexes: 60 self._create_indexes(table_name, df) 61 62 print(f"\n✓ Loaded {len(tables)} tables into {self.db_path}")
Load multiple tables into SQLite.
Arguments:
- tables: Dictionary of table_name -> DataFrame
- if_exists: How to behave if table exists ('fail', 'replace', 'append')
- create_indexes: Whether to create indexes on foreign key columns
def
execute_ddl(self, ddl_statements: dict[str, str]) -> None:
112 def execute_ddl(self, ddl_statements: dict[str, str]) -> None: 113 """ 114 Execute DDL statements to create tables. 115 116 Args: 117 ddl_statements: Dictionary of table_name -> CREATE TABLE statement 118 """ 119 if not self.connection: 120 self.connect() 121 122 cursor = self.connection.cursor() 123 124 for table_name, ddl in ddl_statements.items(): 125 # SQLite doesn't support ALTER TABLE ADD CONSTRAINT for foreign keys 126 # Skip ALTER statements 127 statements = ddl.split(";") 128 for stmt in statements: 129 stmt = stmt.strip() 130 if stmt and not stmt.startswith("ALTER TABLE"): 131 try: 132 cursor.execute(stmt) 133 except sqlite3.OperationalError as e: 134 print(f"Warning: Could not execute DDL for {table_name}: {e}") 135 136 self.connection.commit() 137 print(f"✓ Executed DDL for {len(ddl_statements)} tables")
Execute DDL statements to create tables.
Arguments:
- ddl_statements: Dictionary of table_name -> CREATE TABLE statement
def
query(self, sql: str) -> pandas.core.frame.DataFrame:
139 def query(self, sql: str) -> pd.DataFrame: 140 """ 141 Execute a SQL query and return results as DataFrame. 142 143 Args: 144 sql: SQL query to execute 145 146 Returns: 147 Query results as DataFrame 148 """ 149 if not self.connection: 150 self.connect() 151 152 return pd.read_sql_query(sql, self.connection)
Execute a SQL query and return results as DataFrame.
Arguments:
- sql: SQL query to execute
Returns:
Query results as DataFrame
def
get_table_info(self, table_name: str) -> pandas.core.frame.DataFrame:
154 def get_table_info(self, table_name: str) -> pd.DataFrame: 155 """ 156 Get schema information for a table. 157 158 Args: 159 table_name: Name of the table 160 161 Returns: 162 DataFrame with table schema information 163 """ 164 return self.query(f'PRAGMA table_info("{table_name}")')
Get schema information for a table.
Arguments:
- table_name: Name of the table
Returns:
DataFrame with table schema information
def
list_tables(self) -> list[str]:
166 def list_tables(self) -> list[str]: 167 """ 168 List all tables in the database. 169 170 Returns: 171 List of table names 172 """ 173 cursor = self.connection.cursor() 174 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") 175 return [row[0] for row in cursor.fetchall()]
List all tables in the database.
Returns:
List of table names
def
print_summary(self) -> None:
177 def print_summary(self) -> None: 178 """Print summary of loaded database.""" 179 if not self.connection: 180 print("Not connected to database") 181 return 182 183 print(f"\n{'=' * 60}") 184 print("DATABASE SUMMARY") 185 print(f"{'=' * 60}") 186 print(f"Database: {self.db_path}") 187 print(f"Size: {self.db_path.stat().st_size / 1024:.1f} KB") 188 189 tables = self.list_tables() 190 print(f"\nTables: {len(tables)}") 191 192 for table in tables: 193 cursor = self.connection.cursor() 194 cursor.execute(f'SELECT COUNT(*) FROM "{table}"') 195 count = cursor.fetchone()[0] 196 print(f" - {table}: {count} rows")
Print summary of loaded database.
def
load_to_sqlite( tables: dict[str, pandas.core.frame.DataFrame], db_path: str | pathlib._local.Path, if_exists: str = 'replace', create_indexes: bool = True) -> SQLiteLoader:
208def load_to_sqlite( 209 tables: dict[str, pd.DataFrame], 210 db_path: str | Path, 211 if_exists: str = "replace", 212 create_indexes: bool = True, 213) -> SQLiteLoader: 214 """ 215 Quick function to load tables into SQLite. 216 217 Args: 218 tables: Dictionary of table_name -> DataFrame 219 db_path: Path to SQLite database 220 if_exists: How to behave if table exists 221 create_indexes: Whether to create indexes 222 223 Returns: 224 SQLiteLoader instance (still connected) 225 """ 226 loader = SQLiteLoader(db_path) 227 loader.connect() 228 loader.load_tables(tables, if_exists, create_indexes) 229 return loader
Quick function to load tables into SQLite.
Arguments:
- tables: Dictionary of table_name -> DataFrame
- db_path: Path to SQLite database
- if_exists: How to behave if table exists
- create_indexes: Whether to create indexes
Returns:
SQLiteLoader instance (still connected)