schema_sentinel

  1import base64
  2import getpass
  3import logging as log
  4import os
  5
  6from alembic.ddl import DefaultImpl
  7from sqlalchemy import and_
  8from sqlalchemy.dialects import registry
  9from sqlalchemy.orm import sessionmaker
 10
 11from schema_sentinel.metadata_manager.engine import SfAlchemyEngine, SqLiteAqlAlchemyEngine, get_config_dict
 12from schema_sentinel.metadata_manager.enums import ConnectMode, Environment
 13from schema_sentinel.metadata_manager.model.database import Database
 14from schema_sentinel.metadata_manager.utils import get_config
 15
 16PROJECT_NAME = "schema-sentinel"
 17TEMP_DIR = os.getenv("TEMP") if os.name == "nt" else "/tmp"
 18LOG_FILE = os.path.join(TEMP_DIR, "schema-sentinel.log")
 19LOG_LEVEL = os.getenv("LOG_LEVEL") if os.getenv("LOG_LEVEL") is not None else "INFO"
 20
 21PROJECT_HOME = os.path.dirname(os.path.join(os.path.abspath("./"), PROJECT_NAME))
 22RESOURCES_PATH = os.path.join(PROJECT_HOME, "resources")
 23META_DB_PATH = os.path.join(RESOURCES_PATH, "meta-db")
 24
 25log.basicConfig(
 26    level=LOG_LEVEL,
 27    format="%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]  %(message)s",
 28    handlers=[log.FileHandler(LOG_FILE), log.StreamHandler()],
 29)
 30
 31
 32class SqLiteImpl(DefaultImpl):
 33    __dialect__ = "sqlite"
 34
 35
 36class SnowflakeImpl(DefaultImpl):
 37    __dialect__ = "snowflake"
 38
 39
 40def get_metadata_engine(metadata_db: str) -> SqLiteAqlAlchemyEngine:
 41    url = f"sqlite:///{META_DB_PATH}/{metadata_db}"
 42    sqlite_engine = SqLiteAqlAlchemyEngine(env=None, config={"database": metadata_db, "user": get_user(), "url": url})
 43    engine = sqlite_engine.get_engine()
 44    return engine
 45
 46
 47def get_engine(env: str, resources_path: str, alias: str) -> (SfAlchemyEngine, list, str):
 48    """
 49    Get an SqlAlchemy engine based on environment. SSO connection mode is used
 50    :param env: environment, one of the following: dev, non_prod, cert and prod
 51    :param resources_path: resources folder path where to get an environment for provided alias
 52    :param alias: system alias or database identifier for configuration lookup
 53    :return: sqlalchemy.Engine
 54    """
 55    config = get_config(env, os.path.join(resources_path, alias))
 56    user = config.get(section="DB_CONNECTION", option="username")
 57    if env == "local":
 58        env = config.get("GENERAL", "env")
 59
 60    database_name = config.get("DB_CONNECTION", "database")
 61    schemas = config.get("GENERAL", "schemas").split(",")
 62
 63    private_key = b""
 64    private_key_passphrase = None
 65
 66    if os.environ.get("PRIVATE_KEY"):
 67        private_key = base64.b64decode(os.environ.get("PRIVATE_KEY"))
 68        private_key_passphrase = os.environ.get("PRIVATE_KEY_PASSPHRASE")
 69
 70    connect_mode = ConnectMode.SSO.value if not private_key_passphrase else ConnectMode.KEY_PAIR.value
 71
 72    if connect_mode == ConnectMode.KEY_PAIR.value:
 73        config_dictionary = get_config_dict(
 74            config,
 75            private_key=private_key,
 76            password=private_key_passphrase,
 77            connect_mode=connect_mode,
 78            user=user,
 79            cache_column_metadata=True,
 80        )
 81    elif connect_mode == ConnectMode.SSO.value:
 82        config_dictionary = get_config_dict(
 83            config,
 84            private_key=private_key,
 85            password="",
 86            connect_mode=connect_mode,
 87            user=user,
 88            cache_column_metadata=True,
 89        )
 90
 91    registry.register("snowflake", "snowflake.sqlalchemy", "dialect")
 92    db_engine: SfAlchemyEngine = SfAlchemyEngine(config=config_dictionary)
 93    db_engine.database = database_name
 94    db_engine.env = env
 95
 96    return db_engine, schemas, database_name
 97
 98
 99def get_user():
100    """
101    Get the username for authentication.
102    Can be customized via SNOWFLAKE_USER environment variable,
103    or will use system username with optional email domain.
104
105    :return: Username for Snowflake authentication (e.g., user@domain.com)
106    """
107    # Check if user is explicitly set in environment
108    if os.getenv("SNOWFLAKE_USER"):
109        return os.getenv("SNOWFLAKE_USER")
110
111    # Otherwise, use system username with optional email domain
112    username = getpass.getuser()
113    email_domain = os.getenv("SNOWFLAKE_EMAIL_DOMAIN", "")
114
115    if email_domain:
116        return f"{username}@{email_domain}"
117    return username
118
119
120def load_db(database_name: str, version: str, environment: str, metadata_db: str = "metadata.db") -> Database:
121    engine: SqLiteAqlAlchemyEngine = get_metadata_engine(metadata_db=metadata_db)
122    Session = sessionmaker(bind=engine)
123    session = Session()
124
125    query = session.query(Database).filter(
126        and_(Database.database_name == database_name, Database.version == version, Database.environment == environment)
127    )
128    if not query.count():
129        raise Exception(f"Database {database_name} v.{version} could not be found in {environment} environment")
130
131    return query.one(), session
132
133
134def load_comparator(
135    source_env: str, target_env: str, database_name: str, metadata_db: str, src_version: str, trg_version: str
136):
137    validate(source_env, target_env, database_name, src_version, trg_version)
138
139    one, session = load_db(
140        database_name=database_name, metadata_db=metadata_db, version=src_version, environment=source_env
141    )
142
143    two, session = load_db(
144        database_name=database_name, metadata_db=metadata_db, version=trg_version, environment=target_env
145    )
146
147    return one, two, session
148
149
150def validate(source_env: str, target_env: str, alias: str, src_version: str, trg_version: str) -> bool:
151    # Validate alias is provided
152    if not alias or not alias.strip():
153        raise Exception("Database alias must be provided. Use --alias parameter to specify your database identifier.")
154
155    if source_env not in Environment.list():
156        raise Exception(f"{Environment.list()} are the only environments supported.")
157
158    if source_env == target_env and src_version == trg_version:
159        raise Exception("Comparing a database to itself does not make any sense!")
PROJECT_NAME = 'schema-sentinel'
TEMP_DIR = '/tmp'
LOG_FILE = '/tmp/schema-sentinel.log'
LOG_LEVEL = 'INFO'
PROJECT_HOME = $PWD
RESOURCES_PATH = '/Users/igor.gladyshev/PycharmProjects/schema-sentinel/resources'
META_DB_PATH = '/Users/igor.gladyshev/PycharmProjects/schema-sentinel/resources/meta-db'
class SqLiteImpl(alembic.ddl.impl.DefaultImpl):
33class SqLiteImpl(DefaultImpl):
34    __dialect__ = "sqlite"

Provide the entrypoint for major migration operations, including database-specific behavioral variances.

While individual SQL/DDL constructs already provide for database-specific implementations, variances here allow for entirely different sequences of operations to take place for a particular migration, such as SQL Server's special 'IDENTITY INSERT' step for bulk inserts.

class SnowflakeImpl(alembic.ddl.impl.DefaultImpl):
37class SnowflakeImpl(DefaultImpl):
38    __dialect__ = "snowflake"

Provide the entrypoint for major migration operations, including database-specific behavioral variances.

While individual SQL/DDL constructs already provide for database-specific implementations, variances here allow for entirely different sequences of operations to take place for a particular migration, such as SQL Server's special 'IDENTITY INSERT' step for bulk inserts.

def get_metadata_engine( metadata_db: str) -> schema_sentinel.metadata_manager.engine.SqLiteAqlAlchemyEngine:
41def get_metadata_engine(metadata_db: str) -> SqLiteAqlAlchemyEngine:
42    url = f"sqlite:///{META_DB_PATH}/{metadata_db}"
43    sqlite_engine = SqLiteAqlAlchemyEngine(env=None, config={"database": metadata_db, "user": get_user(), "url": url})
44    engine = sqlite_engine.get_engine()
45    return engine
def get_engine( env: str, resources_path: str, alias: str) -> (<class 'schema_sentinel.metadata_manager.engine.SfAlchemyEngine'>, <class 'list'>, <class 'str'>):
48def get_engine(env: str, resources_path: str, alias: str) -> (SfAlchemyEngine, list, str):
49    """
50    Get an SqlAlchemy engine based on environment. SSO connection mode is used
51    :param env: environment, one of the following: dev, non_prod, cert and prod
52    :param resources_path: resources folder path where to get an environment for provided alias
53    :param alias: system alias or database identifier for configuration lookup
54    :return: sqlalchemy.Engine
55    """
56    config = get_config(env, os.path.join(resources_path, alias))
57    user = config.get(section="DB_CONNECTION", option="username")
58    if env == "local":
59        env = config.get("GENERAL", "env")
60
61    database_name = config.get("DB_CONNECTION", "database")
62    schemas = config.get("GENERAL", "schemas").split(",")
63
64    private_key = b""
65    private_key_passphrase = None
66
67    if os.environ.get("PRIVATE_KEY"):
68        private_key = base64.b64decode(os.environ.get("PRIVATE_KEY"))
69        private_key_passphrase = os.environ.get("PRIVATE_KEY_PASSPHRASE")
70
71    connect_mode = ConnectMode.SSO.value if not private_key_passphrase else ConnectMode.KEY_PAIR.value
72
73    if connect_mode == ConnectMode.KEY_PAIR.value:
74        config_dictionary = get_config_dict(
75            config,
76            private_key=private_key,
77            password=private_key_passphrase,
78            connect_mode=connect_mode,
79            user=user,
80            cache_column_metadata=True,
81        )
82    elif connect_mode == ConnectMode.SSO.value:
83        config_dictionary = get_config_dict(
84            config,
85            private_key=private_key,
86            password="",
87            connect_mode=connect_mode,
88            user=user,
89            cache_column_metadata=True,
90        )
91
92    registry.register("snowflake", "snowflake.sqlalchemy", "dialect")
93    db_engine: SfAlchemyEngine = SfAlchemyEngine(config=config_dictionary)
94    db_engine.database = database_name
95    db_engine.env = env
96
97    return db_engine, schemas, database_name

Get an SqlAlchemy engine based on environment. SSO connection mode is used

Parameters
  • env: environment, one of the following: dev, non_prod, cert and prod
  • resources_path: resources folder path where to get an environment for provided alias
  • alias: system alias or database identifier for configuration lookup
Returns

sqlalchemy.Engine

def get_user():
100def get_user():
101    """
102    Get the username for authentication.
103    Can be customized via SNOWFLAKE_USER environment variable,
104    or will use system username with optional email domain.
105
106    :return: Username for Snowflake authentication (e.g., user@domain.com)
107    """
108    # Check if user is explicitly set in environment
109    if os.getenv("SNOWFLAKE_USER"):
110        return os.getenv("SNOWFLAKE_USER")
111
112    # Otherwise, use system username with optional email domain
113    username = getpass.getuser()
114    email_domain = os.getenv("SNOWFLAKE_EMAIL_DOMAIN", "")
115
116    if email_domain:
117        return f"{username}@{email_domain}"
118    return username

Get the username for authentication. Can be customized via SNOWFLAKE_USER environment variable, or will use system username with optional email domain.

Returns

Username for Snowflake authentication (e.g., user@domain.com)

def load_db( database_name: str, version: str, environment: str, metadata_db: str = 'metadata.db') -> schema_sentinel.metadata_manager.model.database.Database:
121def load_db(database_name: str, version: str, environment: str, metadata_db: str = "metadata.db") -> Database:
122    engine: SqLiteAqlAlchemyEngine = get_metadata_engine(metadata_db=metadata_db)
123    Session = sessionmaker(bind=engine)
124    session = Session()
125
126    query = session.query(Database).filter(
127        and_(Database.database_name == database_name, Database.version == version, Database.environment == environment)
128    )
129    if not query.count():
130        raise Exception(f"Database {database_name} v.{version} could not be found in {environment} environment")
131
132    return query.one(), session
def load_comparator( source_env: str, target_env: str, database_name: str, metadata_db: str, src_version: str, trg_version: str):
135def load_comparator(
136    source_env: str, target_env: str, database_name: str, metadata_db: str, src_version: str, trg_version: str
137):
138    validate(source_env, target_env, database_name, src_version, trg_version)
139
140    one, session = load_db(
141        database_name=database_name, metadata_db=metadata_db, version=src_version, environment=source_env
142    )
143
144    two, session = load_db(
145        database_name=database_name, metadata_db=metadata_db, version=trg_version, environment=target_env
146    )
147
148    return one, two, session
def validate( source_env: str, target_env: str, alias: str, src_version: str, trg_version: str) -> bool:
151def validate(source_env: str, target_env: str, alias: str, src_version: str, trg_version: str) -> bool:
152    # Validate alias is provided
153    if not alias or not alias.strip():
154        raise Exception("Database alias must be provided. Use --alias parameter to specify your database identifier.")
155
156    if source_env not in Environment.list():
157        raise Exception(f"{Environment.list()} are the only environments supported.")
158
159    if source_env == target_env and src_version == trg_version:
160        raise Exception("Comparing a database to itself does not make any sense!")