geronimo.data_sources
Geronimo Data Layer.
The data_sources module provides a unified abstraction for connecting to, querying, and ingesting data from various upstream sources. It decouples the ML modeling logic from the underlying data infrastructure.
Key components:
- DataSource: Represents a table or view in a database.
- Query: A composable query object for retrieving data.
- DatabaseConnection: Protocol for database adapters.
Supported connections:
- Snowflake
- PostgreSQL
- SQL Server
- Google BigQuery (via generic interface)
This layer handles connection pooling, query generation, and data type mapping.
1"""Geronimo Data Layer. 2 3The data_sources module provides a unified abstraction for connecting to, querying, 4and ingesting data from various upstream sources. It decouples the ML modeling logic 5from the underlying data infrastructure. 6 7Key components: 8- DataSource: Represents a table or view in a database. 9- Query: A composable query object for retrieving data. 10- DatabaseConnection: Protocol for database adapters. 11 12Supported connections: 13- Snowflake 14- PostgreSQL 15- SQL Server 16- Google BigQuery (via generic interface) 17 18This layer handles connection pooling, query generation, and data type mapping. 19""" 20 21from geronimo.data_sources.source import DataSource, JoinSpec, collect_data_sources 22from geronimo.data_sources.query import Query 23from geronimo.data_sources.connection import ( 24 DatabaseConnection, 25 BaseDatabaseConnection, 26 SnowflakeConnection, 27 PostgresConnection, 28 SQLServerConnection, 29 get_connection, 30) 31 32__all__ = [ 33 "DataSource", 34 "JoinSpec", 35 "collect_data_sources", 36 "Query", 37 "DatabaseConnection", 38 "BaseDatabaseConnection", 39 "SnowflakeConnection", 40 "PostgresConnection", 41 "SQLServerConnection", 42 "get_connection", 43] 44 45__docformat__ = "google"
35class DataSource: 36 """Abstraction for loading data from various backends. 37 38 Provides a unified interface for querying data from databases, 39 loading from files, or calling custom functions. 40 41 Example (database): 42 ```python 43 from geronimo.data_sources import DataSource, Query 44 45 training_data = DataSource( 46 name="customer_features", 47 source="snowflake", 48 query=Query.from_file("queries/training_data.sql"), 49 ) 50 df = training_data.load(start_date="2024-01-01") 51 ``` 52 53 Example (function): 54 ```python 55 from geronimo.data_sources import DataSource 56 from sklearn.datasets import load_iris 57 import pandas as pd 58 59 def load_iris_data() -> pd.DataFrame: 60 iris = load_iris() 61 return pd.DataFrame(iris.data, columns=iris.feature_names) 62 63 training_data = DataSource( 64 name="iris", 65 source="function", 66 handle=load_iris_data, 67 ) 68 df = training_data.load() # Validates return type at runtime 69 ``` 70 71 Note: 72 When using `source="function"`, the provided handle function MUST: 73 1. Return a pandas DataFrame 74 2. Be callable with optional keyword arguments 75 76 A DataSourceError is raised at runtime if the function does not 77 return a DataFrame. 78 """ 79 80 name: str 81 """The name of the data source.""" 82 83 source: SourceType 84 """The type of the data source.""" 85 86 query: Optional[Query] 87 """The query object (for database sources).""" 88 89 path: Optional[str] 90 """The file path (for file sources).""" 91 92 handle: Optional[Callable[..., pd.DataFrame]] 93 """The function handle (for function sources).""" 94 95 connection_params: dict[str, Any] 96 """Connection parameters.""" 97 98 _custom_connection: Optional[DatabaseConnection] 99 """Internal custom connection instance.""" 100 101 join_spec: Optional["JoinSpec"] 102 """Specification for joining to this source.""" 103 104 def __init__( 105 self, 106 name: str, 107 source: SourceType | str, 108 query: Optional[Query] = None, 109 path: Optional[str] = None, 110 handle: Optional[Callable[..., pd.DataFrame]] = None, 111 connection_params: Optional[dict[str, Any]] = None, 112 connection: Optional[DatabaseConnection] = None, 113 join_spec: Optional["JoinSpec"] = None, 114 ): 115 """Initialize data source. 116 117 Args: 118 name: Descriptive name for the data source. 119 source: Source type (snowflake, postgres, sqlserver, file, function). 120 query: Query object for database sources. 121 path: File path for file-based sources. 122 handle: Callable that returns a DataFrame (for function sources). 123 Must return pd.DataFrame - validated at runtime. 124 connection_params: Optional connection parameters (overrides env vars). 125 connection: Optional custom DatabaseConnection implementation. 126 127 Raises: 128 ValueError: If required arguments are missing for the source type. 129 """ 130 self.name = name 131 self.source = SourceType(source) if isinstance(source, str) else source 132 self.query = query 133 self.path = path 134 self.handle = handle 135 self.connection_params = connection_params or {} 136 self._custom_connection = connection 137 self.join_spec = join_spec 138 139 # Validate required arguments based on source type 140 if self.source == SourceType.FUNC: 141 if not handle: 142 raise ValueError("Function sources require a handle") 143 if not callable(handle): 144 raise ValueError("handle must be callable") 145 elif self.source == SourceType.FILE: 146 if not path: 147 raise ValueError("File sources require a path") 148 else: 149 # Database sources 150 if not query: 151 raise ValueError("Database sources require a query") 152 153 def load(self, **params) -> pd.DataFrame: 154 """Load data from source. 155 156 Args: 157 **params: Parameters passed to the data loading function. 158 For database sources, these are query parameters. 159 For function sources, these are passed to the handle. 160 161 Returns: 162 DataFrame with loaded data. 163 164 Raises: 165 DataSourceError: If function source doesn't return a DataFrame. 166 """ 167 if self.source == SourceType.FILE: 168 return self._load_file() 169 elif self.source == SourceType.FUNC: 170 return self._load_function(**params) 171 else: 172 return self._load_database(**params) 173 174 def _load_function(self, **params) -> pd.DataFrame: 175 """Load data by calling the handle function. 176 177 Validates at runtime that the function returns a DataFrame. 178 179 Args: 180 **params: Keyword arguments passed to the handle function. 181 182 Returns: 183 DataFrame returned by the handle function. 184 185 Raises: 186 DataSourceError: If handle doesn't return a DataFrame or raises an exception. 187 """ 188 try: 189 result = self.handle(**params) 190 except Exception as e: 191 raise DataSourceError( 192 f"DataSource '{self.name}' handle function raised an exception: {e}" 193 ) from e 194 195 # Runtime validation: ensure result is a DataFrame 196 if not isinstance(result, pd.DataFrame): 197 actual_type = type(result).__name__ 198 raise DataSourceError( 199 f"DataSource '{self.name}' handle function must return a pandas DataFrame, " 200 f"but returned {actual_type}. " 201 f"Ensure your function returns pd.DataFrame." 202 ) 203 204 return result 205 206 def _load_file(self) -> pd.DataFrame: 207 """Load data from file.""" 208 from pathlib import Path 209 210 path = Path(self.path) 211 if path.suffix == ".csv": 212 return pd.read_csv(path) 213 elif path.suffix in [".parquet", ".pq"]: 214 return pd.read_parquet(path) 215 elif path.suffix == ".json": 216 return pd.read_json(path) 217 else: 218 raise ValueError(f"Unsupported file format: {path.suffix}") 219 220 def _load_database(self, **params) -> pd.DataFrame: 221 """Load data from database using connection interface.""" 222 sql = self.query.render(**params) 223 224 # Use custom connection if provided, otherwise create from factory 225 if self._custom_connection is not None: 226 connection = self._custom_connection 227 else: 228 connection = get_connection(self.source.value, self.connection_params) 229 230 # Use context manager for automatic connection cleanup 231 with connection: 232 return connection.execute(sql) 233 234 def __repr__(self) -> str: 235 return f"DataSource({self.name}, source={self.source.value})"
Abstraction for loading data from various backends.
Provides a unified interface for querying data from databases, loading from files, or calling custom functions.
Example (database):
from geronimo.data_sources import DataSource, Query
training_data = DataSource(
name="customer_features",
source="snowflake",
query=Query.from_file("queries/training_data.sql"),
)
df = training_data.load(start_date="2024-01-01")
Example (function):
from geronimo.data_sources import DataSource
from sklearn.datasets import load_iris
import pandas as pd
def load_iris_data() -> pd.DataFrame:
iris = load_iris()
return pd.DataFrame(iris.data, columns=iris.feature_names)
training_data = DataSource(
name="iris",
source="function",
handle=load_iris_data,
)
df = training_data.load() # Validates return type at runtime
Note:
When using
source="function", the provided handle function MUST:
- Return a pandas DataFrame
- Be callable with optional keyword arguments
A DataSourceError is raised at runtime if the function does not return a DataFrame.
104 def __init__( 105 self, 106 name: str, 107 source: SourceType | str, 108 query: Optional[Query] = None, 109 path: Optional[str] = None, 110 handle: Optional[Callable[..., pd.DataFrame]] = None, 111 connection_params: Optional[dict[str, Any]] = None, 112 connection: Optional[DatabaseConnection] = None, 113 join_spec: Optional["JoinSpec"] = None, 114 ): 115 """Initialize data source. 116 117 Args: 118 name: Descriptive name for the data source. 119 source: Source type (snowflake, postgres, sqlserver, file, function). 120 query: Query object for database sources. 121 path: File path for file-based sources. 122 handle: Callable that returns a DataFrame (for function sources). 123 Must return pd.DataFrame - validated at runtime. 124 connection_params: Optional connection parameters (overrides env vars). 125 connection: Optional custom DatabaseConnection implementation. 126 127 Raises: 128 ValueError: If required arguments are missing for the source type. 129 """ 130 self.name = name 131 self.source = SourceType(source) if isinstance(source, str) else source 132 self.query = query 133 self.path = path 134 self.handle = handle 135 self.connection_params = connection_params or {} 136 self._custom_connection = connection 137 self.join_spec = join_spec 138 139 # Validate required arguments based on source type 140 if self.source == SourceType.FUNC: 141 if not handle: 142 raise ValueError("Function sources require a handle") 143 if not callable(handle): 144 raise ValueError("handle must be callable") 145 elif self.source == SourceType.FILE: 146 if not path: 147 raise ValueError("File sources require a path") 148 else: 149 # Database sources 150 if not query: 151 raise ValueError("Database sources require a query")
Initialize data source.
Arguments:
- name: Descriptive name for the data source.
- source: Source type (snowflake, postgres, sqlserver, file, function).
- query: Query object for database sources.
- path: File path for file-based sources.
- handle: Callable that returns a DataFrame (for function sources). Must return pd.DataFrame - validated at runtime.
- connection_params: Optional connection parameters (overrides env vars).
- connection: Optional custom DatabaseConnection implementation.
Raises:
- ValueError: If required arguments are missing for the source type.
The function handle (for function sources).
153 def load(self, **params) -> pd.DataFrame: 154 """Load data from source. 155 156 Args: 157 **params: Parameters passed to the data loading function. 158 For database sources, these are query parameters. 159 For function sources, these are passed to the handle. 160 161 Returns: 162 DataFrame with loaded data. 163 164 Raises: 165 DataSourceError: If function source doesn't return a DataFrame. 166 """ 167 if self.source == SourceType.FILE: 168 return self._load_file() 169 elif self.source == SourceType.FUNC: 170 return self._load_function(**params) 171 else: 172 return self._load_database(**params)
Load data from source.
Arguments:
- **params: Parameters passed to the data loading function. For database sources, these are query parameters. For function sources, these are passed to the handle.
Returns:
DataFrame with loaded data.
Raises:
- DataSourceError: If function source doesn't return a DataFrame.
238@dataclass 239class JoinSpec: 240 """Specification for joining a DataSource to the primary source. 241 242 Used when combining multiple DataSources that share a common key. 243 The first DataSource in a list is treated as the primary; subsequent 244 sources are joined to it using their JoinSpec. 245 246 Example: 247 ```python 248 from geronimo.data_sources import DataSource, JoinSpec 249 250 # Primary training source 251 training_customers = DataSource( 252 name="customers", 253 source="file", 254 path="data/customers.csv", 255 ) 256 257 # Secondary source to join 258 training_transactions = DataSource( 259 name="transactions", 260 source="file", 261 path="data/transactions.csv", 262 join_spec=JoinSpec( 263 left_on="customer_id", 264 right_on="customer_id", 265 how="left", 266 ), 267 ) 268 ``` 269 270 Attributes: 271 left_on: Column name in the primary (left) source. 272 right_on: Column name in this (right) source. 273 how: Join type - 'left', 'right', 'inner', or 'outer'. 274 """ 275 left_on: str 276 right_on: str 277 how: str = "left"
Specification for joining a DataSource to the primary source.
Used when combining multiple DataSources that share a common key. The first DataSource in a list is treated as the primary; subsequent sources are joined to it using their JoinSpec.
Example:
from geronimo.data_sources import DataSource, JoinSpec # Primary training source training_customers = DataSource( name="customers", source="file", path="data/customers.csv", ) # Secondary source to join training_transactions = DataSource( name="transactions", source="file", path="data/transactions.csv", join_spec=JoinSpec( left_on="customer_id", right_on="customer_id", how="left", ), )
Attributes:
- left_on: Column name in the primary (left) source.
- right_on: Column name in this (right) source.
- how: Join type - 'left', 'right', 'inner', or 'outer'.
280def collect_data_sources(module, prefix: str) -> list[DataSource]: 281 """Collect all DataSource objects whose variable names start with prefix. 282 283 Useful for dynamically collecting training_* or production_* DataSources. 284 285 Example: 286 ```python 287 # In data_sources.py 288 from geronimo.data_sources import DataSource, collect_data_sources 289 import sys 290 291 training_customers = DataSource(...) 292 training_transactions = DataSource(...) 293 production_customers = DataSource(...) 294 295 # Auto-collect by prefix 296 training_sources = collect_data_sources(sys.modules[__name__], "training_") 297 production_sources = collect_data_sources(sys.modules[__name__], "production_") 298 ``` 299 300 Args: 301 module: The module to search (typically sys.modules[__name__]). 302 prefix: Variable name prefix to match (e.g., "training_"). 303 304 Returns: 305 List of DataSource objects whose variable names start with prefix. 306 """ 307 sources = [] 308 for name in dir(module): 309 if name.startswith(prefix): 310 obj = getattr(module, name) 311 if isinstance(obj, DataSource): 312 sources.append(obj) 313 return sources
Collect all DataSource objects whose variable names start with prefix.
Useful for dynamically collecting training_* or production_* DataSources.
Example:
# In data_sources.py from geronimo.data_sources import DataSource, collect_data_sources import sys training_customers = DataSource(...) training_transactions = DataSource(...) production_customers = DataSource(...) # Auto-collect by prefix training_sources = collect_data_sources(sys.modules[__name__], "training_") production_sources = collect_data_sources(sys.modules[__name__], "production_")
Arguments:
- module: The module to search (typically sys.modules[__name__]).
- prefix: Variable name prefix to match (e.g., "training_").
Returns:
List of DataSource objects whose variable names start with prefix.
8class Query: 9 """SQL query wrapper with parameter substitution. 10 11 Supports loading from files and inline SQL definitions. 12 13 Example: 14 ```python 15 # From file 16 query = Query.from_file("queries/training_data.sql") 17 18 # Inline 19 query = Query("SELECT * FROM features WHERE date >= :start_date") 20 21 # With parameters 22 sql = query.render(start_date="2024-01-01") 23 ``` 24 """ 25 26 def __init__(self, sql: str, name: Optional[str] = None): 27 """Initialize query. 28 29 Args: 30 sql: SQL query string with optional :param placeholders. 31 name: Optional query name for tracking. 32 """ 33 self.sql = sql 34 self.name = name 35 36 @classmethod 37 def from_file(cls, path: str | Path) -> "Query": 38 """Load query from SQL file. 39 40 Args: 41 path: Path to .sql file. 42 43 Returns: 44 Query instance. 45 """ 46 path = Path(path) 47 sql = path.read_text() 48 return cls(sql=sql, name=path.stem) 49 50 def render(self, **params) -> str: 51 """Render query with parameter substitution. 52 53 Args: 54 **params: Named parameters to substitute. 55 56 Returns: 57 Rendered SQL string. 58 """ 59 sql = self.sql 60 for key, value in params.items(): 61 placeholder = f":{key}" 62 if isinstance(value, str): 63 sql = sql.replace(placeholder, f"'{value}'") 64 else: 65 sql = sql.replace(placeholder, str(value)) 66 return sql 67 68 def __repr__(self) -> str: 69 name = self.name or "unnamed" 70 preview = self.sql[:50] + "..." if len(self.sql) > 50 else self.sql 71 return f"Query({name}: {preview})"
SQL query wrapper with parameter substitution.
Supports loading from files and inline SQL definitions.
Example:
# From file query = Query.from_file("queries/training_data.sql") # Inline query = Query("SELECT * FROM features WHERE date >= :start_date") # With parameters sql = query.render(start_date="2024-01-01")
26 def __init__(self, sql: str, name: Optional[str] = None): 27 """Initialize query. 28 29 Args: 30 sql: SQL query string with optional :param placeholders. 31 name: Optional query name for tracking. 32 """ 33 self.sql = sql 34 self.name = name
Initialize query.
Arguments:
- sql: SQL query string with optional :param placeholders.
- name: Optional query name for tracking.
36 @classmethod 37 def from_file(cls, path: str | Path) -> "Query": 38 """Load query from SQL file. 39 40 Args: 41 path: Path to .sql file. 42 43 Returns: 44 Query instance. 45 """ 46 path = Path(path) 47 sql = path.read_text() 48 return cls(sql=sql, name=path.stem)
Load query from SQL file.
Arguments:
- path: Path to .sql file.
Returns:
Query instance.
50 def render(self, **params) -> str: 51 """Render query with parameter substitution. 52 53 Args: 54 **params: Named parameters to substitute. 55 56 Returns: 57 Rendered SQL string. 58 """ 59 sql = self.sql 60 for key, value in params.items(): 61 placeholder = f":{key}" 62 if isinstance(value, str): 63 sql = sql.replace(placeholder, f"'{value}'") 64 else: 65 sql = sql.replace(placeholder, str(value)) 66 return sql
Render query with parameter substitution.
Arguments:
- **params: Named parameters to substitute.
Returns:
Rendered SQL string.
15@runtime_checkable 16class DatabaseConnection(Protocol): 17 """Protocol for database connections. 18 19 Implement this protocol to add support for new database types. 20 21 Example: 22 ```python 23 class MyDatabaseConnection: 24 def __init__(self, connection_string: str): 25 self.connection_string = connection_string 26 self._conn = None 27 28 def connect(self) -> None: 29 self._conn = my_db_driver.connect(self.connection_string) 30 31 def execute(self, sql: str) -> pd.DataFrame: 32 return pd.read_sql(sql, self._conn) 33 34 def close(self) -> None: 35 if self._conn: 36 self._conn.close() 37 ``` 38 """ 39 40 def connect(self) -> None: 41 """Establish connection to the database.""" 42 ... 43 44 def execute(self, sql: str) -> pd.DataFrame: 45 """Execute a SQL query and return results as DataFrame. 46 47 Args: 48 sql: SQL query to execute. 49 50 Returns: 51 DataFrame containing query results. 52 """ 53 ... 54 55 def close(self) -> None: 56 """Close the database connection.""" 57 ...
Protocol for database connections.
Implement this protocol to add support for new database types.
Example:
class MyDatabaseConnection: def __init__(self, connection_string: str): self.connection_string = connection_string self._conn = None def connect(self) -> None: self._conn = my_db_driver.connect(self.connection_string) def execute(self, sql: str) -> pd.DataFrame: return pd.read_sql(sql, self._conn) def close(self) -> None: if self._conn: self._conn.close()
1953def _no_init_or_replace_init(self, *args, **kwargs): 1954 cls = type(self) 1955 1956 if cls._is_protocol: 1957 raise TypeError('Protocols cannot be instantiated') 1958 1959 # Already using a custom `__init__`. No need to calculate correct 1960 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1961 if cls.__init__ is not _no_init_or_replace_init: 1962 return 1963 1964 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1965 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1966 # searches for a proper new `__init__` in the MRO. The new `__init__` 1967 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1968 # instantiation of the protocol subclass will thus use the new 1969 # `__init__` and no longer call `_no_init_or_replace_init`. 1970 for base in cls.__mro__: 1971 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1972 if init is not _no_init_or_replace_init: 1973 cls.__init__ = init 1974 break 1975 else: 1976 # should not happen 1977 cls.__init__ = object.__init__ 1978 1979 cls.__init__(self, *args, **kwargs)
44 def execute(self, sql: str) -> pd.DataFrame: 45 """Execute a SQL query and return results as DataFrame. 46 47 Args: 48 sql: SQL query to execute. 49 50 Returns: 51 DataFrame containing query results. 52 """ 53 ...
Execute a SQL query and return results as DataFrame.
Arguments:
- sql: SQL query to execute.
Returns:
DataFrame containing query results.
60class BaseDatabaseConnection(ABC): 61 """Abstract base class for database connections. 62 63 Provides common functionality like context manager support 64 and shared execute/close implementations. 65 """ 66 67 connection_params: dict[str, Any] 68 """Connection parameters.""" 69 70 _connection: Any 71 """The low-level database connection object.""" 72 73 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 74 """Initialize connection with parameters. 75 76 Args: 77 connection_params: Optional connection parameters (overrides env vars). 78 """ 79 self.connection_params = connection_params or {} 80 self._connection: Any = None 81 82 @abstractmethod 83 def connect(self) -> None: 84 """Establish connection to the database.""" 85 pass 86 87 def execute(self, sql: str) -> pd.DataFrame: 88 """Execute a SQL query and return results as DataFrame. 89 90 Args: 91 sql: SQL query to execute. 92 93 Returns: 94 DataFrame containing query results. 95 96 Raises: 97 RuntimeError: If not connected. 98 """ 99 if self._connection is None: 100 raise RuntimeError("Not connected. Call connect() first.") 101 return pd.read_sql(sql, self._connection) 102 103 def close(self) -> None: 104 """Close the database connection.""" 105 if self._connection: 106 self._connection.close() 107 self._connection = None 108 109 def __enter__(self) -> "BaseDatabaseConnection": 110 """Context manager entry.""" 111 self.connect() 112 return self 113 114 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 115 """Context manager exit.""" 116 self.close() 117 118 @contextmanager 119 def session(self) -> Generator["BaseDatabaseConnection", None, None]: 120 """Convenience context manager for connection lifecycle. 121 122 Example: 123 ```python 124 conn = SnowflakeConnection(params) 125 with conn.session(): 126 df = conn.execute("SELECT * FROM table") 127 ``` 128 """ 129 try: 130 self.connect() 131 yield self 132 finally: 133 self.close()
Abstract base class for database connections.
Provides common functionality like context manager support and shared execute/close implementations.
73 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 74 """Initialize connection with parameters. 75 76 Args: 77 connection_params: Optional connection parameters (overrides env vars). 78 """ 79 self.connection_params = connection_params or {} 80 self._connection: Any = None
Initialize connection with parameters.
Arguments:
- connection_params: Optional connection parameters (overrides env vars).
82 @abstractmethod 83 def connect(self) -> None: 84 """Establish connection to the database.""" 85 pass
Establish connection to the database.
87 def execute(self, sql: str) -> pd.DataFrame: 88 """Execute a SQL query and return results as DataFrame. 89 90 Args: 91 sql: SQL query to execute. 92 93 Returns: 94 DataFrame containing query results. 95 96 Raises: 97 RuntimeError: If not connected. 98 """ 99 if self._connection is None: 100 raise RuntimeError("Not connected. Call connect() first.") 101 return pd.read_sql(sql, self._connection)
Execute a SQL query and return results as DataFrame.
Arguments:
- sql: SQL query to execute.
Returns:
DataFrame containing query results.
Raises:
- RuntimeError: If not connected.
103 def close(self) -> None: 104 """Close the database connection.""" 105 if self._connection: 106 self._connection.close() 107 self._connection = None
Close the database connection.
118 @contextmanager 119 def session(self) -> Generator["BaseDatabaseConnection", None, None]: 120 """Convenience context manager for connection lifecycle. 121 122 Example: 123 ```python 124 conn = SnowflakeConnection(params) 125 with conn.session(): 126 df = conn.execute("SELECT * FROM table") 127 ``` 128 """ 129 try: 130 self.connect() 131 yield self 132 finally: 133 self.close()
Convenience context manager for connection lifecycle.
Example:
conn = SnowflakeConnection(params) with conn.session(): df = conn.execute("SELECT * FROM table")
136class SnowflakeConnection(BaseDatabaseConnection): 137 """Snowflake database connection.""" 138 139 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 140 """Initialize Snowflake connection. 141 142 Args: 143 connection_params: Optional dict with keys: 144 - user: Snowflake username (or SNOWFLAKE_USER env var) 145 - password: Snowflake password (or SNOWFLAKE_PASSWORD env var) 146 - account: Snowflake account (or SNOWFLAKE_ACCOUNT env var) 147 - warehouse: Snowflake warehouse (or SNOWFLAKE_WAREHOUSE env var) 148 - database: Snowflake database (or SNOWFLAKE_DATABASE env var) 149 - schema: Snowflake schema (or SNOWFLAKE_SCHEMA env var) 150 """ 151 super().__init__(connection_params) 152 153 def connect(self) -> None: 154 """Establish connection to Snowflake.""" 155 import snowflake.connector 156 157 conn_args = { 158 "user": self.connection_params.get("user", os.getenv("SNOWFLAKE_USER")), 159 "password": self.connection_params.get("password", os.getenv("SNOWFLAKE_PASSWORD")), 160 "account": self.connection_params.get("account", os.getenv("SNOWFLAKE_ACCOUNT")), 161 "warehouse": self.connection_params.get("warehouse", os.getenv("SNOWFLAKE_WAREHOUSE")), 162 "database": self.connection_params.get("database", os.getenv("SNOWFLAKE_DATABASE")), 163 "schema": self.connection_params.get("schema", os.getenv("SNOWFLAKE_SCHEMA")), 164 } 165 self._connection = snowflake.connector.connect(**conn_args)
Snowflake database connection.
139 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 140 """Initialize Snowflake connection. 141 142 Args: 143 connection_params: Optional dict with keys: 144 - user: Snowflake username (or SNOWFLAKE_USER env var) 145 - password: Snowflake password (or SNOWFLAKE_PASSWORD env var) 146 - account: Snowflake account (or SNOWFLAKE_ACCOUNT env var) 147 - warehouse: Snowflake warehouse (or SNOWFLAKE_WAREHOUSE env var) 148 - database: Snowflake database (or SNOWFLAKE_DATABASE env var) 149 - schema: Snowflake schema (or SNOWFLAKE_SCHEMA env var) 150 """ 151 super().__init__(connection_params)
Initialize Snowflake connection.
Arguments:
- connection_params: Optional dict with keys:
- user: Snowflake username (or SNOWFLAKE_USER env var)
- password: Snowflake password (or SNOWFLAKE_PASSWORD env var)
- account: Snowflake account (or SNOWFLAKE_ACCOUNT env var)
- warehouse: Snowflake warehouse (or SNOWFLAKE_WAREHOUSE env var)
- database: Snowflake database (or SNOWFLAKE_DATABASE env var)
- schema: Snowflake schema (or SNOWFLAKE_SCHEMA env var)
153 def connect(self) -> None: 154 """Establish connection to Snowflake.""" 155 import snowflake.connector 156 157 conn_args = { 158 "user": self.connection_params.get("user", os.getenv("SNOWFLAKE_USER")), 159 "password": self.connection_params.get("password", os.getenv("SNOWFLAKE_PASSWORD")), 160 "account": self.connection_params.get("account", os.getenv("SNOWFLAKE_ACCOUNT")), 161 "warehouse": self.connection_params.get("warehouse", os.getenv("SNOWFLAKE_WAREHOUSE")), 162 "database": self.connection_params.get("database", os.getenv("SNOWFLAKE_DATABASE")), 163 "schema": self.connection_params.get("schema", os.getenv("SNOWFLAKE_SCHEMA")), 164 } 165 self._connection = snowflake.connector.connect(**conn_args)
Establish connection to Snowflake.
168class PostgresConnection(BaseDatabaseConnection): 169 """PostgreSQL database connection.""" 170 171 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 172 """Initialize PostgreSQL connection. 173 174 Args: 175 connection_params: Optional dict with keys: 176 - connection_string: Full connection string 177 (or POSTGRES_CONNECTION_STRING env var) 178 """ 179 super().__init__(connection_params) 180 181 def connect(self) -> None: 182 """Establish connection to PostgreSQL.""" 183 import psycopg2 184 185 conn_str = self.connection_params.get( 186 "connection_string", os.getenv("POSTGRES_CONNECTION_STRING") 187 ) 188 self._connection = psycopg2.connect(conn_str)
PostgreSQL database connection.
171 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 172 """Initialize PostgreSQL connection. 173 174 Args: 175 connection_params: Optional dict with keys: 176 - connection_string: Full connection string 177 (or POSTGRES_CONNECTION_STRING env var) 178 """ 179 super().__init__(connection_params)
Initialize PostgreSQL connection.
Arguments:
- connection_params: Optional dict with keys:
- connection_string: Full connection string (or POSTGRES_CONNECTION_STRING env var)
181 def connect(self) -> None: 182 """Establish connection to PostgreSQL.""" 183 import psycopg2 184 185 conn_str = self.connection_params.get( 186 "connection_string", os.getenv("POSTGRES_CONNECTION_STRING") 187 ) 188 self._connection = psycopg2.connect(conn_str)
Establish connection to PostgreSQL.
191class SQLServerConnection(BaseDatabaseConnection): 192 """SQL Server database connection.""" 193 194 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 195 """Initialize SQL Server connection. 196 197 Args: 198 connection_params: Optional dict with keys: 199 - connection_string: ODBC connection string 200 (or SQLSERVER_CONNECTION_STRING env var) 201 """ 202 super().__init__(connection_params) 203 204 def connect(self) -> None: 205 """Establish connection to SQL Server.""" 206 import pyodbc 207 208 conn_str = self.connection_params.get( 209 "connection_string", os.getenv("SQLSERVER_CONNECTION_STRING") 210 ) 211 self._connection = pyodbc.connect(conn_str)
SQL Server database connection.
194 def __init__(self, connection_params: Optional[dict[str, Any]] = None): 195 """Initialize SQL Server connection. 196 197 Args: 198 connection_params: Optional dict with keys: 199 - connection_string: ODBC connection string 200 (or SQLSERVER_CONNECTION_STRING env var) 201 """ 202 super().__init__(connection_params)
Initialize SQL Server connection.
Arguments:
- connection_params: Optional dict with keys:
- connection_string: ODBC connection string (or SQLSERVER_CONNECTION_STRING env var)
204 def connect(self) -> None: 205 """Establish connection to SQL Server.""" 206 import pyodbc 207 208 conn_str = self.connection_params.get( 209 "connection_string", os.getenv("SQLSERVER_CONNECTION_STRING") 210 ) 211 self._connection = pyodbc.connect(conn_str)
Establish connection to SQL Server.
214def get_connection( 215 source_type: str, 216 connection_params: Optional[dict[str, Any]] = None 217) -> BaseDatabaseConnection: 218 """Factory function to get appropriate connection for source type. 219 220 Args: 221 source_type: Database type (snowflake, postgres, sqlserver). 222 connection_params: Optional connection parameters. 223 224 Returns: 225 Appropriate connection instance. 226 227 Raises: 228 ValueError: If source type is not supported. 229 """ 230 connections = { 231 "snowflake": SnowflakeConnection, 232 "postgres": PostgresConnection, 233 "sqlserver": SQLServerConnection, 234 } 235 236 if source_type not in connections: 237 raise ValueError( 238 f"Unsupported database type: {source_type}. " 239 f"Supported types: {list(connections.keys())}" 240 ) 241 242 return connections[source_type](connection_params)
Factory function to get appropriate connection for source type.
Arguments:
- source_type: Database type (snowflake, postgres, sqlserver).
- connection_params: Optional connection parameters.
Returns:
Appropriate connection instance.
Raises:
- ValueError: If source type is not supported.