schema_sentinel.metadata_manager.engine
1import logging as log 2import os 3from abc import ABC, abstractmethod 4from configparser import ConfigParser 5 6import pandas as pd 7import snowflake.connector 8from cryptography.hazmat.backends import default_backend 9from cryptography.hazmat.primitives import serialization 10from snowflake.sqlalchemy import URL 11from sqlalchemy import MetaData, create_engine, text 12from sqlalchemy.engine import Engine 13 14from .enums import ConnectMode 15 16 17class DBEngineStrategy(ABC): 18 engine = None 19 conn = None 20 metadata: MetaData = None 21 connect_mode = ConnectMode.KEY_PAIR.value 22 env: str = "dev" 23 url: str = None 24 25 def __init__(self, config: dict, env: str = "dev"): 26 self.account = config.get("account") 27 self.user = config.get("user") 28 self.warehouse = config.get("warehouse") 29 self.database = config.get("database") 30 self.private_key = config.get("private_key") if "private_key" in config else b"" 31 self.role = config.get("role") 32 self.engine = None 33 self.private_key_passphrase = config.get("private_key_passphrase") if "private_key_passphrase" in config else "" 34 self.default_schema = config.get("schema") 35 self.connect_mode = config.get("connect_mode") 36 self.env = env 37 self.url = config.get("url") 38 39 @abstractmethod 40 def get_engine(self): 41 pass 42 43 @abstractmethod 44 def get_conn(self): 45 pass 46 47 @abstractmethod 48 def close(self): 49 pass 50 51 def __del__(self): 52 self.close() 53 54 @abstractmethod 55 def execute(self, statement, parameters=None, columns=None, schema=None): 56 pass 57 58 def decode_private_key(self, password): 59 p_key = serialization.load_pem_private_key( 60 self.private_key, password=password.encode(), backend=default_backend() 61 ) 62 63 pkb = p_key.private_bytes( 64 encoding=serialization.Encoding.DER, 65 format=serialization.PrivateFormat.PKCS8, 66 encryption_algorithm=serialization.NoEncryption(), 67 ) 68 69 return pkb 70 71 72class SqLiteAqlAlchemyEngine(DBEngineStrategy): 73 metadata: MetaData = None 74 engine: Engine = None 75 url: str = None 76 77 def __int__(self, config: dict, env: str, url: str): 78 config["url"] = url 79 super().__init__(env=None, config=config) 80 81 def get_engine(self): 82 if self.engine is None: 83 self.engine = create_engine(url=self.url) 84 return self.engine 85 86 def close(self): 87 if self.engine: 88 self.engine.dispose() 89 90 def execute(self, statement, parameters=None, columns=None, schema=None): 91 log.debug(f"statement=[{statement}]") 92 conn = self.get_conn() 93 94 if not columns: 95 if parameters is not None: 96 conn.execute(text(statement), parameters) 97 else: 98 conn.execute(text(statement)) 99 return True, None 100 101 if parameters is not None: 102 results = pd.read_sql(sql=text(statement), con=conn, params=parameters) 103 else: 104 results = pd.read_sql(sql=text(statement), con=conn) 105 106 return True, results 107 108 def get_conn(self): 109 return self.get_engine() 110 111 112class SfAlchemyEngine(DBEngineStrategy): 113 snowflake.connector.paramstyle = "numeric" 114 115 def get_engine(self): 116 if self.engine is None: 117 if self.connect_mode == ConnectMode.SSO.value: 118 self.engine = create_engine( 119 URL( 120 account=self.account, 121 user=self.user, 122 warehouse=self.warehouse, 123 database=self.database, 124 role=self.role, 125 cache_column_metadata=True, 126 schema=self.default_schema, 127 authenticator="externalbrowser", 128 ) 129 ) 130 if self.connect_mode == ConnectMode.PWD.value: 131 self.engine = create_engine( 132 URL( 133 account=self.account, 134 user=self.user, 135 warehouse=self.warehouse, 136 database=self.database, 137 role=self.role, 138 cache_column_metadata=True, 139 schema=self.default_schema, 140 password=os.getenv("SNOWFLAKE_PASSWORD"), 141 ) 142 ) 143 if self.connect_mode == ConnectMode.KEY_PAIR.value: 144 pkb = self.decode_private_key(self.private_key_passphrase) 145 self.engine = create_engine( 146 URL( 147 account=self.account, 148 user=self.user, 149 warehouse=self.warehouse, 150 database=self.database, 151 role=self.role, 152 cache_column_metadata=True, 153 schema=self.default_schema, 154 ), 155 connect_args={ 156 "private_key": pkb, 157 }, 158 ) 159 160 return self.engine 161 162 def get_conn(self): 163 if self.conn is None: 164 engine = self.get_engine() 165 self.conn = engine.connect().execution_options(autocommit=False) 166 return self.conn 167 168 def close(self): 169 if self.conn is not None: 170 self.conn.close() 171 if self.engine is not None: 172 self.engine.dispose() 173 174 def execute(self, statement, parameters=None, columns=None, schema=None): 175 log.debug(f"statement=[{statement}]") 176 conn = self.get_conn() 177 if schema is not None: 178 # Properly quote schema identifier - escape quotes by doubling them (SQL standard) 179 safe_schema = schema.replace('"', '""') 180 conn.execute(f'USE SCHEMA "{safe_schema}";') 181 182 if columns is None: 183 conn.execute("BEGIN") 184 if parameters is not None: 185 conn.execute(text(statement), parameters) 186 else: 187 conn.execute(text(statement)) 188 conn.execute("COMMIT") 189 return True, None 190 191 if parameters is not None: 192 results = pd.read_sql(sql=text(statement), con=conn, params=parameters) 193 else: 194 results = pd.read_sql(sql=text(statement), con=conn) 195 196 return True, results 197 198 199class SfConnectorEngine(DBEngineStrategy): 200 def get_engine(self, schema=None): 201 pkb = self.decode_private_key(self.private_key_passphrase) 202 203 if self.engine is None: 204 snowflake.connector.paramstyle = "numeric" 205 self.engine = snowflake.connector.connect( 206 user=self.user, 207 private_key=pkb, 208 account=self.account, 209 warehouse=self.warehouse, 210 database=self.database, 211 schema=self.default_schema, 212 role=self.role, 213 autocommit=False, 214 ) 215 return self.engine 216 217 def get_conn(self): 218 if self.conn is None: 219 if self.connect_mode == ConnectMode.SSO.value: 220 snowflake.connector.paramstyle = "numeric" 221 self.conn = snowflake.connector.connect( 222 account=self.account, 223 user=self.user, 224 role=self.role, 225 database=self.database, 226 schema=self.schema, 227 warehouse=self.warehouse, 228 authenticator="externalbrowser", 229 ) 230 231 if self.connect_mode == ConnectMode.PWD.value: 232 self.conn = snowflake.connector.connect( 233 account=self.account, 234 user=self.user, 235 role=self.role, 236 database=self.database, 237 schema=self.schema, 238 warehouse=self.warehouse, 239 password=os.getenv("SNOWFLAKE_PASSWORD"), 240 ) 241 242 if self.connect_mode == ConnectMode.KEY_PAIR.value: 243 pkb = self.decode_private_key(self.private_key_passphrase) 244 self.conn = snowflake.connector.connect( 245 account=self.account, 246 user=self.user, 247 role=self.role, 248 database=self.database, 249 schema=self.default_schema, 250 warehouse=self.warehouse, 251 private_key=pkb, 252 ) 253 return self.conn 254 255 def close(self): 256 if self.engine is not None: 257 self.engine.close() 258 259 def execute(self, statement, parameters=None, columns=None, schema=None): 260 log.debug(f"statement=[{statement}]") 261 conn = self.get_engine() 262 263 if schema is not None: 264 # Properly quote schema identifier - escape quotes by doubling them (SQL standard) 265 safe_schema = schema.replace('"', '""') 266 conn.cursor().execute(f'USE SCHEMA "{safe_schema}";') 267 268 if columns is None: 269 conn.cursor().execute(statement, parameters) 270 conn.commit() 271 return True, None 272 273 if parameters is not None: 274 results = pd.read_sql(text(statement), conn, *parameters) 275 else: 276 results = pd.read_sql(text(statement), conn) 277 278 return True, results 279 280 281def get_config_dict( 282 config: ConfigParser, 283 *, 284 private_key: bytes, 285 password: str, 286 connect_mode: int = ConnectMode.KEY_PAIR.value, 287 user: str = None, 288 cache_column_metadata=False, 289): 290 db_section = "DB_CONNECTION" 291 config_map = { 292 "account": config.get(db_section, "account"), 293 "warehouse": config.get(db_section, "warehouse"), 294 "user": user, 295 "database": config.get(db_section, "database"), 296 "role": config.get(db_section, "role"), 297 "schema": config.get(db_section, "schema"), 298 "connect_mode": connect_mode, 299 "cache_column_metadata": cache_column_metadata, 300 "timezone": "America/Los_Angeles", 301 } 302 if connect_mode == ConnectMode.PWD.value: 303 config_map["password"] = password 304 elif connect_mode == ConnectMode.KEY_PAIR.value: 305 config_map["private_key"] = private_key 306 config_map["private_key_passphrase"] = password 307 308 return config_map
class
DBEngineStrategy(abc.ABC):
18class DBEngineStrategy(ABC): 19 engine = None 20 conn = None 21 metadata: MetaData = None 22 connect_mode = ConnectMode.KEY_PAIR.value 23 env: str = "dev" 24 url: str = None 25 26 def __init__(self, config: dict, env: str = "dev"): 27 self.account = config.get("account") 28 self.user = config.get("user") 29 self.warehouse = config.get("warehouse") 30 self.database = config.get("database") 31 self.private_key = config.get("private_key") if "private_key" in config else b"" 32 self.role = config.get("role") 33 self.engine = None 34 self.private_key_passphrase = config.get("private_key_passphrase") if "private_key_passphrase" in config else "" 35 self.default_schema = config.get("schema") 36 self.connect_mode = config.get("connect_mode") 37 self.env = env 38 self.url = config.get("url") 39 40 @abstractmethod 41 def get_engine(self): 42 pass 43 44 @abstractmethod 45 def get_conn(self): 46 pass 47 48 @abstractmethod 49 def close(self): 50 pass 51 52 def __del__(self): 53 self.close() 54 55 @abstractmethod 56 def execute(self, statement, parameters=None, columns=None, schema=None): 57 pass 58 59 def decode_private_key(self, password): 60 p_key = serialization.load_pem_private_key( 61 self.private_key, password=password.encode(), backend=default_backend() 62 ) 63 64 pkb = p_key.private_bytes( 65 encoding=serialization.Encoding.DER, 66 format=serialization.PrivateFormat.PKCS8, 67 encryption_algorithm=serialization.NoEncryption(), 68 ) 69 70 return pkb
Helper class that provides a standard way to create an ABC using inheritance.
def
decode_private_key(self, password):
59 def decode_private_key(self, password): 60 p_key = serialization.load_pem_private_key( 61 self.private_key, password=password.encode(), backend=default_backend() 62 ) 63 64 pkb = p_key.private_bytes( 65 encoding=serialization.Encoding.DER, 66 format=serialization.PrivateFormat.PKCS8, 67 encryption_algorithm=serialization.NoEncryption(), 68 ) 69 70 return pkb
73class SqLiteAqlAlchemyEngine(DBEngineStrategy): 74 metadata: MetaData = None 75 engine: Engine = None 76 url: str = None 77 78 def __int__(self, config: dict, env: str, url: str): 79 config["url"] = url 80 super().__init__(env=None, config=config) 81 82 def get_engine(self): 83 if self.engine is None: 84 self.engine = create_engine(url=self.url) 85 return self.engine 86 87 def close(self): 88 if self.engine: 89 self.engine.dispose() 90 91 def execute(self, statement, parameters=None, columns=None, schema=None): 92 log.debug(f"statement=[{statement}]") 93 conn = self.get_conn() 94 95 if not columns: 96 if parameters is not None: 97 conn.execute(text(statement), parameters) 98 else: 99 conn.execute(text(statement)) 100 return True, None 101 102 if parameters is not None: 103 results = pd.read_sql(sql=text(statement), con=conn, params=parameters) 104 else: 105 results = pd.read_sql(sql=text(statement), con=conn) 106 107 return True, results 108 109 def get_conn(self): 110 return self.get_engine()
Helper class that provides a standard way to create an ABC using inheritance.
def
execute(self, statement, parameters=None, columns=None, schema=None):
91 def execute(self, statement, parameters=None, columns=None, schema=None): 92 log.debug(f"statement=[{statement}]") 93 conn = self.get_conn() 94 95 if not columns: 96 if parameters is not None: 97 conn.execute(text(statement), parameters) 98 else: 99 conn.execute(text(statement)) 100 return True, None 101 102 if parameters is not None: 103 results = pd.read_sql(sql=text(statement), con=conn, params=parameters) 104 else: 105 results = pd.read_sql(sql=text(statement), con=conn) 106 107 return True, results
113class SfAlchemyEngine(DBEngineStrategy): 114 snowflake.connector.paramstyle = "numeric" 115 116 def get_engine(self): 117 if self.engine is None: 118 if self.connect_mode == ConnectMode.SSO.value: 119 self.engine = create_engine( 120 URL( 121 account=self.account, 122 user=self.user, 123 warehouse=self.warehouse, 124 database=self.database, 125 role=self.role, 126 cache_column_metadata=True, 127 schema=self.default_schema, 128 authenticator="externalbrowser", 129 ) 130 ) 131 if self.connect_mode == ConnectMode.PWD.value: 132 self.engine = create_engine( 133 URL( 134 account=self.account, 135 user=self.user, 136 warehouse=self.warehouse, 137 database=self.database, 138 role=self.role, 139 cache_column_metadata=True, 140 schema=self.default_schema, 141 password=os.getenv("SNOWFLAKE_PASSWORD"), 142 ) 143 ) 144 if self.connect_mode == ConnectMode.KEY_PAIR.value: 145 pkb = self.decode_private_key(self.private_key_passphrase) 146 self.engine = create_engine( 147 URL( 148 account=self.account, 149 user=self.user, 150 warehouse=self.warehouse, 151 database=self.database, 152 role=self.role, 153 cache_column_metadata=True, 154 schema=self.default_schema, 155 ), 156 connect_args={ 157 "private_key": pkb, 158 }, 159 ) 160 161 return self.engine 162 163 def get_conn(self): 164 if self.conn is None: 165 engine = self.get_engine() 166 self.conn = engine.connect().execution_options(autocommit=False) 167 return self.conn 168 169 def close(self): 170 if self.conn is not None: 171 self.conn.close() 172 if self.engine is not None: 173 self.engine.dispose() 174 175 def execute(self, statement, parameters=None, columns=None, schema=None): 176 log.debug(f"statement=[{statement}]") 177 conn = self.get_conn() 178 if schema is not None: 179 # Properly quote schema identifier - escape quotes by doubling them (SQL standard) 180 safe_schema = schema.replace('"', '""') 181 conn.execute(f'USE SCHEMA "{safe_schema}";') 182 183 if columns is None: 184 conn.execute("BEGIN") 185 if parameters is not None: 186 conn.execute(text(statement), parameters) 187 else: 188 conn.execute(text(statement)) 189 conn.execute("COMMIT") 190 return True, None 191 192 if parameters is not None: 193 results = pd.read_sql(sql=text(statement), con=conn, params=parameters) 194 else: 195 results = pd.read_sql(sql=text(statement), con=conn) 196 197 return True, results
Helper class that provides a standard way to create an ABC using inheritance.
def
get_engine(self):
116 def get_engine(self): 117 if self.engine is None: 118 if self.connect_mode == ConnectMode.SSO.value: 119 self.engine = create_engine( 120 URL( 121 account=self.account, 122 user=self.user, 123 warehouse=self.warehouse, 124 database=self.database, 125 role=self.role, 126 cache_column_metadata=True, 127 schema=self.default_schema, 128 authenticator="externalbrowser", 129 ) 130 ) 131 if self.connect_mode == ConnectMode.PWD.value: 132 self.engine = create_engine( 133 URL( 134 account=self.account, 135 user=self.user, 136 warehouse=self.warehouse, 137 database=self.database, 138 role=self.role, 139 cache_column_metadata=True, 140 schema=self.default_schema, 141 password=os.getenv("SNOWFLAKE_PASSWORD"), 142 ) 143 ) 144 if self.connect_mode == ConnectMode.KEY_PAIR.value: 145 pkb = self.decode_private_key(self.private_key_passphrase) 146 self.engine = create_engine( 147 URL( 148 account=self.account, 149 user=self.user, 150 warehouse=self.warehouse, 151 database=self.database, 152 role=self.role, 153 cache_column_metadata=True, 154 schema=self.default_schema, 155 ), 156 connect_args={ 157 "private_key": pkb, 158 }, 159 ) 160 161 return self.engine
def
execute(self, statement, parameters=None, columns=None, schema=None):
175 def execute(self, statement, parameters=None, columns=None, schema=None): 176 log.debug(f"statement=[{statement}]") 177 conn = self.get_conn() 178 if schema is not None: 179 # Properly quote schema identifier - escape quotes by doubling them (SQL standard) 180 safe_schema = schema.replace('"', '""') 181 conn.execute(f'USE SCHEMA "{safe_schema}";') 182 183 if columns is None: 184 conn.execute("BEGIN") 185 if parameters is not None: 186 conn.execute(text(statement), parameters) 187 else: 188 conn.execute(text(statement)) 189 conn.execute("COMMIT") 190 return True, None 191 192 if parameters is not None: 193 results = pd.read_sql(sql=text(statement), con=conn, params=parameters) 194 else: 195 results = pd.read_sql(sql=text(statement), con=conn) 196 197 return True, results
200class SfConnectorEngine(DBEngineStrategy): 201 def get_engine(self, schema=None): 202 pkb = self.decode_private_key(self.private_key_passphrase) 203 204 if self.engine is None: 205 snowflake.connector.paramstyle = "numeric" 206 self.engine = snowflake.connector.connect( 207 user=self.user, 208 private_key=pkb, 209 account=self.account, 210 warehouse=self.warehouse, 211 database=self.database, 212 schema=self.default_schema, 213 role=self.role, 214 autocommit=False, 215 ) 216 return self.engine 217 218 def get_conn(self): 219 if self.conn is None: 220 if self.connect_mode == ConnectMode.SSO.value: 221 snowflake.connector.paramstyle = "numeric" 222 self.conn = snowflake.connector.connect( 223 account=self.account, 224 user=self.user, 225 role=self.role, 226 database=self.database, 227 schema=self.schema, 228 warehouse=self.warehouse, 229 authenticator="externalbrowser", 230 ) 231 232 if self.connect_mode == ConnectMode.PWD.value: 233 self.conn = snowflake.connector.connect( 234 account=self.account, 235 user=self.user, 236 role=self.role, 237 database=self.database, 238 schema=self.schema, 239 warehouse=self.warehouse, 240 password=os.getenv("SNOWFLAKE_PASSWORD"), 241 ) 242 243 if self.connect_mode == ConnectMode.KEY_PAIR.value: 244 pkb = self.decode_private_key(self.private_key_passphrase) 245 self.conn = snowflake.connector.connect( 246 account=self.account, 247 user=self.user, 248 role=self.role, 249 database=self.database, 250 schema=self.default_schema, 251 warehouse=self.warehouse, 252 private_key=pkb, 253 ) 254 return self.conn 255 256 def close(self): 257 if self.engine is not None: 258 self.engine.close() 259 260 def execute(self, statement, parameters=None, columns=None, schema=None): 261 log.debug(f"statement=[{statement}]") 262 conn = self.get_engine() 263 264 if schema is not None: 265 # Properly quote schema identifier - escape quotes by doubling them (SQL standard) 266 safe_schema = schema.replace('"', '""') 267 conn.cursor().execute(f'USE SCHEMA "{safe_schema}";') 268 269 if columns is None: 270 conn.cursor().execute(statement, parameters) 271 conn.commit() 272 return True, None 273 274 if parameters is not None: 275 results = pd.read_sql(text(statement), conn, *parameters) 276 else: 277 results = pd.read_sql(text(statement), conn) 278 279 return True, results
Helper class that provides a standard way to create an ABC using inheritance.
def
get_engine(self, schema=None):
201 def get_engine(self, schema=None): 202 pkb = self.decode_private_key(self.private_key_passphrase) 203 204 if self.engine is None: 205 snowflake.connector.paramstyle = "numeric" 206 self.engine = snowflake.connector.connect( 207 user=self.user, 208 private_key=pkb, 209 account=self.account, 210 warehouse=self.warehouse, 211 database=self.database, 212 schema=self.default_schema, 213 role=self.role, 214 autocommit=False, 215 ) 216 return self.engine
def
get_conn(self):
218 def get_conn(self): 219 if self.conn is None: 220 if self.connect_mode == ConnectMode.SSO.value: 221 snowflake.connector.paramstyle = "numeric" 222 self.conn = snowflake.connector.connect( 223 account=self.account, 224 user=self.user, 225 role=self.role, 226 database=self.database, 227 schema=self.schema, 228 warehouse=self.warehouse, 229 authenticator="externalbrowser", 230 ) 231 232 if self.connect_mode == ConnectMode.PWD.value: 233 self.conn = snowflake.connector.connect( 234 account=self.account, 235 user=self.user, 236 role=self.role, 237 database=self.database, 238 schema=self.schema, 239 warehouse=self.warehouse, 240 password=os.getenv("SNOWFLAKE_PASSWORD"), 241 ) 242 243 if self.connect_mode == ConnectMode.KEY_PAIR.value: 244 pkb = self.decode_private_key(self.private_key_passphrase) 245 self.conn = snowflake.connector.connect( 246 account=self.account, 247 user=self.user, 248 role=self.role, 249 database=self.database, 250 schema=self.default_schema, 251 warehouse=self.warehouse, 252 private_key=pkb, 253 ) 254 return self.conn
def
execute(self, statement, parameters=None, columns=None, schema=None):
260 def execute(self, statement, parameters=None, columns=None, schema=None): 261 log.debug(f"statement=[{statement}]") 262 conn = self.get_engine() 263 264 if schema is not None: 265 # Properly quote schema identifier - escape quotes by doubling them (SQL standard) 266 safe_schema = schema.replace('"', '""') 267 conn.cursor().execute(f'USE SCHEMA "{safe_schema}";') 268 269 if columns is None: 270 conn.cursor().execute(statement, parameters) 271 conn.commit() 272 return True, None 273 274 if parameters is not None: 275 results = pd.read_sql(text(statement), conn, *parameters) 276 else: 277 results = pd.read_sql(text(statement), conn) 278 279 return True, results
def
get_config_dict( config: configparser.ConfigParser, *, private_key: bytes, password: str, connect_mode: int = 2, user: str = None, cache_column_metadata=False):
282def get_config_dict( 283 config: ConfigParser, 284 *, 285 private_key: bytes, 286 password: str, 287 connect_mode: int = ConnectMode.KEY_PAIR.value, 288 user: str = None, 289 cache_column_metadata=False, 290): 291 db_section = "DB_CONNECTION" 292 config_map = { 293 "account": config.get(db_section, "account"), 294 "warehouse": config.get(db_section, "warehouse"), 295 "user": user, 296 "database": config.get(db_section, "database"), 297 "role": config.get(db_section, "role"), 298 "schema": config.get(db_section, "schema"), 299 "connect_mode": connect_mode, 300 "cache_column_metadata": cache_column_metadata, 301 "timezone": "America/Los_Angeles", 302 } 303 if connect_mode == ConnectMode.PWD.value: 304 config_map["password"] = password 305 elif connect_mode == ConnectMode.KEY_PAIR.value: 306 config_map["private_key"] = private_key 307 config_map["private_key_passphrase"] = password 308 309 return config_map