schema_sentinel.metadata_manager.metadata
1import json 2import logging as log 3import os 4 5import pandas as pd 6from sqlalchemy import and_ 7from sqlalchemy.orm import sessionmaker 8 9from schema_sentinel.metadata_manager.model import compare_obj 10from schema_sentinel.metadata_manager.model.column import Column 11from schema_sentinel.metadata_manager.model.column_constraint import ColumnConstraint 12from schema_sentinel.metadata_manager.model.constraint import Constraint 13from schema_sentinel.metadata_manager.model.database import Database 14from schema_sentinel.metadata_manager.model.function import Function 15from schema_sentinel.metadata_manager.model.procedure import Procedure 16from schema_sentinel.metadata_manager.model.schema import Schema 17from schema_sentinel.metadata_manager.model.stream import Stream 18from schema_sentinel.metadata_manager.model.table import Table 19from schema_sentinel.metadata_manager.model.table_constraint import TableConstraint 20from schema_sentinel.metadata_manager.model.task import Task 21from schema_sentinel.metadata_manager.model.view import View 22 23from .engine import SfAlchemyEngine, SqLiteAqlAlchemyEngine 24from .enums import DbObjectType 25from .model.metadata_container import MetaData 26from .model.metadata_utils import ( 27 get_columns, 28 get_constraints, 29 get_database, 30 get_functions, 31 get_imported_keys, 32 get_pipes, 33 get_procedures, 34 get_referential_constraints, 35 get_schemas, 36 get_stages, 37 get_streams, 38 get_table_constraints, 39 get_tables, 40 get_tasks, 41 get_views, 42) 43 44 45def get_default_schemas() -> tuple: 46 """ 47 Get schemas from environment variable or return empty tuple to include all schemas. 48 Set SNOWFLAKE_SCHEMAS environment variable as comma-separated list. 49 Example: SNOWFLAKE_SCHEMAS="PUBLIC,INFORMATION_SCHEMA,ANALYTICS" 50 """ 51 schemas_env = os.getenv("SNOWFLAKE_SCHEMAS", "") 52 if schemas_env: 53 return tuple(s.strip() for s in schemas_env.split(",") if s.strip()) 54 return () # Empty tuple means all schemas 55 56 57def init_comparison() -> {}: 58 comparison = {} 59 for value in DbObjectType.list(): 60 comparison[value] = {} 61 return comparison 62 63 64def get_db_objects( 65 database_name: str, 66 engine: SfAlchemyEngine, 67 meta_engine: SqLiteAqlAlchemyEngine, 68 schemas: tuple = None, 69 version: str = "0.1.0", 70 environment: str = "dev", 71) -> MetaData: 72 if schemas is None: 73 schemas = get_default_schemas() 74 session = sessionmaker(bind=meta_engine)() 75 database_df = get_database(database_name=database_name, engine=engine) 76 77 db = Database( 78 version=version, 79 environment=environment, 80 database_name=database_name, 81 database_owner=database_df.database_owner, 82 is_transient=database_df.is_transient, 83 comment=database_df.comment, 84 created=db_timestamp_to_string(database_df.created), 85 last_altered=db_timestamp_to_string(database_df.last_altered), 86 retention_time=database_df.retention_time, 87 ) 88 # TODO: add version increment 89 db.database_id = db.__get_id__() 90 db.save(session=session) 91 log.info(f"Schemas to include are {schemas}") 92 db_objects: MetaData = MetaData( 93 database_object=db, schemas_to_include=schemas, db_timestamp_to_string=db_timestamp_to_string 94 ) 95 96 db_objects.save_schemas(schemas_df=get_schemas(database=db, engine=engine), session=session) 97 98 db_objects.constraints = get_constraints(database_name=database_name, engine=engine) 99 db_objects.tables = get_tables(database_name=database_name, engine=engine) 100 db_objects.columns = get_columns(database_name=database_name, engine=engine) 101 db_objects.views = get_views(database_name=database_name, engine=engine) 102 db_objects.procedures = get_procedures(database_name=database_name, engine=engine) 103 db_objects.functions = get_functions(database_name=database_name, engine=engine) 104 db_objects.streams = get_streams(database_name=database_name, engine=engine) 105 db_objects.tasks = get_tasks(database_name=database_name, engine=engine) 106 db_objects.pipes = get_pipes(database_name=database_name, engine=engine) 107 db_objects.table_constraints = get_table_constraints(database_name=database_name, engine=engine) 108 db_objects.referential_constraints = get_referential_constraints(database_name=database_name, engine=engine) 109 110 db_objects.column_constraints = get_imported_keys(database_name=database_name, engine=engine) 111 db_objects.stages = get_stages(database_name=database_name, engine=engine) 112 113 db_objects.save(session=session) 114 return db_objects 115 116 117def db_timestamp_to_string(db_timestamp) -> str: 118 if not db_timestamp: 119 return "" 120 return str(db_timestamp) 121 122 123def save_metadata( 124 engine: SfAlchemyEngine, 125 environment: str, 126 version: str, 127 meta_engine: SqLiteAqlAlchemyEngine, 128 database_name: str, 129 schemas: tuple = None, 130) -> MetaData: 131 if schemas is None: 132 schemas = get_default_schemas() 133 metadata = get_db_objects( 134 database_name=database_name, 135 engine=engine, 136 meta_engine=meta_engine, 137 schemas=schemas, 138 version=version, 139 environment=environment, 140 ) 141 return metadata 142 143 144def compare_db_objects( 145 source_db: MetaData, 146 source_env: str, 147 target_db: MetaData, 148 target_env: str, 149 engine: SqLiteAqlAlchemyEngine, 150 schemas: list = None, 151): 152 if schemas is None: 153 schemas = get_default_schemas() 154 Session = sessionmaker(bind=engine) 155 session = Session() 156 database: Database = source_db.database_object 157 database.database_id = f"{database.database_name} ({source_env} vs {target_env} comparison)" 158 database.environment = f"{source_env}->{target_env}" 159 database.version = "comparison" 160 database.save(session) 161 162 diffs: MetaData = MetaData( 163 database_object=database, schemas_to_include=schemas, db_timetasmp_to_string=db_timestamp_to_string 164 ) 165 166 diffs.tables = diff_df(source_db.tables, target_db.tables) 167 diffs.columns = diff_df(source_db.columns, target_db.columns) 168 diffs.views = diff_df(source_db.views, target_db.views) 169 diffs.streams = diff_df(source_db.streams, target_db.streams) 170 # diffs.pipes = diff_df(source_db.pipes, target_db.pipes) 171 diffs.tasks = diff_df(source_db.tasks, target_db.tasks) 172 diffs.procedures = diff_df(source_db.procedures, target_db.procedures) 173 diffs.functions = diff_df(source_db.functions, target_db.functions) 174 diffs.table_constraints = diff_df(source_db.table_constraints, target_db.table_constraints) 175 diffs.referential_constraints = diff_df(source_db.referential_constraints, target_db.referential_constraints) 176 diffs.stages = diff_df(source_db.stages, target_db.stages) 177 diffs.column_constraints = diff_df(source_db.column_constraints, target_db.column_constraints) 178 diffs.constraints = diff_df(source_db.constraints, target_db.constraints) 179 180 diffs.save(session=session) 181 log.info( 182 f"Database {source_db.database_object.database_name} comparison report [{source_env}] to [{target_env}]:\n {diffs}" 183 ) 184 return diffs 185 186 187def diff_df(df1, df2, how="left"): 188 """ 189 Find Difference of rows for given two dataframes 190 this function is not symmetric, means 191 diff(x, y) != diff(y, x) 192 however 193 diff(x, y, how='left') == diff(y, x, how='right') 194 195 Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800 196 """ 197 if (df1.columns != df2.columns).any(): 198 raise ValueError("Two dataframe columns must match") 199 200 if df1.equals(df2): 201 names = list(df1.columns) 202 return pd.DataFrame(columns=names) 203 elif how == "right": 204 return pd.concat([df2, df1, df1]).drop_duplicates(keep=False) 205 elif how == "left": 206 return pd.concat([df1, df2, df2]).drop_duplicates(keep=False) 207 else: 208 raise ValueError('how parameter supports only "left" or "right keywords"') 209 210 211def compare(src_database: Database, trg_database: Database, session): 212 comparison: {} = init_comparison() 213 db_key = f"{src_database.database_name}" 214 if src_database.environment != trg_database.environment and src_database.version == trg_database.version: 215 db_key = f"{db_key}:{src_database.environment}->{trg_database.environment}" 216 217 if src_database.version != trg_database.version and src_database.environment == trg_database.environment: 218 db_key = f"{db_key}:{src_database.version}->{trg_database.version}" 219 220 comparison[DbObjectType.DATABASE.value][db_key] = compare_obj(src_database, trg_database) 221 222 src_schemas = session.query(Schema).filter(Schema.database_id == src_database.database_id).all() 223 for src_schema in src_schemas: 224 schema_key = f"{src_schema.schema_name} [{db_key}]" 225 trg_schema = ( 226 session.query(Schema) 227 .filter(and_(Schema.database_id == trg_database.database_id, Schema.schema_name == src_schema.schema_name)) 228 .first() 229 ) 230 if trg_schema: 231 comparison[DbObjectType.SCHEMA.value][schema_key] = compare_obj(src_schema, trg_schema) 232 else: 233 comparison[DbObjectType.SCHEMA.value][schema_key] = {"left": src_schema.__class__.__name__, "right": None} 234 continue 235 236 src_tables = session.query(Table).filter(Table.schema_id == src_schema.schema_id).all() 237 for src_table in src_tables: 238 table_key = f"{src_schema.schema_name}.{src_table.table_name} [{db_key}]" 239 trg_table = ( 240 session.query(Table) 241 .filter(and_(Table.schema_id == trg_schema.schema_id, Table.table_name == src_table.table_name)) 242 .first() 243 ) 244 if trg_table: 245 comp = compare_obj(src_table, trg_table) 246 if comp: 247 comparison[DbObjectType.TABLE.value][table_key] = comp 248 else: 249 comparison[DbObjectType.TABLE.value][table_key] = {"left": src_table.__class__.__name__, "right": None} 250 continue 251 252 src_columns = session.query(Column).filter(Column.table_id == src_table.table_id).all() 253 for src_column in src_columns: 254 column_key = f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name} [{db_key}]" 255 trg_column = ( 256 session.query(Column) 257 .filter(and_(Column.table_id == trg_table.table_id, Column.column_name == src_column.column_name)) 258 .first() 259 ) 260 if trg_column: 261 comparison[DbObjectType.COLUMN.value][column_key] = compare_obj(src_column, trg_column) 262 else: 263 comparison[DbObjectType.COLUMN.value][column_key] = { 264 "left": src_column.__class__.__name__, 265 "right": None, 266 } 267 continue 268 269 src_column_constraints = ( 270 session.query(ColumnConstraint).filter(ColumnConstraint.pk_column_id == src_column.column_id).all() 271 ) 272 for src_column_constraint in src_column_constraints: 273 id = json.loads(src_column_constraint.column_constraint_id) 274 id["pk_column_id"]["version"] = trg_database.version 275 id["fk_column_id"]["version"] = trg_database.version 276 id["pk_column_id"]["environment"] = trg_database.environment 277 id["fk_column_id"]["environment"] = trg_database.environment 278 trg_column_constraint_id = json.dumps(id) 279 280 constraint_name = src_column_constraint.fk_name 281 column_constraint_key = ( 282 f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name}" 283 f".{constraint_name} [{db_key}]" 284 ) 285 trg_column_constraint = ( 286 session.query(ColumnConstraint) 287 .filter(ColumnConstraint.column_constraint_id == trg_column_constraint_id) 288 .first() 289 ) 290 if trg_column_constraint: 291 comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = compare_obj( 292 src_column_constraint, trg_column_constraint 293 ) 294 else: 295 comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = { 296 "left": src_column_constraint.__class__.__name__, 297 "right": None, 298 } 299 continue 300 301 src_table_constraints = ( 302 session.query(TableConstraint).filter(TableConstraint.table_id == src_table.table_id).all() 303 ) 304 for src_table_constraint in src_table_constraints: 305 table_constraint_key = ( 306 f"{src_table_constraint.table_constraint_name}.{src_schema.schema_name}" 307 f".{src_table.table_name} [{db_key}]" 308 ) 309 trg_table_constraint = ( 310 session.query(TableConstraint) 311 .filter( 312 and_( 313 TableConstraint.table_id == trg_table.table_id, 314 TableConstraint.table_constraint_name == src_table_constraint.table_constraint_name, 315 ) 316 ) 317 .first() 318 ) 319 if trg_table_constraint: 320 comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = compare_obj( 321 src_table_constraint, trg_table_constraint 322 ) 323 else: 324 comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = { 325 "left": src_table_constraint.__class__.__name__, 326 "right": None, 327 } 328 continue 329 330 src_constraints = session.query(Constraint).filter(Constraint.table_id == src_table.table_id).all() 331 for src_constraint in src_constraints: 332 constraint_key = ( 333 f"{src_schema.schema_name}.{src_table.table_name}.{src_constraint.constraint_name} [{db_key}]" 334 ) 335 trg_constraint = ( 336 session.query(Constraint) 337 .filter( 338 and_( 339 Constraint.table_id 340 == ( 341 trg_table.table_id.replace( 342 f'"version": {trg_database.version}', f'"version": {src_database.version}' 343 ) 344 ).replace( 345 f'"environment": {trg_database.environment}', 346 f'"environment": {src_database.environment}', 347 ), 348 Constraint.constraint_name == src_constraint.constraint_name, 349 ) 350 ) 351 .first() 352 ) 353 if trg_constraint: 354 comparison[DbObjectType.CONSTRAINT.value][constraint_key] = compare_obj( 355 src_constraint, trg_constraint 356 ) 357 else: 358 comparison[DbObjectType.CONSTRAINT.value][constraint_key] = { 359 "left": src_constraint.__class__.__name__, 360 "right": None, 361 } 362 continue 363 364 src_views = session.query(View).filter(View.schema_id == src_schema.schema_id).all() 365 for src_view in src_views: 366 view_key = f"{src_schema.schema_name}.{src_view.view_name} [{db_key}]" 367 trg_view = ( 368 session.query(View) 369 .filter(and_(View.schema_id == trg_schema.schema_id, View.view_name == src_view.view_name)) 370 .first() 371 ) 372 if trg_view: 373 comparison[DbObjectType.VIEW.value][view_key] = compare_obj(src_view, trg_view) 374 else: 375 comparison[DbObjectType.VIEW.value][view_key] = {"left": src_view.__class__.__name__, "right": None} 376 continue 377 378 src_functions = session.query(Function).filter(Function.schema_id == src_schema.schema_id).all() 379 for src_function in src_functions: 380 function_key = f"{src_schema.schema_name}.{src_function.function_name} [{db_key}]" 381 trg_function = ( 382 session.query(Function) 383 .filter( 384 and_( 385 Function.schema_id == trg_schema.schema_id, 386 Function.function_name == src_function.function_name, 387 Function.argument_signature == src_function.argument_signature, 388 ) 389 ) 390 .first() 391 ) 392 if trg_function: 393 comparison[DbObjectType.FUNCTION.value][function_key] = compare_obj(src_function, trg_function) 394 else: 395 comparison[DbObjectType.FUNCTION.value][function_key] = { 396 "left": src_function.__class__.__name__, 397 "right": None, 398 } 399 continue 400 401 src_procedures = session.query(Procedure).filter(Procedure.schema_id == src_schema.schema_id).all() 402 for src_procedure in src_procedures: 403 procedure_key = f"{src_schema.schema_name}.{src_procedure.procedure_name} [{db_key}]" 404 trg_procedure = ( 405 session.query(Procedure) 406 .filter( 407 and_( 408 Procedure.schema_id == trg_schema.schema_id, 409 Procedure.procedure_name == src_procedure.procedure_name, 410 Procedure.argument_signature == src_procedure.argument_signature, 411 ) 412 ) 413 .first() 414 ) 415 if trg_procedure: 416 comparison[DbObjectType.PROCEDURE.value][procedure_key] = compare_obj(src_procedure, trg_procedure) 417 else: 418 comparison[DbObjectType.PROCEDURE.value][procedure_key] = { 419 "left": src_procedure.__class__.__name__, 420 "right": None, 421 } 422 continue 423 424 # src_stages = session.query(Stage).filter(Stage.schema_id == src_schema.schema_id).all() 425 # for src_stage in src_stages: 426 # stage_key = f"{src_schema.schema_name}.{src_stage.stage_name} [{db_key}]" 427 # trg_stage = session.query(Stage).filter(and_( 428 # Stage.schema_id == trg_schema.schema_id, 429 # Stage.stage_name == src_stage.stage_name 430 # )).first() 431 # if trg_stage: 432 # comparison[DbObjectType.STAGE.value][stage_key] = compare_obj(src_stage, trg_stage) 433 # else: 434 # comparison[DbObjectType.STAGE.value][stage_key] = {"left": src_stage.__class__.__name__, 435 # "right": None} 436 # continue 437 438 src_streams = session.query(Stream).filter(Stream.schema_id == src_schema.schema_id).all() 439 for src_stream in src_streams: 440 stream_key = f"{schema_key}.{src_stream.stream_name}" 441 trg_stream = ( 442 session.query(Stream) 443 .filter(and_(Stream.schema_id == trg_schema.schema_id, Stream.stream_name == src_stream.stream_name)) 444 .first() 445 ) 446 if trg_stream: 447 comparison[DbObjectType.STREAM.value][stream_key] = compare_obj(src_stream, trg_stream) 448 else: 449 comparison[DbObjectType.STREAM.value][stream_key] = { 450 "left": src_stream.__class__.__name__, 451 "right": None, 452 } 453 continue 454 455 # src_pipes = session.query(Pipe).filter(Pipe.schema_id == src_schema.schema_id).all() 456 # for src_pipe in src_pipes: 457 # pipe_key = f"{src_schema.schema_name}.{src_pipe.pipe_name} [{db_key}]" 458 # trg_pipe = session.query(Pipe).filter(and_( 459 # Pipe.schema_id == trg_schema.schema_id, 460 # Pipe.pipe_name == src_pipe.pipe_name 461 # )).first() 462 # if trg_pipe: 463 # comparison[DbObjectType.PIPE.value][pipe_key] = compare_obj(src_pipe, trg_pipe) 464 # else: 465 # comparison[DbObjectType.PIPE.value][pipe_key] = {"left": src_pipe.__class__.__name__, "right": None} 466 # continue 467 468 src_tasks = session.query(Task).filter(Task.schema_id == src_schema.schema_id).all() 469 for src_task in src_tasks: 470 task_key = f"{src_schema.schema_name}.{src_task.task_name} [{db_key}]" 471 trg_task = ( 472 session.query(Task) 473 .filter(and_(Task.schema_id == trg_schema.schema_id, Task.task_name == src_task.task_name)) 474 .first() 475 ) 476 if trg_task: 477 comparison[DbObjectType.TASK.value][task_key] = compare_obj(src_task, trg_task) 478 else: 479 comparison[DbObjectType.TASK.value][task_key] = {"left": src_task.__class__.__name__, "right": None} 480 continue 481 482 return comparison
def
get_default_schemas() -> tuple:
46def get_default_schemas() -> tuple: 47 """ 48 Get schemas from environment variable or return empty tuple to include all schemas. 49 Set SNOWFLAKE_SCHEMAS environment variable as comma-separated list. 50 Example: SNOWFLAKE_SCHEMAS="PUBLIC,INFORMATION_SCHEMA,ANALYTICS" 51 """ 52 schemas_env = os.getenv("SNOWFLAKE_SCHEMAS", "") 53 if schemas_env: 54 return tuple(s.strip() for s in schemas_env.split(",") if s.strip()) 55 return () # Empty tuple means all schemas
Get schemas from environment variable or return empty tuple to include all schemas. Set SNOWFLAKE_SCHEMAS environment variable as comma-separated list. Example: SNOWFLAKE_SCHEMAS="PUBLIC,INFORMATION_SCHEMA,ANALYTICS"
def
init_comparison() -> {}:
def
get_db_objects( database_name: str, engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine, meta_engine: schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine, schemas: tuple = None, version: str = '0.1.0', environment: str = 'dev') -> schema_sentinel.metadata_manager.model.metadata_container.MetaData:
65def get_db_objects( 66 database_name: str, 67 engine: SfAlchemyEngine, 68 meta_engine: SqLiteAqlAlchemyEngine, 69 schemas: tuple = None, 70 version: str = "0.1.0", 71 environment: str = "dev", 72) -> MetaData: 73 if schemas is None: 74 schemas = get_default_schemas() 75 session = sessionmaker(bind=meta_engine)() 76 database_df = get_database(database_name=database_name, engine=engine) 77 78 db = Database( 79 version=version, 80 environment=environment, 81 database_name=database_name, 82 database_owner=database_df.database_owner, 83 is_transient=database_df.is_transient, 84 comment=database_df.comment, 85 created=db_timestamp_to_string(database_df.created), 86 last_altered=db_timestamp_to_string(database_df.last_altered), 87 retention_time=database_df.retention_time, 88 ) 89 # TODO: add version increment 90 db.database_id = db.__get_id__() 91 db.save(session=session) 92 log.info(f"Schemas to include are {schemas}") 93 db_objects: MetaData = MetaData( 94 database_object=db, schemas_to_include=schemas, db_timestamp_to_string=db_timestamp_to_string 95 ) 96 97 db_objects.save_schemas(schemas_df=get_schemas(database=db, engine=engine), session=session) 98 99 db_objects.constraints = get_constraints(database_name=database_name, engine=engine) 100 db_objects.tables = get_tables(database_name=database_name, engine=engine) 101 db_objects.columns = get_columns(database_name=database_name, engine=engine) 102 db_objects.views = get_views(database_name=database_name, engine=engine) 103 db_objects.procedures = get_procedures(database_name=database_name, engine=engine) 104 db_objects.functions = get_functions(database_name=database_name, engine=engine) 105 db_objects.streams = get_streams(database_name=database_name, engine=engine) 106 db_objects.tasks = get_tasks(database_name=database_name, engine=engine) 107 db_objects.pipes = get_pipes(database_name=database_name, engine=engine) 108 db_objects.table_constraints = get_table_constraints(database_name=database_name, engine=engine) 109 db_objects.referential_constraints = get_referential_constraints(database_name=database_name, engine=engine) 110 111 db_objects.column_constraints = get_imported_keys(database_name=database_name, engine=engine) 112 db_objects.stages = get_stages(database_name=database_name, engine=engine) 113 114 db_objects.save(session=session) 115 return db_objects
def
db_timestamp_to_string(db_timestamp) -> str:
def
save_metadata( engine: schema_sentinel.metadata_manager.engine.SfAlchemyEngine, environment: str, version: str, meta_engine: schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine, database_name: str, schemas: tuple = None) -> schema_sentinel.metadata_manager.model.metadata_container.MetaData:
124def save_metadata( 125 engine: SfAlchemyEngine, 126 environment: str, 127 version: str, 128 meta_engine: SqLiteAqlAlchemyEngine, 129 database_name: str, 130 schemas: tuple = None, 131) -> MetaData: 132 if schemas is None: 133 schemas = get_default_schemas() 134 metadata = get_db_objects( 135 database_name=database_name, 136 engine=engine, 137 meta_engine=meta_engine, 138 schemas=schemas, 139 version=version, 140 environment=environment, 141 ) 142 return metadata
def
compare_db_objects( source_db: schema_sentinel.metadata_manager.model.metadata_container.MetaData, source_env: str, target_db: schema_sentinel.metadata_manager.model.metadata_container.MetaData, target_env: str, engine: schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine, schemas: list = None):
145def compare_db_objects( 146 source_db: MetaData, 147 source_env: str, 148 target_db: MetaData, 149 target_env: str, 150 engine: SqLiteAqlAlchemyEngine, 151 schemas: list = None, 152): 153 if schemas is None: 154 schemas = get_default_schemas() 155 Session = sessionmaker(bind=engine) 156 session = Session() 157 database: Database = source_db.database_object 158 database.database_id = f"{database.database_name} ({source_env} vs {target_env} comparison)" 159 database.environment = f"{source_env}->{target_env}" 160 database.version = "comparison" 161 database.save(session) 162 163 diffs: MetaData = MetaData( 164 database_object=database, schemas_to_include=schemas, db_timetasmp_to_string=db_timestamp_to_string 165 ) 166 167 diffs.tables = diff_df(source_db.tables, target_db.tables) 168 diffs.columns = diff_df(source_db.columns, target_db.columns) 169 diffs.views = diff_df(source_db.views, target_db.views) 170 diffs.streams = diff_df(source_db.streams, target_db.streams) 171 # diffs.pipes = diff_df(source_db.pipes, target_db.pipes) 172 diffs.tasks = diff_df(source_db.tasks, target_db.tasks) 173 diffs.procedures = diff_df(source_db.procedures, target_db.procedures) 174 diffs.functions = diff_df(source_db.functions, target_db.functions) 175 diffs.table_constraints = diff_df(source_db.table_constraints, target_db.table_constraints) 176 diffs.referential_constraints = diff_df(source_db.referential_constraints, target_db.referential_constraints) 177 diffs.stages = diff_df(source_db.stages, target_db.stages) 178 diffs.column_constraints = diff_df(source_db.column_constraints, target_db.column_constraints) 179 diffs.constraints = diff_df(source_db.constraints, target_db.constraints) 180 181 diffs.save(session=session) 182 log.info( 183 f"Database {source_db.database_object.database_name} comparison report [{source_env}] to [{target_env}]:\n {diffs}" 184 ) 185 return diffs
def
diff_df(df1, df2, how='left'):
188def diff_df(df1, df2, how="left"): 189 """ 190 Find Difference of rows for given two dataframes 191 this function is not symmetric, means 192 diff(x, y) != diff(y, x) 193 however 194 diff(x, y, how='left') == diff(y, x, how='right') 195 196 Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800 197 """ 198 if (df1.columns != df2.columns).any(): 199 raise ValueError("Two dataframe columns must match") 200 201 if df1.equals(df2): 202 names = list(df1.columns) 203 return pd.DataFrame(columns=names) 204 elif how == "right": 205 return pd.concat([df2, df1, df1]).drop_duplicates(keep=False) 206 elif how == "left": 207 return pd.concat([df1, df2, df2]).drop_duplicates(keep=False) 208 else: 209 raise ValueError('how parameter supports only "left" or "right keywords"')
Find Difference of rows for given two dataframes this function is not symmetric, means diff(x, y) != diff(y, x) however diff(x, y, how='left') == diff(y, x, how='right')
Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800
def
compare( src_database: schema_sentinel.metadata_manager.model.database.Database, trg_database: schema_sentinel.metadata_manager.model.database.Database, session):
212def compare(src_database: Database, trg_database: Database, session): 213 comparison: {} = init_comparison() 214 db_key = f"{src_database.database_name}" 215 if src_database.environment != trg_database.environment and src_database.version == trg_database.version: 216 db_key = f"{db_key}:{src_database.environment}->{trg_database.environment}" 217 218 if src_database.version != trg_database.version and src_database.environment == trg_database.environment: 219 db_key = f"{db_key}:{src_database.version}->{trg_database.version}" 220 221 comparison[DbObjectType.DATABASE.value][db_key] = compare_obj(src_database, trg_database) 222 223 src_schemas = session.query(Schema).filter(Schema.database_id == src_database.database_id).all() 224 for src_schema in src_schemas: 225 schema_key = f"{src_schema.schema_name} [{db_key}]" 226 trg_schema = ( 227 session.query(Schema) 228 .filter(and_(Schema.database_id == trg_database.database_id, Schema.schema_name == src_schema.schema_name)) 229 .first() 230 ) 231 if trg_schema: 232 comparison[DbObjectType.SCHEMA.value][schema_key] = compare_obj(src_schema, trg_schema) 233 else: 234 comparison[DbObjectType.SCHEMA.value][schema_key] = {"left": src_schema.__class__.__name__, "right": None} 235 continue 236 237 src_tables = session.query(Table).filter(Table.schema_id == src_schema.schema_id).all() 238 for src_table in src_tables: 239 table_key = f"{src_schema.schema_name}.{src_table.table_name} [{db_key}]" 240 trg_table = ( 241 session.query(Table) 242 .filter(and_(Table.schema_id == trg_schema.schema_id, Table.table_name == src_table.table_name)) 243 .first() 244 ) 245 if trg_table: 246 comp = compare_obj(src_table, trg_table) 247 if comp: 248 comparison[DbObjectType.TABLE.value][table_key] = comp 249 else: 250 comparison[DbObjectType.TABLE.value][table_key] = {"left": src_table.__class__.__name__, "right": None} 251 continue 252 253 src_columns = session.query(Column).filter(Column.table_id == src_table.table_id).all() 254 for src_column in src_columns: 255 column_key = f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name} [{db_key}]" 256 trg_column = ( 257 session.query(Column) 258 .filter(and_(Column.table_id == trg_table.table_id, Column.column_name == src_column.column_name)) 259 .first() 260 ) 261 if trg_column: 262 comparison[DbObjectType.COLUMN.value][column_key] = compare_obj(src_column, trg_column) 263 else: 264 comparison[DbObjectType.COLUMN.value][column_key] = { 265 "left": src_column.__class__.__name__, 266 "right": None, 267 } 268 continue 269 270 src_column_constraints = ( 271 session.query(ColumnConstraint).filter(ColumnConstraint.pk_column_id == src_column.column_id).all() 272 ) 273 for src_column_constraint in src_column_constraints: 274 id = json.loads(src_column_constraint.column_constraint_id) 275 id["pk_column_id"]["version"] = trg_database.version 276 id["fk_column_id"]["version"] = trg_database.version 277 id["pk_column_id"]["environment"] = trg_database.environment 278 id["fk_column_id"]["environment"] = trg_database.environment 279 trg_column_constraint_id = json.dumps(id) 280 281 constraint_name = src_column_constraint.fk_name 282 column_constraint_key = ( 283 f"{src_schema.schema_name}.{src_table.table_name}.{src_column.column_name}" 284 f".{constraint_name} [{db_key}]" 285 ) 286 trg_column_constraint = ( 287 session.query(ColumnConstraint) 288 .filter(ColumnConstraint.column_constraint_id == trg_column_constraint_id) 289 .first() 290 ) 291 if trg_column_constraint: 292 comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = compare_obj( 293 src_column_constraint, trg_column_constraint 294 ) 295 else: 296 comparison[DbObjectType.COLUMN_CONSTRAINT.value][column_constraint_key] = { 297 "left": src_column_constraint.__class__.__name__, 298 "right": None, 299 } 300 continue 301 302 src_table_constraints = ( 303 session.query(TableConstraint).filter(TableConstraint.table_id == src_table.table_id).all() 304 ) 305 for src_table_constraint in src_table_constraints: 306 table_constraint_key = ( 307 f"{src_table_constraint.table_constraint_name}.{src_schema.schema_name}" 308 f".{src_table.table_name} [{db_key}]" 309 ) 310 trg_table_constraint = ( 311 session.query(TableConstraint) 312 .filter( 313 and_( 314 TableConstraint.table_id == trg_table.table_id, 315 TableConstraint.table_constraint_name == src_table_constraint.table_constraint_name, 316 ) 317 ) 318 .first() 319 ) 320 if trg_table_constraint: 321 comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = compare_obj( 322 src_table_constraint, trg_table_constraint 323 ) 324 else: 325 comparison[DbObjectType.TABLE_CONSTRAINT.value][table_constraint_key] = { 326 "left": src_table_constraint.__class__.__name__, 327 "right": None, 328 } 329 continue 330 331 src_constraints = session.query(Constraint).filter(Constraint.table_id == src_table.table_id).all() 332 for src_constraint in src_constraints: 333 constraint_key = ( 334 f"{src_schema.schema_name}.{src_table.table_name}.{src_constraint.constraint_name} [{db_key}]" 335 ) 336 trg_constraint = ( 337 session.query(Constraint) 338 .filter( 339 and_( 340 Constraint.table_id 341 == ( 342 trg_table.table_id.replace( 343 f'"version": {trg_database.version}', f'"version": {src_database.version}' 344 ) 345 ).replace( 346 f'"environment": {trg_database.environment}', 347 f'"environment": {src_database.environment}', 348 ), 349 Constraint.constraint_name == src_constraint.constraint_name, 350 ) 351 ) 352 .first() 353 ) 354 if trg_constraint: 355 comparison[DbObjectType.CONSTRAINT.value][constraint_key] = compare_obj( 356 src_constraint, trg_constraint 357 ) 358 else: 359 comparison[DbObjectType.CONSTRAINT.value][constraint_key] = { 360 "left": src_constraint.__class__.__name__, 361 "right": None, 362 } 363 continue 364 365 src_views = session.query(View).filter(View.schema_id == src_schema.schema_id).all() 366 for src_view in src_views: 367 view_key = f"{src_schema.schema_name}.{src_view.view_name} [{db_key}]" 368 trg_view = ( 369 session.query(View) 370 .filter(and_(View.schema_id == trg_schema.schema_id, View.view_name == src_view.view_name)) 371 .first() 372 ) 373 if trg_view: 374 comparison[DbObjectType.VIEW.value][view_key] = compare_obj(src_view, trg_view) 375 else: 376 comparison[DbObjectType.VIEW.value][view_key] = {"left": src_view.__class__.__name__, "right": None} 377 continue 378 379 src_functions = session.query(Function).filter(Function.schema_id == src_schema.schema_id).all() 380 for src_function in src_functions: 381 function_key = f"{src_schema.schema_name}.{src_function.function_name} [{db_key}]" 382 trg_function = ( 383 session.query(Function) 384 .filter( 385 and_( 386 Function.schema_id == trg_schema.schema_id, 387 Function.function_name == src_function.function_name, 388 Function.argument_signature == src_function.argument_signature, 389 ) 390 ) 391 .first() 392 ) 393 if trg_function: 394 comparison[DbObjectType.FUNCTION.value][function_key] = compare_obj(src_function, trg_function) 395 else: 396 comparison[DbObjectType.FUNCTION.value][function_key] = { 397 "left": src_function.__class__.__name__, 398 "right": None, 399 } 400 continue 401 402 src_procedures = session.query(Procedure).filter(Procedure.schema_id == src_schema.schema_id).all() 403 for src_procedure in src_procedures: 404 procedure_key = f"{src_schema.schema_name}.{src_procedure.procedure_name} [{db_key}]" 405 trg_procedure = ( 406 session.query(Procedure) 407 .filter( 408 and_( 409 Procedure.schema_id == trg_schema.schema_id, 410 Procedure.procedure_name == src_procedure.procedure_name, 411 Procedure.argument_signature == src_procedure.argument_signature, 412 ) 413 ) 414 .first() 415 ) 416 if trg_procedure: 417 comparison[DbObjectType.PROCEDURE.value][procedure_key] = compare_obj(src_procedure, trg_procedure) 418 else: 419 comparison[DbObjectType.PROCEDURE.value][procedure_key] = { 420 "left": src_procedure.__class__.__name__, 421 "right": None, 422 } 423 continue 424 425 # src_stages = session.query(Stage).filter(Stage.schema_id == src_schema.schema_id).all() 426 # for src_stage in src_stages: 427 # stage_key = f"{src_schema.schema_name}.{src_stage.stage_name} [{db_key}]" 428 # trg_stage = session.query(Stage).filter(and_( 429 # Stage.schema_id == trg_schema.schema_id, 430 # Stage.stage_name == src_stage.stage_name 431 # )).first() 432 # if trg_stage: 433 # comparison[DbObjectType.STAGE.value][stage_key] = compare_obj(src_stage, trg_stage) 434 # else: 435 # comparison[DbObjectType.STAGE.value][stage_key] = {"left": src_stage.__class__.__name__, 436 # "right": None} 437 # continue 438 439 src_streams = session.query(Stream).filter(Stream.schema_id == src_schema.schema_id).all() 440 for src_stream in src_streams: 441 stream_key = f"{schema_key}.{src_stream.stream_name}" 442 trg_stream = ( 443 session.query(Stream) 444 .filter(and_(Stream.schema_id == trg_schema.schema_id, Stream.stream_name == src_stream.stream_name)) 445 .first() 446 ) 447 if trg_stream: 448 comparison[DbObjectType.STREAM.value][stream_key] = compare_obj(src_stream, trg_stream) 449 else: 450 comparison[DbObjectType.STREAM.value][stream_key] = { 451 "left": src_stream.__class__.__name__, 452 "right": None, 453 } 454 continue 455 456 # src_pipes = session.query(Pipe).filter(Pipe.schema_id == src_schema.schema_id).all() 457 # for src_pipe in src_pipes: 458 # pipe_key = f"{src_schema.schema_name}.{src_pipe.pipe_name} [{db_key}]" 459 # trg_pipe = session.query(Pipe).filter(and_( 460 # Pipe.schema_id == trg_schema.schema_id, 461 # Pipe.pipe_name == src_pipe.pipe_name 462 # )).first() 463 # if trg_pipe: 464 # comparison[DbObjectType.PIPE.value][pipe_key] = compare_obj(src_pipe, trg_pipe) 465 # else: 466 # comparison[DbObjectType.PIPE.value][pipe_key] = {"left": src_pipe.__class__.__name__, "right": None} 467 # continue 468 469 src_tasks = session.query(Task).filter(Task.schema_id == src_schema.schema_id).all() 470 for src_task in src_tasks: 471 task_key = f"{src_schema.schema_name}.{src_task.task_name} [{db_key}]" 472 trg_task = ( 473 session.query(Task) 474 .filter(and_(Task.schema_id == trg_schema.schema_id, Task.task_name == src_task.task_name)) 475 .first() 476 ) 477 if trg_task: 478 comparison[DbObjectType.TASK.value][task_key] = compare_obj(src_task, trg_task) 479 else: 480 comparison[DbObjectType.TASK.value][task_key] = {"left": src_task.__class__.__name__, "right": None} 481 continue 482 483 return comparison