schema_sentinel.metadata_manager.metadata

  1import json
  2import logging as log
  3import os
  4
  5import pandas as pd
  6from sqlalchemy import and_
  7from sqlalchemy.orm import sessionmaker
  8
  9from schema_sentinel.metadata_manager.model import compare_obj
 10from schema_sentinel.metadata_manager.model.column import Column
 11from schema_sentinel.metadata_manager.model.column_constraint import ColumnConstraint
 12from schema_sentinel.metadata_manager.model.constraint import Constraint
 13from schema_sentinel.metadata_manager.model.database import Database
 14from schema_sentinel.metadata_manager.model.function import Function
 15from schema_sentinel.metadata_manager.model.procedure import Procedure
 16from schema_sentinel.metadata_manager.model.schema import Schema
 17from schema_sentinel.metadata_manager.model.stream import Stream
 18from schema_sentinel.metadata_manager.model.table import Table
 19from schema_sentinel.metadata_manager.model.table_constraint import TableConstraint
 20from schema_sentinel.metadata_manager.model.task import Task
 21from schema_sentinel.metadata_manager.model.view import View
 22
 23from .engine import SfAlchemyEngine, SqLiteAqlAlchemyEngine
 24from .enums import DbObjectType
 25from .model.metadata_container import MetaData
 26from .model.metadata_utils import (
 27    get_columns,
 28    get_constraints,
 29    get_database,
 30    get_functions,
 31    get_imported_keys,
 32    get_pipes,
 33    get_procedures,
 34    get_referential_constraints,
 35    get_schemas,
 36    get_stages,
 37    get_streams,
 38    get_table_constraints,
 39    get_tables,
 40    get_tasks,
 41    get_views,
 42)
 43
 44
 45def get_default_schemas() -> tuple:
 46    """
 47    Get schemas from environment variable or return empty tuple to include all schemas.
 48    Set SNOWFLAKE_SCHEMAS environment variable as comma-separated list.
 49    Example: SNOWFLAKE_SCHEMAS="PUBLIC,INFORMATION_SCHEMA,ANALYTICS"
 50    """
 51    schemas_env = os.getenv("SNOWFLAKE_SCHEMAS", "")
 52    if schemas_env:
 53        return tuple(s.strip() for s in schemas_env.split(",") if s.strip())
 54    return ()  # Empty tuple means all schemas
 55
 56
 57def init_comparison() -> {}:
 58    comparison = {}
 59    for value in DbObjectType.list():
 60        comparison[value] = {}
 61    return comparison
 62
 63
 64def get_db_objects(
 65    database_name: str,
 66    engine: SfAlchemyEngine,
 67    meta_engine: SqLiteAqlAlchemyEngine,
 68    schemas: tuple = None,
 69    version: str = "0.1.0",
 70    environment: str = "dev",
 71) -> MetaData:
 72    if schemas is None:
 73        schemas = get_default_schemas()
 74    session = sessionmaker(bind=meta_engine)()
 75    database_df = get_database(database_name=database_name, engine=engine)
 76
 77    db = Database(
 78        version=version,
 79        environment=environment,
 80        database_name=database_name,
 81        database_owner=database_df.database_owner,
 82        is_transient=database_df.is_transient,
 83        comment=database_df.comment,
 84        created=db_timestamp_to_string(database_df.created),
 85        last_altered=db_timestamp_to_string(database_df.last_altered),
 86        retention_time=database_df.retention_time,
 87    )
 88    # TODO: add version increment
 89    db.database_id = db.__get_id__()
 90    db.save(session=session)
 91    log.info(f"Schemas to include are {schemas}")
 92    db_objects: MetaData = MetaData(
 93        database_object=db, schemas_to_include=schemas, db_timestamp_to_string=db_timestamp_to_string
 94    )
 95
 96    db_objects.save_schemas(schemas_df=get_schemas(database=db, engine=engine), session=session)
 97
 98    db_objects.constraints = get_constraints(database_name=database_name, engine=engine)
 99    db_objects.tables = get_tables(database_name=database_name, engine=engine)
100    db_objects.columns = get_columns(database_name=database_name, engine=engine)
101    db_objects.views = get_views(database_name=database_name, engine=engine)
102    db_objects.procedures = get_procedures(database_name=database_name, engine=engine)
103    db_objects.functions = get_functions(database_name=database_name, engine=engine)
104    db_objects.streams = get_streams(database_name=database_name, engine=engine)
105    db_objects.tasks = get_tasks(database_name=database_name, engine=engine)
106    db_objects.pipes = get_pipes(database_name=database_name, engine=engine)
107    db_objects.table_constraints = get_table_constraints(database_name=database_name, engine=engine)
108    db_objects.referential_constraints = get_referential_constraints(database_name=database_name, engine=engine)
109
110    db_objects.column_constraints = get_imported_keys(database_name=database_name, engine=engine)
111    db_objects.stages = get_stages(database_name=database_name, engine=engine)
112
113    db_objects.save(session=session)
114    return db_objects
115
116
117def db_timestamp_to_string(db_timestamp) -> str:
118    if not db_timestamp:
119        return ""
120    return str(db_timestamp)
121
122
123def save_metadata(
124    engine: SfAlchemyEngine,
125    environment: str,
126    version: str,
127    meta_engine: SqLiteAqlAlchemyEngine,
128    database_name: str,
129    schemas: tuple = None,
130) -> MetaData:
131    if schemas is None:
132        schemas = get_default_schemas()
133    metadata = get_db_objects(
134        database_name=database_name,
135        engine=engine,
136        meta_engine=meta_engine,
137        schemas=schemas,
138        version=version,
139        environment=environment,
140    )
141    return metadata
142
143
144def compare_db_objects(
145    source_db: MetaData,
146    source_env: str,
147    target_db: MetaData,
148    target_env: str,
149    engine: SqLiteAqlAlchemyEngine,
150    schemas: list = None,
151):
152    if schemas is None:
153        schemas = get_default_schemas()
154    Session = sessionmaker(bind=engine)
155    session = Session()
156    database: Database = source_db.database_object
157    database.database_id = f"{database.database_name} ({source_env} vs {target_env} comparison)"
158    database.environment = f"{source_env}->{target_env}"
159    database.version = "comparison"
160    database.save(session)
161
162    diffs: MetaData = MetaData(
163        database_object=database, schemas_to_include=schemas, db_timetasmp_to_string=db_timestamp_to_string
164    )
165
166    diffs.tables = diff_df(source_db.tables, target_db.tables)
167    diffs.columns = diff_df(source_db.columns, target_db.columns)
168    diffs.views = diff_df(source_db.views, target_db.views)
169    diffs.streams = diff_df(source_db.streams, target_db.streams)
170    # diffs.pipes = diff_df(source_db.pipes, target_db.pipes)
171    diffs.tasks = diff_df(source_db.tasks, target_db.tasks)
172    diffs.procedures = diff_df(source_db.procedures, target_db.procedures)
173    diffs.functions = diff_df(source_db.functions, target_db.functions)
174    diffs.table_constraints = diff_df(source_db.table_constraints, target_db.table_constraints)
175    diffs.referential_constraints = diff_df(source_db.referential_constraints, target_db.referential_constraints)
176    diffs.stages = diff_df(source_db.stages, target_db.stages)
177    diffs.column_constraints = diff_df(source_db.column_constraints, target_db.column_constraints)
178    diffs.constraints = diff_df(source_db.constraints, target_db.constraints)
179
180    diffs.save(session=session)
181    log.info(
182        f"Database {source_db.database_object.database_name} comparison report [{source_env}] to [{target_env}]:\n {diffs}"
183    )
184    return diffs
185
186
187def diff_df(df1, df2, how="left"):
188    """
189    Find Difference of rows for given two dataframes
190    this function is not symmetric, means
191          diff(x, y) != diff(y, x)
192    however
193          diff(x, y, how='left') == diff(y, x, how='right')
194
195    Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800
196    """
197    if (df1.columns != df2.columns).any():
198        raise ValueError("Two dataframe columns must match")
199
200    if df1.equals(df2):
201        names = list(df1.columns)
202        return pd.DataFrame(columns=names)
203    elif how == "right":
204        return pd.concat([df2, df1, df1]).drop_duplicates(keep=False)
205    elif how == "left":
206        return pd.concat([df1, df2, df2]).drop_duplicates(keep=False)
207    else:
208        raise ValueError('how parameter supports only "left" or "right keywords"')
209
210
211def compare(src_database: Database, trg_database: Database, session):
212    comparison: {} = init_comparison()
213    db_key = f"{src_database.database_name}"
214    if src_database.environment != trg_database.environment and src_database.version == trg_database.version:
215        db_key = f"{db_key}:{src_database.environment}->{trg_database.environment}"
216
217    if src_database.version != trg_database.version and src_database.environment == trg_database.environment:
218        db_key = f"{db_key}:{src_database.version}->{trg_database.version}"
219
220    comparison[DbObjectType.DATABASE.value][db_key] = compare_obj(src_database, trg_database)
221
222    src_schemas = session.query(Schema).filter(Schema.database_id == src_database.database_id).all()
223    for src_schema in src_schemas:
224        schema_key = f"{src_schema.schema_name} [{db_key}]"
225        trg_schema = (
226            session.query(Schema)
227            .filter(and_(Schema.database_id == trg_database.database_id, Schema.schema_name == src_schema.schema_name))
228            .first()
229        )
230        if trg_schema:
231            comparison[DbObjectType.SCHEMA.value][schema_key] = compare_obj(src_schema, trg_schema)
232        else:
233            comparison[DbObjectType.SCHEMA.value][schema_key] = {"left": src_schema.__class__.__name__, "right": None}
234            continue
235
236        src_tables = session.query(Table).filter(Table.schema_id == src_schema.schema_id).all()
237        for src_table in src_tables:
238            table_key = f"{src_schema.schema_name}.{src_table.table_name} [{db_key}]"
239            trg_table = (
240                session.query(Table)
241                .filter(and_(Table.schema_id == trg_schema.schema_id, Table.table_name == src_table.table_name))
242                .first()
243            )
244            if trg_table:
245                comp = compare_obj(src_table, trg_table)
246                if comp:
247                    comparison[DbObjectType.TABLE.value][table_key] = comp
248            else:
249                comparison[DbObjectType.TABLE.value][table_key] = {"left": src_table.__class__.__name__, "right": None}
250                continue
251
252            src_columns = session.query(Column).filter(Column.table_id == src_table.table_id).all()
253            for src_column in src_columns:
254                column_key = f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name} [{db_key}]"
255                trg_column = (
256                    session.query(Column)
257                    .filter(and_(Column.table_id == trg_table.table_id, Column.column_name == src_column.column_name))
258                    .first()
259                )
260                if trg_column:
261                    comparison[DbObjectType.COLUMN.value][column_key] = compare_obj(src_column, trg_column)
262                else:
263                    comparison[DbObjectType.COLUMN.value][column_key] = {
264                        "left": src_column.__class__.__name__,
265                        "right": None,
266                    }
267                    continue
268
269                src_column_constraints = (
270                    session.query(ColumnConstraint).filter(ColumnConstraint.pk_column_id == src_column.column_id).all()
271                )
272                for src_column_constraint in src_column_constraints:
273                    id = json.loads(src_column_constraint.column_constraint_id)
274                    id["pk_column_id"]["version"] = trg_database.version
275                    id["fk_column_id"]["version"] = trg_database.version
276                    id["pk_column_id"]["environment"] = trg_database.environment
277                    id["fk_column_id"]["environment"] = trg_database.environment
278                    trg_column_constraint_id = json.dumps(id)
279
280                    constraint_name = src_column_constraint.fk_name
281                    column_constraint_key = (
282                        f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name}"
283                        f".{constraint_name} [{db_key}]"
284                    )
285                    trg_column_constraint = (
286                        session.query(ColumnConstraint)
287                        .filter(ColumnConstraint.column_constraint_id == trg_column_constraint_id)
288                        .first()
289                    )
290                    if trg_column_constraint:
291                        comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = compare_obj(
292                            src_column_constraint, trg_column_constraint
293                        )
294                    else:
295                        comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = {
296                            "left": src_column_constraint.__class__.__name__,
297                            "right": None,
298                        }
299                        continue
300
301            src_table_constraints = (
302                session.query(TableConstraint).filter(TableConstraint.table_id == src_table.table_id).all()
303            )
304            for src_table_constraint in src_table_constraints:
305                table_constraint_key = (
306                    f"{src_table_constraint.table_constraint_name}.{src_schema.schema_name}"
307                    f".{src_table.table_name} [{db_key}]"
308                )
309                trg_table_constraint = (
310                    session.query(TableConstraint)
311                    .filter(
312                        and_(
313                            TableConstraint.table_id == trg_table.table_id,
314                            TableConstraint.table_constraint_name == src_table_constraint.table_constraint_name,
315                        )
316                    )
317                    .first()
318                )
319                if trg_table_constraint:
320                    comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = compare_obj(
321                        src_table_constraint, trg_table_constraint
322                    )
323                else:
324                    comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = {
325                        "left": src_table_constraint.__class__.__name__,
326                        "right": None,
327                    }
328                    continue
329
330            src_constraints = session.query(Constraint).filter(Constraint.table_id == src_table.table_id).all()
331            for src_constraint in src_constraints:
332                constraint_key = (
333                    f"{src_schema.schema_name}.{src_table.table_name}.{src_constraint.constraint_name} [{db_key}]"
334                )
335                trg_constraint = (
336                    session.query(Constraint)
337                    .filter(
338                        and_(
339                            Constraint.table_id
340                            == (
341                                trg_table.table_id.replace(
342                                    f'"version": {trg_database.version}', f'"version": {src_database.version}'
343                                )
344                            ).replace(
345                                f'"environment": {trg_database.environment}',
346                                f'"environment": {src_database.environment}',
347                            ),
348                            Constraint.constraint_name == src_constraint.constraint_name,
349                        )
350                    )
351                    .first()
352                )
353                if trg_constraint:
354                    comparison[DbObjectType.CONSTRAINT.value][constraint_key] = compare_obj(
355                        src_constraint, trg_constraint
356                    )
357                else:
358                    comparison[DbObjectType.CONSTRAINT.value][constraint_key] = {
359                        "left": src_constraint.__class__.__name__,
360                        "right": None,
361                    }
362                    continue
363
364        src_views = session.query(View).filter(View.schema_id == src_schema.schema_id).all()
365        for src_view in src_views:
366            view_key = f"{src_schema.schema_name}.{src_view.view_name} [{db_key}]"
367            trg_view = (
368                session.query(View)
369                .filter(and_(View.schema_id == trg_schema.schema_id, View.view_name == src_view.view_name))
370                .first()
371            )
372            if trg_view:
373                comparison[DbObjectType.VIEW.value][view_key] = compare_obj(src_view, trg_view)
374            else:
375                comparison[DbObjectType.VIEW.value][view_key] = {"left": src_view.__class__.__name__, "right": None}
376                continue
377
378        src_functions = session.query(Function).filter(Function.schema_id == src_schema.schema_id).all()
379        for src_function in src_functions:
380            function_key = f"{src_schema.schema_name}.{src_function.function_name} [{db_key}]"
381            trg_function = (
382                session.query(Function)
383                .filter(
384                    and_(
385                        Function.schema_id == trg_schema.schema_id,
386                        Function.function_name == src_function.function_name,
387                        Function.argument_signature == src_function.argument_signature,
388                    )
389                )
390                .first()
391            )
392            if trg_function:
393                comparison[DbObjectType.FUNCTION.value][function_key] = compare_obj(src_function, trg_function)
394            else:
395                comparison[DbObjectType.FUNCTION.value][function_key] = {
396                    "left": src_function.__class__.__name__,
397                    "right": None,
398                }
399                continue
400
401        src_procedures = session.query(Procedure).filter(Procedure.schema_id == src_schema.schema_id).all()
402        for src_procedure in src_procedures:
403            procedure_key = f"{src_schema.schema_name}.{src_procedure.procedure_name} [{db_key}]"
404            trg_procedure = (
405                session.query(Procedure)
406                .filter(
407                    and_(
408                        Procedure.schema_id == trg_schema.schema_id,
409                        Procedure.procedure_name == src_procedure.procedure_name,
410                        Procedure.argument_signature == src_procedure.argument_signature,
411                    )
412                )
413                .first()
414            )
415            if trg_procedure:
416                comparison[DbObjectType.PROCEDURE.value][procedure_key] = compare_obj(src_procedure, trg_procedure)
417            else:
418                comparison[DbObjectType.PROCEDURE.value][procedure_key] = {
419                    "left": src_procedure.__class__.__name__,
420                    "right": None,
421                }
422                continue
423
424        # src_stages = session.query(Stage).filter(Stage.schema_id == src_schema.schema_id).all()
425        # for src_stage in src_stages:
426        #     stage_key = f"{src_schema.schema_name}.{src_stage.stage_name} [{db_key}]"
427        #     trg_stage = session.query(Stage).filter(and_(
428        #         Stage.schema_id == trg_schema.schema_id,
429        #         Stage.stage_name == src_stage.stage_name
430        #     )).first()
431        #     if trg_stage:
432        #         comparison[DbObjectType.STAGE.value][stage_key] = compare_obj(src_stage, trg_stage)
433        #     else:
434        #         comparison[DbObjectType.STAGE.value][stage_key] = {"left": src_stage.__class__.__name__,
435        #                                                            "right": None}
436        #         continue
437
438        src_streams = session.query(Stream).filter(Stream.schema_id == src_schema.schema_id).all()
439        for src_stream in src_streams:
440            stream_key = f"{schema_key}.{src_stream.stream_name}"
441            trg_stream = (
442                session.query(Stream)
443                .filter(and_(Stream.schema_id == trg_schema.schema_id, Stream.stream_name == src_stream.stream_name))
444                .first()
445            )
446            if trg_stream:
447                comparison[DbObjectType.STREAM.value][stream_key] = compare_obj(src_stream, trg_stream)
448            else:
449                comparison[DbObjectType.STREAM.value][stream_key] = {
450                    "left": src_stream.__class__.__name__,
451                    "right": None,
452                }
453                continue
454
455        # src_pipes = session.query(Pipe).filter(Pipe.schema_id == src_schema.schema_id).all()
456        # for src_pipe in src_pipes:
457        #     pipe_key = f"{src_schema.schema_name}.{src_pipe.pipe_name} [{db_key}]"
458        #     trg_pipe = session.query(Pipe).filter(and_(
459        #         Pipe.schema_id == trg_schema.schema_id,
460        #         Pipe.pipe_name == src_pipe.pipe_name
461        #     )).first()
462        #     if trg_pipe:
463        #         comparison[DbObjectType.PIPE.value][pipe_key] = compare_obj(src_pipe, trg_pipe)
464        #     else:
465        #         comparison[DbObjectType.PIPE.value][pipe_key] = {"left": src_pipe.__class__.__name__, "right": None}
466        #         continue
467
468        src_tasks = session.query(Task).filter(Task.schema_id == src_schema.schema_id).all()
469        for src_task in src_tasks:
470            task_key = f"{src_schema.schema_name}.{src_task.task_name} [{db_key}]"
471            trg_task = (
472                session.query(Task)
473                .filter(and_(Task.schema_id == trg_schema.schema_id, Task.task_name == src_task.task_name))
474                .first()
475            )
476            if trg_task:
477                comparison[DbObjectType.TASK.value][task_key] = compare_obj(src_task, trg_task)
478            else:
479                comparison[DbObjectType.TASK.value][task_key] = {"left": src_task.__class__.__name__, "right": None}
480                continue
481
482    return comparison
def get_default_schemas() -> tuple:
46def get_default_schemas() -> tuple:
47    """
48    Get schemas from environment variable or return empty tuple to include all schemas.
49    Set SNOWFLAKE_SCHEMAS environment variable as comma-separated list.
50    Example: SNOWFLAKE_SCHEMAS="PUBLIC,INFORMATION_SCHEMA,ANALYTICS"
51    """
52    schemas_env = os.getenv("SNOWFLAKE_SCHEMAS", "")
53    if schemas_env:
54        return tuple(s.strip() for s in schemas_env.split(",") if s.strip())
55    return ()  # Empty tuple means all schemas

Get schemas from environment variable or return empty tuple to include all schemas. Set SNOWFLAKE_SCHEMAS environment variable as comma-separated list. Example: SNOWFLAKE_SCHEMAS="PUBLIC,INFORMATION_SCHEMA,ANALYTICS"

def init_comparison() -> {}:
58def init_comparison() -> {}:
59    comparison = {}
60    for value in DbObjectType.list():
61        comparison[value] = {}
62    return comparison
def get_db_objects( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine, meta_engine: schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine, schemas: tuple = None, version: str = '0.1.0', environment: str = 'dev') -> schema_sentinel.metadata_manager.model.metadata_container.MetaData:
 65def get_db_objects(
 66    database_name: str,
 67    engine: SfAlchemyEngine,
 68    meta_engine: SqLiteAqlAlchemyEngine,
 69    schemas: tuple = None,
 70    version: str = "0.1.0",
 71    environment: str = "dev",
 72) -> MetaData:
 73    if schemas is None:
 74        schemas = get_default_schemas()
 75    session = sessionmaker(bind=meta_engine)()
 76    database_df = get_database(database_name=database_name, engine=engine)
 77
 78    db = Database(
 79        version=version,
 80        environment=environment,
 81        database_name=database_name,
 82        database_owner=database_df.database_owner,
 83        is_transient=database_df.is_transient,
 84        comment=database_df.comment,
 85        created=db_timestamp_to_string(database_df.created),
 86        last_altered=db_timestamp_to_string(database_df.last_altered),
 87        retention_time=database_df.retention_time,
 88    )
 89    # TODO: add version increment
 90    db.database_id = db.__get_id__()
 91    db.save(session=session)
 92    log.info(f"Schemas to include are {schemas}")
 93    db_objects: MetaData = MetaData(
 94        database_object=db, schemas_to_include=schemas, db_timestamp_to_string=db_timestamp_to_string
 95    )
 96
 97    db_objects.save_schemas(schemas_df=get_schemas(database=db, engine=engine), session=session)
 98
 99    db_objects.constraints = get_constraints(database_name=database_name, engine=engine)
100    db_objects.tables = get_tables(database_name=database_name, engine=engine)
101    db_objects.columns = get_columns(database_name=database_name, engine=engine)
102    db_objects.views = get_views(database_name=database_name, engine=engine)
103    db_objects.procedures = get_procedures(database_name=database_name, engine=engine)
104    db_objects.functions = get_functions(database_name=database_name, engine=engine)
105    db_objects.streams = get_streams(database_name=database_name, engine=engine)
106    db_objects.tasks = get_tasks(database_name=database_name, engine=engine)
107    db_objects.pipes = get_pipes(database_name=database_name, engine=engine)
108    db_objects.table_constraints = get_table_constraints(database_name=database_name, engine=engine)
109    db_objects.referential_constraints = get_referential_constraints(database_name=database_name, engine=engine)
110
111    db_objects.column_constraints = get_imported_keys(database_name=database_name, engine=engine)
112    db_objects.stages = get_stages(database_name=database_name, engine=engine)
113
114    db_objects.save(session=session)
115    return db_objects
def db_timestamp_to_string(db_timestamp) -> str:
118def db_timestamp_to_string(db_timestamp) -> str:
119    if not db_timestamp:
120        return ""
121    return str(db_timestamp)
def save_metadata( engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine, environment: str, version: str, meta_engine: schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine, database_name: str, schemas: tuple = None) -> schema_sentinel.metadata_manager.model.metadata_container.MetaData:
124def save_metadata(
125    engine: SfAlchemyEngine,
126    environment: str,
127    version: str,
128    meta_engine: SqLiteAqlAlchemyEngine,
129    database_name: str,
130    schemas: tuple = None,
131) -> MetaData:
132    if schemas is None:
133        schemas = get_default_schemas()
134    metadata = get_db_objects(
135        database_name=database_name,
136        engine=engine,
137        meta_engine=meta_engine,
138        schemas=schemas,
139        version=version,
140        environment=environment,
141    )
142    return metadata
def compare_db_objects( source_db: schema_sentinel.metadata_manager.model.metadata_container.MetaData, source_env: str, target_db: schema_sentinel.metadata_manager.model.metadata_container.MetaData, target_env: str, engine: schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine, schemas: list = None):
145def compare_db_objects(
146    source_db: MetaData,
147    source_env: str,
148    target_db: MetaData,
149    target_env: str,
150    engine: SqLiteAqlAlchemyEngine,
151    schemas: list = None,
152):
153    if schemas is None:
154        schemas = get_default_schemas()
155    Session = sessionmaker(bind=engine)
156    session = Session()
157    database: Database = source_db.database_object
158    database.database_id = f"{database.database_name} ({source_env} vs {target_env} comparison)"
159    database.environment = f"{source_env}->{target_env}"
160    database.version = "comparison"
161    database.save(session)
162
163    diffs: MetaData = MetaData(
164        database_object=database, schemas_to_include=schemas, db_timetasmp_to_string=db_timestamp_to_string
165    )
166
167    diffs.tables = diff_df(source_db.tables, target_db.tables)
168    diffs.columns = diff_df(source_db.columns, target_db.columns)
169    diffs.views = diff_df(source_db.views, target_db.views)
170    diffs.streams = diff_df(source_db.streams, target_db.streams)
171    # diffs.pipes = diff_df(source_db.pipes, target_db.pipes)
172    diffs.tasks = diff_df(source_db.tasks, target_db.tasks)
173    diffs.procedures = diff_df(source_db.procedures, target_db.procedures)
174    diffs.functions = diff_df(source_db.functions, target_db.functions)
175    diffs.table_constraints = diff_df(source_db.table_constraints, target_db.table_constraints)
176    diffs.referential_constraints = diff_df(source_db.referential_constraints, target_db.referential_constraints)
177    diffs.stages = diff_df(source_db.stages, target_db.stages)
178    diffs.column_constraints = diff_df(source_db.column_constraints, target_db.column_constraints)
179    diffs.constraints = diff_df(source_db.constraints, target_db.constraints)
180
181    diffs.save(session=session)
182    log.info(
183        f"Database {source_db.database_object.database_name} comparison report [{source_env}] to [{target_env}]:\n {diffs}"
184    )
185    return diffs
def diff_df(df1, df2, how='left'):
188def diff_df(df1, df2, how="left"):
189    """
190    Find Difference of rows for given two dataframes
191    this function is not symmetric, means
192          diff(x, y) != diff(y, x)
193    however
194          diff(x, y, how='left') == diff(y, x, how='right')
195
196    Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800
197    """
198    if (df1.columns != df2.columns).any():
199        raise ValueError("Two dataframe columns must match")
200
201    if df1.equals(df2):
202        names = list(df1.columns)
203        return pd.DataFrame(columns=names)
204    elif how == "right":
205        return pd.concat([df2, df1, df1]).drop_duplicates(keep=False)
206    elif how == "left":
207        return pd.concat([df1, df2, df2]).drop_duplicates(keep=False)
208    else:
209        raise ValueError('how parameter supports only "left" or "right keywords"')

Find Difference of rows for given two dataframes this function is not symmetric, means diff(x, y) != diff(y, x) however diff(x, y, how='left') == diff(y, x, how='right')

Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800

212def compare(src_database: Database, trg_database: Database, session):
213    comparison: {} = init_comparison()
214    db_key = f"{src_database.database_name}"
215    if src_database.environment != trg_database.environment and src_database.version == trg_database.version:
216        db_key = f"{db_key}:{src_database.environment}->{trg_database.environment}"
217
218    if src_database.version != trg_database.version and src_database.environment == trg_database.environment:
219        db_key = f"{db_key}:{src_database.version}->{trg_database.version}"
220
221    comparison[DbObjectType.DATABASE.value][db_key] = compare_obj(src_database, trg_database)
222
223    src_schemas = session.query(Schema).filter(Schema.database_id == src_database.database_id).all()
224    for src_schema in src_schemas:
225        schema_key = f"{src_schema.schema_name} [{db_key}]"
226        trg_schema = (
227            session.query(Schema)
228            .filter(and_(Schema.database_id == trg_database.database_id, Schema.schema_name == src_schema.schema_name))
229            .first()
230        )
231        if trg_schema:
232            comparison[DbObjectType.SCHEMA.value][schema_key] = compare_obj(src_schema, trg_schema)
233        else:
234            comparison[DbObjectType.SCHEMA.value][schema_key] = {"left": src_schema.__class__.__name__, "right": None}
235            continue
236
237        src_tables = session.query(Table).filter(Table.schema_id == src_schema.schema_id).all()
238        for src_table in src_tables:
239            table_key = f"{src_schema.schema_name}.{src_table.table_name} [{db_key}]"
240            trg_table = (
241                session.query(Table)
242                .filter(and_(Table.schema_id == trg_schema.schema_id, Table.table_name == src_table.table_name))
243                .first()
244            )
245            if trg_table:
246                comp = compare_obj(src_table, trg_table)
247                if comp:
248                    comparison[DbObjectType.TABLE.value][table_key] = comp
249            else:
250                comparison[DbObjectType.TABLE.value][table_key] = {"left": src_table.__class__.__name__, "right": None}
251                continue
252
253            src_columns = session.query(Column).filter(Column.table_id == src_table.table_id).all()
254            for src_column in src_columns:
255                column_key = f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name} [{db_key}]"
256                trg_column = (
257                    session.query(Column)
258                    .filter(and_(Column.table_id == trg_table.table_id, Column.column_name == src_column.column_name))
259                    .first()
260                )
261                if trg_column:
262                    comparison[DbObjectType.COLUMN.value][column_key] = compare_obj(src_column, trg_column)
263                else:
264                    comparison[DbObjectType.COLUMN.value][column_key] = {
265                        "left": src_column.__class__.__name__,
266                        "right": None,
267                    }
268                    continue
269
270                src_column_constraints = (
271                    session.query(ColumnConstraint).filter(ColumnConstraint.pk_column_id == src_column.column_id).all()
272                )
273                for src_column_constraint in src_column_constraints:
274                    id = json.loads(src_column_constraint.column_constraint_id)
275                    id["pk_column_id"]["version"] = trg_database.version
276                    id["fk_column_id"]["version"] = trg_database.version
277                    id["pk_column_id"]["environment"] = trg_database.environment
278                    id["fk_column_id"]["environment"] = trg_database.environment
279                    trg_column_constraint_id = json.dumps(id)
280
281                    constraint_name = src_column_constraint.fk_name
282                    column_constraint_key = (
283                        f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name}"
284                        f".{constraint_name} [{db_key}]"
285                    )
286                    trg_column_constraint = (
287                        session.query(ColumnConstraint)
288                        .filter(ColumnConstraint.column_constraint_id == trg_column_constraint_id)
289                        .first()
290                    )
291                    if trg_column_constraint:
292                        comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = compare_obj(
293                            src_column_constraint, trg_column_constraint
294                        )
295                    else:
296                        comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = {
297                            "left": src_column_constraint.__class__.__name__,
298                            "right": None,
299                        }
300                        continue
301
302            src_table_constraints = (
303                session.query(TableConstraint).filter(TableConstraint.table_id == src_table.table_id).all()
304            )
305            for src_table_constraint in src_table_constraints:
306                table_constraint_key = (
307                    f"{src_table_constraint.table_constraint_name}.{src_schema.schema_name}"
308                    f".{src_table.table_name} [{db_key}]"
309                )
310                trg_table_constraint = (
311                    session.query(TableConstraint)
312                    .filter(
313                        and_(
314                            TableConstraint.table_id == trg_table.table_id,
315                            TableConstraint.table_constraint_name == src_table_constraint.table_constraint_name,
316                        )
317                    )
318                    .first()
319                )
320                if trg_table_constraint:
321                    comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = compare_obj(
322                        src_table_constraint, trg_table_constraint
323                    )
324                else:
325                    comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = {
326                        "left": src_table_constraint.__class__.__name__,
327                        "right": None,
328                    }
329                    continue
330
331            src_constraints = session.query(Constraint).filter(Constraint.table_id == src_table.table_id).all()
332            for src_constraint in src_constraints:
333                constraint_key = (
334                    f"{src_schema.schema_name}.{src_table.table_name}.{src_constraint.constraint_name} [{db_key}]"
335                )
336                trg_constraint = (
337                    session.query(Constraint)
338                    .filter(
339                        and_(
340                            Constraint.table_id
341                            == (
342                                trg_table.table_id.replace(
343                                    f'"version": {trg_database.version}', f'"version": {src_database.version}'
344                                )
345                            ).replace(
346                                f'"environment": {trg_database.environment}',
347                                f'"environment": {src_database.environment}',
348                            ),
349                            Constraint.constraint_name == src_constraint.constraint_name,
350                        )
351                    )
352                    .first()
353                )
354                if trg_constraint:
355                    comparison[DbObjectType.CONSTRAINT.value][constraint_key] = compare_obj(
356                        src_constraint, trg_constraint
357                    )
358                else:
359                    comparison[DbObjectType.CONSTRAINT.value][constraint_key] = {
360                        "left": src_constraint.__class__.__name__,
361                        "right": None,
362                    }
363                    continue
364
365        src_views = session.query(View).filter(View.schema_id == src_schema.schema_id).all()
366        for src_view in src_views:
367            view_key = f"{src_schema.schema_name}.{src_view.view_name} [{db_key}]"
368            trg_view = (
369                session.query(View)
370                .filter(and_(View.schema_id == trg_schema.schema_id, View.view_name == src_view.view_name))
371                .first()
372            )
373            if trg_view:
374                comparison[DbObjectType.VIEW.value][view_key] = compare_obj(src_view, trg_view)
375            else:
376                comparison[DbObjectType.VIEW.value][view_key] = {"left": src_view.__class__.__name__, "right": None}
377                continue
378
379        src_functions = session.query(Function).filter(Function.schema_id == src_schema.schema_id).all()
380        for src_function in src_functions:
381            function_key = f"{src_schema.schema_name}.{src_function.function_name} [{db_key}]"
382            trg_function = (
383                session.query(Function)
384                .filter(
385                    and_(
386                        Function.schema_id == trg_schema.schema_id,
387                        Function.function_name == src_function.function_name,
388                        Function.argument_signature == src_function.argument_signature,
389                    )
390                )
391                .first()
392            )
393            if trg_function:
394                comparison[DbObjectType.FUNCTION.value][function_key] = compare_obj(src_function, trg_function)
395            else:
396                comparison[DbObjectType.FUNCTION.value][function_key] = {
397                    "left": src_function.__class__.__name__,
398                    "right": None,
399                }
400                continue
401
402        src_procedures = session.query(Procedure).filter(Procedure.schema_id == src_schema.schema_id).all()
403        for src_procedure in src_procedures:
404            procedure_key = f"{src_schema.schema_name}.{src_procedure.procedure_name} [{db_key}]"
405            trg_procedure = (
406                session.query(Procedure)
407                .filter(
408                    and_(
409                        Procedure.schema_id == trg_schema.schema_id,
410                        Procedure.procedure_name == src_procedure.procedure_name,
411                        Procedure.argument_signature == src_procedure.argument_signature,
412                    )
413                )
414                .first()
415            )
416            if trg_procedure:
417                comparison[DbObjectType.PROCEDURE.value][procedure_key] = compare_obj(src_procedure, trg_procedure)
418            else:
419                comparison[DbObjectType.PROCEDURE.value][procedure_key] = {
420                    "left": src_procedure.__class__.__name__,
421                    "right": None,
422                }
423                continue
424
425        # src_stages = session.query(Stage).filter(Stage.schema_id == src_schema.schema_id).all()
426        # for src_stage in src_stages:
427        #     stage_key = f"{src_schema.schema_name}.{src_stage.stage_name} [{db_key}]"
428        #     trg_stage = session.query(Stage).filter(and_(
429        #         Stage.schema_id == trg_schema.schema_id,
430        #         Stage.stage_name == src_stage.stage_name
431        #     )).first()
432        #     if trg_stage:
433        #         comparison[DbObjectType.STAGE.value][stage_key] = compare_obj(src_stage, trg_stage)
434        #     else:
435        #         comparison[DbObjectType.STAGE.value][stage_key] = {"left": src_stage.__class__.__name__,
436        #                                                            "right": None}
437        #         continue
438
439        src_streams = session.query(Stream).filter(Stream.schema_id == src_schema.schema_id).all()
440        for src_stream in src_streams:
441            stream_key = f"{schema_key}.{src_stream.stream_name}"
442            trg_stream = (
443                session.query(Stream)
444                .filter(and_(Stream.schema_id == trg_schema.schema_id, Stream.stream_name == src_stream.stream_name))
445                .first()
446            )
447            if trg_stream:
448                comparison[DbObjectType.STREAM.value][stream_key] = compare_obj(src_stream, trg_stream)
449            else:
450                comparison[DbObjectType.STREAM.value][stream_key] = {
451                    "left": src_stream.__class__.__name__,
452                    "right": None,
453                }
454                continue
455
456        # src_pipes = session.query(Pipe).filter(Pipe.schema_id == src_schema.schema_id).all()
457        # for src_pipe in src_pipes:
458        #     pipe_key = f"{src_schema.schema_name}.{src_pipe.pipe_name} [{db_key}]"
459        #     trg_pipe = session.query(Pipe).filter(and_(
460        #         Pipe.schema_id == trg_schema.schema_id,
461        #         Pipe.pipe_name == src_pipe.pipe_name
462        #     )).first()
463        #     if trg_pipe:
464        #         comparison[DbObjectType.PIPE.value][pipe_key] = compare_obj(src_pipe, trg_pipe)
465        #     else:
466        #         comparison[DbObjectType.PIPE.value][pipe_key] = {"left": src_pipe.__class__.__name__, "right": None}
467        #         continue
468
469        src_tasks = session.query(Task).filter(Task.schema_id == src_schema.schema_id).all()
470        for src_task in src_tasks:
471            task_key = f"{src_schema.schema_name}.{src_task.task_name} [{db_key}]"
472            trg_task = (
473                session.query(Task)
474                .filter(and_(Task.schema_id == trg_schema.schema_id, Task.task_name == src_task.task_name))
475                .first()
476            )
477            if trg_task:
478                comparison[DbObjectType.TASK.value][task_key] = compare_obj(src_task, trg_task)
479            else:
480                comparison[DbObjectType.TASK.value][task_key] = {"left": src_task.__class__.__name__, "right": None}
481                continue
482
483    return comparison