geronimo.data_sources

Geronimo Data Layer.

The data_sources module provides a unified abstraction for connecting to, querying, and ingesting data from various upstream sources. It decouples the ML modeling logic from the underlying data infrastructure.

Key components:

  • DataSource: Represents a table or view in a database.
  • Query: A composable query object for retrieving data.
  • DatabaseConnection: Protocol for database adapters.

Supported connections:

  • Snowflake
  • PostgreSQL
  • SQL Server
  • Google BigQuery (via generic interface)

This layer handles connection pooling, query generation, and data type mapping.

 1"""Geronimo Data Layer.
 2
 3The data_sources module provides a unified abstraction for connecting to, querying,
 4and ingesting data from various upstream sources. It decouples the ML modeling logic
 5from the underlying data infrastructure.
 6
 7Key components:
 8- DataSource: Represents a table or view in a database.
 9- Query: A composable query object for retrieving data.
10- DatabaseConnection: Protocol for database adapters.
11
12Supported connections:
13- Snowflake
14- PostgreSQL
15- SQL Server
16- Google BigQuery (via generic interface)
17
18This layer handles connection pooling, query generation, and data type mapping.
19"""
20
21from geronimo.data_sources.source import DataSource, JoinSpec, collect_data_sources
22from geronimo.data_sources.query import Query
23from geronimo.data_sources.connection import (
24    DatabaseConnection,
25    BaseDatabaseConnection,
26    SnowflakeConnection,
27    PostgresConnection,
28    SQLServerConnection,
29    get_connection,
30)
31
32__all__ = [
33    "DataSource",
34    "JoinSpec",
35    "collect_data_sources",
36    "Query",
37    "DatabaseConnection",
38    "BaseDatabaseConnection",
39    "SnowflakeConnection",
40    "PostgresConnection",
41    "SQLServerConnection",
42    "get_connection",
43]
44
45__docformat__ = "google"
class DataSource:
 35class DataSource:
 36    """Abstraction for loading data from various backends.
 37
 38    Provides a unified interface for querying data from databases,
 39    loading from files, or calling custom functions.
 40
 41    Example (database):
 42        ```python
 43        from geronimo.data_sources import DataSource, Query
 44
 45        training_data = DataSource(
 46            name="customer_features",
 47            source="snowflake",
 48            query=Query.from_file("queries/training_data.sql"),
 49        )
 50        df = training_data.load(start_date="2024-01-01")
 51        ```
 52
 53    Example (function):
 54        ```python
 55        from geronimo.data_sources import DataSource
 56        from sklearn.datasets import load_iris
 57        import pandas as pd
 58        
 59        def load_iris_data() -> pd.DataFrame:
 60            iris = load_iris()
 61            return pd.DataFrame(iris.data, columns=iris.feature_names)
 62        
 63        training_data = DataSource(
 64            name="iris",
 65            source="function",
 66            handle=load_iris_data,
 67        )
 68        df = training_data.load()  # Validates return type at runtime
 69        ```
 70    
 71    Note:
 72        When using `source="function"`, the provided handle function MUST:
 73        1. Return a pandas DataFrame
 74        2. Be callable with optional keyword arguments
 75        
 76        A DataSourceError is raised at runtime if the function does not
 77        return a DataFrame.
 78    """
 79
 80    name: str
 81    """The name of the data source."""
 82
 83    source: SourceType
 84    """The type of the data source."""
 85
 86    query: Optional[Query]
 87    """The query object (for database sources)."""
 88
 89    path: Optional[str]
 90    """The file path (for file sources)."""
 91
 92    handle: Optional[Callable[..., pd.DataFrame]]
 93    """The function handle (for function sources)."""
 94
 95    connection_params: dict[str, Any]
 96    """Connection parameters."""
 97
 98    _custom_connection: Optional[DatabaseConnection]
 99    """Internal custom connection instance."""
100
101    join_spec: Optional["JoinSpec"]
102    """Specification for joining to this source."""
103
104    def __init__(
105        self,
106        name: str,
107        source: SourceType | str,
108        query: Optional[Query] = None,
109        path: Optional[str] = None,
110        handle: Optional[Callable[..., pd.DataFrame]] = None,
111        connection_params: Optional[dict[str, Any]] = None,
112        connection: Optional[DatabaseConnection] = None,
113        join_spec: Optional["JoinSpec"] = None,
114    ):
115        """Initialize data source.
116
117        Args:
118            name: Descriptive name for the data source.
119            source: Source type (snowflake, postgres, sqlserver, file, function).
120            query: Query object for database sources.
121            path: File path for file-based sources.
122            handle: Callable that returns a DataFrame (for function sources).
123                    Must return pd.DataFrame - validated at runtime.
124            connection_params: Optional connection parameters (overrides env vars).
125            connection: Optional custom DatabaseConnection implementation.
126        
127        Raises:
128            ValueError: If required arguments are missing for the source type.
129        """
130        self.name = name
131        self.source = SourceType(source) if isinstance(source, str) else source
132        self.query = query
133        self.path = path
134        self.handle = handle
135        self.connection_params = connection_params or {}
136        self._custom_connection = connection
137        self.join_spec = join_spec
138
139        # Validate required arguments based on source type
140        if self.source == SourceType.FUNC:
141            if not handle:
142                raise ValueError("Function sources require a handle")
143            if not callable(handle):
144                raise ValueError("handle must be callable")
145        elif self.source == SourceType.FILE:
146            if not path:
147                raise ValueError("File sources require a path")
148        else:
149            # Database sources
150            if not query:
151                raise ValueError("Database sources require a query")
152
153    def load(self, **params) -> pd.DataFrame:
154        """Load data from source.
155
156        Args:
157            **params: Parameters passed to the data loading function.
158                      For database sources, these are query parameters.
159                      For function sources, these are passed to the handle.
160
161        Returns:
162            DataFrame with loaded data.
163        
164        Raises:
165            DataSourceError: If function source doesn't return a DataFrame.
166        """
167        if self.source == SourceType.FILE:
168            return self._load_file()
169        elif self.source == SourceType.FUNC:
170            return self._load_function(**params)
171        else:
172            return self._load_database(**params)
173    
174    def _load_function(self, **params) -> pd.DataFrame:
175        """Load data by calling the handle function.
176        
177        Validates at runtime that the function returns a DataFrame.
178        
179        Args:
180            **params: Keyword arguments passed to the handle function.
181        
182        Returns:
183            DataFrame returned by the handle function.
184        
185        Raises:
186            DataSourceError: If handle doesn't return a DataFrame or raises an exception.
187        """
188        try:
189            result = self.handle(**params)
190        except Exception as e:
191            raise DataSourceError(
192                f"DataSource '{self.name}' handle function raised an exception: {e}"
193            ) from e
194        
195        # Runtime validation: ensure result is a DataFrame
196        if not isinstance(result, pd.DataFrame):
197            actual_type = type(result).__name__
198            raise DataSourceError(
199                f"DataSource '{self.name}' handle function must return a pandas DataFrame, "
200                f"but returned {actual_type}. "
201                f"Ensure your function returns pd.DataFrame."
202            )
203        
204        return result
205
206    def _load_file(self) -> pd.DataFrame:
207        """Load data from file."""
208        from pathlib import Path
209
210        path = Path(self.path)
211        if path.suffix == ".csv":
212            return pd.read_csv(path)
213        elif path.suffix in [".parquet", ".pq"]:
214            return pd.read_parquet(path)
215        elif path.suffix == ".json":
216            return pd.read_json(path)
217        else:
218            raise ValueError(f"Unsupported file format: {path.suffix}")
219
220    def _load_database(self, **params) -> pd.DataFrame:
221        """Load data from database using connection interface."""
222        sql = self.query.render(**params)
223        
224        # Use custom connection if provided, otherwise create from factory
225        if self._custom_connection is not None:
226            connection = self._custom_connection
227        else:
228            connection = get_connection(self.source.value, self.connection_params)
229        
230        # Use context manager for automatic connection cleanup
231        with connection:
232            return connection.execute(sql)
233
234    def __repr__(self) -> str:
235        return f"DataSource({self.name}, source={self.source.value})"

Abstraction for loading data from various backends.

Provides a unified interface for querying data from databases, loading from files, or calling custom functions.

Example (database):

from geronimo.data_sources import DataSource, Query

training_data = DataSource(
    name="customer_features",
    source="snowflake",
    query=Query.from_file("queries/training_data.sql"),
)
df = training_data.load(start_date="2024-01-01")

Example (function):

from geronimo.data_sources import DataSource
from sklearn.datasets import load_iris
import pandas as pd

def load_iris_data() -> pd.DataFrame:
    iris = load_iris()
    return pd.DataFrame(iris.data, columns=iris.feature_names)

training_data = DataSource(
    name="iris",
    source="function",
    handle=load_iris_data,
)
df = training_data.load()  # Validates return type at runtime
Note:

When using source="function", the provided handle function MUST:

  1. Return a pandas DataFrame
  2. Be callable with optional keyword arguments

A DataSourceError is raised at runtime if the function does not return a DataFrame.

DataSource( name: str, source: geronimo.data_sources.source.SourceType | str, query: Optional[Query] = None, path: Optional[str] = None, handle: Optional[Callable[..., pandas.core.frame.DataFrame]] = None, connection_params: Optional[dict[str, Any]] = None, connection: Optional[DatabaseConnection] = None, join_spec: Optional[JoinSpec] = None)
104    def __init__(
105        self,
106        name: str,
107        source: SourceType | str,
108        query: Optional[Query] = None,
109        path: Optional[str] = None,
110        handle: Optional[Callable[..., pd.DataFrame]] = None,
111        connection_params: Optional[dict[str, Any]] = None,
112        connection: Optional[DatabaseConnection] = None,
113        join_spec: Optional["JoinSpec"] = None,
114    ):
115        """Initialize data source.
116
117        Args:
118            name: Descriptive name for the data source.
119            source: Source type (snowflake, postgres, sqlserver, file, function).
120            query: Query object for database sources.
121            path: File path for file-based sources.
122            handle: Callable that returns a DataFrame (for function sources).
123                    Must return pd.DataFrame - validated at runtime.
124            connection_params: Optional connection parameters (overrides env vars).
125            connection: Optional custom DatabaseConnection implementation.
126        
127        Raises:
128            ValueError: If required arguments are missing for the source type.
129        """
130        self.name = name
131        self.source = SourceType(source) if isinstance(source, str) else source
132        self.query = query
133        self.path = path
134        self.handle = handle
135        self.connection_params = connection_params or {}
136        self._custom_connection = connection
137        self.join_spec = join_spec
138
139        # Validate required arguments based on source type
140        if self.source == SourceType.FUNC:
141            if not handle:
142                raise ValueError("Function sources require a handle")
143            if not callable(handle):
144                raise ValueError("handle must be callable")
145        elif self.source == SourceType.FILE:
146            if not path:
147                raise ValueError("File sources require a path")
148        else:
149            # Database sources
150            if not query:
151                raise ValueError("Database sources require a query")

Initialize data source.

Arguments:
  • name: Descriptive name for the data source.
  • source: Source type (snowflake, postgres, sqlserver, file, function).
  • query: Query object for database sources.
  • path: File path for file-based sources.
  • handle: Callable that returns a DataFrame (for function sources). Must return pd.DataFrame - validated at runtime.
  • connection_params: Optional connection parameters (overrides env vars).
  • connection: Optional custom DatabaseConnection implementation.
Raises:
  • ValueError: If required arguments are missing for the source type.
name: str

The name of the data source.

source: geronimo.data_sources.source.SourceType

The type of the data source.

query: Optional[Query]

The query object (for database sources).

path: Optional[str]

The file path (for file sources).

handle: Optional[Callable[..., pandas.core.frame.DataFrame]]

The function handle (for function sources).

connection_params: dict[str, typing.Any]

Connection parameters.

join_spec: Optional[JoinSpec]

Specification for joining to this source.

def load(self, **params) -> pandas.core.frame.DataFrame:
153    def load(self, **params) -> pd.DataFrame:
154        """Load data from source.
155
156        Args:
157            **params: Parameters passed to the data loading function.
158                      For database sources, these are query parameters.
159                      For function sources, these are passed to the handle.
160
161        Returns:
162            DataFrame with loaded data.
163        
164        Raises:
165            DataSourceError: If function source doesn't return a DataFrame.
166        """
167        if self.source == SourceType.FILE:
168            return self._load_file()
169        elif self.source == SourceType.FUNC:
170            return self._load_function(**params)
171        else:
172            return self._load_database(**params)

Load data from source.

Arguments:
  • **params: Parameters passed to the data loading function. For database sources, these are query parameters. For function sources, these are passed to the handle.
Returns:

DataFrame with loaded data.

Raises:
  • DataSourceError: If function source doesn't return a DataFrame.
@dataclass
class JoinSpec:
238@dataclass
239class JoinSpec:
240    """Specification for joining a DataSource to the primary source.
241    
242    Used when combining multiple DataSources that share a common key.
243    The first DataSource in a list is treated as the primary; subsequent
244    sources are joined to it using their JoinSpec.
245    
246    Example:
247        ```python
248        from geronimo.data_sources import DataSource, JoinSpec
249        
250        # Primary training source
251        training_customers = DataSource(
252            name="customers",
253            source="file",
254            path="data/customers.csv",
255        )
256        
257        # Secondary source to join
258        training_transactions = DataSource(
259            name="transactions",
260            source="file",
261            path="data/transactions.csv",
262            join_spec=JoinSpec(
263                left_on="customer_id",
264                right_on="customer_id",
265                how="left",
266            ),
267        )
268        ```
269    
270    Attributes:
271        left_on: Column name in the primary (left) source.
272        right_on: Column name in this (right) source.
273        how: Join type - 'left', 'right', 'inner', or 'outer'.
274    """
275    left_on: str
276    right_on: str
277    how: str = "left"

Specification for joining a DataSource to the primary source.

Used when combining multiple DataSources that share a common key. The first DataSource in a list is treated as the primary; subsequent sources are joined to it using their JoinSpec.

Example:
from geronimo.data_sources import DataSource, JoinSpec

# Primary training source
training_customers = DataSource(
    name="customers",
    source="file",
    path="data/customers.csv",
)

# Secondary source to join
training_transactions = DataSource(
    name="transactions",
    source="file",
    path="data/transactions.csv",
    join_spec=JoinSpec(
        left_on="customer_id",
        right_on="customer_id",
        how="left",
    ),
)
Attributes:
  • left_on: Column name in the primary (left) source.
  • right_on: Column name in this (right) source.
  • how: Join type - 'left', 'right', 'inner', or 'outer'.
JoinSpec(left_on: str, right_on: str, how: str = 'left')
left_on: str
right_on: str
how: str = 'left'
def collect_data_sources(module, prefix: str) -> list[DataSource]:
280def collect_data_sources(module, prefix: str) -> list[DataSource]:
281    """Collect all DataSource objects whose variable names start with prefix.
282    
283    Useful for dynamically collecting training_* or production_* DataSources.
284    
285    Example:
286        ```python
287        # In data_sources.py
288        from geronimo.data_sources import DataSource, collect_data_sources
289        import sys
290        
291        training_customers = DataSource(...)
292        training_transactions = DataSource(...)
293        production_customers = DataSource(...)
294        
295        # Auto-collect by prefix
296        training_sources = collect_data_sources(sys.modules[__name__], "training_")
297        production_sources = collect_data_sources(sys.modules[__name__], "production_")
298        ```
299    
300    Args:
301        module: The module to search (typically sys.modules[__name__]).
302        prefix: Variable name prefix to match (e.g., "training_").
303    
304    Returns:
305        List of DataSource objects whose variable names start with prefix.
306    """
307    sources = []
308    for name in dir(module):
309        if name.startswith(prefix):
310            obj = getattr(module, name)
311            if isinstance(obj, DataSource):
312                sources.append(obj)
313    return sources

Collect all DataSource objects whose variable names start with prefix.

Useful for dynamically collecting training_* or production_* DataSources.

Example:
# In data_sources.py
from geronimo.data_sources import DataSource, collect_data_sources
import sys

training_customers = DataSource(...)
training_transactions = DataSource(...)
production_customers = DataSource(...)

# Auto-collect by prefix
training_sources = collect_data_sources(sys.modules[__name__], "training_")
production_sources = collect_data_sources(sys.modules[__name__], "production_")
Arguments:
  • module: The module to search (typically sys.modules[__name__]).
  • prefix: Variable name prefix to match (e.g., "training_").
Returns:

List of DataSource objects whose variable names start with prefix.

class Query:
 8class Query:
 9    """SQL query wrapper with parameter substitution.
10
11    Supports loading from files and inline SQL definitions.
12
13    Example:
14        ```python
15        # From file
16        query = Query.from_file("queries/training_data.sql")
17
18        # Inline
19        query = Query("SELECT * FROM features WHERE date >= :start_date")
20
21        # With parameters
22        sql = query.render(start_date="2024-01-01")
23        ```
24    """
25
26    def __init__(self, sql: str, name: Optional[str] = None):
27        """Initialize query.
28
29        Args:
30            sql: SQL query string with optional :param placeholders.
31            name: Optional query name for tracking.
32        """
33        self.sql = sql
34        self.name = name
35
36    @classmethod
37    def from_file(cls, path: str | Path) -> "Query":
38        """Load query from SQL file.
39
40        Args:
41            path: Path to .sql file.
42
43        Returns:
44            Query instance.
45        """
46        path = Path(path)
47        sql = path.read_text()
48        return cls(sql=sql, name=path.stem)
49
50    def render(self, **params) -> str:
51        """Render query with parameter substitution.
52
53        Args:
54            **params: Named parameters to substitute.
55
56        Returns:
57            Rendered SQL string.
58        """
59        sql = self.sql
60        for key, value in params.items():
61            placeholder = f":{key}"
62            if isinstance(value, str):
63                sql = sql.replace(placeholder, f"'{value}'")
64            else:
65                sql = sql.replace(placeholder, str(value))
66        return sql
67
68    def __repr__(self) -> str:
69        name = self.name or "unnamed"
70        preview = self.sql[:50] + "..." if len(self.sql) > 50 else self.sql
71        return f"Query({name}: {preview})"

SQL query wrapper with parameter substitution.

Supports loading from files and inline SQL definitions.

Example:
# From file
query = Query.from_file("queries/training_data.sql")

# Inline
query = Query("SELECT * FROM features WHERE date >= :start_date")

# With parameters
sql = query.render(start_date="2024-01-01")
Query(sql: str, name: Optional[str] = None)
26    def __init__(self, sql: str, name: Optional[str] = None):
27        """Initialize query.
28
29        Args:
30            sql: SQL query string with optional :param placeholders.
31            name: Optional query name for tracking.
32        """
33        self.sql = sql
34        self.name = name

Initialize query.

Arguments:
  • sql: SQL query string with optional :param placeholders.
  • name: Optional query name for tracking.
sql
name
@classmethod
def from_file(cls, path: str | pathlib.Path) -> Query:
36    @classmethod
37    def from_file(cls, path: str | Path) -> "Query":
38        """Load query from SQL file.
39
40        Args:
41            path: Path to .sql file.
42
43        Returns:
44            Query instance.
45        """
46        path = Path(path)
47        sql = path.read_text()
48        return cls(sql=sql, name=path.stem)

Load query from SQL file.

Arguments:
  • path: Path to .sql file.
Returns:

Query instance.

def render(self, **params) -> str:
50    def render(self, **params) -> str:
51        """Render query with parameter substitution.
52
53        Args:
54            **params: Named parameters to substitute.
55
56        Returns:
57            Rendered SQL string.
58        """
59        sql = self.sql
60        for key, value in params.items():
61            placeholder = f":{key}"
62            if isinstance(value, str):
63                sql = sql.replace(placeholder, f"'{value}'")
64            else:
65                sql = sql.replace(placeholder, str(value))
66        return sql

Render query with parameter substitution.

Arguments:
  • **params: Named parameters to substitute.
Returns:

Rendered SQL string.

@runtime_checkable
class DatabaseConnection(typing.Protocol):
15@runtime_checkable
16class DatabaseConnection(Protocol):
17    """Protocol for database connections.
18    
19    Implement this protocol to add support for new database types.
20    
21    Example:
22        ```python
23        class MyDatabaseConnection:
24            def __init__(self, connection_string: str):
25                self.connection_string = connection_string
26                self._conn = None
27            
28            def connect(self) -> None:
29                self._conn = my_db_driver.connect(self.connection_string)
30            
31            def execute(self, sql: str) -> pd.DataFrame:
32                return pd.read_sql(sql, self._conn)
33            
34            def close(self) -> None:
35                if self._conn:
36                    self._conn.close()
37        ```
38    """
39    
40    def connect(self) -> None:
41        """Establish connection to the database."""
42        ...
43    
44    def execute(self, sql: str) -> pd.DataFrame:
45        """Execute a SQL query and return results as DataFrame.
46        
47        Args:
48            sql: SQL query to execute.
49            
50        Returns:
51            DataFrame containing query results.
52        """
53        ...
54    
55    def close(self) -> None:
56        """Close the database connection."""
57        ...

Protocol for database connections.

Implement this protocol to add support for new database types.

Example:
class MyDatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self._conn = None

    def connect(self) -> None:
        self._conn = my_db_driver.connect(self.connection_string)

    def execute(self, sql: str) -> pd.DataFrame:
        return pd.read_sql(sql, self._conn)

    def close(self) -> None:
        if self._conn:
            self._conn.close()
DatabaseConnection(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
def connect(self) -> None:
40    def connect(self) -> None:
41        """Establish connection to the database."""
42        ...

Establish connection to the database.

def execute(self, sql: str) -> pandas.core.frame.DataFrame:
44    def execute(self, sql: str) -> pd.DataFrame:
45        """Execute a SQL query and return results as DataFrame.
46        
47        Args:
48            sql: SQL query to execute.
49            
50        Returns:
51            DataFrame containing query results.
52        """
53        ...

Execute a SQL query and return results as DataFrame.

Arguments:
  • sql: SQL query to execute.
Returns:

DataFrame containing query results.

def close(self) -> None:
55    def close(self) -> None:
56        """Close the database connection."""
57        ...

Close the database connection.

class BaseDatabaseConnection(abc.ABC):
 60class BaseDatabaseConnection(ABC):
 61    """Abstract base class for database connections.
 62    
 63    Provides common functionality like context manager support
 64    and shared execute/close implementations.
 65    """
 66
 67    connection_params: dict[str, Any]
 68    """Connection parameters."""
 69
 70    _connection: Any
 71    """The low-level database connection object."""
 72
 73    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
 74        """Initialize connection with parameters.
 75        
 76        Args:
 77            connection_params: Optional connection parameters (overrides env vars).
 78        """
 79        self.connection_params = connection_params or {}
 80        self._connection: Any = None
 81    
 82    @abstractmethod
 83    def connect(self) -> None:
 84        """Establish connection to the database."""
 85        pass
 86    
 87    def execute(self, sql: str) -> pd.DataFrame:
 88        """Execute a SQL query and return results as DataFrame.
 89        
 90        Args:
 91            sql: SQL query to execute.
 92            
 93        Returns:
 94            DataFrame containing query results.
 95            
 96        Raises:
 97            RuntimeError: If not connected.
 98        """
 99        if self._connection is None:
100            raise RuntimeError("Not connected. Call connect() first.")
101        return pd.read_sql(sql, self._connection)
102    
103    def close(self) -> None:
104        """Close the database connection."""
105        if self._connection:
106            self._connection.close()
107            self._connection = None
108    
109    def __enter__(self) -> "BaseDatabaseConnection":
110        """Context manager entry."""
111        self.connect()
112        return self
113    
114    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
115        """Context manager exit."""
116        self.close()
117    
118    @contextmanager
119    def session(self) -> Generator["BaseDatabaseConnection", None, None]:
120        """Convenience context manager for connection lifecycle.
121        
122        Example:
123            ```python
124            conn = SnowflakeConnection(params)
125            with conn.session():
126                df = conn.execute("SELECT * FROM table")
127            ```
128        """
129        try:
130            self.connect()
131            yield self
132        finally:
133            self.close()

Abstract base class for database connections.

Provides common functionality like context manager support and shared execute/close implementations.

BaseDatabaseConnection(connection_params: Optional[dict[str, Any]] = None)
73    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
74        """Initialize connection with parameters.
75        
76        Args:
77            connection_params: Optional connection parameters (overrides env vars).
78        """
79        self.connection_params = connection_params or {}
80        self._connection: Any = None

Initialize connection with parameters.

Arguments:
  • connection_params: Optional connection parameters (overrides env vars).
connection_params: dict[str, typing.Any]

Connection parameters.

@abstractmethod
def connect(self) -> None:
82    @abstractmethod
83    def connect(self) -> None:
84        """Establish connection to the database."""
85        pass

Establish connection to the database.

def execute(self, sql: str) -> pandas.core.frame.DataFrame:
 87    def execute(self, sql: str) -> pd.DataFrame:
 88        """Execute a SQL query and return results as DataFrame.
 89        
 90        Args:
 91            sql: SQL query to execute.
 92            
 93        Returns:
 94            DataFrame containing query results.
 95            
 96        Raises:
 97            RuntimeError: If not connected.
 98        """
 99        if self._connection is None:
100            raise RuntimeError("Not connected. Call connect() first.")
101        return pd.read_sql(sql, self._connection)

Execute a SQL query and return results as DataFrame.

Arguments:
  • sql: SQL query to execute.
Returns:

DataFrame containing query results.

Raises:
  • RuntimeError: If not connected.
def close(self) -> None:
103    def close(self) -> None:
104        """Close the database connection."""
105        if self._connection:
106            self._connection.close()
107            self._connection = None

Close the database connection.

@contextmanager
def session( self) -> Generator[BaseDatabaseConnection, NoneType, NoneType]:
118    @contextmanager
119    def session(self) -> Generator["BaseDatabaseConnection", None, None]:
120        """Convenience context manager for connection lifecycle.
121        
122        Example:
123            ```python
124            conn = SnowflakeConnection(params)
125            with conn.session():
126                df = conn.execute("SELECT * FROM table")
127            ```
128        """
129        try:
130            self.connect()
131            yield self
132        finally:
133            self.close()

Convenience context manager for connection lifecycle.

Example:
conn = SnowflakeConnection(params)
with conn.session():
    df = conn.execute("SELECT * FROM table")
class SnowflakeConnection(geronimo.data_sources.BaseDatabaseConnection):
136class SnowflakeConnection(BaseDatabaseConnection):
137    """Snowflake database connection."""
138    
139    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
140        """Initialize Snowflake connection.
141        
142        Args:
143            connection_params: Optional dict with keys:
144                - user: Snowflake username (or SNOWFLAKE_USER env var)
145                - password: Snowflake password (or SNOWFLAKE_PASSWORD env var)
146                - account: Snowflake account (or SNOWFLAKE_ACCOUNT env var)
147                - warehouse: Snowflake warehouse (or SNOWFLAKE_WAREHOUSE env var)
148                - database: Snowflake database (or SNOWFLAKE_DATABASE env var)
149                - schema: Snowflake schema (or SNOWFLAKE_SCHEMA env var)
150        """
151        super().__init__(connection_params)
152    
153    def connect(self) -> None:
154        """Establish connection to Snowflake."""
155        import snowflake.connector
156        
157        conn_args = {
158            "user": self.connection_params.get("user", os.getenv("SNOWFLAKE_USER")),
159            "password": self.connection_params.get("password", os.getenv("SNOWFLAKE_PASSWORD")),
160            "account": self.connection_params.get("account", os.getenv("SNOWFLAKE_ACCOUNT")),
161            "warehouse": self.connection_params.get("warehouse", os.getenv("SNOWFLAKE_WAREHOUSE")),
162            "database": self.connection_params.get("database", os.getenv("SNOWFLAKE_DATABASE")),
163            "schema": self.connection_params.get("schema", os.getenv("SNOWFLAKE_SCHEMA")),
164        }
165        self._connection = snowflake.connector.connect(**conn_args)

Snowflake database connection.

SnowflakeConnection(connection_params: Optional[dict[str, Any]] = None)
139    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
140        """Initialize Snowflake connection.
141        
142        Args:
143            connection_params: Optional dict with keys:
144                - user: Snowflake username (or SNOWFLAKE_USER env var)
145                - password: Snowflake password (or SNOWFLAKE_PASSWORD env var)
146                - account: Snowflake account (or SNOWFLAKE_ACCOUNT env var)
147                - warehouse: Snowflake warehouse (or SNOWFLAKE_WAREHOUSE env var)
148                - database: Snowflake database (or SNOWFLAKE_DATABASE env var)
149                - schema: Snowflake schema (or SNOWFLAKE_SCHEMA env var)
150        """
151        super().__init__(connection_params)

Initialize Snowflake connection.

Arguments:
  • connection_params: Optional dict with keys:
    • user: Snowflake username (or SNOWFLAKE_USER env var)
    • password: Snowflake password (or SNOWFLAKE_PASSWORD env var)
    • account: Snowflake account (or SNOWFLAKE_ACCOUNT env var)
    • warehouse: Snowflake warehouse (or SNOWFLAKE_WAREHOUSE env var)
    • database: Snowflake database (or SNOWFLAKE_DATABASE env var)
    • schema: Snowflake schema (or SNOWFLAKE_SCHEMA env var)
def connect(self) -> None:
153    def connect(self) -> None:
154        """Establish connection to Snowflake."""
155        import snowflake.connector
156        
157        conn_args = {
158            "user": self.connection_params.get("user", os.getenv("SNOWFLAKE_USER")),
159            "password": self.connection_params.get("password", os.getenv("SNOWFLAKE_PASSWORD")),
160            "account": self.connection_params.get("account", os.getenv("SNOWFLAKE_ACCOUNT")),
161            "warehouse": self.connection_params.get("warehouse", os.getenv("SNOWFLAKE_WAREHOUSE")),
162            "database": self.connection_params.get("database", os.getenv("SNOWFLAKE_DATABASE")),
163            "schema": self.connection_params.get("schema", os.getenv("SNOWFLAKE_SCHEMA")),
164        }
165        self._connection = snowflake.connector.connect(**conn_args)

Establish connection to Snowflake.

class PostgresConnection(geronimo.data_sources.BaseDatabaseConnection):
168class PostgresConnection(BaseDatabaseConnection):
169    """PostgreSQL database connection."""
170    
171    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
172        """Initialize PostgreSQL connection.
173        
174        Args:
175            connection_params: Optional dict with keys:
176                - connection_string: Full connection string 
177                  (or POSTGRES_CONNECTION_STRING env var)
178        """
179        super().__init__(connection_params)
180    
181    def connect(self) -> None:
182        """Establish connection to PostgreSQL."""
183        import psycopg2
184        
185        conn_str = self.connection_params.get(
186            "connection_string", os.getenv("POSTGRES_CONNECTION_STRING")
187        )
188        self._connection = psycopg2.connect(conn_str)

PostgreSQL database connection.

PostgresConnection(connection_params: Optional[dict[str, Any]] = None)
171    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
172        """Initialize PostgreSQL connection.
173        
174        Args:
175            connection_params: Optional dict with keys:
176                - connection_string: Full connection string 
177                  (or POSTGRES_CONNECTION_STRING env var)
178        """
179        super().__init__(connection_params)

Initialize PostgreSQL connection.

Arguments:
  • connection_params: Optional dict with keys:
    • connection_string: Full connection string (or POSTGRES_CONNECTION_STRING env var)
def connect(self) -> None:
181    def connect(self) -> None:
182        """Establish connection to PostgreSQL."""
183        import psycopg2
184        
185        conn_str = self.connection_params.get(
186            "connection_string", os.getenv("POSTGRES_CONNECTION_STRING")
187        )
188        self._connection = psycopg2.connect(conn_str)

Establish connection to PostgreSQL.

class SQLServerConnection(geronimo.data_sources.BaseDatabaseConnection):
191class SQLServerConnection(BaseDatabaseConnection):
192    """SQL Server database connection."""
193    
194    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
195        """Initialize SQL Server connection.
196        
197        Args:
198            connection_params: Optional dict with keys:
199                - connection_string: ODBC connection string
200                  (or SQLSERVER_CONNECTION_STRING env var)
201        """
202        super().__init__(connection_params)
203    
204    def connect(self) -> None:
205        """Establish connection to SQL Server."""
206        import pyodbc
207        
208        conn_str = self.connection_params.get(
209            "connection_string", os.getenv("SQLSERVER_CONNECTION_STRING")
210        )
211        self._connection = pyodbc.connect(conn_str)

SQL Server database connection.

SQLServerConnection(connection_params: Optional[dict[str, Any]] = None)
194    def __init__(self, connection_params: Optional[dict[str, Any]] = None):
195        """Initialize SQL Server connection.
196        
197        Args:
198            connection_params: Optional dict with keys:
199                - connection_string: ODBC connection string
200                  (or SQLSERVER_CONNECTION_STRING env var)
201        """
202        super().__init__(connection_params)

Initialize SQL Server connection.

Arguments:
  • connection_params: Optional dict with keys:
    • connection_string: ODBC connection string (or SQLSERVER_CONNECTION_STRING env var)
def connect(self) -> None:
204    def connect(self) -> None:
205        """Establish connection to SQL Server."""
206        import pyodbc
207        
208        conn_str = self.connection_params.get(
209            "connection_string", os.getenv("SQLSERVER_CONNECTION_STRING")
210        )
211        self._connection = pyodbc.connect(conn_str)

Establish connection to SQL Server.

def get_connection( source_type: str, connection_params: Optional[dict[str, Any]] = None) -> BaseDatabaseConnection:
214def get_connection(
215    source_type: str, 
216    connection_params: Optional[dict[str, Any]] = None
217) -> BaseDatabaseConnection:
218    """Factory function to get appropriate connection for source type.
219    
220    Args:
221        source_type: Database type (snowflake, postgres, sqlserver).
222        connection_params: Optional connection parameters.
223        
224    Returns:
225        Appropriate connection instance.
226        
227    Raises:
228        ValueError: If source type is not supported.
229    """
230    connections = {
231        "snowflake": SnowflakeConnection,
232        "postgres": PostgresConnection,
233        "sqlserver": SQLServerConnection,
234    }
235    
236    if source_type not in connections:
237        raise ValueError(
238            f"Unsupported database type: {source_type}. "
239            f"Supported types: {list(connections.keys())}"
240        )
241    
242    return connections[source_type](connection_params)

Factory function to get appropriate connection for source type.

Arguments:
  • source_type: Database type (snowflake, postgres, sqlserver).
  • connection_params: Optional connection parameters.
Returns:

Appropriate connection instance.

Raises:
  • ValueError: If source type is not supported.