schema_sentinel.metadata_manager.model.metadata_utils
1import pandas as pd 2from sqlalchemy import text 3 4from ..engine import DBEngineStrategy, SfAlchemyEngine 5from .database import Database 6 7 8def _quote_identifier(identifier: str) -> str: 9 """Safely quote a SQL identifier by escaping internal quotes and wrapping in double quotes.""" 10 # Escape double quotes by doubling them (SQL standard), then wrap in double quotes 11 escaped = identifier.replace('"', '""') 12 return f'"{escaped}"' 13 14 15def get_table_constraints_old(database: str, engine: SfAlchemyEngine) -> pd.DataFrame: 16 statement = f"""select * from INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE CONSTRAINT_CATALOG = '{database.replace('"', "")}'""" 17 success, result = engine.execute(statement=statement, columns=["name"]) 18 return result 19 20 21def get_schemas(database: Database, engine: SfAlchemyEngine) -> pd.DataFrame: 22 database_name = database.database_name.replace('"', "") 23 statement = f"""select * from INFORMATION_SCHEMA.SCHEMATA WHERE CATALOG_NAME = '{database_name.replace('"', "")}'""" 24 success, result = engine.execute(statement=statement, columns=["name"]) 25 return result 26 27 28def to_lower_case(string_array: list) -> list: 29 return [x.lower() for x in string_array] 30 31 32def get_imported_keys(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 33 statement = f"show imported keys in database {_quote_identifier(database_name)}" 34 success, result = engine.execute(statement=statement, columns=["name"]) 35 return result 36 37 38def get_stages(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 39 statement = f"show stages in database {_quote_identifier(database_name)}" 40 success, result = engine.execute(statement=statement, columns=["name"]) 41 return result 42 43 44def get_table_constraints(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 45 statement = f""" 46select * from INFORMATION_SCHEMA.table_constraints where constraint_catalog='{database_name.replace('"', "")}' 47order by constraint_catalog, constraint_schema, table_name, constraint_type, constraint_name""" 48 success, result = engine.execute(statement=statement, columns=["name"]) 49 return result 50 51 52def get_referential_constraints(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 53 statement = f""" 54select * from INFORMATION_SCHEMA.referential_constraints where constraint_catalog='{database_name.replace('"', "")}' 55order by constraint_schema, constraint_name;""" 56 success, result = engine.execute(statement=statement, columns=["name"]) 57 result.columns = result.columns.str.lower() 58 return result 59 60 61def get_views(database_name: str, engine: DBEngineStrategy) -> pd.DataFrame: 62 statement = f"show views in database {_quote_identifier(database_name)}" 63 success, result = engine.execute(statement=statement, columns=["name"]) 64 result = result[result["schema_name"] != "INFORMATION_SCHEMA"].copy() 65 return result 66 67 68def get_procedures(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 69 success, result = engine.execute(statement=get_procedures_sql(database=database_name), columns=["name"]) 70 result.columns = result.columns.str.lower() 71 return result 72 73 74def get_functions(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 75 success, result = engine.execute(statement=get_functions_sql(database=database_name), columns=["name"]) 76 result.columns = result.columns.str.lower() 77 return result 78 79 80def get_columns(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 81 success, result = engine.execute(statement=get_columns_sql(database=database_name), columns=["name"]) 82 result.columns = result.columns.str.lower() 83 return result 84 85 86def get_tables(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 87 success, result = engine.execute(statement=get_tables_sql(database=database_name), columns=["name"]) 88 result.columns = result.columns.str.lower() 89 return result 90 91 92def get_columns_sql(database): 93 statement = f""" 94 select 95 * 96 from information_schema.columns 97 WHERE table_catalog='{database.replace('"', "")}' 98 order by table_schema, table_name, ordinal_position;""" 99 return statement 100 101 102def get_tables_sql(database): 103 database_name = database.replace('"', "") 104 statement = f""" 105 select 106 table_catalog, 107 table_schema, 108 table_name, 109 table_owner, 110 table_type, 111 is_transient, 112 clustering_key, 113 row_count, 114 bytes, 115 retention_time, 116 created, 117 last_altered, 118 last_ddl, 119 last_ddl_by, 120 auto_clustering_on, 121 comment 122 from information_schema.tables 123 where table_catalog = '{database_name}' 124 and table_type = 'BASE TABLE' 125 order by table_schema, table_name;""" 126 return statement 127 128 129def get_procedures_sql(database: str): 130 database_name = database.replace('"', "") 131 statement = f""" 132 SELECT 133 procedure_catalog, 134 procedure_schema, 135 procedure_name, 136 procedure_owner, 137 argument_signature, 138 data_type, 139 character_maximum_length, 140 character_octet_length, 141 numeric_precision, 142 numeric_precision_radix, 143 numeric_scale, 144 procedure_language, 145 procedure_definition, 146 TO_VARCHAR(CREATED::TIMESTAMP_TZ, 'YYYY-MM-DD HH:mi:SS TZHTZM') AS created, 147 TO_VARCHAR(LAST_ALTERED::TIMESTAMP_TZ, 'YYYY-MM-DD HH:mi:SS TZHTZM') AS last_altered, 148 comment 149 FROM INFORMATION_SCHEMA.PROCEDURES 150 WHERE PROCEDURE_CATALOG = '{database_name.replace('"', "")}' 151 ORDER BY PROCEDURE_SCHEMA, PROCEDURE_NAME""" 152 return statement 153 154 155def get_functions_sql(database): 156 database_name = database.replace('"', "") 157 statement = f""" 158 select 159 function_schema, 160 function_name, 161 function_owner, 162 argument_signature, 163 data_type, 164 character_maximum_length, 165 character_octet_length, 166 numeric_precision, 167 numeric_precision_radix, 168 numeric_scale, 169 function_language, 170 function_definition, 171 volatility, 172 is_null_call, 173 is_secure, 174 to_varchar(created::timestamp_tz, 'yyyy-mm-dd hh:mi:ss tzhtzm') as created, 175 to_varchar(last_altered::timestamp_tz, 'yyyy-mm-dd hh:mi:ss tzhtzm') as last_altered, 176 comment, 177 is_external, 178 api_integration, 179 context_headers, 180 max_batch_rows, 181 request_translator, 182 response_translator, 183 compression, 184 packages, 185 imports, 186 handler, 187 target_path, 188 runtime_version, 189 installed_packages, 190 is_memoizable 191 from information_schema.functions 192 where function_catalog = '{database_name.replace('"', "")}' 193 ORDER BY FUNCTION_SCHEMA, FUNCTION_NAME""" 194 return statement 195 196 197def get_tasks(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 198 success, tasks = engine.execute( 199 statement=f"""show tasks in database {_quote_identifier(database_name)};""", columns=["name"] 200 ) 201 return tasks 202 203 204def get_pipes(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 205 statement = f""" 206 select 207 pipe_catalog, 208 pipe_schema, 209 pipe_name, 210 pipe_owner, 211 definition, 212 is_autoingest_enabled, 213 notification_channel_name, 214 created, 215 last_altered, 216 comment, 217 pattern 218 from INFORMATION_SCHEMA.PIPES 219 WHERE PIPE_CATALOG = '{database_name.replace('"', "")}'""" 220 success, pipes = engine.execute(statement=statement, columns=["name"]) 221 return pipes 222 223 224def get_streams(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 225 success, streams = engine.execute( 226 statement=f"show streams in database {_quote_identifier(database_name)};", columns=["name"] 227 ) 228 return streams 229 230 231def get_database(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 232 statement = f""" 233select database_owner, is_transient, comment, created, ifnull(last_altered, created) as last_altered, 234 to_varchar(retention_time) as "retention_time" 235from INFORMATION_SCHEMA.databases WHERE DATABASE_NAME='{database_name.replace('"', "")}'""" 236 success, database_df = engine.execute(statement=statement, columns=["name"]) 237 return database_df.iloc[0] 238 239 240GET_CONSTRAINTS_SQL = """ 241select 242 'PRIMARY KEY' as "constraint_type", 243 "database_name", 244 "schema_name", 245 "table_name", 246 "constraint_name", 247 listagg("column_name", ', ') within group(order by "key_sequence") as "constraint_details", 248 NULL AS "reference_key", 249 Null as "update_rule", 250 Null as "delete_rule", 251 max("created_on") as "created" 252from primary_keys 253group by "database_name", 254 "schema_name", 255 "table_name", 256 "constraint_name" 257 258union all 259 260select 261 'UNIQUE KEY' as "constraint_type", 262 "database_name", 263 "schema_name", 264 "table_name", 265 "constraint_name", 266 listagg("column_name", ', ') within group(order by "key_sequence") as "constraint_details", 267 NULL AS "reference_key", 268 Null as "update_rule", 269 Null as "delete_rule", 270 max("created_on") as "created" 271from unique_keys 272group by "database_name", 273 "schema_name", 274 "table_name", 275 "constraint_name" 276 277union all 278 279select 280 * 281from default_constraints 282 283union all 284 285select 286 'FOREIGN KEY' as "constraint_type", 287 "fk_database_name", 288 "fk_schema_name", 289 "fk_table_name", 290 "fk_name", 291 listagg("fk_column_name", ', ') within group(order by "key_sequence") as "constraint_details", 292 concat("pk_database_name", '.', "pk_schema_name", '.', "pk_name") as "reference_key", 293 max("update_rule") as "update_rule", 294 max("delete_rule") as "delete_rule", 295 max("created_on") as "created" 296from imported_keys ik 297group by "fk_database_name", 298 "fk_schema_name", 299 "fk_table_name", 300 "fk_name", 301 "pk_database_name", 302 "pk_schema_name", 303 "pk_name"; 304""" 305 306 307def get_constraints(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 308 conn = engine.get_conn() 309 statement = f"show imported keys in database {_quote_identifier(database_name)};" 310 conn.execute(text(statement)) 311 statement = """create or replace temporary table imported_keys as 312select * from table(result_scan(last_query_id()));""" 313 conn.execute(text(statement)) 314 statement = f"show primary keys in database {_quote_identifier(database_name)};" 315 conn.execute(text(statement)) 316 statement = """create or replace temporary table primary_keys as 317select * from table(result_scan(last_query_id()));""" 318 conn.execute(text(statement)) 319 statement = f"show unique keys in database {_quote_identifier(database_name)};" 320 conn.execute(text(statement)) 321 statement = """create or replace temporary table unique_keys as 322select * from table(result_scan(last_query_id()));""" 323 conn.execute(text(statement)) 324 statement = f""" 325create or replace temporary table default_constraints as 326select 327 'DEFAULT CONSTRAINT' as constraint_type, 328 table_catalog, 329 table_schema, 330 table_name as tn, 331 'DF_' || column_name as constraint_name, 332 column_default as constraint_details, 333 NULL as reference_key, 334 Null as "update_rule", 335 Null as "delete_rule", 336 NULL as created 337from information_schema.columns 338where column_default is not null 339and table_catalog = '{database_name}'""" 340 conn.execute(text(statement)) 341 df = pd.read_sql(text(GET_CONSTRAINTS_SQL.replace("$DB_NAME", database_name)), conn) 342 return df
def
get_table_constraints_old( database: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
16def get_table_constraints_old(database: str, engine: SfAlchemyEngine) -> pd.DataFrame: 17 statement = f"""select * from INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE CONSTRAINT_CATALOG = '{database.replace('"', "")}'""" 18 success, result = engine.execute(statement=statement, columns=["name"]) 19 return result
def
get_schemas( database: schema_sentinel.metadata_manager.model.database.Database, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
22def get_schemas(database: Database, engine: SfAlchemyEngine) -> pd.DataFrame: 23 database_name = database.database_name.replace('"', "") 24 statement = f"""select * from INFORMATION_SCHEMA.SCHEMATA WHERE CATALOG_NAME = '{database_name.replace('"', "")}'""" 25 success, result = engine.execute(statement=statement, columns=["name"]) 26 return result
def
to_lower_case(string_array: list) -> list:
def
get_imported_keys( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_stages( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_table_constraints( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
45def get_table_constraints(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 46 statement = f""" 47select * from INFORMATION_SCHEMA.table_constraints where constraint_catalog='{database_name.replace('"', "")}' 48order by constraint_catalog, constraint_schema, table_name, constraint_type, constraint_name""" 49 success, result = engine.execute(statement=statement, columns=["name"]) 50 return result
def
get_referential_constraints( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
53def get_referential_constraints(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 54 statement = f""" 55select * from INFORMATION_SCHEMA.referential_constraints where constraint_catalog='{database_name.replace('"', "")}' 56order by constraint_schema, constraint_name;""" 57 success, result = engine.execute(statement=statement, columns=["name"]) 58 result.columns = result.columns.str.lower() 59 return result
def
get_views( database_name: str, engine: schema_sentinel.metadata_manager.engine.DBEngineStrategy) -> pandas.core.frame.DataFrame:
62def get_views(database_name: str, engine: DBEngineStrategy) -> pd.DataFrame: 63 statement = f"show views in database {_quote_identifier(database_name)}" 64 success, result = engine.execute(statement=statement, columns=["name"]) 65 result = result[result["schema_name"] != "INFORMATION_SCHEMA"].copy() 66 return result
def
get_procedures( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_functions( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_columns( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_tables( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_columns_sql(database):
def
get_tables_sql(database):
103def get_tables_sql(database): 104 database_name = database.replace('"', "") 105 statement = f""" 106 select 107 table_catalog, 108 table_schema, 109 table_name, 110 table_owner, 111 table_type, 112 is_transient, 113 clustering_key, 114 row_count, 115 bytes, 116 retention_time, 117 created, 118 last_altered, 119 last_ddl, 120 last_ddl_by, 121 auto_clustering_on, 122 comment 123 from information_schema.tables 124 where table_catalog = '{database_name}' 125 and table_type = 'BASE TABLE' 126 order by table_schema, table_name;""" 127 return statement
def
get_procedures_sql(database: str):
130def get_procedures_sql(database: str): 131 database_name = database.replace('"', "") 132 statement = f""" 133 SELECT 134 procedure_catalog, 135 procedure_schema, 136 procedure_name, 137 procedure_owner, 138 argument_signature, 139 data_type, 140 character_maximum_length, 141 character_octet_length, 142 numeric_precision, 143 numeric_precision_radix, 144 numeric_scale, 145 procedure_language, 146 procedure_definition, 147 TO_VARCHAR(CREATED::TIMESTAMP_TZ, 'YYYY-MM-DD HH:mi:SS TZHTZM') AS created, 148 TO_VARCHAR(LAST_ALTERED::TIMESTAMP_TZ, 'YYYY-MM-DD HH:mi:SS TZHTZM') AS last_altered, 149 comment 150 FROM INFORMATION_SCHEMA.PROCEDURES 151 WHERE PROCEDURE_CATALOG = '{database_name.replace('"', "")}' 152 ORDER BY PROCEDURE_SCHEMA, PROCEDURE_NAME""" 153 return statement
def
get_functions_sql(database):
156def get_functions_sql(database): 157 database_name = database.replace('"', "") 158 statement = f""" 159 select 160 function_schema, 161 function_name, 162 function_owner, 163 argument_signature, 164 data_type, 165 character_maximum_length, 166 character_octet_length, 167 numeric_precision, 168 numeric_precision_radix, 169 numeric_scale, 170 function_language, 171 function_definition, 172 volatility, 173 is_null_call, 174 is_secure, 175 to_varchar(created::timestamp_tz, 'yyyy-mm-dd hh:mi:ss tzhtzm') as created, 176 to_varchar(last_altered::timestamp_tz, 'yyyy-mm-dd hh:mi:ss tzhtzm') as last_altered, 177 comment, 178 is_external, 179 api_integration, 180 context_headers, 181 max_batch_rows, 182 request_translator, 183 response_translator, 184 compression, 185 packages, 186 imports, 187 handler, 188 target_path, 189 runtime_version, 190 installed_packages, 191 is_memoizable 192 from information_schema.functions 193 where function_catalog = '{database_name.replace('"', "")}' 194 ORDER BY FUNCTION_SCHEMA, FUNCTION_NAME""" 195 return statement
def
get_tasks( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_pipes( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
205def get_pipes(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 206 statement = f""" 207 select 208 pipe_catalog, 209 pipe_schema, 210 pipe_name, 211 pipe_owner, 212 definition, 213 is_autoingest_enabled, 214 notification_channel_name, 215 created, 216 last_altered, 217 comment, 218 pattern 219 from INFORMATION_SCHEMA.PIPES 220 WHERE PIPE_CATALOG = '{database_name.replace('"', "")}'""" 221 success, pipes = engine.execute(statement=statement, columns=["name"]) 222 return pipes
def
get_streams( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
def
get_database( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
232def get_database(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 233 statement = f""" 234select database_owner, is_transient, comment, created, ifnull(last_altered, created) as last_altered, 235 to_varchar(retention_time) as "retention_time" 236from INFORMATION_SCHEMA.databases WHERE DATABASE_NAME='{database_name.replace('"', "")}'""" 237 success, database_df = engine.execute(statement=statement, columns=["name"]) 238 return database_df.iloc[0]
GET_CONSTRAINTS_SQL =
'\nselect\n \'PRIMARY KEY\' as "constraint_type",\n "database_name",\n "schema_name",\n "table_name",\n "constraint_name",\n listagg("column_name", \', \') within group(order by "key_sequence") as "constraint_details",\n NULL AS "reference_key",\n Null as "update_rule",\n Null as "delete_rule",\n max("created_on") as "created"\nfrom primary_keys\ngroup by "database_name",\n "schema_name",\n "table_name",\n "constraint_name"\n\nunion all\n\nselect\n \'UNIQUE KEY\' as "constraint_type",\n "database_name",\n "schema_name",\n "table_name",\n "constraint_name",\n listagg("column_name", \', \') within group(order by "key_sequence") as "constraint_details",\n NULL AS "reference_key",\n Null as "update_rule",\n Null as "delete_rule",\n max("created_on") as "created"\nfrom unique_keys\ngroup by "database_name",\n "schema_name",\n "table_name",\n "constraint_name"\n\nunion all\n\nselect\n *\nfrom default_constraints\n\nunion all\n\nselect\n \'FOREIGN KEY\' as "constraint_type",\n "fk_database_name",\n "fk_schema_name",\n "fk_table_name",\n "fk_name",\n listagg("fk_column_name", \', \') within group(order by "key_sequence") as "constraint_details",\n concat("pk_database_name", \'.\', "pk_schema_name", \'.\', "pk_name") as "reference_key",\n max("update_rule") as "update_rule",\n max("delete_rule") as "delete_rule",\n max("created_on") as "created"\nfrom imported_keys ik\ngroup by "fk_database_name",\n "fk_schema_name",\n "fk_table_name",\n "fk_name",\n "pk_database_name",\n "pk_schema_name",\n "pk_name";\n'
def
get_constraints( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine) -> pandas.core.frame.DataFrame:
308def get_constraints(database_name: str, engine: SfAlchemyEngine) -> pd.DataFrame: 309 conn = engine.get_conn() 310 statement = f"show imported keys in database {_quote_identifier(database_name)};" 311 conn.execute(text(statement)) 312 statement = """create or replace temporary table imported_keys as 313select * from table(result_scan(last_query_id()));""" 314 conn.execute(text(statement)) 315 statement = f"show primary keys in database {_quote_identifier(database_name)};" 316 conn.execute(text(statement)) 317 statement = """create or replace temporary table primary_keys as 318select * from table(result_scan(last_query_id()));""" 319 conn.execute(text(statement)) 320 statement = f"show unique keys in database {_quote_identifier(database_name)};" 321 conn.execute(text(statement)) 322 statement = """create or replace temporary table unique_keys as 323select * from table(result_scan(last_query_id()));""" 324 conn.execute(text(statement)) 325 statement = f""" 326create or replace temporary table default_constraints as 327select 328 'DEFAULT CONSTRAINT' as constraint_type, 329 table_catalog, 330 table_schema, 331 table_name as tn, 332 'DF_' || column_name as constraint_name, 333 column_default as constraint_details, 334 NULL as reference_key, 335 Null as "update_rule", 336 Null as "delete_rule", 337 NULL as created 338from information_schema.columns 339where column_default is not null 340and table_catalog = '{database_name}'""" 341 conn.execute(text(statement)) 342 df = pd.read_sql(text(GET_CONSTRAINTS_SQL.replace("$DB_NAME", database_name)), conn) 343 return df