yaml_shredder.ddl_generator

Generate SQL DDL statements from table structures.

  1"""Generate SQL DDL statements from table structures."""
  2
  3from pathlib import Path
  4from typing import Any
  5
  6import pandas as pd
  7
  8
  9class DDLGenerator:
 10    """Generate SQL DDL (CREATE TABLE) statements from DataFrames."""
 11
 12    # Type mapping from pandas to SQL
 13    TYPE_MAPPING = {
 14        "int64": "INTEGER",
 15        "int32": "INTEGER",
 16        "float64": "FLOAT",
 17        "float32": "FLOAT",
 18        "bool": "BOOLEAN",
 19        "datetime64[ns]": "TIMESTAMP",
 20        "object": "VARCHAR(1000)",  # Default for strings
 21        "string": "VARCHAR(1000)",
 22    }
 23
 24    SNOWFLAKE_TYPE_MAPPING = {
 25        "int64": "NUMBER",
 26        "int32": "NUMBER",
 27        "float64": "FLOAT",
 28        "float32": "FLOAT",
 29        "bool": "BOOLEAN",
 30        "datetime64[ns]": "TIMESTAMP_NTZ",
 31        "object": "VARCHAR(16777216)",  # Snowflake max VARCHAR
 32        "string": "VARCHAR(16777216)",
 33    }
 34
 35    SQLITE_TYPE_MAPPING = {
 36        "int64": "INTEGER",
 37        "int32": "INTEGER",
 38        "float64": "REAL",
 39        "float32": "REAL",
 40        "bool": "INTEGER",  # SQLite uses 0/1 for boolean
 41        "datetime64[ns]": "TEXT",  # SQLite stores dates as TEXT or INTEGER
 42        "object": "TEXT",
 43        "string": "TEXT",
 44    }
 45
 46    def __init__(self, dialect: str = "snowflake"):
 47        """
 48        Initialize DDL generator.
 49
 50        Args:
 51            dialect: SQL dialect ('snowflake', 'postgres', 'mysql', 'sqlite', 'standard')
 52        """
 53        self.dialect = dialect.lower()
 54        self.ddl_statements = {}
 55
 56        # Select type mapping based on dialect
 57        if self.dialect == "snowflake":
 58            self.type_map = self.SNOWFLAKE_TYPE_MAPPING
 59        elif self.dialect == "sqlite":
 60            self.type_map = self.SQLITE_TYPE_MAPPING
 61        else:
 62            self.type_map = self.TYPE_MAPPING
 63
 64    def generate_ddl(
 65        self, tables: dict[str, pd.DataFrame], relationships: list[dict[str, Any]] | None = None
 66    ) -> dict[str, str]:
 67        """
 68        Generate DDL statements for all tables.
 69
 70        Args:
 71            tables: Dictionary of table_name -> DataFrame
 72            relationships: Optional list of relationships for foreign keys
 73
 74        Returns:
 75            Dictionary of table_name -> DDL statement
 76        """
 77        self.ddl_statements = {}
 78
 79        for table_name, df in tables.items():
 80            ddl = self._generate_table_ddl(table_name, df)
 81            self.ddl_statements[table_name] = ddl
 82
 83        # Add foreign key constraints if relationships provided
 84        if relationships:
 85            self._add_foreign_keys(relationships)
 86
 87        return self.ddl_statements
 88
 89    def _generate_table_ddl(self, table_name: str, df: pd.DataFrame) -> str:
 90        """
 91        Generate CREATE TABLE statement for a single table.
 92
 93        Args:
 94            table_name: Name of the table
 95            df: DataFrame with the data
 96
 97        Returns:
 98            CREATE TABLE DDL statement
 99        """
100        # Start DDL
101        ddl_lines = [f"CREATE TABLE {self._quote_identifier(table_name)} ("]
102
103        # Add columns
104        column_defs = []
105        for col_name in df.columns:
106            col_type = self._infer_column_type(df[col_name])
107            nullable = "NULL" if df[col_name].isna().any() else "NOT NULL"
108
109            # Special handling for certain columns
110            if col_name.lower() in ["id", "_row_index"]:
111                nullable = "NOT NULL"
112
113            column_def = f"    {self._quote_identifier(col_name)} {col_type} {nullable}"
114            column_defs.append(column_def)
115
116        ddl_lines.append(",\n".join(column_defs))
117
118        # Add primary key if obvious
119        if "id" in [c.lower() for c in df.columns]:
120            id_col = next(c for c in df.columns if c.lower() == "id")
121            ddl_lines.append(f",\n    PRIMARY KEY ({self._quote_identifier(id_col)})")
122
123        # Close statement
124        ddl_lines.append(");")
125
126        return "\n".join(ddl_lines)
127
128    def _infer_column_type(self, series: pd.Series) -> str:
129        """
130        Infer SQL type from pandas Series.
131
132        Args:
133            series: Pandas Series to analyze
134
135        Returns:
136            SQL type string
137        """
138        dtype_str = str(series.dtype)
139
140        # Check for string length if object type
141        if dtype_str == "object":
142            # Skip expensive calculation for large series
143            if len(series) > 1000:
144                sample_series = series.dropna().head(100)
145            else:
146                sample_series = series.dropna()
147
148            if len(sample_series) == 0:
149                return self.type_map.get("object", "VARCHAR(1000)")
150
151            max_length = sample_series.astype(str).str.len().max()
152            if pd.isna(max_length):
153                return self.type_map.get("object", "VARCHAR(1000)")
154
155            if self.dialect == "sqlite":
156                # SQLite TEXT has no length limit
157                return "TEXT"
158            elif self.dialect == "snowflake":
159                # Snowflake VARCHAR can be very large
160                if max_length > 16000000:
161                    return "VARCHAR(16777216)"
162                return f"VARCHAR({min(int(max_length * 1.5), 16777216)})"
163            else:
164                # Other databases - cap at reasonable size
165                return f"VARCHAR({min(int(max_length * 1.5), 4000)})"
166
167        return self.type_map.get(dtype_str, "VARCHAR(1000)")
168
169    def _add_foreign_keys(self, relationships: list[dict[str, Any]]) -> None:
170        """
171        Add foreign key constraints to DDL statements.
172
173        Args:
174            relationships: List of relationship dictionaries
175        """
176        for rel in relationships:
177            child_table = rel["child_table"]
178            parent_table = rel["parent_table"]
179            fk_columns = rel["foreign_keys"]
180
181            if child_table in self.ddl_statements:
182                # Add ALTER TABLE statement for foreign key
183                for fk_col in fk_columns:
184                    fk_name = f"FK_{child_table}_{parent_table}_{fk_col}"
185                    alter_stmt = (
186                        f"\nALTER TABLE {self._quote_identifier(child_table)} "
187                        f"ADD CONSTRAINT {self._quote_identifier(fk_name)} "
188                        f"FOREIGN KEY ({self._quote_identifier(f'parent_{fk_col}')}) "
189                        f"REFERENCES {self._quote_identifier(parent_table)}({self._quote_identifier(fk_col)});"
190                    )
191                    self.ddl_statements[child_table] += alter_stmt
192
193    def _quote_identifier(self, identifier: str) -> str:
194        """
195        Quote identifier based on dialect.
196
197        Args:
198            identifier: Identifier to quote
199
200        Returns:
201            Quoted identifier
202        """
203        if self.dialect == "mysql":
204            return f"`{identifier}`"
205        else:  # snowflake, postgres, sqlite use double quotes
206            return f'"{identifier}"'
207
208    def save_ddl(self, output_file: str | Path) -> None:
209        """
210        Save all DDL statements to a file.
211
212        Args:
213            output_file: Path to output file
214        """
215        output_file = Path(output_file)
216
217        with open(output_file, "w") as f:
218            f.write("-- DDL Statements generated by YAML Shredder\n")
219            f.write(f"-- Dialect: {self.dialect.upper()}\n")
220            f.write(f"-- Tables: {len(self.ddl_statements)}\n\n")
221
222            for table_name, ddl in self.ddl_statements.items():
223                f.write(f"-- Table: {table_name}\n")
224                f.write(ddl)
225                f.write("\n\n")
226
227        print(f"DDL saved to: {output_file}")
228
229    def print_ddl(self) -> None:
230        """Print all DDL statements to console."""
231        print(f"\n{'=' * 60}")
232        print("SQL DDL STATEMENTS")
233        print(f"{'=' * 60}")
234        print(f"Dialect: {self.dialect.upper()}")
235        print(f"Tables: {len(self.ddl_statements)}\n")
236
237        for table_name, ddl in self.ddl_statements.items():
238            print(f"-- {table_name}")
239            print(ddl)
240            print()
241
242
243def generate_snowflake_ddl(
244    tables: dict[str, pd.DataFrame],
245    relationships: list[dict[str, Any]] | None = None,
246    output_file: str | Path | None = None,
247) -> dict[str, str]:
248    """
249    Quick function to generate Snowflake DDL.
250
251    Args:
252        tables: Dictionary of table_name -> DataFrame
253        relationships: Optional relationships
254        output_file: Optional file to save DDL
255
256    Returns:
257        Dictionary of DDL statements
258    """
259    generator = DDLGenerator(dialect="snowflake")
260    ddl = generator.generate_ddl(tables, relationships)
261
262    if output_file:
263        generator.save_ddl(output_file)
264
265    return ddl
class DDLGenerator:
 10class DDLGenerator:
 11    """Generate SQL DDL (CREATE TABLE) statements from DataFrames."""
 12
 13    # Type mapping from pandas to SQL
 14    TYPE_MAPPING = {
 15        "int64": "INTEGER",
 16        "int32": "INTEGER",
 17        "float64": "FLOAT",
 18        "float32": "FLOAT",
 19        "bool": "BOOLEAN",
 20        "datetime64[ns]": "TIMESTAMP",
 21        "object": "VARCHAR(1000)",  # Default for strings
 22        "string": "VARCHAR(1000)",
 23    }
 24
 25    SNOWFLAKE_TYPE_MAPPING = {
 26        "int64": "NUMBER",
 27        "int32": "NUMBER",
 28        "float64": "FLOAT",
 29        "float32": "FLOAT",
 30        "bool": "BOOLEAN",
 31        "datetime64[ns]": "TIMESTAMP_NTZ",
 32        "object": "VARCHAR(16777216)",  # Snowflake max VARCHAR
 33        "string": "VARCHAR(16777216)",
 34    }
 35
 36    SQLITE_TYPE_MAPPING = {
 37        "int64": "INTEGER",
 38        "int32": "INTEGER",
 39        "float64": "REAL",
 40        "float32": "REAL",
 41        "bool": "INTEGER",  # SQLite uses 0/1 for boolean
 42        "datetime64[ns]": "TEXT",  # SQLite stores dates as TEXT or INTEGER
 43        "object": "TEXT",
 44        "string": "TEXT",
 45    }
 46
 47    def __init__(self, dialect: str = "snowflake"):
 48        """
 49        Initialize DDL generator.
 50
 51        Args:
 52            dialect: SQL dialect ('snowflake', 'postgres', 'mysql', 'sqlite', 'standard')
 53        """
 54        self.dialect = dialect.lower()
 55        self.ddl_statements = {}
 56
 57        # Select type mapping based on dialect
 58        if self.dialect == "snowflake":
 59            self.type_map = self.SNOWFLAKE_TYPE_MAPPING
 60        elif self.dialect == "sqlite":
 61            self.type_map = self.SQLITE_TYPE_MAPPING
 62        else:
 63            self.type_map = self.TYPE_MAPPING
 64
 65    def generate_ddl(
 66        self, tables: dict[str, pd.DataFrame], relationships: list[dict[str, Any]] | None = None
 67    ) -> dict[str, str]:
 68        """
 69        Generate DDL statements for all tables.
 70
 71        Args:
 72            tables: Dictionary of table_name -> DataFrame
 73            relationships: Optional list of relationships for foreign keys
 74
 75        Returns:
 76            Dictionary of table_name -> DDL statement
 77        """
 78        self.ddl_statements = {}
 79
 80        for table_name, df in tables.items():
 81            ddl = self._generate_table_ddl(table_name, df)
 82            self.ddl_statements[table_name] = ddl
 83
 84        # Add foreign key constraints if relationships provided
 85        if relationships:
 86            self._add_foreign_keys(relationships)
 87
 88        return self.ddl_statements
 89
 90    def _generate_table_ddl(self, table_name: str, df: pd.DataFrame) -> str:
 91        """
 92        Generate CREATE TABLE statement for a single table.
 93
 94        Args:
 95            table_name: Name of the table
 96            df: DataFrame with the data
 97
 98        Returns:
 99            CREATE TABLE DDL statement
100        """
101        # Start DDL
102        ddl_lines = [f"CREATE TABLE {self._quote_identifier(table_name)} ("]
103
104        # Add columns
105        column_defs = []
106        for col_name in df.columns:
107            col_type = self._infer_column_type(df[col_name])
108            nullable = "NULL" if df[col_name].isna().any() else "NOT NULL"
109
110            # Special handling for certain columns
111            if col_name.lower() in ["id", "_row_index"]:
112                nullable = "NOT NULL"
113
114            column_def = f"    {self._quote_identifier(col_name)} {col_type} {nullable}"
115            column_defs.append(column_def)
116
117        ddl_lines.append(",\n".join(column_defs))
118
119        # Add primary key if obvious
120        if "id" in [c.lower() for c in df.columns]:
121            id_col = next(c for c in df.columns if c.lower() == "id")
122            ddl_lines.append(f",\n    PRIMARY KEY ({self._quote_identifier(id_col)})")
123
124        # Close statement
125        ddl_lines.append(");")
126
127        return "\n".join(ddl_lines)
128
129    def _infer_column_type(self, series: pd.Series) -> str:
130        """
131        Infer SQL type from pandas Series.
132
133        Args:
134            series: Pandas Series to analyze
135
136        Returns:
137            SQL type string
138        """
139        dtype_str = str(series.dtype)
140
141        # Check for string length if object type
142        if dtype_str == "object":
143            # Skip expensive calculation for large series
144            if len(series) > 1000:
145                sample_series = series.dropna().head(100)
146            else:
147                sample_series = series.dropna()
148
149            if len(sample_series) == 0:
150                return self.type_map.get("object", "VARCHAR(1000)")
151
152            max_length = sample_series.astype(str).str.len().max()
153            if pd.isna(max_length):
154                return self.type_map.get("object", "VARCHAR(1000)")
155
156            if self.dialect == "sqlite":
157                # SQLite TEXT has no length limit
158                return "TEXT"
159            elif self.dialect == "snowflake":
160                # Snowflake VARCHAR can be very large
161                if max_length > 16000000:
162                    return "VARCHAR(16777216)"
163                return f"VARCHAR({min(int(max_length * 1.5), 16777216)})"
164            else:
165                # Other databases - cap at reasonable size
166                return f"VARCHAR({min(int(max_length * 1.5), 4000)})"
167
168        return self.type_map.get(dtype_str, "VARCHAR(1000)")
169
170    def _add_foreign_keys(self, relationships: list[dict[str, Any]]) -> None:
171        """
172        Add foreign key constraints to DDL statements.
173
174        Args:
175            relationships: List of relationship dictionaries
176        """
177        for rel in relationships:
178            child_table = rel["child_table"]
179            parent_table = rel["parent_table"]
180            fk_columns = rel["foreign_keys"]
181
182            if child_table in self.ddl_statements:
183                # Add ALTER TABLE statement for foreign key
184                for fk_col in fk_columns:
185                    fk_name = f"FK_{child_table}_{parent_table}_{fk_col}"
186                    alter_stmt = (
187                        f"\nALTER TABLE {self._quote_identifier(child_table)} "
188                        f"ADD CONSTRAINT {self._quote_identifier(fk_name)} "
189                        f"FOREIGN KEY ({self._quote_identifier(f'parent_{fk_col}')}) "
190                        f"REFERENCES {self._quote_identifier(parent_table)}({self._quote_identifier(fk_col)});"
191                    )
192                    self.ddl_statements[child_table] += alter_stmt
193
194    def _quote_identifier(self, identifier: str) -> str:
195        """
196        Quote identifier based on dialect.
197
198        Args:
199            identifier: Identifier to quote
200
201        Returns:
202            Quoted identifier
203        """
204        if self.dialect == "mysql":
205            return f"`{identifier}`"
206        else:  # snowflake, postgres, sqlite use double quotes
207            return f'"{identifier}"'
208
209    def save_ddl(self, output_file: str | Path) -> None:
210        """
211        Save all DDL statements to a file.
212
213        Args:
214            output_file: Path to output file
215        """
216        output_file = Path(output_file)
217
218        with open(output_file, "w") as f:
219            f.write("-- DDL Statements generated by YAML Shredder\n")
220            f.write(f"-- Dialect: {self.dialect.upper()}\n")
221            f.write(f"-- Tables: {len(self.ddl_statements)}\n\n")
222
223            for table_name, ddl in self.ddl_statements.items():
224                f.write(f"-- Table: {table_name}\n")
225                f.write(ddl)
226                f.write("\n\n")
227
228        print(f"DDL saved to: {output_file}")
229
230    def print_ddl(self) -> None:
231        """Print all DDL statements to console."""
232        print(f"\n{'=' * 60}")
233        print("SQL DDL STATEMENTS")
234        print(f"{'=' * 60}")
235        print(f"Dialect: {self.dialect.upper()}")
236        print(f"Tables: {len(self.ddl_statements)}\n")
237
238        for table_name, ddl in self.ddl_statements.items():
239            print(f"-- {table_name}")
240            print(ddl)
241            print()

Generate SQL DDL (CREATE TABLE) statements from DataFrames.

DDLGenerator(dialect: str = 'snowflake')
47    def __init__(self, dialect: str = "snowflake"):
48        """
49        Initialize DDL generator.
50
51        Args:
52            dialect: SQL dialect ('snowflake', 'postgres', 'mysql', 'sqlite', 'standard')
53        """
54        self.dialect = dialect.lower()
55        self.ddl_statements = {}
56
57        # Select type mapping based on dialect
58        if self.dialect == "snowflake":
59            self.type_map = self.SNOWFLAKE_TYPE_MAPPING
60        elif self.dialect == "sqlite":
61            self.type_map = self.SQLITE_TYPE_MAPPING
62        else:
63            self.type_map = self.TYPE_MAPPING

Initialize DDL generator.

Arguments:
  • dialect: SQL dialect ('snowflake', 'postgres', 'mysql', 'sqlite', 'standard')
TYPE_MAPPING = {'int64': 'INTEGER', 'int32': 'INTEGER', 'float64': 'FLOAT', 'float32': 'FLOAT', 'bool': 'BOOLEAN', 'datetime64[ns]': 'TIMESTAMP', 'object': 'VARCHAR(1000)', 'string': 'VARCHAR(1000)'}
SNOWFLAKE_TYPE_MAPPING = {'int64': 'NUMBER', 'int32': 'NUMBER', 'float64': 'FLOAT', 'float32': 'FLOAT', 'bool': 'BOOLEAN', 'datetime64[ns]': 'TIMESTAMP_NTZ', 'object': 'VARCHAR(16777216)', 'string': 'VARCHAR(16777216)'}
SQLITE_TYPE_MAPPING = {'int64': 'INTEGER', 'int32': 'INTEGER', 'float64': 'REAL', 'float32': 'REAL', 'bool': 'INTEGER', 'datetime64[ns]': 'TEXT', 'object': 'TEXT', 'string': 'TEXT'}
dialect
ddl_statements
def generate_ddl( self, tables: dict[str, pandas.core.frame.DataFrame], relationships: list[dict[str, typing.Any]] | None = None) -> dict[str, str]:
65    def generate_ddl(
66        self, tables: dict[str, pd.DataFrame], relationships: list[dict[str, Any]] | None = None
67    ) -> dict[str, str]:
68        """
69        Generate DDL statements for all tables.
70
71        Args:
72            tables: Dictionary of table_name -> DataFrame
73            relationships: Optional list of relationships for foreign keys
74
75        Returns:
76            Dictionary of table_name -> DDL statement
77        """
78        self.ddl_statements = {}
79
80        for table_name, df in tables.items():
81            ddl = self._generate_table_ddl(table_name, df)
82            self.ddl_statements[table_name] = ddl
83
84        # Add foreign key constraints if relationships provided
85        if relationships:
86            self._add_foreign_keys(relationships)
87
88        return self.ddl_statements

Generate DDL statements for all tables.

Arguments:
  • tables: Dictionary of table_name -> DataFrame
  • relationships: Optional list of relationships for foreign keys
Returns:

Dictionary of table_name -> DDL statement

def save_ddl(self, output_file: str | pathlib._local.Path) -> None:
209    def save_ddl(self, output_file: str | Path) -> None:
210        """
211        Save all DDL statements to a file.
212
213        Args:
214            output_file: Path to output file
215        """
216        output_file = Path(output_file)
217
218        with open(output_file, "w") as f:
219            f.write("-- DDL Statements generated by YAML Shredder\n")
220            f.write(f"-- Dialect: {self.dialect.upper()}\n")
221            f.write(f"-- Tables: {len(self.ddl_statements)}\n\n")
222
223            for table_name, ddl in self.ddl_statements.items():
224                f.write(f"-- Table: {table_name}\n")
225                f.write(ddl)
226                f.write("\n\n")
227
228        print(f"DDL saved to: {output_file}")

Save all DDL statements to a file.

Arguments:
  • output_file: Path to output file
def print_ddl(self) -> None:
230    def print_ddl(self) -> None:
231        """Print all DDL statements to console."""
232        print(f"\n{'=' * 60}")
233        print("SQL DDL STATEMENTS")
234        print(f"{'=' * 60}")
235        print(f"Dialect: {self.dialect.upper()}")
236        print(f"Tables: {len(self.ddl_statements)}\n")
237
238        for table_name, ddl in self.ddl_statements.items():
239            print(f"-- {table_name}")
240            print(ddl)
241            print()

Print all DDL statements to console.

def generate_snowflake_ddl( tables: dict[str, pandas.core.frame.DataFrame], relationships: list[dict[str, typing.Any]] | None = None, output_file: str | pathlib._local.Path | None = None) -> dict[str, str]:
244def generate_snowflake_ddl(
245    tables: dict[str, pd.DataFrame],
246    relationships: list[dict[str, Any]] | None = None,
247    output_file: str | Path | None = None,
248) -> dict[str, str]:
249    """
250    Quick function to generate Snowflake DDL.
251
252    Args:
253        tables: Dictionary of table_name -> DataFrame
254        relationships: Optional relationships
255        output_file: Optional file to save DDL
256
257    Returns:
258        Dictionary of DDL statements
259    """
260    generator = DDLGenerator(dialect="snowflake")
261    ddl = generator.generate_ddl(tables, relationships)
262
263    if output_file:
264        generator.save_ddl(output_file)
265
266    return ddl

Quick function to generate Snowflake DDL.

Arguments:
  • tables: Dictionary of table_name -> DataFrame
  • relationships: Optional relationships
  • output_file: Optional file to save DDL
Returns:

Dictionary of DDL statements