schema_sentinel.metadata_manager.model.metadata_container

  1import json
  2import logging as log
  3
  4import pandas as pd
  5
  6from ..enums import DbObjectType
  7from . import CommonBase
  8from .column import Column
  9from .column_constraint import ColumnConstraint
 10from .constraint import Constraint
 11from .database import Database
 12from .function import Function
 13from .pipe import Pipe
 14from .procedure import Procedure
 15from .referential_constraint import ReferentialConstraint
 16from .schema import Schema
 17from .stage import Stage
 18from .stream import Stream
 19from .table import Table
 20from .table_constraint import TableConstraint
 21from .task import Task
 22from .view import View
 23
 24
 25class MetaData:
 26    database_object: Database
 27    schemas: dict[str, Schema] = []
 28    schemas_to_include: list[str]
 29    tables: pd.DataFrame = None
 30    columns: pd.DataFrame = None
 31    views: pd.DataFrame = None
 32    streams: pd.DataFrame = None
 33    pipes: pd.DataFrame = None
 34    tasks: pd.DataFrame = None
 35    procedures: pd.DataFrame = None
 36    functions: pd.DataFrame = None
 37    table_constraints: pd.DataFrame = None
 38    referential_constraints: pd.DataFrame = None
 39    constraints: pd.DataFrame = None
 40    stages: pd.DataFrame = None
 41    column_constraints: pd.DataFrame = None
 42    metadata: dict[str, dict[str, CommonBase]] = {}
 43
 44    def __init__(self, database_object: Database, schemas_to_include: list[str], db_timestamp_to_string):
 45        self.database_object = database_object
 46        self.metadata[DbObjectType.DATABASE] = {database_object.database_id: database_object}
 47        log.info(f"Metadata schemas to include are {schemas_to_include}")
 48        self.schemas_to_include = schemas_to_include
 49        self.db_timestamp_to_string = db_timestamp_to_string
 50
 51    def save(self, session):
 52        log.info("Save tables")
 53        self.save_tables(session=session)
 54        log.info("Save columns")
 55        self.save_columns(session=session)
 56        log.info("Save constraints")
 57        self.save_constraints(session=session)
 58        log.info("Save column constraints")
 59        self.save_column_constraints(session=session)
 60        log.info("Save functions")
 61        self.save_functions(session=session)
 62        log.info("Save pipes")
 63        self.save_pipes(session=session)
 64        log.info("Save procedures")
 65        self.save_procedures(session=session)
 66        log.info("Save referential constraints")
 67        self.save_referential_constraints(session=session)
 68        log.info("Save stages")
 69        self.save_stages(session=session)
 70        log.info("Save streams")
 71        self.save_streams(session=session)
 72        log.info("Save table_constraints")
 73        self.save_table_constraints(session=session)
 74        log.info("Save tasks")
 75        self.save_tasks(session=session)
 76        log.info("Save views")
 77        self.save_views(session=session)
 78
 79    def database_id(self) -> str:
 80        return self.database_object.__get_id__()
 81
 82    def get_schema_id(self, schema_name: str) -> str:
 83        id = json.loads(self.database_id())
 84        id["schema_name"] = schema_name
 85        return json.dumps(id)
 86
 87    def get_table_id(self, schema_name: str, table_name: str) -> str:
 88        id = json.loads(self.get_schema_id(schema_name))
 89        id["table_name"] = table_name
 90        return json.dumps(id)
 91
 92    def get_constraint_id(self, schema_name: str, constraint_name: str) -> str:
 93        id = json.loads(self.get_schema_id(schema_name))
 94        id["constraint_name"] = constraint_name
 95        return json.dumps(id)
 96
 97    def get_table_constraint_id(self, table_id: str, constraint_name: str) -> str:
 98        id = json.loads(table_id)
 99        id["constraint_name"] = constraint_name
100        return json.dumps(id)
101
102    def get_view_id(self, schema_name: str, view_name: str) -> str:
103        id = json.loads(self.get_schema_id(schema_name))
104        id["view_name"] = view_name
105        return json.dumps(id)
106
107    def get_task_id(self, schema_name: str, task_name: str) -> str:
108        id = json.loads(self.get_schema_id(schema_name))
109        id["task_name"] = task_name
110        return json.dumps(id)
111
112    def get_stream_id(self, schema_name: str, stream_name: str) -> str:
113        id = json.loads(self.get_schema_id(schema_name))
114        id["stream_name"] = stream_name
115        return json.dumps(id)
116
117    def get_pipe_id(self, schema_name: str, pipe_name: str) -> str:
118        id = json.loads(self.get_schema_id(schema_name))
119        id["pipe_name"] = pipe_name
120        return json.dumps(id)
121
122    def get_stage_id(self, schema_name: str, stage_name: str) -> str:
123        id = json.loads(self.get_schema_id(schema_name))
124        id["stage_name"] = stage_name
125        return json.dumps(id)
126
127    def get_column_id(self, schema_name: str, table_name: str, column_name: str) -> str:
128        id = json.loads(self.get_table_id(schema_name, table_name))
129        id["column_name"] = column_name
130        return json.dumps(id)
131
132    def get_procedure_id(self, schema_name: str, procedure_name: str, argument_signature: str) -> str:
133        id = json.loads(self.get_schema_id(schema_name))
134        id["procedure_name"] = procedure_name
135        id["argument_signature"] = argument_signature
136        return json.dumps(id)
137
138    def get_function_id(self, schema_name: str, function_name: str, argument_signature: str) -> str:
139        id = json.loads(self.get_schema_id(schema_name))
140        id["function_name"] = function_name
141        id["argument_signature"] = argument_signature
142        return json.dumps(id)
143
144    def save_tables(self, session) -> bool:
145        self.metadata[DbObjectType.TABLE] = {}
146        for _index, df in self.tables.iterrows():
147            table = Table(
148                schema_id=self.get_schema_id(df["table_schema"]),
149                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
150                table_name=df["table_name"],
151                table_owner=df["table_owner"],
152                table_type=df["table_type"],
153                is_transient=df["is_transient"],
154                clustering_key=df["clustering_key"],
155                row_count=df["row_count"],
156                comment=df["comment"],
157                bytes=df["bytes"],
158                retention_time=df["retention_time"],
159                created=self.db_timestamp_to_string(df["created"]),
160                last_altered=self.db_timestamp_to_string(df["last_altered"]),
161                last_ddl=self.db_timestamp_to_string(df["last_ddl"]),
162                last_ddl_by=df["last_ddl_by"],
163                auto_clustering_on=df["auto_clustering_on"],
164                change_tracking="True",
165                is_external="False",
166                enable_schema_evolution="False",
167                owner_role_type="DATABASE",
168                is_event="False",
169            )
170            table.save(session=session)
171            self.metadata[DbObjectType.TABLE][table.table_id] = table
172
173        return True
174
175    def save_column_constraints(self, session) -> bool:
176        self.metadata[DbObjectType.COLUMN_CONSTRAINT] = {}
177        for _index, df in self.column_constraints.iterrows():
178            column_constraint = ColumnConstraint(
179                pk_column_id=self.get_column_id(df["pk_schema_name"], df["pk_table_name"], df["pk_column_name"]),
180                pk_constraint_id=self.get_constraint_id(df["pk_schema_name"], df["pk_name"]),
181                fk_column_id=self.get_column_id(df["fk_schema_name"], df["fk_table_name"], df["fk_column_name"]),
182                fk_constraint_id=self.get_constraint_id(df["fk_schema_name"], df["fk_name"]),
183                pk_name=df["pk_name"],
184                fk_name=df["fk_name"],
185                key_sequence=df["key_sequence"],
186                comment=df["comment"],
187                created=self.db_timestamp_to_string(df["created_on"]),
188                deferrability=df["deferrability"],
189                rely=df["rely"],
190                update_rule=df["update_rule"],
191                delete_rule=df["delete_rule"],
192            )
193            column_constraint.column_constraint_id = column_constraint.__get_id__()
194
195            column_constraint.save(session=session)
196            self.metadata[DbObjectType.COLUMN_CONSTRAINT][column_constraint.column_constraint_id] = column_constraint
197
198        return True
199
200    def save_referential_constraints(self, session) -> bool:
201        self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT] = {}
202        for _index, df in self.referential_constraints.iterrows():
203            referential_constraint = ReferentialConstraint(
204                foreign_key_constraint_id=self.get_constraint_id(df["constraint_schema"], df["constraint_name"]),
205                unique_constraint_id=self.get_constraint_id(
206                    df["unique_constraint_schema"], df["unique_constraint_name"]
207                ),
208                fk_name=df["constraint_name"],
209                pk_name=df["unique_constraint_name"],
210                match_option=df["match_option"],
211                update_rule=df["update_rule"],
212                delete_rule=df["delete_rule"],
213                comment=df["comment"],
214                created=self.db_timestamp_to_string(df["created"]),
215                last_altered=self.db_timestamp_to_string(df["last_altered"]),
216            )
217            referential_constraint.referential_constraint_id = referential_constraint.__get_id__()
218            referential_constraint.save(session=session)
219            self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT][referential_constraint.referential_constraint_id] = (
220                referential_constraint
221            )
222
223        return True
224
225    def save_table_constraints(self, session) -> bool:
226        self.metadata[DbObjectType.TABLE_CONSTRAINT] = {}
227        for _index, df in self.table_constraints.iterrows():
228            table_constraint = TableConstraint(
229                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
230                table_constraint_name=df["constraint_name"],
231                constraint_type=df["constraint_type"],
232                is_deferrable=df["is_deferrable"],
233                initially_deferred=df["initially_deferred"],
234                enforced=df["enforced"],
235                comment=df["comment"],
236                created=self.db_timestamp_to_string(df["created"]),
237                last_altered=self.db_timestamp_to_string(df["last_altered"]),
238                rely=df["rely"],
239            )
240            table_constraint.table_constraint_id = table_constraint.__get_id__()
241            table_constraint.save(session=session)
242            self.metadata[DbObjectType.TABLE_CONSTRAINT][table_constraint.table_constraint_id] = table_constraint
243
244        return True
245
246    def save_constraints(self, session) -> bool:
247        self.metadata[DbObjectType.CONSTRAINT] = {}
248        for _index, df in self.constraints.iterrows():
249            constraint = Constraint(
250                table_id=self.get_table_id(df["schema_name"], df["table_name"]),
251                constraint_name=df["constraint_name"],
252                constraint_type=df["constraint_type"],
253                constraint_details=df["constraint_details"],
254                reference_key=df["reference_key"],
255                created=self.db_timestamp_to_string(df["created"]),
256                update_rule=df["update_rule"],
257                delete_rule=df["delete_rule"],
258            )
259            constraint.constraint_id = constraint.__get_id__()
260            constraint.save(session=session)
261            self.metadata[DbObjectType.CONSTRAINT][constraint.constraint_id] = constraint
262
263        return True
264
265    def save_tasks(self, session) -> bool:
266        self.metadata[DbObjectType.TASK] = {}
267        for _index, df in self.tasks.iterrows():
268            try:
269                task = Task(
270                    id=str(df["id"]),
271                    task_id=self.get_task_id(df["schema_name"], df["name"]),
272                    schema_id=self.get_schema_id(df["schema_name"]),
273                    task_name=df["name"],
274                    task_owner=df["owner"],
275                    warehouse=df["warehouse"],
276                    schedule=df["schedule"],
277                    predecessors=df["predecessors"],
278                    state=df["state"],
279                    definition=df["definition"],
280                    condition=df["condition"],
281                    allow_overlapping_execution=df["allow_overlapping_execution"],
282                    error_integration=df["error_integration"],
283                    comment=df["comment"],
284                    last_committed=self.db_timestamp_to_string(df["last_committed_on"]),
285                    last_suspended=self.db_timestamp_to_string(df["last_suspended_on"]),
286                    owner_role_type=df["owner_role_type"],
287                    config=df["config"],
288                    created=self.db_timestamp_to_string(df["created_on"]),
289                )
290
291                task.save(session=session)
292                self.metadata[DbObjectType.TASK][task.task_id] = task
293
294            except Exception as e:
295                log.error(f"Failed to save task: {task}, exception: {e}")
296                import ipdb
297
298                ipdb.set_trace()
299
300        return True
301
302    def save_streams(self, session) -> bool:
303        self.metadata[DbObjectType.STREAM] = {}
304        failed_rows = []
305        for _index, df in self.streams.iterrows():
306            try:
307                stream = Stream(
308                    schema_id=self.get_schema_id(df["schema_name"]),
309                    stream_id=self.get_stream_id(df["schema_name"], df["name"]),
310                    stream_name=df["name"],
311                    stream_owner=df["owner"],
312                    comment=df["comment"],
313                    table_name=df["table_name"],
314                    source_type=df["source_type"],
315                    base_tables=df["base_tables"],
316                    type=df["type"],
317                    stale=df["stale"],
318                    mode=df["mode"],
319                    stale_after=self.db_timestamp_to_string(df["stale_after"]),
320                    invalid_reason=df["invalid_reason"],
321                    owner_role_type=df["owner_role_type"],
322                    created=self.db_timestamp_to_string(df["created_on"]),
323                )
324
325                stream.save(session=session)
326                self.metadata[DbObjectType.STREAM][stream.stream_id] = stream
327
328            except Exception as e:
329                log.error(f"Exception saving column:{stream}: {e}")
330                failed_rows.append(stream.stream_id)
331                import ipdb
332
333                ipdb.set_trace()
334
335        return True
336
337    def save_stages(self, session) -> bool:
338        self.metadata[DbObjectType.STAGE] = {}
339        for _index, df in self.stages.iterrows():
340            stage = Stage(
341                schema_id=self.get_schema_id(df["schema_name"]),
342                stage_id=self.get_stage_id(df["schema_name"], df["name"]),
343                stage_name=df["name"],
344                stage_owner=df["owner"],
345                stage_url=df["url"],
346                stage_region=df["region"],
347                stage_type=df["type"],
348                comment=df["comment"],
349                created=self.db_timestamp_to_string(df["created_on"]),
350                has_credentials=df["has_credentials"],
351                has_encryption_key=df["has_encryption_key"],
352                cloud=df["cloud"],
353                notification_channel=df["notification_channel"],
354                storage_integration=df["storage_integration"],
355            )
356
357            stage.save(session=session)
358            self.metadata[DbObjectType.STAGE][stage.stage_id] = stage
359
360        return True
361
362    def save_pipes(self, session) -> bool:
363        self.metadata[DbObjectType.SCHEMA] = {}
364        for _index, df in self.pipes.iterrows():
365            pipe = Pipe(
366                schema_id=self.get_schema_id(df["pipe_schema"]),
367                pipe_id=self.get_pipe_id(df["pipe_schema"], df["pipe_name"]),
368                pipe_name=df["pipe_name"],
369                pipe_owner=df["pipe_owner"],
370                pipe_definition=df["definition"],
371                is_autoingest_enabled=df["is_autoingest_enabled"],
372                notification_channel_name=df["notification_channel_name"],
373                comment=df["comment"],
374                created=self.db_timestamp_to_string(df["created"]),
375                last_altered=self.db_timestamp_to_string(df["last_altered"]),
376                pattern=df["pattern"],
377            )
378
379            pipe.save(session=session)
380            self.metadata[DbObjectType.SCHEMA][pipe.pipe_id] = pipe
381
382        return True
383
384    def save_functions(self, session) -> bool:
385        self.metadata[DbObjectType.FUNCTION] = {}
386        for _index, df in self.functions.iterrows():
387            function = Function(
388                schema_id=self.get_schema_id(df["function_schema"]),
389                function_id=self.get_function_id(df["function_schema"], df["function_name"], df["argument_signature"]),
390                function_name=df["function_name"],
391                function_owner=df["function_owner"],
392                argument_signature=df["argument_signature"],
393                data_type=df["data_type"],
394                character_maximum_length=df["character_maximum_length"],
395                character_octet_length=df["character_octet_length"],
396                numeric_precision=df["numeric_precision"],
397                numeric_precision_radix=df["numeric_precision_radix"],
398                numeric_scale=df["numeric_scale"],
399                function_language=df["function_language"],
400                function_definition=df["function_definition"],
401                volatility=df["volatility"],
402                is_null_call=df["is_null_call"],
403                is_secure=df["is_secure"],
404                comment=df["comment"],
405                created=self.db_timestamp_to_string(df["created"]),
406                last_altered=self.db_timestamp_to_string(df["last_altered"]),
407                is_external=df["is_external"],
408                api_integration=df["api_integration"],
409                context_headers=df["context_headers"],
410                max_batch_rows=df["max_batch_rows"],
411                compression=df["compression"],
412                packages=df["packages"],
413                runtime_version=df["runtime_version"],
414                installed_packages=df["installed_packages"],
415                is_memoizable=df["is_memoizable"],
416            )
417            function.save(session=session)
418            self.metadata[DbObjectType.FUNCTION][function.function_id] = function
419
420        return True
421
422    def save_procedures(self, session) -> bool:
423        self.metadata[DbObjectType.PROCEDURE] = {}
424        for _index, df in self.procedures.iterrows():
425            procedure = Procedure(
426                procedure_id=self.get_procedure_id(
427                    df["procedure_schema"], df["procedure_name"], df["argument_signature"]
428                ),
429                schema_id=self.get_schema_id(df["procedure_schema"]),
430                procedure_name=df["procedure_name"],
431                procedure_owner=df["procedure_owner"],
432                argument_signature=df["argument_signature"],
433                data_type=df["data_type"],
434                character_maximum_length=df["character_maximum_length"],
435                character_octet_length=df["character_octet_length"],
436                numeric_precision=df["numeric_precision"],
437                numeric_precision_radix=df["numeric_precision_radix"],
438                numeric_scale=df["numeric_scale"],
439                procedure_language=df["procedure_language"],
440                procedure_definition=df["procedure_definition"],
441                comment=df["comment"],
442                created=self.db_timestamp_to_string(df["created"]),
443                last_altered=self.db_timestamp_to_string(df["last_altered"]),
444            )
445            procedure.save(session=session)
446            self.metadata[DbObjectType.PROCEDURE][procedure.procedure_id] = procedure
447
448        return True
449
450    def save_columns(self, session) -> bool:
451        self.metadata[DbObjectType.COLUMN] = {}
452        failed_rows = []
453        saved_rows = []
454        for _index, df in self.columns.iterrows():
455            column: Column = Column(
456                column_id=self.get_column_id(df["table_schema"], df["table_name"], df["column_name"]),
457                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
458                column_name=df["column_name"],
459                ordinal_position=df["ordinal_position"],
460                column_default=df["column_default"],
461                is_nullable=df["is_nullable"],
462                data_type=df["data_type"],
463                character_maximum_length=df["character_maximum_length"],
464                character_octet_length=df["character_octet_length"],
465                numeric_precision=df["numeric_precision"],
466                numeric_precision_radix=df["numeric_precision_radix"],
467                numeric_scale=df["numeric_scale"],
468                datetime_precision=df["datetime_precision"],
469                is_identity=df["is_identity"],
470                identity_generation=df["identity_generation"],
471                identity_start=df["identity_start"],
472                identity_increment=df["identity_increment"],
473                comment=df["comment"],
474            )
475
476            try:
477                column.save(session=session)
478                self.metadata[DbObjectType.COLUMN][column.column_id] = column
479                saved_rows.append(column.column_id)
480            except Exception:
481                import ipdb
482
483                ipdb.set_trace()
484                failed_rows.append(column.column_id)
485
486        if len(failed_rows) > 0:
487            for x in failed_rows:
488                log.error(f"failed to save column: {x}")
489
490        return True
491
492    def save_views(self, session) -> bool:
493        self.metadata[DbObjectType.VIEW] = {}
494        for _index, df in self.views.iterrows():
495            view = View(
496                schema_id=self.get_schema_id(df["schema_name"]),
497                view_id=self.get_view_id(df["schema_name"], df["name"]),
498                view_name=df["name"],
499                view_owner=df["owner"],
500                view_definition=df["text"],
501                is_secure=df["is_secure"],
502                is_materialized=df["is_materialized"],
503                change_tracking=df["change_tracking"],
504                created=self.db_timestamp_to_string(df["created_on"]),
505                owner_role_type=df["owner_role_type"],
506                comment=df["comment"],
507            )
508            view.save(session=session)
509            self.metadata[DbObjectType.VIEW][view.view_id] = view
510        return True
511
512    def save_schemas(self, schemas_df: pd.DataFrame, session):
513        self.metadata[DbObjectType.SCHEMA] = {}
514        for _index, schema in schemas_df.iterrows():
515            schema_object = Schema(
516                database_id=self.database_object.database_id,
517                schema_id=self.get_schema_id(schema["schema_name"]),
518                schema_name=schema["schema_name"],
519                schema_owner=schema["schema_owner"],
520                is_transient=schema["is_transient"],
521                comment=schema["comment"],
522                created=self.db_timestamp_to_string(schema["created"]),
523                last_altered=self.db_timestamp_to_string(schema["last_altered"]),
524                retention_time=schema["retention_time"],
525            )
526            schema_object.save(session=session)
527            self.schemas.append(schema_object)
528            self.metadata[DbObjectType.SCHEMA][schema_object.schema_id] = schema_object
class MetaData:
 26class MetaData:
 27    database_object: Database
 28    schemas: dict[str, Schema] = []
 29    schemas_to_include: list[str]
 30    tables: pd.DataFrame = None
 31    columns: pd.DataFrame = None
 32    views: pd.DataFrame = None
 33    streams: pd.DataFrame = None
 34    pipes: pd.DataFrame = None
 35    tasks: pd.DataFrame = None
 36    procedures: pd.DataFrame = None
 37    functions: pd.DataFrame = None
 38    table_constraints: pd.DataFrame = None
 39    referential_constraints: pd.DataFrame = None
 40    constraints: pd.DataFrame = None
 41    stages: pd.DataFrame = None
 42    column_constraints: pd.DataFrame = None
 43    metadata: dict[str, dict[str, CommonBase]] = {}
 44
 45    def __init__(self, database_object: Database, schemas_to_include: list[str], db_timestamp_to_string):
 46        self.database_object = database_object
 47        self.metadata[DbObjectType.DATABASE] = {database_object.database_id: database_object}
 48        log.info(f"Metadata schemas to include are {schemas_to_include}")
 49        self.schemas_to_include = schemas_to_include
 50        self.db_timestamp_to_string = db_timestamp_to_string
 51
 52    def save(self, session):
 53        log.info("Save tables")
 54        self.save_tables(session=session)
 55        log.info("Save columns")
 56        self.save_columns(session=session)
 57        log.info("Save constraints")
 58        self.save_constraints(session=session)
 59        log.info("Save column constraints")
 60        self.save_column_constraints(session=session)
 61        log.info("Save functions")
 62        self.save_functions(session=session)
 63        log.info("Save pipes")
 64        self.save_pipes(session=session)
 65        log.info("Save procedures")
 66        self.save_procedures(session=session)
 67        log.info("Save referential constraints")
 68        self.save_referential_constraints(session=session)
 69        log.info("Save stages")
 70        self.save_stages(session=session)
 71        log.info("Save streams")
 72        self.save_streams(session=session)
 73        log.info("Save table_constraints")
 74        self.save_table_constraints(session=session)
 75        log.info("Save tasks")
 76        self.save_tasks(session=session)
 77        log.info("Save views")
 78        self.save_views(session=session)
 79
 80    def database_id(self) -> str:
 81        return self.database_object.__get_id__()
 82
 83    def get_schema_id(self, schema_name: str) -> str:
 84        id = json.loads(self.database_id())
 85        id["schema_name"] = schema_name
 86        return json.dumps(id)
 87
 88    def get_table_id(self, schema_name: str, table_name: str) -> str:
 89        id = json.loads(self.get_schema_id(schema_name))
 90        id["table_name"] = table_name
 91        return json.dumps(id)
 92
 93    def get_constraint_id(self, schema_name: str, constraint_name: str) -> str:
 94        id = json.loads(self.get_schema_id(schema_name))
 95        id["constraint_name"] = constraint_name
 96        return json.dumps(id)
 97
 98    def get_table_constraint_id(self, table_id: str, constraint_name: str) -> str:
 99        id = json.loads(table_id)
100        id["constraint_name"] = constraint_name
101        return json.dumps(id)
102
103    def get_view_id(self, schema_name: str, view_name: str) -> str:
104        id = json.loads(self.get_schema_id(schema_name))
105        id["view_name"] = view_name
106        return json.dumps(id)
107
108    def get_task_id(self, schema_name: str, task_name: str) -> str:
109        id = json.loads(self.get_schema_id(schema_name))
110        id["task_name"] = task_name
111        return json.dumps(id)
112
113    def get_stream_id(self, schema_name: str, stream_name: str) -> str:
114        id = json.loads(self.get_schema_id(schema_name))
115        id["stream_name"] = stream_name
116        return json.dumps(id)
117
118    def get_pipe_id(self, schema_name: str, pipe_name: str) -> str:
119        id = json.loads(self.get_schema_id(schema_name))
120        id["pipe_name"] = pipe_name
121        return json.dumps(id)
122
123    def get_stage_id(self, schema_name: str, stage_name: str) -> str:
124        id = json.loads(self.get_schema_id(schema_name))
125        id["stage_name"] = stage_name
126        return json.dumps(id)
127
128    def get_column_id(self, schema_name: str, table_name: str, column_name: str) -> str:
129        id = json.loads(self.get_table_id(schema_name, table_name))
130        id["column_name"] = column_name
131        return json.dumps(id)
132
133    def get_procedure_id(self, schema_name: str, procedure_name: str, argument_signature: str) -> str:
134        id = json.loads(self.get_schema_id(schema_name))
135        id["procedure_name"] = procedure_name
136        id["argument_signature"] = argument_signature
137        return json.dumps(id)
138
139    def get_function_id(self, schema_name: str, function_name: str, argument_signature: str) -> str:
140        id = json.loads(self.get_schema_id(schema_name))
141        id["function_name"] = function_name
142        id["argument_signature"] = argument_signature
143        return json.dumps(id)
144
145    def save_tables(self, session) -> bool:
146        self.metadata[DbObjectType.TABLE] = {}
147        for _index, df in self.tables.iterrows():
148            table = Table(
149                schema_id=self.get_schema_id(df["table_schema"]),
150                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
151                table_name=df["table_name"],
152                table_owner=df["table_owner"],
153                table_type=df["table_type"],
154                is_transient=df["is_transient"],
155                clustering_key=df["clustering_key"],
156                row_count=df["row_count"],
157                comment=df["comment"],
158                bytes=df["bytes"],
159                retention_time=df["retention_time"],
160                created=self.db_timestamp_to_string(df["created"]),
161                last_altered=self.db_timestamp_to_string(df["last_altered"]),
162                last_ddl=self.db_timestamp_to_string(df["last_ddl"]),
163                last_ddl_by=df["last_ddl_by"],
164                auto_clustering_on=df["auto_clustering_on"],
165                change_tracking="True",
166                is_external="False",
167                enable_schema_evolution="False",
168                owner_role_type="DATABASE",
169                is_event="False",
170            )
171            table.save(session=session)
172            self.metadata[DbObjectType.TABLE][table.table_id] = table
173
174        return True
175
176    def save_column_constraints(self, session) -> bool:
177        self.metadata[DbObjectType.COLUMN_CONSTRAINT] = {}
178        for _index, df in self.column_constraints.iterrows():
179            column_constraint = ColumnConstraint(
180                pk_column_id=self.get_column_id(df["pk_schema_name"], df["pk_table_name"], df["pk_column_name"]),
181                pk_constraint_id=self.get_constraint_id(df["pk_schema_name"], df["pk_name"]),
182                fk_column_id=self.get_column_id(df["fk_schema_name"], df["fk_table_name"], df["fk_column_name"]),
183                fk_constraint_id=self.get_constraint_id(df["fk_schema_name"], df["fk_name"]),
184                pk_name=df["pk_name"],
185                fk_name=df["fk_name"],
186                key_sequence=df["key_sequence"],
187                comment=df["comment"],
188                created=self.db_timestamp_to_string(df["created_on"]),
189                deferrability=df["deferrability"],
190                rely=df["rely"],
191                update_rule=df["update_rule"],
192                delete_rule=df["delete_rule"],
193            )
194            column_constraint.column_constraint_id = column_constraint.__get_id__()
195
196            column_constraint.save(session=session)
197            self.metadata[DbObjectType.COLUMN_CONSTRAINT][column_constraint.column_constraint_id] = column_constraint
198
199        return True
200
201    def save_referential_constraints(self, session) -> bool:
202        self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT] = {}
203        for _index, df in self.referential_constraints.iterrows():
204            referential_constraint = ReferentialConstraint(
205                foreign_key_constraint_id=self.get_constraint_id(df["constraint_schema"], df["constraint_name"]),
206                unique_constraint_id=self.get_constraint_id(
207                    df["unique_constraint_schema"], df["unique_constraint_name"]
208                ),
209                fk_name=df["constraint_name"],
210                pk_name=df["unique_constraint_name"],
211                match_option=df["match_option"],
212                update_rule=df["update_rule"],
213                delete_rule=df["delete_rule"],
214                comment=df["comment"],
215                created=self.db_timestamp_to_string(df["created"]),
216                last_altered=self.db_timestamp_to_string(df["last_altered"]),
217            )
218            referential_constraint.referential_constraint_id = referential_constraint.__get_id__()
219            referential_constraint.save(session=session)
220            self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT][referential_constraint.referential_constraint_id] = (
221                referential_constraint
222            )
223
224        return True
225
226    def save_table_constraints(self, session) -> bool:
227        self.metadata[DbObjectType.TABLE_CONSTRAINT] = {}
228        for _index, df in self.table_constraints.iterrows():
229            table_constraint = TableConstraint(
230                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
231                table_constraint_name=df["constraint_name"],
232                constraint_type=df["constraint_type"],
233                is_deferrable=df["is_deferrable"],
234                initially_deferred=df["initially_deferred"],
235                enforced=df["enforced"],
236                comment=df["comment"],
237                created=self.db_timestamp_to_string(df["created"]),
238                last_altered=self.db_timestamp_to_string(df["last_altered"]),
239                rely=df["rely"],
240            )
241            table_constraint.table_constraint_id = table_constraint.__get_id__()
242            table_constraint.save(session=session)
243            self.metadata[DbObjectType.TABLE_CONSTRAINT][table_constraint.table_constraint_id] = table_constraint
244
245        return True
246
247    def save_constraints(self, session) -> bool:
248        self.metadata[DbObjectType.CONSTRAINT] = {}
249        for _index, df in self.constraints.iterrows():
250            constraint = Constraint(
251                table_id=self.get_table_id(df["schema_name"], df["table_name"]),
252                constraint_name=df["constraint_name"],
253                constraint_type=df["constraint_type"],
254                constraint_details=df["constraint_details"],
255                reference_key=df["reference_key"],
256                created=self.db_timestamp_to_string(df["created"]),
257                update_rule=df["update_rule"],
258                delete_rule=df["delete_rule"],
259            )
260            constraint.constraint_id = constraint.__get_id__()
261            constraint.save(session=session)
262            self.metadata[DbObjectType.CONSTRAINT][constraint.constraint_id] = constraint
263
264        return True
265
266    def save_tasks(self, session) -> bool:
267        self.metadata[DbObjectType.TASK] = {}
268        for _index, df in self.tasks.iterrows():
269            try:
270                task = Task(
271                    id=str(df["id"]),
272                    task_id=self.get_task_id(df["schema_name"], df["name"]),
273                    schema_id=self.get_schema_id(df["schema_name"]),
274                    task_name=df["name"],
275                    task_owner=df["owner"],
276                    warehouse=df["warehouse"],
277                    schedule=df["schedule"],
278                    predecessors=df["predecessors"],
279                    state=df["state"],
280                    definition=df["definition"],
281                    condition=df["condition"],
282                    allow_overlapping_execution=df["allow_overlapping_execution"],
283                    error_integration=df["error_integration"],
284                    comment=df["comment"],
285                    last_committed=self.db_timestamp_to_string(df["last_committed_on"]),
286                    last_suspended=self.db_timestamp_to_string(df["last_suspended_on"]),
287                    owner_role_type=df["owner_role_type"],
288                    config=df["config"],
289                    created=self.db_timestamp_to_string(df["created_on"]),
290                )
291
292                task.save(session=session)
293                self.metadata[DbObjectType.TASK][task.task_id] = task
294
295            except Exception as e:
296                log.error(f"Failed to save task: {task}, exception: {e}")
297                import ipdb
298
299                ipdb.set_trace()
300
301        return True
302
303    def save_streams(self, session) -> bool:
304        self.metadata[DbObjectType.STREAM] = {}
305        failed_rows = []
306        for _index, df in self.streams.iterrows():
307            try:
308                stream = Stream(
309                    schema_id=self.get_schema_id(df["schema_name"]),
310                    stream_id=self.get_stream_id(df["schema_name"], df["name"]),
311                    stream_name=df["name"],
312                    stream_owner=df["owner"],
313                    comment=df["comment"],
314                    table_name=df["table_name"],
315                    source_type=df["source_type"],
316                    base_tables=df["base_tables"],
317                    type=df["type"],
318                    stale=df["stale"],
319                    mode=df["mode"],
320                    stale_after=self.db_timestamp_to_string(df["stale_after"]),
321                    invalid_reason=df["invalid_reason"],
322                    owner_role_type=df["owner_role_type"],
323                    created=self.db_timestamp_to_string(df["created_on"]),
324                )
325
326                stream.save(session=session)
327                self.metadata[DbObjectType.STREAM][stream.stream_id] = stream
328
329            except Exception as e:
330                log.error(f"Exception saving column:{stream}: {e}")
331                failed_rows.append(stream.stream_id)
332                import ipdb
333
334                ipdb.set_trace()
335
336        return True
337
338    def save_stages(self, session) -> bool:
339        self.metadata[DbObjectType.STAGE] = {}
340        for _index, df in self.stages.iterrows():
341            stage = Stage(
342                schema_id=self.get_schema_id(df["schema_name"]),
343                stage_id=self.get_stage_id(df["schema_name"], df["name"]),
344                stage_name=df["name"],
345                stage_owner=df["owner"],
346                stage_url=df["url"],
347                stage_region=df["region"],
348                stage_type=df["type"],
349                comment=df["comment"],
350                created=self.db_timestamp_to_string(df["created_on"]),
351                has_credentials=df["has_credentials"],
352                has_encryption_key=df["has_encryption_key"],
353                cloud=df["cloud"],
354                notification_channel=df["notification_channel"],
355                storage_integration=df["storage_integration"],
356            )
357
358            stage.save(session=session)
359            self.metadata[DbObjectType.STAGE][stage.stage_id] = stage
360
361        return True
362
363    def save_pipes(self, session) -> bool:
364        self.metadata[DbObjectType.SCHEMA] = {}
365        for _index, df in self.pipes.iterrows():
366            pipe = Pipe(
367                schema_id=self.get_schema_id(df["pipe_schema"]),
368                pipe_id=self.get_pipe_id(df["pipe_schema"], df["pipe_name"]),
369                pipe_name=df["pipe_name"],
370                pipe_owner=df["pipe_owner"],
371                pipe_definition=df["definition"],
372                is_autoingest_enabled=df["is_autoingest_enabled"],
373                notification_channel_name=df["notification_channel_name"],
374                comment=df["comment"],
375                created=self.db_timestamp_to_string(df["created"]),
376                last_altered=self.db_timestamp_to_string(df["last_altered"]),
377                pattern=df["pattern"],
378            )
379
380            pipe.save(session=session)
381            self.metadata[DbObjectType.SCHEMA][pipe.pipe_id] = pipe
382
383        return True
384
385    def save_functions(self, session) -> bool:
386        self.metadata[DbObjectType.FUNCTION] = {}
387        for _index, df in self.functions.iterrows():
388            function = Function(
389                schema_id=self.get_schema_id(df["function_schema"]),
390                function_id=self.get_function_id(df["function_schema"], df["function_name"], df["argument_signature"]),
391                function_name=df["function_name"],
392                function_owner=df["function_owner"],
393                argument_signature=df["argument_signature"],
394                data_type=df["data_type"],
395                character_maximum_length=df["character_maximum_length"],
396                character_octet_length=df["character_octet_length"],
397                numeric_precision=df["numeric_precision"],
398                numeric_precision_radix=df["numeric_precision_radix"],
399                numeric_scale=df["numeric_scale"],
400                function_language=df["function_language"],
401                function_definition=df["function_definition"],
402                volatility=df["volatility"],
403                is_null_call=df["is_null_call"],
404                is_secure=df["is_secure"],
405                comment=df["comment"],
406                created=self.db_timestamp_to_string(df["created"]),
407                last_altered=self.db_timestamp_to_string(df["last_altered"]),
408                is_external=df["is_external"],
409                api_integration=df["api_integration"],
410                context_headers=df["context_headers"],
411                max_batch_rows=df["max_batch_rows"],
412                compression=df["compression"],
413                packages=df["packages"],
414                runtime_version=df["runtime_version"],
415                installed_packages=df["installed_packages"],
416                is_memoizable=df["is_memoizable"],
417            )
418            function.save(session=session)
419            self.metadata[DbObjectType.FUNCTION][function.function_id] = function
420
421        return True
422
423    def save_procedures(self, session) -> bool:
424        self.metadata[DbObjectType.PROCEDURE] = {}
425        for _index, df in self.procedures.iterrows():
426            procedure = Procedure(
427                procedure_id=self.get_procedure_id(
428                    df["procedure_schema"], df["procedure_name"], df["argument_signature"]
429                ),
430                schema_id=self.get_schema_id(df["procedure_schema"]),
431                procedure_name=df["procedure_name"],
432                procedure_owner=df["procedure_owner"],
433                argument_signature=df["argument_signature"],
434                data_type=df["data_type"],
435                character_maximum_length=df["character_maximum_length"],
436                character_octet_length=df["character_octet_length"],
437                numeric_precision=df["numeric_precision"],
438                numeric_precision_radix=df["numeric_precision_radix"],
439                numeric_scale=df["numeric_scale"],
440                procedure_language=df["procedure_language"],
441                procedure_definition=df["procedure_definition"],
442                comment=df["comment"],
443                created=self.db_timestamp_to_string(df["created"]),
444                last_altered=self.db_timestamp_to_string(df["last_altered"]),
445            )
446            procedure.save(session=session)
447            self.metadata[DbObjectType.PROCEDURE][procedure.procedure_id] = procedure
448
449        return True
450
451    def save_columns(self, session) -> bool:
452        self.metadata[DbObjectType.COLUMN] = {}
453        failed_rows = []
454        saved_rows = []
455        for _index, df in self.columns.iterrows():
456            column: Column = Column(
457                column_id=self.get_column_id(df["table_schema"], df["table_name"], df["column_name"]),
458                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
459                column_name=df["column_name"],
460                ordinal_position=df["ordinal_position"],
461                column_default=df["column_default"],
462                is_nullable=df["is_nullable"],
463                data_type=df["data_type"],
464                character_maximum_length=df["character_maximum_length"],
465                character_octet_length=df["character_octet_length"],
466                numeric_precision=df["numeric_precision"],
467                numeric_precision_radix=df["numeric_precision_radix"],
468                numeric_scale=df["numeric_scale"],
469                datetime_precision=df["datetime_precision"],
470                is_identity=df["is_identity"],
471                identity_generation=df["identity_generation"],
472                identity_start=df["identity_start"],
473                identity_increment=df["identity_increment"],
474                comment=df["comment"],
475            )
476
477            try:
478                column.save(session=session)
479                self.metadata[DbObjectType.COLUMN][column.column_id] = column
480                saved_rows.append(column.column_id)
481            except Exception:
482                import ipdb
483
484                ipdb.set_trace()
485                failed_rows.append(column.column_id)
486
487        if len(failed_rows) > 0:
488            for x in failed_rows:
489                log.error(f"failed to save column: {x}")
490
491        return True
492
493    def save_views(self, session) -> bool:
494        self.metadata[DbObjectType.VIEW] = {}
495        for _index, df in self.views.iterrows():
496            view = View(
497                schema_id=self.get_schema_id(df["schema_name"]),
498                view_id=self.get_view_id(df["schema_name"], df["name"]),
499                view_name=df["name"],
500                view_owner=df["owner"],
501                view_definition=df["text"],
502                is_secure=df["is_secure"],
503                is_materialized=df["is_materialized"],
504                change_tracking=df["change_tracking"],
505                created=self.db_timestamp_to_string(df["created_on"]),
506                owner_role_type=df["owner_role_type"],
507                comment=df["comment"],
508            )
509            view.save(session=session)
510            self.metadata[DbObjectType.VIEW][view.view_id] = view
511        return True
512
513    def save_schemas(self, schemas_df: pd.DataFrame, session):
514        self.metadata[DbObjectType.SCHEMA] = {}
515        for _index, schema in schemas_df.iterrows():
516            schema_object = Schema(
517                database_id=self.database_object.database_id,
518                schema_id=self.get_schema_id(schema["schema_name"]),
519                schema_name=schema["schema_name"],
520                schema_owner=schema["schema_owner"],
521                is_transient=schema["is_transient"],
522                comment=schema["comment"],
523                created=self.db_timestamp_to_string(schema["created"]),
524                last_altered=self.db_timestamp_to_string(schema["last_altered"]),
525                retention_time=schema["retention_time"],
526            )
527            schema_object.save(session=session)
528            self.schemas.append(schema_object)
529            self.metadata[DbObjectType.SCHEMA][schema_object.schema_id] = schema_object
MetaData( database_object: schema_sentinel.metadata_manager.model.database.Database, schemas_to_include: list[str], db_timestamp_to_string)
45    def __init__(self, database_object: Database, schemas_to_include: list[str], db_timestamp_to_string):
46        self.database_object = database_object
47        self.metadata[DbObjectType.DATABASE] = {database_object.database_id: database_object}
48        log.info(f"Metadata schemas to include are {schemas_to_include}")
49        self.schemas_to_include = schemas_to_include
50        self.db_timestamp_to_string = db_timestamp_to_string
schemas_to_include: list[str]
tables: pandas.core.frame.DataFrame = None
columns: pandas.core.frame.DataFrame = None
views: pandas.core.frame.DataFrame = None
streams: pandas.core.frame.DataFrame = None
pipes: pandas.core.frame.DataFrame = None
tasks: pandas.core.frame.DataFrame = None
procedures: pandas.core.frame.DataFrame = None
functions: pandas.core.frame.DataFrame = None
table_constraints: pandas.core.frame.DataFrame = None
referential_constraints: pandas.core.frame.DataFrame = None
constraints: pandas.core.frame.DataFrame = None
stages: pandas.core.frame.DataFrame = None
column_constraints: pandas.core.frame.DataFrame = None
metadata: dict[str, dict[str, schema_sentinel.metadata_manager.model.CommonBase]] = {}
db_timestamp_to_string
def save(self, session):
52    def save(self, session):
53        log.info("Save tables")
54        self.save_tables(session=session)
55        log.info("Save columns")
56        self.save_columns(session=session)
57        log.info("Save constraints")
58        self.save_constraints(session=session)
59        log.info("Save column constraints")
60        self.save_column_constraints(session=session)
61        log.info("Save functions")
62        self.save_functions(session=session)
63        log.info("Save pipes")
64        self.save_pipes(session=session)
65        log.info("Save procedures")
66        self.save_procedures(session=session)
67        log.info("Save referential constraints")
68        self.save_referential_constraints(session=session)
69        log.info("Save stages")
70        self.save_stages(session=session)
71        log.info("Save streams")
72        self.save_streams(session=session)
73        log.info("Save table_constraints")
74        self.save_table_constraints(session=session)
75        log.info("Save tasks")
76        self.save_tasks(session=session)
77        log.info("Save views")
78        self.save_views(session=session)
def database_id(self) -> str:
80    def database_id(self) -> str:
81        return self.database_object.__get_id__()
def get_schema_id(self, schema_name: str) -> str:
83    def get_schema_id(self, schema_name: str) -> str:
84        id = json.loads(self.database_id())
85        id["schema_name"] = schema_name
86        return json.dumps(id)
def get_table_id(self, schema_name: str, table_name: str) -> str:
88    def get_table_id(self, schema_name: str, table_name: str) -> str:
89        id = json.loads(self.get_schema_id(schema_name))
90        id["table_name"] = table_name
91        return json.dumps(id)
def get_constraint_id(self, schema_name: str, constraint_name: str) -> str:
93    def get_constraint_id(self, schema_name: str, constraint_name: str) -> str:
94        id = json.loads(self.get_schema_id(schema_name))
95        id["constraint_name"] = constraint_name
96        return json.dumps(id)
def get_table_constraint_id(self, table_id: str, constraint_name: str) -> str:
 98    def get_table_constraint_id(self, table_id: str, constraint_name: str) -> str:
 99        id = json.loads(table_id)
100        id["constraint_name"] = constraint_name
101        return json.dumps(id)
def get_view_id(self, schema_name: str, view_name: str) -> str:
103    def get_view_id(self, schema_name: str, view_name: str) -> str:
104        id = json.loads(self.get_schema_id(schema_name))
105        id["view_name"] = view_name
106        return json.dumps(id)
def get_task_id(self, schema_name: str, task_name: str) -> str:
108    def get_task_id(self, schema_name: str, task_name: str) -> str:
109        id = json.loads(self.get_schema_id(schema_name))
110        id["task_name"] = task_name
111        return json.dumps(id)
def get_stream_id(self, schema_name: str, stream_name: str) -> str:
113    def get_stream_id(self, schema_name: str, stream_name: str) -> str:
114        id = json.loads(self.get_schema_id(schema_name))
115        id["stream_name"] = stream_name
116        return json.dumps(id)
def get_pipe_id(self, schema_name: str, pipe_name: str) -> str:
118    def get_pipe_id(self, schema_name: str, pipe_name: str) -> str:
119        id = json.loads(self.get_schema_id(schema_name))
120        id["pipe_name"] = pipe_name
121        return json.dumps(id)
def get_stage_id(self, schema_name: str, stage_name: str) -> str:
123    def get_stage_id(self, schema_name: str, stage_name: str) -> str:
124        id = json.loads(self.get_schema_id(schema_name))
125        id["stage_name"] = stage_name
126        return json.dumps(id)
def get_column_id(self, schema_name: str, table_name: str, column_name: str) -> str:
128    def get_column_id(self, schema_name: str, table_name: str, column_name: str) -> str:
129        id = json.loads(self.get_table_id(schema_name, table_name))
130        id["column_name"] = column_name
131        return json.dumps(id)
def get_procedure_id( self, schema_name: str, procedure_name: str, argument_signature: str) -> str:
133    def get_procedure_id(self, schema_name: str, procedure_name: str, argument_signature: str) -> str:
134        id = json.loads(self.get_schema_id(schema_name))
135        id["procedure_name"] = procedure_name
136        id["argument_signature"] = argument_signature
137        return json.dumps(id)
def get_function_id( self, schema_name: str, function_name: str, argument_signature: str) -> str:
139    def get_function_id(self, schema_name: str, function_name: str, argument_signature: str) -> str:
140        id = json.loads(self.get_schema_id(schema_name))
141        id["function_name"] = function_name
142        id["argument_signature"] = argument_signature
143        return json.dumps(id)
def save_tables(self, session) -> bool:
145    def save_tables(self, session) -> bool:
146        self.metadata[DbObjectType.TABLE] = {}
147        for _index, df in self.tables.iterrows():
148            table = Table(
149                schema_id=self.get_schema_id(df["table_schema"]),
150                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
151                table_name=df["table_name"],
152                table_owner=df["table_owner"],
153                table_type=df["table_type"],
154                is_transient=df["is_transient"],
155                clustering_key=df["clustering_key"],
156                row_count=df["row_count"],
157                comment=df["comment"],
158                bytes=df["bytes"],
159                retention_time=df["retention_time"],
160                created=self.db_timestamp_to_string(df["created"]),
161                last_altered=self.db_timestamp_to_string(df["last_altered"]),
162                last_ddl=self.db_timestamp_to_string(df["last_ddl"]),
163                last_ddl_by=df["last_ddl_by"],
164                auto_clustering_on=df["auto_clustering_on"],
165                change_tracking="True",
166                is_external="False",
167                enable_schema_evolution="False",
168                owner_role_type="DATABASE",
169                is_event="False",
170            )
171            table.save(session=session)
172            self.metadata[DbObjectType.TABLE][table.table_id] = table
173
174        return True
def save_column_constraints(self, session) -> bool:
176    def save_column_constraints(self, session) -> bool:
177        self.metadata[DbObjectType.COLUMN_CONSTRAINT] = {}
178        for _index, df in self.column_constraints.iterrows():
179            column_constraint = ColumnConstraint(
180                pk_column_id=self.get_column_id(df["pk_schema_name"], df["pk_table_name"], df["pk_column_name"]),
181                pk_constraint_id=self.get_constraint_id(df["pk_schema_name"], df["pk_name"]),
182                fk_column_id=self.get_column_id(df["fk_schema_name"], df["fk_table_name"], df["fk_column_name"]),
183                fk_constraint_id=self.get_constraint_id(df["fk_schema_name"], df["fk_name"]),
184                pk_name=df["pk_name"],
185                fk_name=df["fk_name"],
186                key_sequence=df["key_sequence"],
187                comment=df["comment"],
188                created=self.db_timestamp_to_string(df["created_on"]),
189                deferrability=df["deferrability"],
190                rely=df["rely"],
191                update_rule=df["update_rule"],
192                delete_rule=df["delete_rule"],
193            )
194            column_constraint.column_constraint_id = column_constraint.__get_id__()
195
196            column_constraint.save(session=session)
197            self.metadata[DbObjectType.COLUMN_CONSTRAINT][column_constraint.column_constraint_id] = column_constraint
198
199        return True
def save_referential_constraints(self, session) -> bool:
201    def save_referential_constraints(self, session) -> bool:
202        self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT] = {}
203        for _index, df in self.referential_constraints.iterrows():
204            referential_constraint = ReferentialConstraint(
205                foreign_key_constraint_id=self.get_constraint_id(df["constraint_schema"], df["constraint_name"]),
206                unique_constraint_id=self.get_constraint_id(
207                    df["unique_constraint_schema"], df["unique_constraint_name"]
208                ),
209                fk_name=df["constraint_name"],
210                pk_name=df["unique_constraint_name"],
211                match_option=df["match_option"],
212                update_rule=df["update_rule"],
213                delete_rule=df["delete_rule"],
214                comment=df["comment"],
215                created=self.db_timestamp_to_string(df["created"]),
216                last_altered=self.db_timestamp_to_string(df["last_altered"]),
217            )
218            referential_constraint.referential_constraint_id = referential_constraint.__get_id__()
219            referential_constraint.save(session=session)
220            self.metadata[DbObjectType.REFERENTIAL_CONSTRAINT][referential_constraint.referential_constraint_id] = (
221                referential_constraint
222            )
223
224        return True
def save_table_constraints(self, session) -> bool:
226    def save_table_constraints(self, session) -> bool:
227        self.metadata[DbObjectType.TABLE_CONSTRAINT] = {}
228        for _index, df in self.table_constraints.iterrows():
229            table_constraint = TableConstraint(
230                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
231                table_constraint_name=df["constraint_name"],
232                constraint_type=df["constraint_type"],
233                is_deferrable=df["is_deferrable"],
234                initially_deferred=df["initially_deferred"],
235                enforced=df["enforced"],
236                comment=df["comment"],
237                created=self.db_timestamp_to_string(df["created"]),
238                last_altered=self.db_timestamp_to_string(df["last_altered"]),
239                rely=df["rely"],
240            )
241            table_constraint.table_constraint_id = table_constraint.__get_id__()
242            table_constraint.save(session=session)
243            self.metadata[DbObjectType.TABLE_CONSTRAINT][table_constraint.table_constraint_id] = table_constraint
244
245        return True
def save_constraints(self, session) -> bool:
247    def save_constraints(self, session) -> bool:
248        self.metadata[DbObjectType.CONSTRAINT] = {}
249        for _index, df in self.constraints.iterrows():
250            constraint = Constraint(
251                table_id=self.get_table_id(df["schema_name"], df["table_name"]),
252                constraint_name=df["constraint_name"],
253                constraint_type=df["constraint_type"],
254                constraint_details=df["constraint_details"],
255                reference_key=df["reference_key"],
256                created=self.db_timestamp_to_string(df["created"]),
257                update_rule=df["update_rule"],
258                delete_rule=df["delete_rule"],
259            )
260            constraint.constraint_id = constraint.__get_id__()
261            constraint.save(session=session)
262            self.metadata[DbObjectType.CONSTRAINT][constraint.constraint_id] = constraint
263
264        return True
def save_tasks(self, session) -> bool:
266    def save_tasks(self, session) -> bool:
267        self.metadata[DbObjectType.TASK] = {}
268        for _index, df in self.tasks.iterrows():
269            try:
270                task = Task(
271                    id=str(df["id"]),
272                    task_id=self.get_task_id(df["schema_name"], df["name"]),
273                    schema_id=self.get_schema_id(df["schema_name"]),
274                    task_name=df["name"],
275                    task_owner=df["owner"],
276                    warehouse=df["warehouse"],
277                    schedule=df["schedule"],
278                    predecessors=df["predecessors"],
279                    state=df["state"],
280                    definition=df["definition"],
281                    condition=df["condition"],
282                    allow_overlapping_execution=df["allow_overlapping_execution"],
283                    error_integration=df["error_integration"],
284                    comment=df["comment"],
285                    last_committed=self.db_timestamp_to_string(df["last_committed_on"]),
286                    last_suspended=self.db_timestamp_to_string(df["last_suspended_on"]),
287                    owner_role_type=df["owner_role_type"],
288                    config=df["config"],
289                    created=self.db_timestamp_to_string(df["created_on"]),
290                )
291
292                task.save(session=session)
293                self.metadata[DbObjectType.TASK][task.task_id] = task
294
295            except Exception as e:
296                log.error(f"Failed to save task: {task}, exception: {e}")
297                import ipdb
298
299                ipdb.set_trace()
300
301        return True
def save_streams(self, session) -> bool:
303    def save_streams(self, session) -> bool:
304        self.metadata[DbObjectType.STREAM] = {}
305        failed_rows = []
306        for _index, df in self.streams.iterrows():
307            try:
308                stream = Stream(
309                    schema_id=self.get_schema_id(df["schema_name"]),
310                    stream_id=self.get_stream_id(df["schema_name"], df["name"]),
311                    stream_name=df["name"],
312                    stream_owner=df["owner"],
313                    comment=df["comment"],
314                    table_name=df["table_name"],
315                    source_type=df["source_type"],
316                    base_tables=df["base_tables"],
317                    type=df["type"],
318                    stale=df["stale"],
319                    mode=df["mode"],
320                    stale_after=self.db_timestamp_to_string(df["stale_after"]),
321                    invalid_reason=df["invalid_reason"],
322                    owner_role_type=df["owner_role_type"],
323                    created=self.db_timestamp_to_string(df["created_on"]),
324                )
325
326                stream.save(session=session)
327                self.metadata[DbObjectType.STREAM][stream.stream_id] = stream
328
329            except Exception as e:
330                log.error(f"Exception saving column:{stream}: {e}")
331                failed_rows.append(stream.stream_id)
332                import ipdb
333
334                ipdb.set_trace()
335
336        return True
def save_stages(self, session) -> bool:
338    def save_stages(self, session) -> bool:
339        self.metadata[DbObjectType.STAGE] = {}
340        for _index, df in self.stages.iterrows():
341            stage = Stage(
342                schema_id=self.get_schema_id(df["schema_name"]),
343                stage_id=self.get_stage_id(df["schema_name"], df["name"]),
344                stage_name=df["name"],
345                stage_owner=df["owner"],
346                stage_url=df["url"],
347                stage_region=df["region"],
348                stage_type=df["type"],
349                comment=df["comment"],
350                created=self.db_timestamp_to_string(df["created_on"]),
351                has_credentials=df["has_credentials"],
352                has_encryption_key=df["has_encryption_key"],
353                cloud=df["cloud"],
354                notification_channel=df["notification_channel"],
355                storage_integration=df["storage_integration"],
356            )
357
358            stage.save(session=session)
359            self.metadata[DbObjectType.STAGE][stage.stage_id] = stage
360
361        return True
def save_pipes(self, session) -> bool:
363    def save_pipes(self, session) -> bool:
364        self.metadata[DbObjectType.SCHEMA] = {}
365        for _index, df in self.pipes.iterrows():
366            pipe = Pipe(
367                schema_id=self.get_schema_id(df["pipe_schema"]),
368                pipe_id=self.get_pipe_id(df["pipe_schema"], df["pipe_name"]),
369                pipe_name=df["pipe_name"],
370                pipe_owner=df["pipe_owner"],
371                pipe_definition=df["definition"],
372                is_autoingest_enabled=df["is_autoingest_enabled"],
373                notification_channel_name=df["notification_channel_name"],
374                comment=df["comment"],
375                created=self.db_timestamp_to_string(df["created"]),
376                last_altered=self.db_timestamp_to_string(df["last_altered"]),
377                pattern=df["pattern"],
378            )
379
380            pipe.save(session=session)
381            self.metadata[DbObjectType.SCHEMA][pipe.pipe_id] = pipe
382
383        return True
def save_functions(self, session) -> bool:
385    def save_functions(self, session) -> bool:
386        self.metadata[DbObjectType.FUNCTION] = {}
387        for _index, df in self.functions.iterrows():
388            function = Function(
389                schema_id=self.get_schema_id(df["function_schema"]),
390                function_id=self.get_function_id(df["function_schema"], df["function_name"], df["argument_signature"]),
391                function_name=df["function_name"],
392                function_owner=df["function_owner"],
393                argument_signature=df["argument_signature"],
394                data_type=df["data_type"],
395                character_maximum_length=df["character_maximum_length"],
396                character_octet_length=df["character_octet_length"],
397                numeric_precision=df["numeric_precision"],
398                numeric_precision_radix=df["numeric_precision_radix"],
399                numeric_scale=df["numeric_scale"],
400                function_language=df["function_language"],
401                function_definition=df["function_definition"],
402                volatility=df["volatility"],
403                is_null_call=df["is_null_call"],
404                is_secure=df["is_secure"],
405                comment=df["comment"],
406                created=self.db_timestamp_to_string(df["created"]),
407                last_altered=self.db_timestamp_to_string(df["last_altered"]),
408                is_external=df["is_external"],
409                api_integration=df["api_integration"],
410                context_headers=df["context_headers"],
411                max_batch_rows=df["max_batch_rows"],
412                compression=df["compression"],
413                packages=df["packages"],
414                runtime_version=df["runtime_version"],
415                installed_packages=df["installed_packages"],
416                is_memoizable=df["is_memoizable"],
417            )
418            function.save(session=session)
419            self.metadata[DbObjectType.FUNCTION][function.function_id] = function
420
421        return True
def save_procedures(self, session) -> bool:
423    def save_procedures(self, session) -> bool:
424        self.metadata[DbObjectType.PROCEDURE] = {}
425        for _index, df in self.procedures.iterrows():
426            procedure = Procedure(
427                procedure_id=self.get_procedure_id(
428                    df["procedure_schema"], df["procedure_name"], df["argument_signature"]
429                ),
430                schema_id=self.get_schema_id(df["procedure_schema"]),
431                procedure_name=df["procedure_name"],
432                procedure_owner=df["procedure_owner"],
433                argument_signature=df["argument_signature"],
434                data_type=df["data_type"],
435                character_maximum_length=df["character_maximum_length"],
436                character_octet_length=df["character_octet_length"],
437                numeric_precision=df["numeric_precision"],
438                numeric_precision_radix=df["numeric_precision_radix"],
439                numeric_scale=df["numeric_scale"],
440                procedure_language=df["procedure_language"],
441                procedure_definition=df["procedure_definition"],
442                comment=df["comment"],
443                created=self.db_timestamp_to_string(df["created"]),
444                last_altered=self.db_timestamp_to_string(df["last_altered"]),
445            )
446            procedure.save(session=session)
447            self.metadata[DbObjectType.PROCEDURE][procedure.procedure_id] = procedure
448
449        return True
def save_columns(self, session) -> bool:
451    def save_columns(self, session) -> bool:
452        self.metadata[DbObjectType.COLUMN] = {}
453        failed_rows = []
454        saved_rows = []
455        for _index, df in self.columns.iterrows():
456            column: Column = Column(
457                column_id=self.get_column_id(df["table_schema"], df["table_name"], df["column_name"]),
458                table_id=self.get_table_id(df["table_schema"], df["table_name"]),
459                column_name=df["column_name"],
460                ordinal_position=df["ordinal_position"],
461                column_default=df["column_default"],
462                is_nullable=df["is_nullable"],
463                data_type=df["data_type"],
464                character_maximum_length=df["character_maximum_length"],
465                character_octet_length=df["character_octet_length"],
466                numeric_precision=df["numeric_precision"],
467                numeric_precision_radix=df["numeric_precision_radix"],
468                numeric_scale=df["numeric_scale"],
469                datetime_precision=df["datetime_precision"],
470                is_identity=df["is_identity"],
471                identity_generation=df["identity_generation"],
472                identity_start=df["identity_start"],
473                identity_increment=df["identity_increment"],
474                comment=df["comment"],
475            )
476
477            try:
478                column.save(session=session)
479                self.metadata[DbObjectType.COLUMN][column.column_id] = column
480                saved_rows.append(column.column_id)
481            except Exception:
482                import ipdb
483
484                ipdb.set_trace()
485                failed_rows.append(column.column_id)
486
487        if len(failed_rows) > 0:
488            for x in failed_rows:
489                log.error(f"failed to save column: {x}")
490
491        return True
def save_views(self, session) -> bool:
493    def save_views(self, session) -> bool:
494        self.metadata[DbObjectType.VIEW] = {}
495        for _index, df in self.views.iterrows():
496            view = View(
497                schema_id=self.get_schema_id(df["schema_name"]),
498                view_id=self.get_view_id(df["schema_name"], df["name"]),
499                view_name=df["name"],
500                view_owner=df["owner"],
501                view_definition=df["text"],
502                is_secure=df["is_secure"],
503                is_materialized=df["is_materialized"],
504                change_tracking=df["change_tracking"],
505                created=self.db_timestamp_to_string(df["created_on"]),
506                owner_role_type=df["owner_role_type"],
507                comment=df["comment"],
508            )
509            view.save(session=session)
510            self.metadata[DbObjectType.VIEW][view.view_id] = view
511        return True
def save_schemas(self, schemas_df: pandas.core.frame.DataFrame, session):
513    def save_schemas(self, schemas_df: pd.DataFrame, session):
514        self.metadata[DbObjectType.SCHEMA] = {}
515        for _index, schema in schemas_df.iterrows():
516            schema_object = Schema(
517                database_id=self.database_object.database_id,
518                schema_id=self.get_schema_id(schema["schema_name"]),
519                schema_name=schema["schema_name"],
520                schema_owner=schema["schema_owner"],
521                is_transient=schema["is_transient"],
522                comment=schema["comment"],
523                created=self.db_timestamp_to_string(schema["created"]),
524                last_altered=self.db_timestamp_to_string(schema["last_altered"]),
525                retention_time=schema["retention_time"],
526            )
527            schema_object.save(session=session)
528            self.schemas.append(schema_object)
529            self.metadata[DbObjectType.SCHEMA][schema_object.schema_id] = schema_object