Metadata-Version: 2.4
Name: ds_utilities-dkirby
Version: 0.1.0
Summary: A collection of my frequently used utilities for data science projects
Author: D. Kirby
Description-Content-Type: text/markdown
Requires-Dist: pandas~=2.3
Requires-Dist: numpy~=2.3
Requires-Dist: snowflake~=1.8
Provides-Extra: dev
Requires-Dist: pytest~=8.4; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"
Provides-Extra: test
Requires-Dist: pytest~=8.4; extra == "test"

# DS-Utilities

A collection of data science utilities and helper functions for common data workflows. This package provides reusable tools to streamline data science projects, including data pipeline frameworks, data loading strategies, and transformation utilities.

## Features

- **Data Pipeline Framework**: Flexible and extensible pipeline architecture for loading and transforming data
- **Multiple Data Sources**: Support for Snowflake databases and CSV files
- **Strategy Pattern**: Extensible design that makes adding new data sources straightforward
- **Composable Operations**: Chain multiple data operations together in a declarative way
- **Type Safety**: Full type hints for better IDE support and code reliability
- **Unified Interface**: Consistent API across different data source types

## Installation

```bash
# Install required dependencies
pip install pandas snowflake-connector-python

# Clone or install the package
pip install -e .
```

## Requirements

- Python 3.8+
- pandas
- snowflake-connector-python (for Snowflake integration)

Environment variables for Snowflake connection:
- `SNOWFLAKE_USER`
- `SNOWFLAKE_ACCT`
- `SNOWFLAKE_ROLE`

---

## Data Pipeline Utilities

The data pipeline module provides a flexible framework for loading and transforming data from multiple sources through a strategy pattern design with composable data transformation pipelines.

### Architecture

The pipeline utilities are built around three main components:

1. **DataStrategy** (Abstract Base Class): Defines the interface for data loading strategies
2. **DataConnector**: Factory class that manages strategy selection and execution
3. **DataPipeline**: Orchestrates multiple data operations in a sequential pipeline

### Design Pattern

The package uses the **Strategy Pattern** to separate data loading logic from the client code. This allows:
- Easy addition of new data sources without modifying existing code
- Consistent interface across different data sources
- Better testability through dependency injection

### Classes

#### DataStrategy (Abstract Base Class)

Base class for all data loading strategies.

```python
from abc import ABC, abstractmethod
from pandas import DataFrame

class DataStrategy(ABC):
    @abstractmethod
    def __init__(self, connection):
        self.connection = connection
        self._source_list: list[str] = None
    
    @abstractmethod
    def load(self, sources: Union[str, list[str]]) -> DataFrame:
        ...
```

#### CsvStrategy

Loads data from CSV files.

**Parameters:**
- `root_dir` (str): Root directory containing CSV files

**Methods:**
- `load(sources)`: Load one or more CSV files

**Example:**
```python
from ds_utilities import CsvStrategy

# Initialize with root directory
strategy = CsvStrategy("/path/to/data")

# Load single file
data = strategy.load("sales.csv")

# Load multiple files
data_list = strategy.load(["sales.csv", "customers.csv"])
```

#### SnowflakeStrategy

Executes SQL queries against Snowflake database.

**Parameters:**
- `connector`: Snowflake connection object

**Methods:**
- `load(sources)`: Execute one or more SQL queries

**Example:**
```python
import snowflake.connector
from ds_utilities import SnowflakeStrategy

# Create Snowflake connection
conn = snowflake.connector.connect(
    user=os.environ['SNOWFLAKE_USER'],
    account=os.environ['SNOWFLAKE_ACCT'],
    role=os.environ['SNOWFLAKE_ROLE'],
    authenticator='externalbrowser'
)

# Initialize strategy
strategy = SnowflakeStrategy(conn)

# Execute single query
query = "SELECT * FROM sales WHERE date >= '2024-01-01'"
data = strategy.load(query)

# Execute multiple queries
queries = [
    "SELECT * FROM sales",
    "SELECT * FROM customers"
]
data_list = strategy.load(queries)
```

#### DataConnector

Factory class for creating and managing data strategies.

**Parameters:**
- `type` (str): Type of data source - must be "snowflake" or "csv"
- `connection`: Connection object appropriate for the strategy type

**Methods:**
- `load_data(sources)`: Load data using the configured strategy

**Example:**
```python
from ds_utilities import DataConnector

# CSV connector
csv_connector = DataConnector("csv", "/path/to/data")
data = csv_connector.load_data("sales.csv")

# Snowflake connector
sf_connector = DataConnector("snowflake", snowflake_conn)
data = sf_connector.load_data("SELECT * FROM sales")

# Check connector type
print(sf_connector)  # Output: "snowflake"
```

#### DataPipeline

Chains multiple data operations together in a sequential pipeline.

**Methods:**
- `add_stage(stage, **kwargs)`: Add a processing stage to the pipeline
- `run()`: Execute all stages in order

**Example:**
```python
from ds_utilities import DataPipeline
import pandas as pd

# Create pipeline
pipeline = DataPipeline()

# Define stages
def load_data(file_path, data=None):
    return pd.read_csv(file_path)

def clean_data(data):
    return data.dropna()

def transform_data(column_name, data):
    data[f'{column_name}_squared'] = data[column_name] ** 2
    return data

# Add stages to pipeline
pipeline.add_stage(load_data, file_path="data/sales.csv")
pipeline.add_stage(clean_data)
pipeline.add_stage(transform_data, column_name='revenue')

# Execute pipeline
result = pipeline.run()
```

### Usage Examples

#### Example 1: Loading CSV Data

```python
import os
from ds_utilities import DataConnector

# Set up connector
connector = DataConnector("csv", os.getcwd())

# Load single file
media_data = connector.load_data("data/media.csv")
print(media_data.head())

# Load multiple files
data_files = ["data/media.csv", "data/nrmps.csv"]
datasets = connector.load_data(data_files)
print(f"Loaded {len(datasets)} datasets")
```

#### Example 2: Querying Snowflake

```python
import os
import snowflake.connector
from ds_utilities import DataConnector

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.environ['SNOWFLAKE_USER'],
    account=os.environ['SNOWFLAKE_ACCT'],
    role=os.environ['SNOWFLAKE_ROLE'],
    authenticator='externalbrowser'
)

# Create connector
connector = DataConnector("snowflake", conn)

# Execute query
query = """
    SELECT 
        DATE_TRUNC(WEEK, DATE) AS week,
        STATE,
        SUM(COST) AS total_cost
    FROM advertising
    GROUP BY week, STATE
"""

data = connector.load_data(query)
print(data.head())
```

#### Example 3: Building a Data Pipeline

```python
from ds_utilities import DataPipeline, DataConnector
import pandas as pd

# Create pipeline
pipeline = DataPipeline()

# Stage 1: Load data
def load_data(file_path, data=None):
    return pd.read_csv(file_path)

# Stage 2: Filter data
def filter_data(data):
    return data[data['COST'] > 1000]

# Stage 3: Aggregate by state
def aggregate_data(data):
    return data.groupby('STATE').agg({
        'COST': 'sum',
        'CLICKS': 'sum',
        'IMPRESSIONS': 'sum'
    }).reset_index()

# Stage 4: Convert to dictionary
def to_dict(data):
    return data.to_dict()

# Build pipeline
pipeline.add_stage(load_data, file_path="data/media.csv")
pipeline.add_stage(filter_data)
pipeline.add_stage(aggregate_data)
pipeline.add_stage(to_dict)

# Execute
result = pipeline.run()
print(result)
```

#### Example 4: Using Lambda Functions in Pipelines

```python
from ds_utilities import DataPipeline
import pandas as pd

pipeline = DataPipeline()

def load_data(file_path, data=None):
    return pd.read_csv(file_path)

pipeline.add_stage(load_data, file_path="data/media.csv")
pipeline.add_stage(lambda data: data[['WEEK_T', 'STATE', 'COST']])
pipeline.add_stage(lambda data: data.sort_values('COST', ascending=False))

result = pipeline.run()
print(result.head())
```

#### Example 5: Integrating Connector with Pipeline

```python
from ds_utilities import DataPipeline, DataConnector
import os

pipeline = DataPipeline()

# Stage 1: Load from CSV
def load_csv(filename, data=None):
    connector = DataConnector("csv", os.getcwd())
    return connector.load_data(filename)

# Stage 2: Transform
def calculate_metrics(data):
    data['CPM'] = (data['COST'] / data['IMPRESSIONS']) * 1000
    data['CPC'] = data['COST'] / data['CLICKS']
    return data

# Stage 3: Export
def save_results(output_file, data):
    data.to_csv(output_file, index=False)
    return data

pipeline.add_stage(load_csv, filename="data/media.csv")
pipeline.add_stage(calculate_metrics)
pipeline.add_stage(save_results, output_file="data/processed_media.csv")

result = pipeline.run()
```

### Extending the Data Pipeline

#### Adding a New Data Source

To add support for a new data source, create a new strategy class:

```python
from ds_utilities import DataStrategy
from typing import Union
from pandas import DataFrame

class PostgresStrategy(DataStrategy):
    def __init__(self, connection):
        super().__init__(connection)
    
    def load(self, sources: Union[str, list[str]]) -> Union[DataFrame, list[DataFrame]]:
        super().load(sources)
        cursor = self.connection.cursor()
        results = []
        
        for source in self._source_list:
            cursor.execute(source)
            df = DataFrame(cursor.fetchall())
            df.columns = [desc[0] for desc in cursor.description]
            results.append(df)
        
        return results[0] if len(results) == 1 else results

# Register the strategy
DataConnector.STRATEGIES['postgres'] = PostgresStrategy

# Use it
connector = DataConnector('postgres', postgres_connection)
data = connector.load_data("SELECT * FROM table")
```

### Common Pipeline Patterns

#### Pattern 1: ETL Pipeline

```python
pipeline = DataPipeline()
pipeline.add_stage(extract_from_source)  # Extract
pipeline.add_stage(clean_data)           # Transform
pipeline.add_stage(validate_schema)      # Transform
pipeline.add_stage(load_to_destination)  # Load
result = pipeline.run()
```

#### Pattern 2: Multi-Source Aggregation

```python
def load_and_combine(sources, data=None):
    connector = DataConnector("csv", "./data")
    datasets = connector.load_data(sources)
    return pd.concat(datasets, ignore_index=True)

pipeline = DataPipeline()
pipeline.add_stage(load_and_combine, sources=["sales_q1.csv", "sales_q2.csv"])
pipeline.add_stage(aggregate_by_region)
result = pipeline.run()
```

---

## Testing

Run the test suite:

```bash
pytest tests/ -v
```

Run specific test class:

```bash
pytest tests/test_data_pipeline.py::TestDataPipeline -v
```

Run with coverage:

```bash
pytest tests/ --cov=ds_utilities --cov-report=html
```


