schema_sentinel.metadata_manager.model.metadata_container
1import json 2import logging as log 3 4import pandas as pd 5 6from ..enums import DbObjectType 7from . import CommonBase 8from .column import Column 9from .column_constraint import ColumnConstraint 10from .constraint import Constraint 11from .database import Database 12from .function import Function 13from .pipe import Pipe 14from .procedure import Procedure 15from .referential_constraint import ReferentialConstraint 16from .schema import Schema 17from .stage import Stage 18from .stream import Stream 19from .table import Table 20from .table_constraint import TableConstraint 21from .task import Task 22from .view import View 23 24 25class MetaData: 26 database_object: Database 27 schemas: dict[str, Schema] = [] 28 schemas_to_include: list[str] 29 tables: pd.DataFrame = None 30 columns: pd.DataFrame = None 31 views: pd.DataFrame = None 32 streams: pd.DataFrame = None 33 pipes: pd.DataFrame = None 34 tasks: pd.DataFrame = None 35 procedures: pd.DataFrame = None 36 functions: pd.DataFrame = None 37 table_constraints: pd.DataFrame = None 38 referential_constraints: pd.DataFrame = None 39 constraints: pd.DataFrame = None 40 stages: pd.DataFrame = None 41 column_constraints: pd.DataFrame = None 42 metadata: dict[str, dict[str, CommonBase]] = {} 43 44 def __init__(self, database_object: Database, schemas_to_include: list[str], db_timestamp_to_string): 45 self.database_object = database_object 46 self.metadata[DbObjectType.DATABASE] = {database_object.database_id: database_object} 47 log.info(f"Metadata schemas to include are {schemas_to_include}") 48 self.schemas_to_include = schemas_to_include 49 self.db_timestamp_to_string = db_timestamp_to_string 50 51 def save(self, session): 52 log.info("Save tables") 53 self.save_tables(session=session) 54 log.info("Save columns") 55 self.save_columns(session=session) 56 log.info("Save constraints") 57 self.save_constraints(session=session) 58 log.info("Save column constraints") 59 self.save_column_constraints(session=session) 60 log.info("Save functions") 61 self.save_functions(session=session) 62 log.info("Save pipes") 63 self.save_pipes(session=session) 64 log.info("Save procedures") 65 self.save_procedures(session=session) 66 log.info("Save referential constraints") 67 self.save_referential_constraints(session=session) 68 log.info("Save stages") 69 self.save_stages(session=session) 70 log.info("Save streams") 71 self.save_streams(session=session) 72 log.info("Save table_constraints") 73 self.save_table_constraints(session=session) 74 log.info("Save tasks") 75 self.save_tasks(session=session) 76 log.info("Save views") 77 self.save_views(session=session) 78 79 def database_id(self) -> str: 80 return self.database_object.__get_id__() 81 82 def get_schema_id(self, schema_name: str) -> str: 83 id = json.loads(self.database_id()) 84 id["schema_name"] = schema_name 85 return json.dumps(id) 86 87 def get_table_id(self, schema_name: str, table_name: str) -> str: 88 id = json.loads(self.get_schema_id(schema_name)) 89 id["table_name"] = table_name 90 return json.dumps(id) 91 92 def get_constraint_id(self, schema_name: str, constraint_name: str) -> str: 93 id = json.loads(self.get_schema_id(schema_name)) 94 id["constraint_name"] = constraint_name 95 return json.dumps(id) 96 97 def get_table_constraint_id(self, table_id: str, constraint_name: str) -> str: 98 id = json.loads(table_id) 99 id["constraint_name"] = constraint_name 100 return json.dumps(id) 101 102 def get_view_id(self, schema_name: str, view_name: str) -> str: 103 id = json.loads(self.get_schema_id(schema_name)) 104 id["view_name"] = view_name 105 return json.dumps(id) 106 107 def get_task_id(self, schema_name: str, task_name: str) -> str: 108 id = json.loads(self.get_schema_id(schema_name)) 109 id["task_name"] = task_name 110 return json.dumps(id) 111 112 def get_stream_id(self, schema_name: str, stream_name: str) -> str: 113 id = json.loads(self.get_schema_id(schema_name)) 114 id["stream_name"] = stream_name 115 return json.dumps(id) 116 117 def get_pipe_id(self, schema_name: str, pipe_name: str) -> str: 118 id = json.loads(self.get_schema_id(schema_name)) 119 id["pipe_name"] = pipe_name 120 return json.dumps(id) 121 122 def get_stage_id(self, schema_name: str, stage_name: str) -> str: 123 id = json.loads(self.get_schema_id(schema_name)) 124 id["stage_name"] = stage_name 125 return json.dumps(id) 126 127 def get_column_id(self, schema_name: str, table_name: str, column_name: str) -> str: 128 id = json.loads(self.get_table_id(schema_name, table_name)) 129 id["column_name"] = column_name 130 return json.dumps(id) 131 132 def get_procedure_id(self, schema_name: str, procedure_name: str, argument_signature: str) -> str: 133 id = json.loads(self.get_schema_id(schema_name)) 134 id["procedure_name"] = procedure_name 135 id["argument_signature"] = argument_signature 136 return json.dumps(id) 137 138 def get_function_id(self, schema_name: str, function_name: str, argument_signature: str) -> str: 139 id = json.loads(self.get_schema_id(schema_name)) 140 id["function_name"] = function_name 141 id["argument_signature"] = argument_signature 142 return json.dumps(id) 143 144 def save_tables(self, session) -> bool: 145 self.metadata[DbObjectType.TABLE] = {} 146 for _index, df in self.tables.iterrows(): 147 table = Table( 148 schema_id=self.get_schema_id(df["table_schema"]), 149 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 150 table_name=df["table_name"], 151 table_owner=df["table_owner"], 152 table_type=df["table_type"], 153 is_transient=df["is_transient"], 154 clustering_key=df["clustering_key"], 155 row_count=df["row_count"], 156 comment=df["comment"], 157 bytes=df["bytes"], 158 retention_time=df["retention_time"], 159 created=self.db_timestamp_to_string(df["created"]), 160 last_altered=self.db_timestamp_to_string(df["last_altered"]), 161 last_ddl=self.db_timestamp_to_string(df["last_ddl"]), 162 last_ddl_by=df["last_ddl_by"], 163 auto_clustering_on=df["auto_clustering_on"], 164 change_tracking="True", 165 is_external="False", 166 enable_schema_evolution="False", 167 owner_role_type="DATABASE", 168 is_event="False", 169 ) 170 table.save(session=session) 171 self.metadata[DbObjectType.TABLE][table.table_id] = table 172 173 return True 174 175 def save_column_constraints(self, session) -> bool: 176 self.metadata[DbObjectType.COLUMN_CONSTRAINT] = {} 177 for _index, df in self.column_constraints.iterrows(): 178 column_constraint = ColumnConstraint( 179 pk_column_id=self.get_column_id(df["pk_schema_name"], df["pk_table_name"], df["pk_column_name"]), 180 pk_constraint_id=self.get_constraint_id(df["pk_schema_name"], df["pk_name"]), 181 fk_column_id=self.get_column_id(df["fk_schema_name"], df["fk_table_name"], df["fk_column_name"]), 182 fk_constraint_id=self.get_constraint_id(df["fk_schema_name"], df["fk_name"]), 183 pk_name=df["pk_name"], 184 fk_name=df["fk_name"], 185 key_sequence=df["key_sequence"], 186 comment=df["comment"], 187 created=self.db_timestamp_to_string(df["created_on"]), 188 deferrability=df["deferrability"], 189 rely=df["rely"], 190 update_rule=df["update_rule"], 191 delete_rule=df["delete_rule"], 192 ) 193 column_constraint.column_constraint_id = column_constraint.__get_id__() 194 195 column_constraint.save(session=session) 196 self.metadata[DbObjectType.COLUMN_CONSTRAINT][column_constraint.column_constraint_id] = column_constraint 197 198 return True 199 200 def save_referential_constraints(self, session) -> bool: 201 self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT] = {} 202 for _index, df in self.referential_constraints.iterrows(): 203 referential_constraint = ReferentialConstraint( 204 foreign_key_constraint_id=self.get_constraint_id(df["constraint_schema"], df["constraint_name"]), 205 unique_constraint_id=self.get_constraint_id( 206 df["unique_constraint_schema"], df["unique_constraint_name"] 207 ), 208 fk_name=df["constraint_name"], 209 pk_name=df["unique_constraint_name"], 210 match_option=df["match_option"], 211 update_rule=df["update_rule"], 212 delete_rule=df["delete_rule"], 213 comment=df["comment"], 214 created=self.db_timestamp_to_string(df["created"]), 215 last_altered=self.db_timestamp_to_string(df["last_altered"]), 216 ) 217 referential_constraint.referential_constraint_id = referential_constraint.__get_id__() 218 referential_constraint.save(session=session) 219 self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT][referential_constraint.referential_constraint_id] = ( 220 referential_constraint 221 ) 222 223 return True 224 225 def save_table_constraints(self, session) -> bool: 226 self.metadata[DbObjectType.TABLE_CONSTRAINT] = {} 227 for _index, df in self.table_constraints.iterrows(): 228 table_constraint = TableConstraint( 229 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 230 table_constraint_name=df["constraint_name"], 231 constraint_type=df["constraint_type"], 232 is_deferrable=df["is_deferrable"], 233 initially_deferred=df["initially_deferred"], 234 enforced=df["enforced"], 235 comment=df["comment"], 236 created=self.db_timestamp_to_string(df["created"]), 237 last_altered=self.db_timestamp_to_string(df["last_altered"]), 238 rely=df["rely"], 239 ) 240 table_constraint.table_constraint_id = table_constraint.__get_id__() 241 table_constraint.save(session=session) 242 self.metadata[DbObjectType.TABLE_CONSTRAINT][table_constraint.table_constraint_id] = table_constraint 243 244 return True 245 246 def save_constraints(self, session) -> bool: 247 self.metadata[DbObjectType.CONSTRAINT] = {} 248 for _index, df in self.constraints.iterrows(): 249 constraint = Constraint( 250 table_id=self.get_table_id(df["schema_name"], df["table_name"]), 251 constraint_name=df["constraint_name"], 252 constraint_type=df["constraint_type"], 253 constraint_details=df["constraint_details"], 254 reference_key=df["reference_key"], 255 created=self.db_timestamp_to_string(df["created"]), 256 update_rule=df["update_rule"], 257 delete_rule=df["delete_rule"], 258 ) 259 constraint.constraint_id = constraint.__get_id__() 260 constraint.save(session=session) 261 self.metadata[DbObjectType.CONSTRAINT][constraint.constraint_id] = constraint 262 263 return True 264 265 def save_tasks(self, session) -> bool: 266 self.metadata[DbObjectType.TASK] = {} 267 for _index, df in self.tasks.iterrows(): 268 try: 269 task = Task( 270 id=str(df["id"]), 271 task_id=self.get_task_id(df["schema_name"], df["name"]), 272 schema_id=self.get_schema_id(df["schema_name"]), 273 task_name=df["name"], 274 task_owner=df["owner"], 275 warehouse=df["warehouse"], 276 schedule=df["schedule"], 277 predecessors=df["predecessors"], 278 state=df["state"], 279 definition=df["definition"], 280 condition=df["condition"], 281 allow_overlapping_execution=df["allow_overlapping_execution"], 282 error_integration=df["error_integration"], 283 comment=df["comment"], 284 last_committed=self.db_timestamp_to_string(df["last_committed_on"]), 285 last_suspended=self.db_timestamp_to_string(df["last_suspended_on"]), 286 owner_role_type=df["owner_role_type"], 287 config=df["config"], 288 created=self.db_timestamp_to_string(df["created_on"]), 289 ) 290 291 task.save(session=session) 292 self.metadata[DbObjectType.TASK][task.task_id] = task 293 294 except Exception as e: 295 log.error(f"Failed to save task: {task}, exception: {e}") 296 import ipdb 297 298 ipdb.set_trace() 299 300 return True 301 302 def save_streams(self, session) -> bool: 303 self.metadata[DbObjectType.STREAM] = {} 304 failed_rows = [] 305 for _index, df in self.streams.iterrows(): 306 try: 307 stream = Stream( 308 schema_id=self.get_schema_id(df["schema_name"]), 309 stream_id=self.get_stream_id(df["schema_name"], df["name"]), 310 stream_name=df["name"], 311 stream_owner=df["owner"], 312 comment=df["comment"], 313 table_name=df["table_name"], 314 source_type=df["source_type"], 315 base_tables=df["base_tables"], 316 type=df["type"], 317 stale=df["stale"], 318 mode=df["mode"], 319 stale_after=self.db_timestamp_to_string(df["stale_after"]), 320 invalid_reason=df["invalid_reason"], 321 owner_role_type=df["owner_role_type"], 322 created=self.db_timestamp_to_string(df["created_on"]), 323 ) 324 325 stream.save(session=session) 326 self.metadata[DbObjectType.STREAM][stream.stream_id] = stream 327 328 except Exception as e: 329 log.error(f"Exception saving column:{stream}: {e}") 330 failed_rows.append(stream.stream_id) 331 import ipdb 332 333 ipdb.set_trace() 334 335 return True 336 337 def save_stages(self, session) -> bool: 338 self.metadata[DbObjectType.STAGE] = {} 339 for _index, df in self.stages.iterrows(): 340 stage = Stage( 341 schema_id=self.get_schema_id(df["schema_name"]), 342 stage_id=self.get_stage_id(df["schema_name"], df["name"]), 343 stage_name=df["name"], 344 stage_owner=df["owner"], 345 stage_url=df["url"], 346 stage_region=df["region"], 347 stage_type=df["type"], 348 comment=df["comment"], 349 created=self.db_timestamp_to_string(df["created_on"]), 350 has_credentials=df["has_credentials"], 351 has_encryption_key=df["has_encryption_key"], 352 cloud=df["cloud"], 353 notification_channel=df["notification_channel"], 354 storage_integration=df["storage_integration"], 355 ) 356 357 stage.save(session=session) 358 self.metadata[DbObjectType.STAGE][stage.stage_id] = stage 359 360 return True 361 362 def save_pipes(self, session) -> bool: 363 self.metadata[DbObjectType.SCHEMA] = {} 364 for _index, df in self.pipes.iterrows(): 365 pipe = Pipe( 366 schema_id=self.get_schema_id(df["pipe_schema"]), 367 pipe_id=self.get_pipe_id(df["pipe_schema"], df["pipe_name"]), 368 pipe_name=df["pipe_name"], 369 pipe_owner=df["pipe_owner"], 370 pipe_definition=df["definition"], 371 is_autoingest_enabled=df["is_autoingest_enabled"], 372 notification_channel_name=df["notification_channel_name"], 373 comment=df["comment"], 374 created=self.db_timestamp_to_string(df["created"]), 375 last_altered=self.db_timestamp_to_string(df["last_altered"]), 376 pattern=df["pattern"], 377 ) 378 379 pipe.save(session=session) 380 self.metadata[DbObjectType.SCHEMA][pipe.pipe_id] = pipe 381 382 return True 383 384 def save_functions(self, session) -> bool: 385 self.metadata[DbObjectType.FUNCTION] = {} 386 for _index, df in self.functions.iterrows(): 387 function = Function( 388 schema_id=self.get_schema_id(df["function_schema"]), 389 function_id=self.get_function_id(df["function_schema"], df["function_name"], df["argument_signature"]), 390 function_name=df["function_name"], 391 function_owner=df["function_owner"], 392 argument_signature=df["argument_signature"], 393 data_type=df["data_type"], 394 character_maximum_length=df["character_maximum_length"], 395 character_octet_length=df["character_octet_length"], 396 numeric_precision=df["numeric_precision"], 397 numeric_precision_radix=df["numeric_precision_radix"], 398 numeric_scale=df["numeric_scale"], 399 function_language=df["function_language"], 400 function_definition=df["function_definition"], 401 volatility=df["volatility"], 402 is_null_call=df["is_null_call"], 403 is_secure=df["is_secure"], 404 comment=df["comment"], 405 created=self.db_timestamp_to_string(df["created"]), 406 last_altered=self.db_timestamp_to_string(df["last_altered"]), 407 is_external=df["is_external"], 408 api_integration=df["api_integration"], 409 context_headers=df["context_headers"], 410 max_batch_rows=df["max_batch_rows"], 411 compression=df["compression"], 412 packages=df["packages"], 413 runtime_version=df["runtime_version"], 414 installed_packages=df["installed_packages"], 415 is_memoizable=df["is_memoizable"], 416 ) 417 function.save(session=session) 418 self.metadata[DbObjectType.FUNCTION][function.function_id] = function 419 420 return True 421 422 def save_procedures(self, session) -> bool: 423 self.metadata[DbObjectType.PROCEDURE] = {} 424 for _index, df in self.procedures.iterrows(): 425 procedure = Procedure( 426 procedure_id=self.get_procedure_id( 427 df["procedure_schema"], df["procedure_name"], df["argument_signature"] 428 ), 429 schema_id=self.get_schema_id(df["procedure_schema"]), 430 procedure_name=df["procedure_name"], 431 procedure_owner=df["procedure_owner"], 432 argument_signature=df["argument_signature"], 433 data_type=df["data_type"], 434 character_maximum_length=df["character_maximum_length"], 435 character_octet_length=df["character_octet_length"], 436 numeric_precision=df["numeric_precision"], 437 numeric_precision_radix=df["numeric_precision_radix"], 438 numeric_scale=df["numeric_scale"], 439 procedure_language=df["procedure_language"], 440 procedure_definition=df["procedure_definition"], 441 comment=df["comment"], 442 created=self.db_timestamp_to_string(df["created"]), 443 last_altered=self.db_timestamp_to_string(df["last_altered"]), 444 ) 445 procedure.save(session=session) 446 self.metadata[DbObjectType.PROCEDURE][procedure.procedure_id] = procedure 447 448 return True 449 450 def save_columns(self, session) -> bool: 451 self.metadata[DbObjectType.COLUMN] = {} 452 failed_rows = [] 453 saved_rows = [] 454 for _index, df in self.columns.iterrows(): 455 column: Column = Column( 456 column_id=self.get_column_id(df["table_schema"], df["table_name"], df["column_name"]), 457 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 458 column_name=df["column_name"], 459 ordinal_position=df["ordinal_position"], 460 column_default=df["column_default"], 461 is_nullable=df["is_nullable"], 462 data_type=df["data_type"], 463 character_maximum_length=df["character_maximum_length"], 464 character_octet_length=df["character_octet_length"], 465 numeric_precision=df["numeric_precision"], 466 numeric_precision_radix=df["numeric_precision_radix"], 467 numeric_scale=df["numeric_scale"], 468 datetime_precision=df["datetime_precision"], 469 is_identity=df["is_identity"], 470 identity_generation=df["identity_generation"], 471 identity_start=df["identity_start"], 472 identity_increment=df["identity_increment"], 473 comment=df["comment"], 474 ) 475 476 try: 477 column.save(session=session) 478 self.metadata[DbObjectType.COLUMN][column.column_id] = column 479 saved_rows.append(column.column_id) 480 except Exception: 481 import ipdb 482 483 ipdb.set_trace() 484 failed_rows.append(column.column_id) 485 486 if len(failed_rows) > 0: 487 for x in failed_rows: 488 log.error(f"failed to save column: {x}") 489 490 return True 491 492 def save_views(self, session) -> bool: 493 self.metadata[DbObjectType.VIEW] = {} 494 for _index, df in self.views.iterrows(): 495 view = View( 496 schema_id=self.get_schema_id(df["schema_name"]), 497 view_id=self.get_view_id(df["schema_name"], df["name"]), 498 view_name=df["name"], 499 view_owner=df["owner"], 500 view_definition=df["text"], 501 is_secure=df["is_secure"], 502 is_materialized=df["is_materialized"], 503 change_tracking=df["change_tracking"], 504 created=self.db_timestamp_to_string(df["created_on"]), 505 owner_role_type=df["owner_role_type"], 506 comment=df["comment"], 507 ) 508 view.save(session=session) 509 self.metadata[DbObjectType.VIEW][view.view_id] = view 510 return True 511 512 def save_schemas(self, schemas_df: pd.DataFrame, session): 513 self.metadata[DbObjectType.SCHEMA] = {} 514 for _index, schema in schemas_df.iterrows(): 515 schema_object = Schema( 516 database_id=self.database_object.database_id, 517 schema_id=self.get_schema_id(schema["schema_name"]), 518 schema_name=schema["schema_name"], 519 schema_owner=schema["schema_owner"], 520 is_transient=schema["is_transient"], 521 comment=schema["comment"], 522 created=self.db_timestamp_to_string(schema["created"]), 523 last_altered=self.db_timestamp_to_string(schema["last_altered"]), 524 retention_time=schema["retention_time"], 525 ) 526 schema_object.save(session=session) 527 self.schemas.append(schema_object) 528 self.metadata[DbObjectType.SCHEMA][schema_object.schema_id] = schema_object
class
MetaData:
26class MetaData: 27 database_object: Database 28 schemas: dict[str, Schema] = [] 29 schemas_to_include: list[str] 30 tables: pd.DataFrame = None 31 columns: pd.DataFrame = None 32 views: pd.DataFrame = None 33 streams: pd.DataFrame = None 34 pipes: pd.DataFrame = None 35 tasks: pd.DataFrame = None 36 procedures: pd.DataFrame = None 37 functions: pd.DataFrame = None 38 table_constraints: pd.DataFrame = None 39 referential_constraints: pd.DataFrame = None 40 constraints: pd.DataFrame = None 41 stages: pd.DataFrame = None 42 column_constraints: pd.DataFrame = None 43 metadata: dict[str, dict[str, CommonBase]] = {} 44 45 def __init__(self, database_object: Database, schemas_to_include: list[str], db_timestamp_to_string): 46 self.database_object = database_object 47 self.metadata[DbObjectType.DATABASE] = {database_object.database_id: database_object} 48 log.info(f"Metadata schemas to include are {schemas_to_include}") 49 self.schemas_to_include = schemas_to_include 50 self.db_timestamp_to_string = db_timestamp_to_string 51 52 def save(self, session): 53 log.info("Save tables") 54 self.save_tables(session=session) 55 log.info("Save columns") 56 self.save_columns(session=session) 57 log.info("Save constraints") 58 self.save_constraints(session=session) 59 log.info("Save column constraints") 60 self.save_column_constraints(session=session) 61 log.info("Save functions") 62 self.save_functions(session=session) 63 log.info("Save pipes") 64 self.save_pipes(session=session) 65 log.info("Save procedures") 66 self.save_procedures(session=session) 67 log.info("Save referential constraints") 68 self.save_referential_constraints(session=session) 69 log.info("Save stages") 70 self.save_stages(session=session) 71 log.info("Save streams") 72 self.save_streams(session=session) 73 log.info("Save table_constraints") 74 self.save_table_constraints(session=session) 75 log.info("Save tasks") 76 self.save_tasks(session=session) 77 log.info("Save views") 78 self.save_views(session=session) 79 80 def database_id(self) -> str: 81 return self.database_object.__get_id__() 82 83 def get_schema_id(self, schema_name: str) -> str: 84 id = json.loads(self.database_id()) 85 id["schema_name"] = schema_name 86 return json.dumps(id) 87 88 def get_table_id(self, schema_name: str, table_name: str) -> str: 89 id = json.loads(self.get_schema_id(schema_name)) 90 id["table_name"] = table_name 91 return json.dumps(id) 92 93 def get_constraint_id(self, schema_name: str, constraint_name: str) -> str: 94 id = json.loads(self.get_schema_id(schema_name)) 95 id["constraint_name"] = constraint_name 96 return json.dumps(id) 97 98 def get_table_constraint_id(self, table_id: str, constraint_name: str) -> str: 99 id = json.loads(table_id) 100 id["constraint_name"] = constraint_name 101 return json.dumps(id) 102 103 def get_view_id(self, schema_name: str, view_name: str) -> str: 104 id = json.loads(self.get_schema_id(schema_name)) 105 id["view_name"] = view_name 106 return json.dumps(id) 107 108 def get_task_id(self, schema_name: str, task_name: str) -> str: 109 id = json.loads(self.get_schema_id(schema_name)) 110 id["task_name"] = task_name 111 return json.dumps(id) 112 113 def get_stream_id(self, schema_name: str, stream_name: str) -> str: 114 id = json.loads(self.get_schema_id(schema_name)) 115 id["stream_name"] = stream_name 116 return json.dumps(id) 117 118 def get_pipe_id(self, schema_name: str, pipe_name: str) -> str: 119 id = json.loads(self.get_schema_id(schema_name)) 120 id["pipe_name"] = pipe_name 121 return json.dumps(id) 122 123 def get_stage_id(self, schema_name: str, stage_name: str) -> str: 124 id = json.loads(self.get_schema_id(schema_name)) 125 id["stage_name"] = stage_name 126 return json.dumps(id) 127 128 def get_column_id(self, schema_name: str, table_name: str, column_name: str) -> str: 129 id = json.loads(self.get_table_id(schema_name, table_name)) 130 id["column_name"] = column_name 131 return json.dumps(id) 132 133 def get_procedure_id(self, schema_name: str, procedure_name: str, argument_signature: str) -> str: 134 id = json.loads(self.get_schema_id(schema_name)) 135 id["procedure_name"] = procedure_name 136 id["argument_signature"] = argument_signature 137 return json.dumps(id) 138 139 def get_function_id(self, schema_name: str, function_name: str, argument_signature: str) -> str: 140 id = json.loads(self.get_schema_id(schema_name)) 141 id["function_name"] = function_name 142 id["argument_signature"] = argument_signature 143 return json.dumps(id) 144 145 def save_tables(self, session) -> bool: 146 self.metadata[DbObjectType.TABLE] = {} 147 for _index, df in self.tables.iterrows(): 148 table = Table( 149 schema_id=self.get_schema_id(df["table_schema"]), 150 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 151 table_name=df["table_name"], 152 table_owner=df["table_owner"], 153 table_type=df["table_type"], 154 is_transient=df["is_transient"], 155 clustering_key=df["clustering_key"], 156 row_count=df["row_count"], 157 comment=df["comment"], 158 bytes=df["bytes"], 159 retention_time=df["retention_time"], 160 created=self.db_timestamp_to_string(df["created"]), 161 last_altered=self.db_timestamp_to_string(df["last_altered"]), 162 last_ddl=self.db_timestamp_to_string(df["last_ddl"]), 163 last_ddl_by=df["last_ddl_by"], 164 auto_clustering_on=df["auto_clustering_on"], 165 change_tracking="True", 166 is_external="False", 167 enable_schema_evolution="False", 168 owner_role_type="DATABASE", 169 is_event="False", 170 ) 171 table.save(session=session) 172 self.metadata[DbObjectType.TABLE][table.table_id] = table 173 174 return True 175 176 def save_column_constraints(self, session) -> bool: 177 self.metadata[DbObjectType.COLUMN_CONSTRAINT] = {} 178 for _index, df in self.column_constraints.iterrows(): 179 column_constraint = ColumnConstraint( 180 pk_column_id=self.get_column_id(df["pk_schema_name"], df["pk_table_name"], df["pk_column_name"]), 181 pk_constraint_id=self.get_constraint_id(df["pk_schema_name"], df["pk_name"]), 182 fk_column_id=self.get_column_id(df["fk_schema_name"], df["fk_table_name"], df["fk_column_name"]), 183 fk_constraint_id=self.get_constraint_id(df["fk_schema_name"], df["fk_name"]), 184 pk_name=df["pk_name"], 185 fk_name=df["fk_name"], 186 key_sequence=df["key_sequence"], 187 comment=df["comment"], 188 created=self.db_timestamp_to_string(df["created_on"]), 189 deferrability=df["deferrability"], 190 rely=df["rely"], 191 update_rule=df["update_rule"], 192 delete_rule=df["delete_rule"], 193 ) 194 column_constraint.column_constraint_id = column_constraint.__get_id__() 195 196 column_constraint.save(session=session) 197 self.metadata[DbObjectType.COLUMN_CONSTRAINT][column_constraint.column_constraint_id] = column_constraint 198 199 return True 200 201 def save_referential_constraints(self, session) -> bool: 202 self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT] = {} 203 for _index, df in self.referential_constraints.iterrows(): 204 referential_constraint = ReferentialConstraint( 205 foreign_key_constraint_id=self.get_constraint_id(df["constraint_schema"], df["constraint_name"]), 206 unique_constraint_id=self.get_constraint_id( 207 df["unique_constraint_schema"], df["unique_constraint_name"] 208 ), 209 fk_name=df["constraint_name"], 210 pk_name=df["unique_constraint_name"], 211 match_option=df["match_option"], 212 update_rule=df["update_rule"], 213 delete_rule=df["delete_rule"], 214 comment=df["comment"], 215 created=self.db_timestamp_to_string(df["created"]), 216 last_altered=self.db_timestamp_to_string(df["last_altered"]), 217 ) 218 referential_constraint.referential_constraint_id = referential_constraint.__get_id__() 219 referential_constraint.save(session=session) 220 self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT][referential_constraint.referential_constraint_id] = ( 221 referential_constraint 222 ) 223 224 return True 225 226 def save_table_constraints(self, session) -> bool: 227 self.metadata[DbObjectType.TABLE_CONSTRAINT] = {} 228 for _index, df in self.table_constraints.iterrows(): 229 table_constraint = TableConstraint( 230 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 231 table_constraint_name=df["constraint_name"], 232 constraint_type=df["constraint_type"], 233 is_deferrable=df["is_deferrable"], 234 initially_deferred=df["initially_deferred"], 235 enforced=df["enforced"], 236 comment=df["comment"], 237 created=self.db_timestamp_to_string(df["created"]), 238 last_altered=self.db_timestamp_to_string(df["last_altered"]), 239 rely=df["rely"], 240 ) 241 table_constraint.table_constraint_id = table_constraint.__get_id__() 242 table_constraint.save(session=session) 243 self.metadata[DbObjectType.TABLE_CONSTRAINT][table_constraint.table_constraint_id] = table_constraint 244 245 return True 246 247 def save_constraints(self, session) -> bool: 248 self.metadata[DbObjectType.CONSTRAINT] = {} 249 for _index, df in self.constraints.iterrows(): 250 constraint = Constraint( 251 table_id=self.get_table_id(df["schema_name"], df["table_name"]), 252 constraint_name=df["constraint_name"], 253 constraint_type=df["constraint_type"], 254 constraint_details=df["constraint_details"], 255 reference_key=df["reference_key"], 256 created=self.db_timestamp_to_string(df["created"]), 257 update_rule=df["update_rule"], 258 delete_rule=df["delete_rule"], 259 ) 260 constraint.constraint_id = constraint.__get_id__() 261 constraint.save(session=session) 262 self.metadata[DbObjectType.CONSTRAINT][constraint.constraint_id] = constraint 263 264 return True 265 266 def save_tasks(self, session) -> bool: 267 self.metadata[DbObjectType.TASK] = {} 268 for _index, df in self.tasks.iterrows(): 269 try: 270 task = Task( 271 id=str(df["id"]), 272 task_id=self.get_task_id(df["schema_name"], df["name"]), 273 schema_id=self.get_schema_id(df["schema_name"]), 274 task_name=df["name"], 275 task_owner=df["owner"], 276 warehouse=df["warehouse"], 277 schedule=df["schedule"], 278 predecessors=df["predecessors"], 279 state=df["state"], 280 definition=df["definition"], 281 condition=df["condition"], 282 allow_overlapping_execution=df["allow_overlapping_execution"], 283 error_integration=df["error_integration"], 284 comment=df["comment"], 285 last_committed=self.db_timestamp_to_string(df["last_committed_on"]), 286 last_suspended=self.db_timestamp_to_string(df["last_suspended_on"]), 287 owner_role_type=df["owner_role_type"], 288 config=df["config"], 289 created=self.db_timestamp_to_string(df["created_on"]), 290 ) 291 292 task.save(session=session) 293 self.metadata[DbObjectType.TASK][task.task_id] = task 294 295 except Exception as e: 296 log.error(f"Failed to save task: {task}, exception: {e}") 297 import ipdb 298 299 ipdb.set_trace() 300 301 return True 302 303 def save_streams(self, session) -> bool: 304 self.metadata[DbObjectType.STREAM] = {} 305 failed_rows = [] 306 for _index, df in self.streams.iterrows(): 307 try: 308 stream = Stream( 309 schema_id=self.get_schema_id(df["schema_name"]), 310 stream_id=self.get_stream_id(df["schema_name"], df["name"]), 311 stream_name=df["name"], 312 stream_owner=df["owner"], 313 comment=df["comment"], 314 table_name=df["table_name"], 315 source_type=df["source_type"], 316 base_tables=df["base_tables"], 317 type=df["type"], 318 stale=df["stale"], 319 mode=df["mode"], 320 stale_after=self.db_timestamp_to_string(df["stale_after"]), 321 invalid_reason=df["invalid_reason"], 322 owner_role_type=df["owner_role_type"], 323 created=self.db_timestamp_to_string(df["created_on"]), 324 ) 325 326 stream.save(session=session) 327 self.metadata[DbObjectType.STREAM][stream.stream_id] = stream 328 329 except Exception as e: 330 log.error(f"Exception saving column:{stream}: {e}") 331 failed_rows.append(stream.stream_id) 332 import ipdb 333 334 ipdb.set_trace() 335 336 return True 337 338 def save_stages(self, session) -> bool: 339 self.metadata[DbObjectType.STAGE] = {} 340 for _index, df in self.stages.iterrows(): 341 stage = Stage( 342 schema_id=self.get_schema_id(df["schema_name"]), 343 stage_id=self.get_stage_id(df["schema_name"], df["name"]), 344 stage_name=df["name"], 345 stage_owner=df["owner"], 346 stage_url=df["url"], 347 stage_region=df["region"], 348 stage_type=df["type"], 349 comment=df["comment"], 350 created=self.db_timestamp_to_string(df["created_on"]), 351 has_credentials=df["has_credentials"], 352 has_encryption_key=df["has_encryption_key"], 353 cloud=df["cloud"], 354 notification_channel=df["notification_channel"], 355 storage_integration=df["storage_integration"], 356 ) 357 358 stage.save(session=session) 359 self.metadata[DbObjectType.STAGE][stage.stage_id] = stage 360 361 return True 362 363 def save_pipes(self, session) -> bool: 364 self.metadata[DbObjectType.SCHEMA] = {} 365 for _index, df in self.pipes.iterrows(): 366 pipe = Pipe( 367 schema_id=self.get_schema_id(df["pipe_schema"]), 368 pipe_id=self.get_pipe_id(df["pipe_schema"], df["pipe_name"]), 369 pipe_name=df["pipe_name"], 370 pipe_owner=df["pipe_owner"], 371 pipe_definition=df["definition"], 372 is_autoingest_enabled=df["is_autoingest_enabled"], 373 notification_channel_name=df["notification_channel_name"], 374 comment=df["comment"], 375 created=self.db_timestamp_to_string(df["created"]), 376 last_altered=self.db_timestamp_to_string(df["last_altered"]), 377 pattern=df["pattern"], 378 ) 379 380 pipe.save(session=session) 381 self.metadata[DbObjectType.SCHEMA][pipe.pipe_id] = pipe 382 383 return True 384 385 def save_functions(self, session) -> bool: 386 self.metadata[DbObjectType.FUNCTION] = {} 387 for _index, df in self.functions.iterrows(): 388 function = Function( 389 schema_id=self.get_schema_id(df["function_schema"]), 390 function_id=self.get_function_id(df["function_schema"], df["function_name"], df["argument_signature"]), 391 function_name=df["function_name"], 392 function_owner=df["function_owner"], 393 argument_signature=df["argument_signature"], 394 data_type=df["data_type"], 395 character_maximum_length=df["character_maximum_length"], 396 character_octet_length=df["character_octet_length"], 397 numeric_precision=df["numeric_precision"], 398 numeric_precision_radix=df["numeric_precision_radix"], 399 numeric_scale=df["numeric_scale"], 400 function_language=df["function_language"], 401 function_definition=df["function_definition"], 402 volatility=df["volatility"], 403 is_null_call=df["is_null_call"], 404 is_secure=df["is_secure"], 405 comment=df["comment"], 406 created=self.db_timestamp_to_string(df["created"]), 407 last_altered=self.db_timestamp_to_string(df["last_altered"]), 408 is_external=df["is_external"], 409 api_integration=df["api_integration"], 410 context_headers=df["context_headers"], 411 max_batch_rows=df["max_batch_rows"], 412 compression=df["compression"], 413 packages=df["packages"], 414 runtime_version=df["runtime_version"], 415 installed_packages=df["installed_packages"], 416 is_memoizable=df["is_memoizable"], 417 ) 418 function.save(session=session) 419 self.metadata[DbObjectType.FUNCTION][function.function_id] = function 420 421 return True 422 423 def save_procedures(self, session) -> bool: 424 self.metadata[DbObjectType.PROCEDURE] = {} 425 for _index, df in self.procedures.iterrows(): 426 procedure = Procedure( 427 procedure_id=self.get_procedure_id( 428 df["procedure_schema"], df["procedure_name"], df["argument_signature"] 429 ), 430 schema_id=self.get_schema_id(df["procedure_schema"]), 431 procedure_name=df["procedure_name"], 432 procedure_owner=df["procedure_owner"], 433 argument_signature=df["argument_signature"], 434 data_type=df["data_type"], 435 character_maximum_length=df["character_maximum_length"], 436 character_octet_length=df["character_octet_length"], 437 numeric_precision=df["numeric_precision"], 438 numeric_precision_radix=df["numeric_precision_radix"], 439 numeric_scale=df["numeric_scale"], 440 procedure_language=df["procedure_language"], 441 procedure_definition=df["procedure_definition"], 442 comment=df["comment"], 443 created=self.db_timestamp_to_string(df["created"]), 444 last_altered=self.db_timestamp_to_string(df["last_altered"]), 445 ) 446 procedure.save(session=session) 447 self.metadata[DbObjectType.PROCEDURE][procedure.procedure_id] = procedure 448 449 return True 450 451 def save_columns(self, session) -> bool: 452 self.metadata[DbObjectType.COLUMN] = {} 453 failed_rows = [] 454 saved_rows = [] 455 for _index, df in self.columns.iterrows(): 456 column: Column = Column( 457 column_id=self.get_column_id(df["table_schema"], df["table_name"], df["column_name"]), 458 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 459 column_name=df["column_name"], 460 ordinal_position=df["ordinal_position"], 461 column_default=df["column_default"], 462 is_nullable=df["is_nullable"], 463 data_type=df["data_type"], 464 character_maximum_length=df["character_maximum_length"], 465 character_octet_length=df["character_octet_length"], 466 numeric_precision=df["numeric_precision"], 467 numeric_precision_radix=df["numeric_precision_radix"], 468 numeric_scale=df["numeric_scale"], 469 datetime_precision=df["datetime_precision"], 470 is_identity=df["is_identity"], 471 identity_generation=df["identity_generation"], 472 identity_start=df["identity_start"], 473 identity_increment=df["identity_increment"], 474 comment=df["comment"], 475 ) 476 477 try: 478 column.save(session=session) 479 self.metadata[DbObjectType.COLUMN][column.column_id] = column 480 saved_rows.append(column.column_id) 481 except Exception: 482 import ipdb 483 484 ipdb.set_trace() 485 failed_rows.append(column.column_id) 486 487 if len(failed_rows) > 0: 488 for x in failed_rows: 489 log.error(f"failed to save column: {x}") 490 491 return True 492 493 def save_views(self, session) -> bool: 494 self.metadata[DbObjectType.VIEW] = {} 495 for _index, df in self.views.iterrows(): 496 view = View( 497 schema_id=self.get_schema_id(df["schema_name"]), 498 view_id=self.get_view_id(df["schema_name"], df["name"]), 499 view_name=df["name"], 500 view_owner=df["owner"], 501 view_definition=df["text"], 502 is_secure=df["is_secure"], 503 is_materialized=df["is_materialized"], 504 change_tracking=df["change_tracking"], 505 created=self.db_timestamp_to_string(df["created_on"]), 506 owner_role_type=df["owner_role_type"], 507 comment=df["comment"], 508 ) 509 view.save(session=session) 510 self.metadata[DbObjectType.VIEW][view.view_id] = view 511 return True 512 513 def save_schemas(self, schemas_df: pd.DataFrame, session): 514 self.metadata[DbObjectType.SCHEMA] = {} 515 for _index, schema in schemas_df.iterrows(): 516 schema_object = Schema( 517 database_id=self.database_object.database_id, 518 schema_id=self.get_schema_id(schema["schema_name"]), 519 schema_name=schema["schema_name"], 520 schema_owner=schema["schema_owner"], 521 is_transient=schema["is_transient"], 522 comment=schema["comment"], 523 created=self.db_timestamp_to_string(schema["created"]), 524 last_altered=self.db_timestamp_to_string(schema["last_altered"]), 525 retention_time=schema["retention_time"], 526 ) 527 schema_object.save(session=session) 528 self.schemas.append(schema_object) 529 self.metadata[DbObjectType.SCHEMA][schema_object.schema_id] = schema_object
MetaData( database_object: schema_sentinel.metadata_manager.model.database.Database, schemas_to_include: list[str], db_timestamp_to_string)
45 def __init__(self, database_object: Database, schemas_to_include: list[str], db_timestamp_to_string): 46 self.database_object = database_object 47 self.metadata[DbObjectType.DATABASE] = {database_object.database_id: database_object} 48 log.info(f"Metadata schemas to include are {schemas_to_include}") 49 self.schemas_to_include = schemas_to_include 50 self.db_timestamp_to_string = db_timestamp_to_string
database_object: schema_sentinel.metadata_manager.model.database.Database
def
save(self, session):
52 def save(self, session): 53 log.info("Save tables") 54 self.save_tables(session=session) 55 log.info("Save columns") 56 self.save_columns(session=session) 57 log.info("Save constraints") 58 self.save_constraints(session=session) 59 log.info("Save column constraints") 60 self.save_column_constraints(session=session) 61 log.info("Save functions") 62 self.save_functions(session=session) 63 log.info("Save pipes") 64 self.save_pipes(session=session) 65 log.info("Save procedures") 66 self.save_procedures(session=session) 67 log.info("Save referential constraints") 68 self.save_referential_constraints(session=session) 69 log.info("Save stages") 70 self.save_stages(session=session) 71 log.info("Save streams") 72 self.save_streams(session=session) 73 log.info("Save table_constraints") 74 self.save_table_constraints(session=session) 75 log.info("Save tasks") 76 self.save_tasks(session=session) 77 log.info("Save views") 78 self.save_views(session=session)
def
save_tables(self, session) -> bool:
145 def save_tables(self, session) -> bool: 146 self.metadata[DbObjectType.TABLE] = {} 147 for _index, df in self.tables.iterrows(): 148 table = Table( 149 schema_id=self.get_schema_id(df["table_schema"]), 150 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 151 table_name=df["table_name"], 152 table_owner=df["table_owner"], 153 table_type=df["table_type"], 154 is_transient=df["is_transient"], 155 clustering_key=df["clustering_key"], 156 row_count=df["row_count"], 157 comment=df["comment"], 158 bytes=df["bytes"], 159 retention_time=df["retention_time"], 160 created=self.db_timestamp_to_string(df["created"]), 161 last_altered=self.db_timestamp_to_string(df["last_altered"]), 162 last_ddl=self.db_timestamp_to_string(df["last_ddl"]), 163 last_ddl_by=df["last_ddl_by"], 164 auto_clustering_on=df["auto_clustering_on"], 165 change_tracking="True", 166 is_external="False", 167 enable_schema_evolution="False", 168 owner_role_type="DATABASE", 169 is_event="False", 170 ) 171 table.save(session=session) 172 self.metadata[DbObjectType.TABLE][table.table_id] = table 173 174 return True
def
save_column_constraints(self, session) -> bool:
176 def save_column_constraints(self, session) -> bool: 177 self.metadata[DbObjectType.COLUMN_CONSTRAINT] = {} 178 for _index, df in self.column_constraints.iterrows(): 179 column_constraint = ColumnConstraint( 180 pk_column_id=self.get_column_id(df["pk_schema_name"], df["pk_table_name"], df["pk_column_name"]), 181 pk_constraint_id=self.get_constraint_id(df["pk_schema_name"], df["pk_name"]), 182 fk_column_id=self.get_column_id(df["fk_schema_name"], df["fk_table_name"], df["fk_column_name"]), 183 fk_constraint_id=self.get_constraint_id(df["fk_schema_name"], df["fk_name"]), 184 pk_name=df["pk_name"], 185 fk_name=df["fk_name"], 186 key_sequence=df["key_sequence"], 187 comment=df["comment"], 188 created=self.db_timestamp_to_string(df["created_on"]), 189 deferrability=df["deferrability"], 190 rely=df["rely"], 191 update_rule=df["update_rule"], 192 delete_rule=df["delete_rule"], 193 ) 194 column_constraint.column_constraint_id = column_constraint.__get_id__() 195 196 column_constraint.save(session=session) 197 self.metadata[DbObjectType.COLUMN_CONSTRAINT][column_constraint.column_constraint_id] = column_constraint 198 199 return True
def
save_referential_constraints(self, session) -> bool:
201 def save_referential_constraints(self, session) -> bool: 202 self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT] = {} 203 for _index, df in self.referential_constraints.iterrows(): 204 referential_constraint = ReferentialConstraint( 205 foreign_key_constraint_id=self.get_constraint_id(df["constraint_schema"], df["constraint_name"]), 206 unique_constraint_id=self.get_constraint_id( 207 df["unique_constraint_schema"], df["unique_constraint_name"] 208 ), 209 fk_name=df["constraint_name"], 210 pk_name=df["unique_constraint_name"], 211 match_option=df["match_option"], 212 update_rule=df["update_rule"], 213 delete_rule=df["delete_rule"], 214 comment=df["comment"], 215 created=self.db_timestamp_to_string(df["created"]), 216 last_altered=self.db_timestamp_to_string(df["last_altered"]), 217 ) 218 referential_constraint.referential_constraint_id = referential_constraint.__get_id__() 219 referential_constraint.save(session=session) 220 self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT][referential_constraint.referential_constraint_id] = ( 221 referential_constraint 222 ) 223 224 return True
def
save_table_constraints(self, session) -> bool:
226 def save_table_constraints(self, session) -> bool: 227 self.metadata[DbObjectType.TABLE_CONSTRAINT] = {} 228 for _index, df in self.table_constraints.iterrows(): 229 table_constraint = TableConstraint( 230 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 231 table_constraint_name=df["constraint_name"], 232 constraint_type=df["constraint_type"], 233 is_deferrable=df["is_deferrable"], 234 initially_deferred=df["initially_deferred"], 235 enforced=df["enforced"], 236 comment=df["comment"], 237 created=self.db_timestamp_to_string(df["created"]), 238 last_altered=self.db_timestamp_to_string(df["last_altered"]), 239 rely=df["rely"], 240 ) 241 table_constraint.table_constraint_id = table_constraint.__get_id__() 242 table_constraint.save(session=session) 243 self.metadata[DbObjectType.TABLE_CONSTRAINT][table_constraint.table_constraint_id] = table_constraint 244 245 return True
def
save_constraints(self, session) -> bool:
247 def save_constraints(self, session) -> bool: 248 self.metadata[DbObjectType.CONSTRAINT] = {} 249 for _index, df in self.constraints.iterrows(): 250 constraint = Constraint( 251 table_id=self.get_table_id(df["schema_name"], df["table_name"]), 252 constraint_name=df["constraint_name"], 253 constraint_type=df["constraint_type"], 254 constraint_details=df["constraint_details"], 255 reference_key=df["reference_key"], 256 created=self.db_timestamp_to_string(df["created"]), 257 update_rule=df["update_rule"], 258 delete_rule=df["delete_rule"], 259 ) 260 constraint.constraint_id = constraint.__get_id__() 261 constraint.save(session=session) 262 self.metadata[DbObjectType.CONSTRAINT][constraint.constraint_id] = constraint 263 264 return True
def
save_tasks(self, session) -> bool:
266 def save_tasks(self, session) -> bool: 267 self.metadata[DbObjectType.TASK] = {} 268 for _index, df in self.tasks.iterrows(): 269 try: 270 task = Task( 271 id=str(df["id"]), 272 task_id=self.get_task_id(df["schema_name"], df["name"]), 273 schema_id=self.get_schema_id(df["schema_name"]), 274 task_name=df["name"], 275 task_owner=df["owner"], 276 warehouse=df["warehouse"], 277 schedule=df["schedule"], 278 predecessors=df["predecessors"], 279 state=df["state"], 280 definition=df["definition"], 281 condition=df["condition"], 282 allow_overlapping_execution=df["allow_overlapping_execution"], 283 error_integration=df["error_integration"], 284 comment=df["comment"], 285 last_committed=self.db_timestamp_to_string(df["last_committed_on"]), 286 last_suspended=self.db_timestamp_to_string(df["last_suspended_on"]), 287 owner_role_type=df["owner_role_type"], 288 config=df["config"], 289 created=self.db_timestamp_to_string(df["created_on"]), 290 ) 291 292 task.save(session=session) 293 self.metadata[DbObjectType.TASK][task.task_id] = task 294 295 except Exception as e: 296 log.error(f"Failed to save task: {task}, exception: {e}") 297 import ipdb 298 299 ipdb.set_trace() 300 301 return True
def
save_streams(self, session) -> bool:
303 def save_streams(self, session) -> bool: 304 self.metadata[DbObjectType.STREAM] = {} 305 failed_rows = [] 306 for _index, df in self.streams.iterrows(): 307 try: 308 stream = Stream( 309 schema_id=self.get_schema_id(df["schema_name"]), 310 stream_id=self.get_stream_id(df["schema_name"], df["name"]), 311 stream_name=df["name"], 312 stream_owner=df["owner"], 313 comment=df["comment"], 314 table_name=df["table_name"], 315 source_type=df["source_type"], 316 base_tables=df["base_tables"], 317 type=df["type"], 318 stale=df["stale"], 319 mode=df["mode"], 320 stale_after=self.db_timestamp_to_string(df["stale_after"]), 321 invalid_reason=df["invalid_reason"], 322 owner_role_type=df["owner_role_type"], 323 created=self.db_timestamp_to_string(df["created_on"]), 324 ) 325 326 stream.save(session=session) 327 self.metadata[DbObjectType.STREAM][stream.stream_id] = stream 328 329 except Exception as e: 330 log.error(f"Exception saving column:{stream}: {e}") 331 failed_rows.append(stream.stream_id) 332 import ipdb 333 334 ipdb.set_trace() 335 336 return True
def
save_stages(self, session) -> bool:
338 def save_stages(self, session) -> bool: 339 self.metadata[DbObjectType.STAGE] = {} 340 for _index, df in self.stages.iterrows(): 341 stage = Stage( 342 schema_id=self.get_schema_id(df["schema_name"]), 343 stage_id=self.get_stage_id(df["schema_name"], df["name"]), 344 stage_name=df["name"], 345 stage_owner=df["owner"], 346 stage_url=df["url"], 347 stage_region=df["region"], 348 stage_type=df["type"], 349 comment=df["comment"], 350 created=self.db_timestamp_to_string(df["created_on"]), 351 has_credentials=df["has_credentials"], 352 has_encryption_key=df["has_encryption_key"], 353 cloud=df["cloud"], 354 notification_channel=df["notification_channel"], 355 storage_integration=df["storage_integration"], 356 ) 357 358 stage.save(session=session) 359 self.metadata[DbObjectType.STAGE][stage.stage_id] = stage 360 361 return True
def
save_pipes(self, session) -> bool:
363 def save_pipes(self, session) -> bool: 364 self.metadata[DbObjectType.SCHEMA] = {} 365 for _index, df in self.pipes.iterrows(): 366 pipe = Pipe( 367 schema_id=self.get_schema_id(df["pipe_schema"]), 368 pipe_id=self.get_pipe_id(df["pipe_schema"], df["pipe_name"]), 369 pipe_name=df["pipe_name"], 370 pipe_owner=df["pipe_owner"], 371 pipe_definition=df["definition"], 372 is_autoingest_enabled=df["is_autoingest_enabled"], 373 notification_channel_name=df["notification_channel_name"], 374 comment=df["comment"], 375 created=self.db_timestamp_to_string(df["created"]), 376 last_altered=self.db_timestamp_to_string(df["last_altered"]), 377 pattern=df["pattern"], 378 ) 379 380 pipe.save(session=session) 381 self.metadata[DbObjectType.SCHEMA][pipe.pipe_id] = pipe 382 383 return True
def
save_functions(self, session) -> bool:
385 def save_functions(self, session) -> bool: 386 self.metadata[DbObjectType.FUNCTION] = {} 387 for _index, df in self.functions.iterrows(): 388 function = Function( 389 schema_id=self.get_schema_id(df["function_schema"]), 390 function_id=self.get_function_id(df["function_schema"], df["function_name"], df["argument_signature"]), 391 function_name=df["function_name"], 392 function_owner=df["function_owner"], 393 argument_signature=df["argument_signature"], 394 data_type=df["data_type"], 395 character_maximum_length=df["character_maximum_length"], 396 character_octet_length=df["character_octet_length"], 397 numeric_precision=df["numeric_precision"], 398 numeric_precision_radix=df["numeric_precision_radix"], 399 numeric_scale=df["numeric_scale"], 400 function_language=df["function_language"], 401 function_definition=df["function_definition"], 402 volatility=df["volatility"], 403 is_null_call=df["is_null_call"], 404 is_secure=df["is_secure"], 405 comment=df["comment"], 406 created=self.db_timestamp_to_string(df["created"]), 407 last_altered=self.db_timestamp_to_string(df["last_altered"]), 408 is_external=df["is_external"], 409 api_integration=df["api_integration"], 410 context_headers=df["context_headers"], 411 max_batch_rows=df["max_batch_rows"], 412 compression=df["compression"], 413 packages=df["packages"], 414 runtime_version=df["runtime_version"], 415 installed_packages=df["installed_packages"], 416 is_memoizable=df["is_memoizable"], 417 ) 418 function.save(session=session) 419 self.metadata[DbObjectType.FUNCTION][function.function_id] = function 420 421 return True
def
save_procedures(self, session) -> bool:
423 def save_procedures(self, session) -> bool: 424 self.metadata[DbObjectType.PROCEDURE] = {} 425 for _index, df in self.procedures.iterrows(): 426 procedure = Procedure( 427 procedure_id=self.get_procedure_id( 428 df["procedure_schema"], df["procedure_name"], df["argument_signature"] 429 ), 430 schema_id=self.get_schema_id(df["procedure_schema"]), 431 procedure_name=df["procedure_name"], 432 procedure_owner=df["procedure_owner"], 433 argument_signature=df["argument_signature"], 434 data_type=df["data_type"], 435 character_maximum_length=df["character_maximum_length"], 436 character_octet_length=df["character_octet_length"], 437 numeric_precision=df["numeric_precision"], 438 numeric_precision_radix=df["numeric_precision_radix"], 439 numeric_scale=df["numeric_scale"], 440 procedure_language=df["procedure_language"], 441 procedure_definition=df["procedure_definition"], 442 comment=df["comment"], 443 created=self.db_timestamp_to_string(df["created"]), 444 last_altered=self.db_timestamp_to_string(df["last_altered"]), 445 ) 446 procedure.save(session=session) 447 self.metadata[DbObjectType.PROCEDURE][procedure.procedure_id] = procedure 448 449 return True
def
save_columns(self, session) -> bool:
451 def save_columns(self, session) -> bool: 452 self.metadata[DbObjectType.COLUMN] = {} 453 failed_rows = [] 454 saved_rows = [] 455 for _index, df in self.columns.iterrows(): 456 column: Column = Column( 457 column_id=self.get_column_id(df["table_schema"], df["table_name"], df["column_name"]), 458 table_id=self.get_table_id(df["table_schema"], df["table_name"]), 459 column_name=df["column_name"], 460 ordinal_position=df["ordinal_position"], 461 column_default=df["column_default"], 462 is_nullable=df["is_nullable"], 463 data_type=df["data_type"], 464 character_maximum_length=df["character_maximum_length"], 465 character_octet_length=df["character_octet_length"], 466 numeric_precision=df["numeric_precision"], 467 numeric_precision_radix=df["numeric_precision_radix"], 468 numeric_scale=df["numeric_scale"], 469 datetime_precision=df["datetime_precision"], 470 is_identity=df["is_identity"], 471 identity_generation=df["identity_generation"], 472 identity_start=df["identity_start"], 473 identity_increment=df["identity_increment"], 474 comment=df["comment"], 475 ) 476 477 try: 478 column.save(session=session) 479 self.metadata[DbObjectType.COLUMN][column.column_id] = column 480 saved_rows.append(column.column_id) 481 except Exception: 482 import ipdb 483 484 ipdb.set_trace() 485 failed_rows.append(column.column_id) 486 487 if len(failed_rows) > 0: 488 for x in failed_rows: 489 log.error(f"failed to save column: {x}") 490 491 return True
def
save_views(self, session) -> bool:
493 def save_views(self, session) -> bool: 494 self.metadata[DbObjectType.VIEW] = {} 495 for _index, df in self.views.iterrows(): 496 view = View( 497 schema_id=self.get_schema_id(df["schema_name"]), 498 view_id=self.get_view_id(df["schema_name"], df["name"]), 499 view_name=df["name"], 500 view_owner=df["owner"], 501 view_definition=df["text"], 502 is_secure=df["is_secure"], 503 is_materialized=df["is_materialized"], 504 change_tracking=df["change_tracking"], 505 created=self.db_timestamp_to_string(df["created_on"]), 506 owner_role_type=df["owner_role_type"], 507 comment=df["comment"], 508 ) 509 view.save(session=session) 510 self.metadata[DbObjectType.VIEW][view.view_id] = view 511 return True
def
save_schemas(self, schemas_df: pandas.core.frame.DataFrame, session):
513 def save_schemas(self, schemas_df: pd.DataFrame, session): 514 self.metadata[DbObjectType.SCHEMA] = {} 515 for _index, schema in schemas_df.iterrows(): 516 schema_object = Schema( 517 database_id=self.database_object.database_id, 518 schema_id=self.get_schema_id(schema["schema_name"]), 519 schema_name=schema["schema_name"], 520 schema_owner=schema["schema_owner"], 521 is_transient=schema["is_transient"], 522 comment=schema["comment"], 523 created=self.db_timestamp_to_string(schema["created"]), 524 last_altered=self.db_timestamp_to_string(schema["last_altered"]), 525 retention_time=schema["retention_time"], 526 ) 527 schema_object.save(session=session) 528 self.schemas.append(schema_object) 529 self.metadata[DbObjectType.SCHEMA][schema_object.schema_id] = schema_object