Metadata-Version: 2.4
Name: orchestration_utils
Version: 0.2.0
Summary: This package contains utility functions for Prefect and Snowflake
Author-email: DWBI <dwbi@cloudfactory.com>
License-Expression: BSD-1-Clause
Project-URL: Repository, https://github.com/cloudfactory/orchestration-utilities
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: pandas==2.3.3
Requires-Dist: numpy==1.26.4
Requires-Dist: prefect-aws==0.5.1
Requires-Dist: prefect-snowflake==0.28.5
Requires-Dist: pyarrow==22.0.0
Requires-Dist: pydantic==2.12.5
Requires-Dist: requests==2.32.5
Requires-Dist: prefect==3.6.8
Requires-Dist: clickhouse-connect>=0.6.0
Requires-Dist: dlt<2.0.0,>=1.5.0

# orchestration-utilities

[![Run Unit Tests](https://github.com/cloudfactory/orchestration-utilities/actions/workflows/test.yml/badge.svg)](https://github.com/cloudfactory/orchestration-utilities/actions/workflows/test.yml)
[![Python 3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/)
[![PyPI version](https://badge.fury.io/py/orchestration-utils.svg)](https://badge.fury.io/py/orchestration-utils)

This repository holds the utilities modules that are essential for ETL operations. This repository will be used as a package and serve the ETL flows.<br> This package will be used in the `PREFECT` flows and `SNOWFLAKE` as part of the ETL operations.


## Installation

Install the package using PyPI:

```bash
pip install orchestration-utils
```

For development installation with testing dependencies:

```bash
git clone https://github.com/cloudfactory/orchestration-utilities.git
cd orchestration-utilities
pip install -r requirements-dev.txt
pip install -e .
```

## Inside this package

### 1. aws.py
This module contains the functions that are used to interact with the AWS services.<br> Example: S3
___

### 2. copy_into_s3
This module contains the functions that can be used to copy the data from the Snowflake Stage(S3 Bucket) to the Snowflake Table.
This module leverages the `etl_operations` module to perform the `Schema Drift Handeling` and `Query Execution`.<br>
This module works best with the Stages that are partitioned well. Example: The data in the S3 bucket is partitioned by date, year, month, etc.<br>
This module does not perform well if the data is not partitioned well in the S3 bucket.
Example: If the data in the S3 bucket is dropped under a single folder without any partitioning, then the copy operation will take a lot of time to complete. Given the folder is heavy with files.

#### Class/Groups:
- `CopyIntoTable`: This class contains the functions that are used to copy the data from the Snowflake Stage(S3 Bucket) to the Snowflake Table.
- `copy_into_snowflake_table`: This function is the main function that is used to copy the data from the Snowflake Stage(S3 Bucket) to the Snowflake Table. It accepts the parameter `force` which is used to force the copy operation to be performed even if the data is already present in the table. The default value of the `force` parameter is `False`. 
___

### 3. s3_to_clickhouse
**Production-ready module for loading data from S3 directly into ClickHouse using the `dlt` (Data Loading Tool) library.**

This module has been extensively updated with powerful new features for intelligent data partition filtering, automatic schema management, and optimized parallel processing.

**Key Features:**
- **Date-Based Partition Discovery & Filtering**: Automatically discovers S3 folders with dates (e.g., `2024-01-15/`) and filters based on `start_date` and `end_date` parameters
- **Smart Path Filtering Logic**: 
  - Folders with dates in their names are filtered by the date range
  - Nested dates are ignored (only first-level folders are checked)
- **Automatic Table Name Generation**: Auto-generates table names following pattern `s3_{type}_{source}_{project}` when not explicitly provided
- **Dynamic Schema Management**: Powered by `dlt` for automatic schema inference, table creation, and evolution: <https://dlthub.com/docs/general-usage/schema-evolution>
- **Intelligent Worker Allocation**: Automatically calculates optimal worker count based on data size (1-16 workers)
- **Rich Metadata Tracking**: Every record enriched with `id`, `ingestion_date`, `s3_last_modified`, `s3_etag`, `folder_name`, `file_name`, `json_object`, `type`, `source`, and `project` columns
- **File Format Support**: JSON, CSV (with automatic normalization of variants like `jsoneachrow`, `ndjson`)
- **Prefect Integration**: Seamless workflow orchestration with dedicated Prefect flows
- **Two-Level Parallelism**: Partition-level parallelism via Prefect + file-level parallelism via `dlt`
- **S3 Structure Caching**: Intelligent caching to avoid redundant S3 scans

#### Classes:
- **`S3ToClickHouseDLT`**: Main class for S3 → ClickHouse data loading using `dlt`
- **`S3ToClickHouseResult`**: Result dataclass with load statistics (rows inserted, success status, message)

#### Key Methods:
- **`load_from_s3(force=False, workers=None)`**: Primary loading method with optional force reload and worker override
- **`s3_to_clickhouse_flow(start_date, end_date, config_data)`**: Prefect flow for batch processing multiple ingestion jobs

#### Date Filtering Behavior:

The module implements sophisticated path filtering:

**Example S3 Structure:**
```
s3://bucket/
  ├── foo1.csv
  ├── 2024-01-15/
  │   └── data.csv
  └── logs/
      └── app.log
```

**With `start_date=2024-01-01` and `end_date=2024-01-31` and discover_partitions is true:**
- ✅ `s3://bucket/foo1.csv` - Excluded (file at root)
- ✅ `s3://bucket/2024-01-15/` - Included (folder date within range)
- ✅ `s3://bucket/2024-01-15/data.csv` - Included (file in valid folder)
- ❌  `s3://bucket/2024-12-30/` - Excluded (folder date outside range)
- ✅ `s3://bucket/logs/` - Excluded (no date in folder name)

**With discover_partitions as false:**
- ✅ `s3://bucket/foo1.csv` - Included (file at root)
- ✅ `s3://bucket/2024-01-15/` - Included (folder date within range)
- ✅ `s3://bucket/2024-01-15/data.csv` - Included (file in valid folder)
- ❌  `s3://bucket/2024-12-30/` - Included (folder date outside range)
- ✅ `s3://bucket/logs/` - Included (no date in folder name)

#### Quick Example:

```python
from orchestration_utils.s3_to_clickhouse import S3ToClickHouseDLT
from datetime import datetime

# Initialize with date filtering and auto table naming
loader = S3ToClickHouseDLT(
    # ClickHouse credentials from Prefect JSONSecret block
    clickhouse_credentials_block="my-clickhouse-creds",
    clickhouse_database="analytics",
    # S3 source configuration
    s3_bucket="data-lake",
    s3_prefix="events/",
    s3_pattern="*.json",
    file_format="json",
    # AWS credentials via Prefect block (optional - uses default credential chain if not provided)
    aws_credentials_block="my-aws-creds",
    # Auto-generate table name: s3_inference_api_chatbot
    data_type="inference",
    data_source="api",
    project="chatbot",
    # Date filtering (only process date-based folders within range)
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 1, 31),
    # Optional: explicit worker count (auto-calculated if omitted)
    workers=8
)

# Load data
result = loader.load_from_s3()
if result.success:
    print(f"✅ Loaded {result.rows_inserted} rows")
    print(f"📊 Table: {loader.table_name}")  # Output: s3_inference_api_chatbot
else:
    print(f"❌ Failed: {result.message}")

loader.close()
```

#### Worker Auto-Calculation:

If `workers` parameter is not specified, the system automatically calculates the optimal number based on data size:
- **< 10 MB**: 1 worker
- **10 MB - 100 MB**: 2 workers
- **100 MB - 500 MB**: 4 workers
- **500 MB - 1 GB**: 8 workers
- **> 1 GB**: 16 workers

Workers are always capped between 1 and 16 for optimal performance.

___

### 4. etl_contol.py
This module contains the functions that interact with Snowflake and stores the states of the flows in the database.
- This module accepts the connection(connection_creds) paramater where the default value is `snowflake-prefect-user`, pipeline name and environment name.
- The pipeline name and environment name are used to store the states of the flows in the database. Example when the flow is started, completed, failed, etc.
___

### 5. etl_operations.py
This module contains the functions that are used to perform the ETL operations either in the Destination table or in the Source table.<br>

#### Class/Groups:
- `CreateConnections`: This class is used to create the connections to the databases. The connections are created using the connection credentials and warehouse name.
- `SnowflakeDestination`: This class contains all the load types and the functions that are used to load the data into the Snowflake tables.<br>This class accepts the connection credentials (by default the value is `snowflake-prefect-user`), warehouse name(by default the value is `loading`), database name, and environment name(by default the value is `dev`).
- `DataFrameHadler`: This class contains the functions that converts the dataframes columns to the relevant data types.
- `SchemaDriftHandler`: This class contains the functions that are used to handle the schema drifts in the destination table.
- `SnowflakeSource`: This class contains the functions that are used to extract the data from the Snowflake tables.
___

### 6. notifications.py
This module contains the functions that are used to send the notifications to Slack. The Webhook blocks need to be created in `Prefect` first to send the notifications to Slack.

#### Class/Groups:
- `SlackWebhooksNotification`: This class is used to send the notifications to Slack. The Class accepts the webhook name and the message that needs to be sent to Slack.
___
### 7. queries.py
This module contains the queries that are used to perform the ETL operations in the Snowflake tables. This module is referred by the `etl_control` and `etl_operations` modules.


## Development

### Running Tests

Run the test suite using pytest:

```bash
make test
```

Or directly with pytest:

```bash
python -m pytest test -v
```

For coverage report:

```bash
python -m pytest test -v --cov=orchestration_utils --cov-report=html
```

### Building the Package Locally

Install the dependencies in your virtual environment:

```bash
pip install -r requirements-dev.txt
```

Build `dist` folder where `.whl` and `.tar.gz` files are created:

```bash
make build
```

This will create the `dist` folder with:
- `orchestration_utils-0.0.0.tar.gz`
- `orchestration_utils-0.0.0-py3-none-any.whl`

The `.whl` file can be installed using: `pip install dist/orchestration_utils-0.0.0-py3-none-any.whl`

## How to deploy

Deploy the package to the PYPI using Github Actions. There are two workflows one to deploy in dev and the other to deploy in production.

#### 1. Dev/Manual Release to TestPyPI
- Click on Run workflow
- Select the branch that you have made the changes
- The changes will be refelcted in [TestPyPI](https://test.pypi.org/project/orchestration-utils/)
#### 2. Prod Release to PyPI
- Click on Run workflow
- Select the `main` branch only
- The changes will be refelcted in [PyPI](https://pypi.org/project/orchestration-utils/)
