schema_sentinel.metadata_manager.engine

  1import logging as log
  2import os
  3from abc import ABC, abstractmethod
  4from configparser import ConfigParser
  5
  6import pandas as pd
  7import snowflake.connector
  8from cryptography.hazmat.backends import default_backend
  9from cryptography.hazmat.primitives import serialization
 10from snowflake.sqlalchemy import URL
 11from sqlalchemy import MetaData, create_engine, text
 12from sqlalchemy.engine import Engine
 13
 14from .enums import ConnectMode
 15
 16
 17class DBEngineStrategy(ABC):
 18    engine = None
 19    conn = None
 20    metadata: MetaData = None
 21    connect_mode = ConnectMode.KEY_PAIR.value
 22    env: str = "dev"
 23    url: str = None
 24
 25    def __init__(self, config: dict, env: str = "dev"):
 26        self.account = config.get("account")
 27        self.user = config.get("user")
 28        self.warehouse = config.get("warehouse")
 29        self.database = config.get("database")
 30        self.private_key = config.get("private_key") if "private_key" in config else b""
 31        self.role = config.get("role")
 32        self.engine = None
 33        self.private_key_passphrase = config.get("private_key_passphrase") if "private_key_passphrase" in config else ""
 34        self.default_schema = config.get("schema")
 35        self.connect_mode = config.get("connect_mode")
 36        self.env = env
 37        self.url = config.get("url")
 38
 39    @abstractmethod
 40    def get_engine(self):
 41        pass
 42
 43    @abstractmethod
 44    def get_conn(self):
 45        pass
 46
 47    @abstractmethod
 48    def close(self):
 49        pass
 50
 51    def __del__(self):
 52        self.close()
 53
 54    @abstractmethod
 55    def execute(self, statement, parameters=None, columns=None, schema=None):
 56        pass
 57
 58    def decode_private_key(self, password):
 59        p_key = serialization.load_pem_private_key(
 60            self.private_key, password=password.encode(), backend=default_backend()
 61        )
 62
 63        pkb = p_key.private_bytes(
 64            encoding=serialization.Encoding.DER,
 65            format=serialization.PrivateFormat.PKCS8,
 66            encryption_algorithm=serialization.NoEncryption(),
 67        )
 68
 69        return pkb
 70
 71
 72class SqLiteAqlAlchemyEngine(DBEngineStrategy):
 73    metadata: MetaData = None
 74    engine: Engine = None
 75    url: str = None
 76
 77    def __int__(self, config: dict, env: str, url: str):
 78        config["url"] = url
 79        super().__init__(env=None, config=config)
 80
 81    def get_engine(self):
 82        if self.engine is None:
 83            self.engine = create_engine(url=self.url)
 84        return self.engine
 85
 86    def close(self):
 87        if self.engine:
 88            self.engine.dispose()
 89
 90    def execute(self, statement, parameters=None, columns=None, schema=None):
 91        log.debug(f"statement=[{statement}]")
 92        conn = self.get_conn()
 93
 94        if not columns:
 95            if parameters is not None:
 96                conn.execute(text(statement), parameters)
 97            else:
 98                conn.execute(text(statement))
 99            return True, None
100
101        if parameters is not None:
102            results = pd.read_sql(sql=text(statement), con=conn, params=parameters)
103        else:
104            results = pd.read_sql(sql=text(statement), con=conn)
105
106        return True, results
107
108    def get_conn(self):
109        return self.get_engine()
110
111
112class SfAlchemyEngine(DBEngineStrategy):
113    snowflake.connector.paramstyle = "numeric"
114
115    def get_engine(self):
116        if self.engine is None:
117            if self.connect_mode == ConnectMode.SSO.value:
118                self.engine = create_engine(
119                    URL(
120                        account=self.account,
121                        user=self.user,
122                        warehouse=self.warehouse,
123                        database=self.database,
124                        role=self.role,
125                        cache_column_metadata=True,
126                        schema=self.default_schema,
127                        authenticator="externalbrowser",
128                    )
129                )
130            if self.connect_mode == ConnectMode.PWD.value:
131                self.engine = create_engine(
132                    URL(
133                        account=self.account,
134                        user=self.user,
135                        warehouse=self.warehouse,
136                        database=self.database,
137                        role=self.role,
138                        cache_column_metadata=True,
139                        schema=self.default_schema,
140                        password=os.getenv("SNOWFLAKE_PASSWORD"),
141                    )
142                )
143            if self.connect_mode == ConnectMode.KEY_PAIR.value:
144                pkb = self.decode_private_key(self.private_key_passphrase)
145                self.engine = create_engine(
146                    URL(
147                        account=self.account,
148                        user=self.user,
149                        warehouse=self.warehouse,
150                        database=self.database,
151                        role=self.role,
152                        cache_column_metadata=True,
153                        schema=self.default_schema,
154                    ),
155                    connect_args={
156                        "private_key": pkb,
157                    },
158                )
159
160        return self.engine
161
162    def get_conn(self):
163        if self.conn is None:
164            engine = self.get_engine()
165            self.conn = engine.connect().execution_options(autocommit=False)
166        return self.conn
167
168    def close(self):
169        if self.conn is not None:
170            self.conn.close()
171        if self.engine is not None:
172            self.engine.dispose()
173
174    def execute(self, statement, parameters=None, columns=None, schema=None):
175        log.debug(f"statement=[{statement}]")
176        conn = self.get_conn()
177        if schema is not None:
178            # Properly quote schema identifier - escape quotes by doubling them (SQL standard)
179            safe_schema = schema.replace('"', '""')
180            conn.execute(f'USE SCHEMA "{safe_schema}";')
181
182        if columns is None:
183            conn.execute("BEGIN")
184            if parameters is not None:
185                conn.execute(text(statement), parameters)
186            else:
187                conn.execute(text(statement))
188            conn.execute("COMMIT")
189            return True, None
190
191        if parameters is not None:
192            results = pd.read_sql(sql=text(statement), con=conn, params=parameters)
193        else:
194            results = pd.read_sql(sql=text(statement), con=conn)
195
196        return True, results
197
198
199class SfConnectorEngine(DBEngineStrategy):
200    def get_engine(self, schema=None):
201        pkb = self.decode_private_key(self.private_key_passphrase)
202
203        if self.engine is None:
204            snowflake.connector.paramstyle = "numeric"
205            self.engine = snowflake.connector.connect(
206                user=self.user,
207                private_key=pkb,
208                account=self.account,
209                warehouse=self.warehouse,
210                database=self.database,
211                schema=self.default_schema,
212                role=self.role,
213                autocommit=False,
214            )
215        return self.engine
216
217    def get_conn(self):
218        if self.conn is None:
219            if self.connect_mode == ConnectMode.SSO.value:
220                snowflake.connector.paramstyle = "numeric"
221                self.conn = snowflake.connector.connect(
222                    account=self.account,
223                    user=self.user,
224                    role=self.role,
225                    database=self.database,
226                    schema=self.schema,
227                    warehouse=self.warehouse,
228                    authenticator="externalbrowser",
229                )
230
231            if self.connect_mode == ConnectMode.PWD.value:
232                self.conn = snowflake.connector.connect(
233                    account=self.account,
234                    user=self.user,
235                    role=self.role,
236                    database=self.database,
237                    schema=self.schema,
238                    warehouse=self.warehouse,
239                    password=os.getenv("SNOWFLAKE_PASSWORD"),
240                )
241
242            if self.connect_mode == ConnectMode.KEY_PAIR.value:
243                pkb = self.decode_private_key(self.private_key_passphrase)
244                self.conn = snowflake.connector.connect(
245                    account=self.account,
246                    user=self.user,
247                    role=self.role,
248                    database=self.database,
249                    schema=self.default_schema,
250                    warehouse=self.warehouse,
251                    private_key=pkb,
252                )
253        return self.conn
254
255    def close(self):
256        if self.engine is not None:
257            self.engine.close()
258
259    def execute(self, statement, parameters=None, columns=None, schema=None):
260        log.debug(f"statement=[{statement}]")
261        conn = self.get_engine()
262
263        if schema is not None:
264            # Properly quote schema identifier - escape quotes by doubling them (SQL standard)
265            safe_schema = schema.replace('"', '""')
266            conn.cursor().execute(f'USE SCHEMA "{safe_schema}";')
267
268        if columns is None:
269            conn.cursor().execute(statement, parameters)
270            conn.commit()
271            return True, None
272
273        if parameters is not None:
274            results = pd.read_sql(text(statement), conn, *parameters)
275        else:
276            results = pd.read_sql(text(statement), conn)
277
278        return True, results
279
280
281def get_config_dict(
282    config: ConfigParser,
283    *,
284    private_key: bytes,
285    password: str,
286    connect_mode: int = ConnectMode.KEY_PAIR.value,
287    user: str = None,
288    cache_column_metadata=False,
289):
290    db_section = "DB_CONNECTION"
291    config_map = {
292        "account": config.get(db_section, "account"),
293        "warehouse": config.get(db_section, "warehouse"),
294        "user": user,
295        "database": config.get(db_section, "database"),
296        "role": config.get(db_section, "role"),
297        "schema": config.get(db_section, "schema"),
298        "connect_mode": connect_mode,
299        "cache_column_metadata": cache_column_metadata,
300        "timezone": "America/Los_Angeles",
301    }
302    if connect_mode == ConnectMode.PWD.value:
303        config_map["password"] = password
304    elif connect_mode == ConnectMode.KEY_PAIR.value:
305        config_map["private_key"] = private_key
306        config_map["private_key_passphrase"] = password
307
308    return config_map
class DBEngineStrategy(abc.ABC):
18class DBEngineStrategy(ABC):
19    engine = None
20    conn = None
21    metadata: MetaData = None
22    connect_mode = ConnectMode.KEY_PAIR.value
23    env: str = "dev"
24    url: str = None
25
26    def __init__(self, config: dict, env: str = "dev"):
27        self.account = config.get("account")
28        self.user = config.get("user")
29        self.warehouse = config.get("warehouse")
30        self.database = config.get("database")
31        self.private_key = config.get("private_key") if "private_key" in config else b""
32        self.role = config.get("role")
33        self.engine = None
34        self.private_key_passphrase = config.get("private_key_passphrase") if "private_key_passphrase" in config else ""
35        self.default_schema = config.get("schema")
36        self.connect_mode = config.get("connect_mode")
37        self.env = env
38        self.url = config.get("url")
39
40    @abstractmethod
41    def get_engine(self):
42        pass
43
44    @abstractmethod
45    def get_conn(self):
46        pass
47
48    @abstractmethod
49    def close(self):
50        pass
51
52    def __del__(self):
53        self.close()
54
55    @abstractmethod
56    def execute(self, statement, parameters=None, columns=None, schema=None):
57        pass
58
59    def decode_private_key(self, password):
60        p_key = serialization.load_pem_private_key(
61            self.private_key, password=password.encode(), backend=default_backend()
62        )
63
64        pkb = p_key.private_bytes(
65            encoding=serialization.Encoding.DER,
66            format=serialization.PrivateFormat.PKCS8,
67            encryption_algorithm=serialization.NoEncryption(),
68        )
69
70        return pkb

Helper class that provides a standard way to create an ABC using inheritance.

engine = None
conn = None
metadata: sqlalchemy.sql.schema.MetaData = None
connect_mode = 2
env: str = 'dev'
url: str = None
account
user
warehouse
database
private_key
role
private_key_passphrase
default_schema
@abstractmethod
def get_engine(self):
40    @abstractmethod
41    def get_engine(self):
42        pass
@abstractmethod
def get_conn(self):
44    @abstractmethod
45    def get_conn(self):
46        pass
@abstractmethod
def close(self):
48    @abstractmethod
49    def close(self):
50        pass
@abstractmethod
def execute(self, statement, parameters=None, columns=None, schema=None):
55    @abstractmethod
56    def execute(self, statement, parameters=None, columns=None, schema=None):
57        pass
def decode_private_key(self, password):
59    def decode_private_key(self, password):
60        p_key = serialization.load_pem_private_key(
61            self.private_key, password=password.encode(), backend=default_backend()
62        )
63
64        pkb = p_key.private_bytes(
65            encoding=serialization.Encoding.DER,
66            format=serialization.PrivateFormat.PKCS8,
67            encryption_algorithm=serialization.NoEncryption(),
68        )
69
70        return pkb
class SqLiteAqlAlchemyEngine(DBEngineStrategy):
 73class SqLiteAqlAlchemyEngine(DBEngineStrategy):
 74    metadata: MetaData = None
 75    engine: Engine = None
 76    url: str = None
 77
 78    def __int__(self, config: dict, env: str, url: str):
 79        config["url"] = url
 80        super().__init__(env=None, config=config)
 81
 82    def get_engine(self):
 83        if self.engine is None:
 84            self.engine = create_engine(url=self.url)
 85        return self.engine
 86
 87    def close(self):
 88        if self.engine:
 89            self.engine.dispose()
 90
 91    def execute(self, statement, parameters=None, columns=None, schema=None):
 92        log.debug(f"statement=[{statement}]")
 93        conn = self.get_conn()
 94
 95        if not columns:
 96            if parameters is not None:
 97                conn.execute(text(statement), parameters)
 98            else:
 99                conn.execute(text(statement))
100            return True, None
101
102        if parameters is not None:
103            results = pd.read_sql(sql=text(statement), con=conn, params=parameters)
104        else:
105            results = pd.read_sql(sql=text(statement), con=conn)
106
107        return True, results
108
109    def get_conn(self):
110        return self.get_engine()

Helper class that provides a standard way to create an ABC using inheritance.

metadata: sqlalchemy.sql.schema.MetaData = None
engine: sqlalchemy.engine.base.Engine = None
url: str = None
def get_engine(self):
82    def get_engine(self):
83        if self.engine is None:
84            self.engine = create_engine(url=self.url)
85        return self.engine
def close(self):
87    def close(self):
88        if self.engine:
89            self.engine.dispose()
def execute(self, statement, parameters=None, columns=None, schema=None):
 91    def execute(self, statement, parameters=None, columns=None, schema=None):
 92        log.debug(f"statement=[{statement}]")
 93        conn = self.get_conn()
 94
 95        if not columns:
 96            if parameters is not None:
 97                conn.execute(text(statement), parameters)
 98            else:
 99                conn.execute(text(statement))
100            return True, None
101
102        if parameters is not None:
103            results = pd.read_sql(sql=text(statement), con=conn, params=parameters)
104        else:
105            results = pd.read_sql(sql=text(statement), con=conn)
106
107        return True, results
def get_conn(self):
109    def get_conn(self):
110        return self.get_engine()
class SfAlchemyEngine(DBEngineStrategy):
113class SfAlchemyEngine(DBEngineStrategy):
114    snowflake.connector.paramstyle = "numeric"
115
116    def get_engine(self):
117        if self.engine is None:
118            if self.connect_mode == ConnectMode.SSO.value:
119                self.engine = create_engine(
120                    URL(
121                        account=self.account,
122                        user=self.user,
123                        warehouse=self.warehouse,
124                        database=self.database,
125                        role=self.role,
126                        cache_column_metadata=True,
127                        schema=self.default_schema,
128                        authenticator="externalbrowser",
129                    )
130                )
131            if self.connect_mode == ConnectMode.PWD.value:
132                self.engine = create_engine(
133                    URL(
134                        account=self.account,
135                        user=self.user,
136                        warehouse=self.warehouse,
137                        database=self.database,
138                        role=self.role,
139                        cache_column_metadata=True,
140                        schema=self.default_schema,
141                        password=os.getenv("SNOWFLAKE_PASSWORD"),
142                    )
143                )
144            if self.connect_mode == ConnectMode.KEY_PAIR.value:
145                pkb = self.decode_private_key(self.private_key_passphrase)
146                self.engine = create_engine(
147                    URL(
148                        account=self.account,
149                        user=self.user,
150                        warehouse=self.warehouse,
151                        database=self.database,
152                        role=self.role,
153                        cache_column_metadata=True,
154                        schema=self.default_schema,
155                    ),
156                    connect_args={
157                        "private_key": pkb,
158                    },
159                )
160
161        return self.engine
162
163    def get_conn(self):
164        if self.conn is None:
165            engine = self.get_engine()
166            self.conn = engine.connect().execution_options(autocommit=False)
167        return self.conn
168
169    def close(self):
170        if self.conn is not None:
171            self.conn.close()
172        if self.engine is not None:
173            self.engine.dispose()
174
175    def execute(self, statement, parameters=None, columns=None, schema=None):
176        log.debug(f"statement=[{statement}]")
177        conn = self.get_conn()
178        if schema is not None:
179            # Properly quote schema identifier - escape quotes by doubling them (SQL standard)
180            safe_schema = schema.replace('"', '""')
181            conn.execute(f'USE SCHEMA "{safe_schema}";')
182
183        if columns is None:
184            conn.execute("BEGIN")
185            if parameters is not None:
186                conn.execute(text(statement), parameters)
187            else:
188                conn.execute(text(statement))
189            conn.execute("COMMIT")
190            return True, None
191
192        if parameters is not None:
193            results = pd.read_sql(sql=text(statement), con=conn, params=parameters)
194        else:
195            results = pd.read_sql(sql=text(statement), con=conn)
196
197        return True, results

Helper class that provides a standard way to create an ABC using inheritance.

def get_engine(self):
116    def get_engine(self):
117        if self.engine is None:
118            if self.connect_mode == ConnectMode.SSO.value:
119                self.engine = create_engine(
120                    URL(
121                        account=self.account,
122                        user=self.user,
123                        warehouse=self.warehouse,
124                        database=self.database,
125                        role=self.role,
126                        cache_column_metadata=True,
127                        schema=self.default_schema,
128                        authenticator="externalbrowser",
129                    )
130                )
131            if self.connect_mode == ConnectMode.PWD.value:
132                self.engine = create_engine(
133                    URL(
134                        account=self.account,
135                        user=self.user,
136                        warehouse=self.warehouse,
137                        database=self.database,
138                        role=self.role,
139                        cache_column_metadata=True,
140                        schema=self.default_schema,
141                        password=os.getenv("SNOWFLAKE_PASSWORD"),
142                    )
143                )
144            if self.connect_mode == ConnectMode.KEY_PAIR.value:
145                pkb = self.decode_private_key(self.private_key_passphrase)
146                self.engine = create_engine(
147                    URL(
148                        account=self.account,
149                        user=self.user,
150                        warehouse=self.warehouse,
151                        database=self.database,
152                        role=self.role,
153                        cache_column_metadata=True,
154                        schema=self.default_schema,
155                    ),
156                    connect_args={
157                        "private_key": pkb,
158                    },
159                )
160
161        return self.engine
def get_conn(self):
163    def get_conn(self):
164        if self.conn is None:
165            engine = self.get_engine()
166            self.conn = engine.connect().execution_options(autocommit=False)
167        return self.conn
def close(self):
169    def close(self):
170        if self.conn is not None:
171            self.conn.close()
172        if self.engine is not None:
173            self.engine.dispose()
def execute(self, statement, parameters=None, columns=None, schema=None):
175    def execute(self, statement, parameters=None, columns=None, schema=None):
176        log.debug(f"statement=[{statement}]")
177        conn = self.get_conn()
178        if schema is not None:
179            # Properly quote schema identifier - escape quotes by doubling them (SQL standard)
180            safe_schema = schema.replace('"', '""')
181            conn.execute(f'USE SCHEMA "{safe_schema}";')
182
183        if columns is None:
184            conn.execute("BEGIN")
185            if parameters is not None:
186                conn.execute(text(statement), parameters)
187            else:
188                conn.execute(text(statement))
189            conn.execute("COMMIT")
190            return True, None
191
192        if parameters is not None:
193            results = pd.read_sql(sql=text(statement), con=conn, params=parameters)
194        else:
195            results = pd.read_sql(sql=text(statement), con=conn)
196
197        return True, results
class SfConnectorEngine(DBEngineStrategy):
200class SfConnectorEngine(DBEngineStrategy):
201    def get_engine(self, schema=None):
202        pkb = self.decode_private_key(self.private_key_passphrase)
203
204        if self.engine is None:
205            snowflake.connector.paramstyle = "numeric"
206            self.engine = snowflake.connector.connect(
207                user=self.user,
208                private_key=pkb,
209                account=self.account,
210                warehouse=self.warehouse,
211                database=self.database,
212                schema=self.default_schema,
213                role=self.role,
214                autocommit=False,
215            )
216        return self.engine
217
218    def get_conn(self):
219        if self.conn is None:
220            if self.connect_mode == ConnectMode.SSO.value:
221                snowflake.connector.paramstyle = "numeric"
222                self.conn = snowflake.connector.connect(
223                    account=self.account,
224                    user=self.user,
225                    role=self.role,
226                    database=self.database,
227                    schema=self.schema,
228                    warehouse=self.warehouse,
229                    authenticator="externalbrowser",
230                )
231
232            if self.connect_mode == ConnectMode.PWD.value:
233                self.conn = snowflake.connector.connect(
234                    account=self.account,
235                    user=self.user,
236                    role=self.role,
237                    database=self.database,
238                    schema=self.schema,
239                    warehouse=self.warehouse,
240                    password=os.getenv("SNOWFLAKE_PASSWORD"),
241                )
242
243            if self.connect_mode == ConnectMode.KEY_PAIR.value:
244                pkb = self.decode_private_key(self.private_key_passphrase)
245                self.conn = snowflake.connector.connect(
246                    account=self.account,
247                    user=self.user,
248                    role=self.role,
249                    database=self.database,
250                    schema=self.default_schema,
251                    warehouse=self.warehouse,
252                    private_key=pkb,
253                )
254        return self.conn
255
256    def close(self):
257        if self.engine is not None:
258            self.engine.close()
259
260    def execute(self, statement, parameters=None, columns=None, schema=None):
261        log.debug(f"statement=[{statement}]")
262        conn = self.get_engine()
263
264        if schema is not None:
265            # Properly quote schema identifier - escape quotes by doubling them (SQL standard)
266            safe_schema = schema.replace('"', '""')
267            conn.cursor().execute(f'USE SCHEMA "{safe_schema}";')
268
269        if columns is None:
270            conn.cursor().execute(statement, parameters)
271            conn.commit()
272            return True, None
273
274        if parameters is not None:
275            results = pd.read_sql(text(statement), conn, *parameters)
276        else:
277            results = pd.read_sql(text(statement), conn)
278
279        return True, results

Helper class that provides a standard way to create an ABC using inheritance.

def get_engine(self, schema=None):
201    def get_engine(self, schema=None):
202        pkb = self.decode_private_key(self.private_key_passphrase)
203
204        if self.engine is None:
205            snowflake.connector.paramstyle = "numeric"
206            self.engine = snowflake.connector.connect(
207                user=self.user,
208                private_key=pkb,
209                account=self.account,
210                warehouse=self.warehouse,
211                database=self.database,
212                schema=self.default_schema,
213                role=self.role,
214                autocommit=False,
215            )
216        return self.engine
def get_conn(self):
218    def get_conn(self):
219        if self.conn is None:
220            if self.connect_mode == ConnectMode.SSO.value:
221                snowflake.connector.paramstyle = "numeric"
222                self.conn = snowflake.connector.connect(
223                    account=self.account,
224                    user=self.user,
225                    role=self.role,
226                    database=self.database,
227                    schema=self.schema,
228                    warehouse=self.warehouse,
229                    authenticator="externalbrowser",
230                )
231
232            if self.connect_mode == ConnectMode.PWD.value:
233                self.conn = snowflake.connector.connect(
234                    account=self.account,
235                    user=self.user,
236                    role=self.role,
237                    database=self.database,
238                    schema=self.schema,
239                    warehouse=self.warehouse,
240                    password=os.getenv("SNOWFLAKE_PASSWORD"),
241                )
242
243            if self.connect_mode == ConnectMode.KEY_PAIR.value:
244                pkb = self.decode_private_key(self.private_key_passphrase)
245                self.conn = snowflake.connector.connect(
246                    account=self.account,
247                    user=self.user,
248                    role=self.role,
249                    database=self.database,
250                    schema=self.default_schema,
251                    warehouse=self.warehouse,
252                    private_key=pkb,
253                )
254        return self.conn
def close(self):
256    def close(self):
257        if self.engine is not None:
258            self.engine.close()
def execute(self, statement, parameters=None, columns=None, schema=None):
260    def execute(self, statement, parameters=None, columns=None, schema=None):
261        log.debug(f"statement=[{statement}]")
262        conn = self.get_engine()
263
264        if schema is not None:
265            # Properly quote schema identifier - escape quotes by doubling them (SQL standard)
266            safe_schema = schema.replace('"', '""')
267            conn.cursor().execute(f'USE SCHEMA "{safe_schema}";')
268
269        if columns is None:
270            conn.cursor().execute(statement, parameters)
271            conn.commit()
272            return True, None
273
274        if parameters is not None:
275            results = pd.read_sql(text(statement), conn, *parameters)
276        else:
277            results = pd.read_sql(text(statement), conn)
278
279        return True, results
def get_config_dict( config: configparser.ConfigParser, *, private_key: bytes, password: str, connect_mode: int = 2, user: str = None, cache_column_metadata=False):
282def get_config_dict(
283    config: ConfigParser,
284    *,
285    private_key: bytes,
286    password: str,
287    connect_mode: int = ConnectMode.KEY_PAIR.value,
288    user: str = None,
289    cache_column_metadata=False,
290):
291    db_section = "DB_CONNECTION"
292    config_map = {
293        "account": config.get(db_section, "account"),
294        "warehouse": config.get(db_section, "warehouse"),
295        "user": user,
296        "database": config.get(db_section, "database"),
297        "role": config.get(db_section, "role"),
298        "schema": config.get(db_section, "schema"),
299        "connect_mode": connect_mode,
300        "cache_column_metadata": cache_column_metadata,
301        "timezone": "America/Los_Angeles",
302    }
303    if connect_mode == ConnectMode.PWD.value:
304        config_map["password"] = password
305    elif connect_mode == ConnectMode.KEY_PAIR.value:
306        config_map["private_key"] = private_key
307        config_map["private_key_passphrase"] = password
308
309    return config_map