schema_sentinel
1import base64 2import getpass 3import logging as log 4import os 5 6from alembic.ddl import DefaultImpl 7from sqlalchemy import and_ 8from sqlalchemy.dialects import registry 9from sqlalchemy.orm import sessionmaker 10 11from schema_sentinel.metadata_manager.engine import SfAlchemyEngine, SqLiteAqlAlchemyEngine, get_config_dict 12from schema_sentinel.metadata_manager.enums import ConnectMode, Environment 13from schema_sentinel.metadata_manager.model.database import Database 14from schema_sentinel.metadata_manager.utils import get_config 15 16PROJECT_NAME = "schema-sentinel" 17TEMP_DIR = os.getenv("TEMP") if os.name == "nt" else "/tmp" 18LOG_FILE = os.path.join(TEMP_DIR, "schema-sentinel.log") 19LOG_LEVEL = os.getenv("LOG_LEVEL") if os.getenv("LOG_LEVEL") is not None else "INFO" 20 21PROJECT_HOME = os.path.dirname(os.path.join(os.path.abspath("./"), PROJECT_NAME)) 22RESOURCES_PATH = os.path.join(PROJECT_HOME, "resources") 23META_DB_PATH = os.path.join(RESOURCES_PATH, "meta-db") 24 25log.basicConfig( 26 level=LOG_LEVEL, 27 format="%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s", 28 handlers=[log.FileHandler(LOG_FILE), log.StreamHandler()], 29) 30 31 32class SqLiteImpl(DefaultImpl): 33 __dialect__ = "sqlite" 34 35 36class SnowflakeImpl(DefaultImpl): 37 __dialect__ = "snowflake" 38 39 40def get_metadata_engine(metadata_db: str) -> SqLiteAqlAlchemyEngine: 41 url = f"sqlite:///{META_DB_PATH}/{metadata_db}" 42 sqlite_engine = SqLiteAqlAlchemyEngine(env=None, config={"database": metadata_db, "user": get_user(), "url": url}) 43 engine = sqlite_engine.get_engine() 44 return engine 45 46 47def get_engine(env: str, resources_path: str, alias: str) -> (SfAlchemyEngine, list, str): 48 """ 49 Get an SqlAlchemy engine based on environment. SSO connection mode is used 50 :param env: environment, one of the following: dev, non_prod, cert and prod 51 :param resources_path: resources folder path where to get an environment for provided alias 52 :param alias: system alias or database identifier for configuration lookup 53 :return: sqlalchemy.Engine 54 """ 55 config = get_config(env, os.path.join(resources_path, alias)) 56 user = config.get(section="DB_CONNECTION", option="username") 57 if env == "local": 58 env = config.get("GENERAL", "env") 59 60 database_name = config.get("DB_CONNECTION", "database") 61 schemas = config.get("GENERAL", "schemas").split(",") 62 63 private_key = b"" 64 private_key_passphrase = None 65 66 if os.environ.get("PRIVATE_KEY"): 67 private_key = base64.b64decode(os.environ.get("PRIVATE_KEY")) 68 private_key_passphrase = os.environ.get("PRIVATE_KEY_PASSPHRASE") 69 70 connect_mode = ConnectMode.SSO.value if not private_key_passphrase else ConnectMode.KEY_PAIR.value 71 72 if connect_mode == ConnectMode.KEY_PAIR.value: 73 config_dictionary = get_config_dict( 74 config, 75 private_key=private_key, 76 password=private_key_passphrase, 77 connect_mode=connect_mode, 78 user=user, 79 cache_column_metadata=True, 80 ) 81 elif connect_mode == ConnectMode.SSO.value: 82 config_dictionary = get_config_dict( 83 config, 84 private_key=private_key, 85 password="", 86 connect_mode=connect_mode, 87 user=user, 88 cache_column_metadata=True, 89 ) 90 91 registry.register("snowflake", "snowflake.sqlalchemy", "dialect") 92 db_engine: SfAlchemyEngine = SfAlchemyEngine(config=config_dictionary) 93 db_engine.database = database_name 94 db_engine.env = env 95 96 return db_engine, schemas, database_name 97 98 99def get_user(): 100 """ 101 Get the username for authentication. 102 Can be customized via SNOWFLAKE_USER environment variable, 103 or will use system username with optional email domain. 104 105 :return: Username for Snowflake authentication (e.g., user@domain.com) 106 """ 107 # Check if user is explicitly set in environment 108 if os.getenv("SNOWFLAKE_USER"): 109 return os.getenv("SNOWFLAKE_USER") 110 111 # Otherwise, use system username with optional email domain 112 username = getpass.getuser() 113 email_domain = os.getenv("SNOWFLAKE_EMAIL_DOMAIN", "") 114 115 if email_domain: 116 return f"{username}@{email_domain}" 117 return username 118 119 120def load_db(database_name: str, version: str, environment: str, metadata_db: str = "metadata.db") -> Database: 121 engine: SqLiteAqlAlchemyEngine = get_metadata_engine(metadata_db=metadata_db) 122 Session = sessionmaker(bind=engine) 123 session = Session() 124 125 query = session.query(Database).filter( 126 and_(Database.database_name == database_name, Database.version == version, Database.environment == environment) 127 ) 128 if not query.count(): 129 raise Exception(f"Database {database_name} v.{version} could not be found in {environment} environment") 130 131 return query.one(), session 132 133 134def load_comparator( 135 source_env: str, target_env: str, database_name: str, metadata_db: str, src_version: str, trg_version: str 136): 137 validate(source_env, target_env, database_name, src_version, trg_version) 138 139 one, session = load_db( 140 database_name=database_name, metadata_db=metadata_db, version=src_version, environment=source_env 141 ) 142 143 two, session = load_db( 144 database_name=database_name, metadata_db=metadata_db, version=trg_version, environment=target_env 145 ) 146 147 return one, two, session 148 149 150def validate(source_env: str, target_env: str, alias: str, src_version: str, trg_version: str) -> bool: 151 # Validate alias is provided 152 if not alias or not alias.strip(): 153 raise Exception("Database alias must be provided. Use --alias parameter to specify your database identifier.") 154 155 if source_env not in Environment.list(): 156 raise Exception(f"{Environment.list()} are the only environments supported.") 157 158 if source_env == target_env and src_version == trg_version: 159 raise Exception("Comparing a database to itself does not make any sense!")
Provide the entrypoint for major migration operations, including database-specific behavioral variances.
While individual SQL/DDL constructs already provide for database-specific implementations, variances here allow for entirely different sequences of operations to take place for a particular migration, such as SQL Server's special 'IDENTITY INSERT' step for bulk inserts.
Provide the entrypoint for major migration operations, including database-specific behavioral variances.
While individual SQL/DDL constructs already provide for database-specific implementations, variances here allow for entirely different sequences of operations to take place for a particular migration, such as SQL Server's special 'IDENTITY INSERT' step for bulk inserts.
48def get_engine(env: str, resources_path: str, alias: str) -> (SfAlchemyEngine, list, str): 49 """ 50 Get an SqlAlchemy engine based on environment. SSO connection mode is used 51 :param env: environment, one of the following: dev, non_prod, cert and prod 52 :param resources_path: resources folder path where to get an environment for provided alias 53 :param alias: system alias or database identifier for configuration lookup 54 :return: sqlalchemy.Engine 55 """ 56 config = get_config(env, os.path.join(resources_path, alias)) 57 user = config.get(section="DB_CONNECTION", option="username") 58 if env == "local": 59 env = config.get("GENERAL", "env") 60 61 database_name = config.get("DB_CONNECTION", "database") 62 schemas = config.get("GENERAL", "schemas").split(",") 63 64 private_key = b"" 65 private_key_passphrase = None 66 67 if os.environ.get("PRIVATE_KEY"): 68 private_key = base64.b64decode(os.environ.get("PRIVATE_KEY")) 69 private_key_passphrase = os.environ.get("PRIVATE_KEY_PASSPHRASE") 70 71 connect_mode = ConnectMode.SSO.value if not private_key_passphrase else ConnectMode.KEY_PAIR.value 72 73 if connect_mode == ConnectMode.KEY_PAIR.value: 74 config_dictionary = get_config_dict( 75 config, 76 private_key=private_key, 77 password=private_key_passphrase, 78 connect_mode=connect_mode, 79 user=user, 80 cache_column_metadata=True, 81 ) 82 elif connect_mode == ConnectMode.SSO.value: 83 config_dictionary = get_config_dict( 84 config, 85 private_key=private_key, 86 password="", 87 connect_mode=connect_mode, 88 user=user, 89 cache_column_metadata=True, 90 ) 91 92 registry.register("snowflake", "snowflake.sqlalchemy", "dialect") 93 db_engine: SfAlchemyEngine = SfAlchemyEngine(config=config_dictionary) 94 db_engine.database = database_name 95 db_engine.env = env 96 97 return db_engine, schemas, database_name
Get an SqlAlchemy engine based on environment. SSO connection mode is used
Parameters
- env: environment, one of the following: dev, non_prod, cert and prod
- resources_path: resources folder path where to get an environment for provided alias
- alias: system alias or database identifier for configuration lookup
Returns
sqlalchemy.Engine
100def get_user(): 101 """ 102 Get the username for authentication. 103 Can be customized via SNOWFLAKE_USER environment variable, 104 or will use system username with optional email domain. 105 106 :return: Username for Snowflake authentication (e.g., user@domain.com) 107 """ 108 # Check if user is explicitly set in environment 109 if os.getenv("SNOWFLAKE_USER"): 110 return os.getenv("SNOWFLAKE_USER") 111 112 # Otherwise, use system username with optional email domain 113 username = getpass.getuser() 114 email_domain = os.getenv("SNOWFLAKE_EMAIL_DOMAIN", "") 115 116 if email_domain: 117 return f"{username}@{email_domain}" 118 return username
Get the username for authentication. Can be customized via SNOWFLAKE_USER environment variable, or will use system username with optional email domain.
Returns
Username for Snowflake authentication (e.g., user@domain.com)
121def load_db(database_name: str, version: str, environment: str, metadata_db: str = "metadata.db") -> Database: 122 engine: SqLiteAqlAlchemyEngine = get_metadata_engine(metadata_db=metadata_db) 123 Session = sessionmaker(bind=engine) 124 session = Session() 125 126 query = session.query(Database).filter( 127 and_(Database.database_name == database_name, Database.version == version, Database.environment == environment) 128 ) 129 if not query.count(): 130 raise Exception(f"Database {database_name} v.{version} could not be found in {environment} environment") 131 132 return query.one(), session
135def load_comparator( 136 source_env: str, target_env: str, database_name: str, metadata_db: str, src_version: str, trg_version: str 137): 138 validate(source_env, target_env, database_name, src_version, trg_version) 139 140 one, session = load_db( 141 database_name=database_name, metadata_db=metadata_db, version=src_version, environment=source_env 142 ) 143 144 two, session = load_db( 145 database_name=database_name, metadata_db=metadata_db, version=trg_version, environment=target_env 146 ) 147 148 return one, two, session
151def validate(source_env: str, target_env: str, alias: str, src_version: str, trg_version: str) -> bool: 152 # Validate alias is provided 153 if not alias or not alias.strip(): 154 raise Exception("Database alias must be provided. Use --alias parameter to specify your database identifier.") 155 156 if source_env not in Environment.list(): 157 raise Exception(f"{Environment.list()} are the only environments supported.") 158 159 if source_env == target_env and src_version == trg_version: 160 raise Exception("Comparing a database to itself does not make any sense!")