Metadata-Version: 2.4
Name: snowforge-package
Version: 0.4.3
Summary: A Python package for supporting dataplatform-migration from on-prem to cloud
Home-page: https://github.com/yourusername/Snowforge
Author: Andreas Heggelund, Christophe Lebegue
Author-email: andreasheggelund@gmail.com
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: boto3
Requires-Dist: snowflake-connector-python
Requires-Dist: coloredlogs
Requires-Dist: colored
Requires-Dist: tqdm
Requires-Dist: toml
Requires-Dist: argparse
Requires-Dist: requests
Requires-Dist: jinja2
Requires-Dist: snowpipe-streaming
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: license-file
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# 🚀 Snowforge - Enterprise Data Integration Platform

**Snowforge** is a comprehensive Python package designed to streamline data integration and orchestration across cloud and on-premise systems. It provides a unified interface for managing data pipelines between **AWS**, **Snowflake**, **Microsoft Fabric**, **Power BI**, and various **on-premise database systems**.

## 📋 Table of Contents

- [Overview](#-overview)
- [Key Features](#-key-features)
- [Installation](#-installation)
- [Configuration](#-configuration)
- [Quick Start](#-quick-start)
- [Core Modules](#-core-modules)
  - [AWSIntegration](#awsintegration)
  - [SnowflakeIntegration](#snowflakeintegration)
  - [SnowflakeLogging](#snowflakelogging)
  - [Config](#config)
  - [Logging](#logging)
  - [DataMover](#datamover)
  - [Broadcaster](#broadcaster)
  - [FabricIntegration](#fabricintegration)
- [Advanced Usage](#-advanced-usage)
- [Extending Snowforge](#-extending-snowforge)
- [License](#-license)
- [Contributing](#-contributing)

---

## 🎯 Overview

Snowforge addresses the complexity of modern data engineering by providing:

- **Unified Configuration Management**: Single TOML-based configuration for all integrations
- **Multi-Cloud Support**: Seamless integration with AWS S3, Secrets Manager, and Snowflake
- **BI Report Distribution**: Automated export and delivery of Power BI and Fabric reports
- **Extensible Architecture**: Strategy pattern-based extractors for any database system
- **Production-Ready Logging**: Console and Snowflake-backed execution tracking
- **Parallel Processing**: Built-in support for concurrent data operations

### 🏗️ Architecture

Snowforge is organized into specialized modules:

```
Snowforge/
├── AWSIntegration.py          # AWS S3 and Secrets Manager operations
├── SnowflakeIntegration.py    # Snowflake connections and data loading
├── SnowflakeLogging.py        # Task execution logging to Snowflake
├── Config.py                  # Configuration management
├── Logging.py                 # Centralized console logging
├── DataMover/                 # Data extraction and movement engine
│   ├── DataMover.py
│   └── Extractors/            # Database-specific extractors
├── Broadcaster/               # BI report export and distribution
│   ├── SnowforgeBroadcaster.py
│   ├── BroadcastConfig.py
│   ├── Email.py
│   └── adapters/              # Platform-specific exporters
└── FabricIntegration/         # Microsoft Fabric and Power BI API
    ├── FabricIntegration.py
    └── PowerAPI.py
```

---

## ✨ Key Features

### Data Integration
- **Multi-Source Extraction**: Extract data from Netezza, Oracle, PostgreSQL, and more
- **Parallel Processing**: Multi-threaded and multi-process data operations
- **Smart Chunking**: Automatic file segmentation for large datasets
- **S3 Integration**: Efficient uploads with progress tracking and multipart transfer

### Snowflake Operations
- **Flexible Authentication**: Key-pair or browser-based authentication
- **Connection Pooling**: Automatic connection reuse and management
- **Snowpipe Streaming**: Real-time data ingestion support
- **Data Loading**: Direct COPY INTO operations from S3 stages

### BI and Reporting
- **Report Broadcasting**: Automated export and delivery of paginated reports
- **Multi-Channel Delivery**: Email and Teams (extensible)
- **Template-Based Configuration**: Jinja2 templating for dynamic parameters
- **Export Polling**: Automatic status monitoring and download

### Configuration & Security
- **Centralized Configuration**: Single TOML file for all credentials and profiles
- **AWS Secrets Manager**: Secure token and credential storage
- **Multiple Profiles**: Support for dev, staging, and production environments
- **Environment Variable Override**: Flexible configuration paths

### Logging & Monitoring
- **Colored Console Output**: Easy-to-read log levels with color coding
- **Snowflake Task Tracking**: Persistent execution history in Snowflake
- **Verbose Mode**: Detailed debugging information on demand
- **Execution Metadata**: Start time, end time, status, and next execution tracking

---

## 📥 Installation

### From PyPI

```bash
pip install snowforge-package
```

### From Source

```bash
git clone https://github.com/Norsk-Tipping/snowforge-package.git
cd snowforge-package
pip install -e .
```

### Requirements

- Python >= 3.12
- Dependencies (automatically installed):
  - boto3
  - snowflake-connector-python
  - snowpipe-streaming
  - colored, coloredlogs
  - tqdm
  - toml
  - requests
  - jinja2

---

## ⚙️ Configuration

Snowforge uses a `snowforge_config.toml` file for managing credentials and profiles. The file is searched in the following order:

1. Path specified in the `SNOWFORGE_CONFIG_PATH` environment variable
2. Current working directory: `./snowforge_config.toml`
3. User config directory: `~/.config/snowforge_config.toml`
4. Package installation directory

### Configuration File Structure

```toml
# AWS Configuration
[AWS.default]
AWS_ACCESS_KEY = "your-access-key-id"
AWS_SECRET_KEY = "your-secret-access-key"
REGION = "us-east-1"

[AWS.production]
AWS_ACCESS_KEY = "prod-access-key"
AWS_SECRET_KEY = "prod-secret-key"
REGION = "eu-west-1"

# Snowflake Configuration
[SNOWFLAKE.default]
USERNAME = "your-username"
ACCOUNT = "your-account-identifier"
ROLE = "SYSADMIN"
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"

[SNOWFLAKE.svc_key_based]
USERNAME = "service_account"
ACCOUNT = "xy12345.us-east-1"
KEY_FILE_PATH = "/path/to/private_key.p8"
KEY_FILE_PASSWORD = "key_password"
SNOWFLAKE_DATABASE = "ANALYTICS"
SNOWFLAKE_SCHEMA = "PUBLIC"
SNOWFLAKE_WAREHOUSE = "ETL_WH"
ROLE = "ETL_ROLE"

# Required for SnowflakeLogging
[SNOWFLAKE.snowforge]
USERNAME = "logging_user"
ACCOUNT = "xy12345.us-east-1"
KEY_FILE_PATH = "/path/to/logging_key.p8"
KEY_FILE_PASSWORD = "key_password"
SNOWFLAKE_DATABASE = "LOGGING_DB"
SNOWFLAKE_SCHEMA = "TASK_LOGS"
```

### Environment Variables

```bash
# Override config file location
export SNOWFORGE_CONFIG_PATH="/custom/path/snowforge_config.toml"
```

---

## 🚀 Quick Start

### Basic Workflow Example

```python
from Snowforge import (
    AWSIntegration,
    SnowflakeIntegration,
    SnowflakeLogging,
    Debug
)
from datetime import datetime

# Initialize AWS
AWSIntegration.initialize(profile="default", verbose=True)

# Connect to Snowflake
conn = SnowflakeIntegration.connect(profile="svc_key_based", verbose=True)

# Log task execution
execution_id = SnowflakeLogging.log_start(
    task_id=1,
    process_id=12345,
    starttime=datetime.now()
)

# Your data processing logic here
Debug.log("Processing data...", level='INFO')

# Complete task logging
SnowflakeLogging.log_end(
    execution_id=execution_id,
    status="SUCCESS",
    log_path="/logs/task_1.log",
    endtime=datetime.now(),
    next_execution_time=None
)

# Clean up
SnowflakeIntegration.close_connection()
```

---

## 📚 Core Modules

### AWSIntegration

Manages AWS S3 and Secrets Manager operations with automatic authentication and error handling.

#### Key Methods

##### `initialize(profile: str = "default", verbose: bool = False)`
Initialize AWS clients for S3 and Secrets Manager.

**Example:**
```python
from Snowforge import AWSIntegration

# Initialize with default profile
AWSIntegration.initialize(profile="default")

# Initialize with production profile and verbose output
AWSIntegration.initialize(profile="production", verbose=True)
```

##### `get_secret(secret_name: str, verbose: bool = False) -> dict | str | None`
Retrieve secrets from AWS Secrets Manager.

**Example:**
```python
# Get database credentials
db_creds = AWSIntegration.get_secret("production/database/credentials")

# Get API token
api_token = AWSIntegration.get_secret("powerbi/refresh_token")

if isinstance(db_creds, dict):
    username = db_creds.get("username")
    password = db_creds.get("password")
```

##### `push_file_to_s3(bucket_name: str, file_to_upload: str, key: str, config: TransferConfig = None, verbose: bool = False)`
Upload files to S3 with progress tracking and multipart support.

**Example:**
```python
from boto3.s3.transfer import TransferConfig

# Configure for large files
transfer_config = AWSIntegration.define_s3_transfer_config(
    size_threshold=0.5,  # 500 MB threshold
    threads=10           # 10 concurrent threads
)

# Upload file
AWSIntegration.push_file_to_s3(
    bucket_name="my-data-bucket",
    file_to_upload="/tmp/large_dataset.csv",
    key="raw/2024/dataset.csv",
    config=transfer_config,
    verbose=True
)
```

##### `get_bucket_contents(bucket_name: str, verbose: bool = False) -> list[str]`
List all files in an S3 bucket.

**Example:**
```python
# List all files in bucket
files = AWSIntegration.get_bucket_contents("my-data-bucket")

for file_key in files:
    print(f"Found: {file_key}")
```

---

### SnowflakeIntegration

Establishes and manages connections to Snowflake with support for multiple authentication methods.

#### Key Methods

##### `connect(user_name: str = None, account: str = None, profile: str = "default", verbose: bool = False)`
Connect to Snowflake using various authentication methods.

**Example:**
```python
from Snowforge import SnowflakeIntegration

# Method 1: Using profile with key-pair authentication
conn = SnowflakeIntegration.connect(profile="svc_key_based", verbose=True)

# Method 2: Using external browser authentication
conn = SnowflakeIntegration.connect(
    user_name="john.doe@company.com",
    account="xy12345.us-east-1"
)

# Method 3: Using default profile
conn = SnowflakeIntegration.connect()

# Execute queries
cursor = conn.cursor()
cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()")
print(cursor.fetchone())
cursor.close()
```

##### `load_to_snowflake(stage: str, stage_key: str, database: str, schema: str, table: str, profile: str = "default", verbose: bool = False)`
Load data from Snowflake stage into table using COPY INTO.

**Example:**
```python
# Load CSV file from internal stage
SnowflakeIntegration.load_to_snowflake(
    stage="MY_STAGE",
    stage_key="/data/2024/sales.csv",
    database="ANALYTICS",
    schema="RAW",
    table="SALES_DATA",
    profile="svc_key_based",
    verbose=True
)
```

##### `truncate_table(database: str, schema: str, table: str, profile: str = "default", verbose: bool = False)`
Truncate a Snowflake table.

**Example:**
```python
# Truncate staging table before load
SnowflakeIntegration.truncate_table(
    database="ANALYTICS",
    schema="STAGING",
    table="TEMP_SALES",
    profile="svc_key_based"
)
```

##### `connect_to_pipe(profile: str = "default", client_name: str = None, db_name: str = None, schema_name: str = None, pipe_name: str = None, verbose: bool = False)`
Connect to Snowpipe Streaming for real-time data ingestion.

**Example:**
```python
from Snowforge import SnowflakeIntegration
from datetime import datetime

# Connect to Snowpipe
client = SnowflakeIntegration.connect_to_pipe(
    profile="svc_key_based",
    client_name="LOG_STREAM",
    db_name="LOGGING_DB",
    schema_name="LOGS",
    pipe_name="LOG_PIPE"
)

# Stream data
data = {
    "SOURCE": "APPLICATION",
    "TIMESTAMP": datetime.now().isoformat(),
    "CRITICALITY": "INFO",
    "MESSAGE": "User login successful",
    "WORKFLOW_NAME": "Authentication",
    "WORKFLOW_INSTANCE": "12345"
}

SnowflakeIntegration.stream_to_pipe(client, data, channel_name="LOG_CHANNEL")

# Close connection
SnowflakeIntegration.close_pipe_connection(client)
```

##### `close_connection()`
Close the active Snowflake connection.

**Example:**
```python
# Always close connections when done
SnowflakeIntegration.close_connection()
```

---

### SnowflakeLogging

Provides structured task execution logging directly to Snowflake for audit trails and monitoring.

#### Prerequisites

Before using SnowflakeLogging, you must:

1. Create a profile named `snowforge` in your config file
2. Execute the SQL setup scripts in your Snowflake account

**View setup requirements:**
```python
from Snowforge import SnowflakeLogging

# Print SQL scripts to console
sql_files = SnowflakeLogging.show_requirements(print_to_console=True)
```

#### Key Methods

##### `log_start(task_id: int, process_id: int, starttime: datetime, verbose: bool = False) -> int`
Log the start of a task execution and return an execution ID.

**Example:**
```python
from Snowforge import SnowflakeLogging
from datetime import datetime
import hashlib

# Generate unique process ID
process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)

# Log task start
execution_id = SnowflakeLogging.log_start(
    task_id=42,
    process_id=process_id,
    starttime=datetime.now(),
    verbose=True
)

print(f"Task execution started: {execution_id}")
```

##### `log_end(execution_id: int, status: str, log_path: str, endtime: datetime, next_execution_time: datetime = None, verbose: bool = False)`
Log the completion of a task execution.

**Example:**
```python
from datetime import datetime, timedelta

# Log successful completion
SnowflakeLogging.log_end(
    execution_id=execution_id,
    status="SUCCESS",
    log_path="/logs/2024/04/task_42.log",
    endtime=datetime.now(),
    next_execution_time=datetime.now() + timedelta(hours=24),
    verbose=True
)

# Log failure
SnowflakeLogging.log_end(
    execution_id=execution_id,
    status="FAILED",
    log_path="/logs/2024/04/task_42_error.log",
    endtime=datetime.now(),
    next_execution_time=None  # No next execution on failure
)
```

#### Complete Logging Example

```python
from Snowforge import SnowflakeLogging, Debug
from datetime import datetime
import hashlib

def run_data_pipeline():
    """Example data pipeline with Snowflake logging."""

    # Generate process ID
    process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)

    # Start logging
    execution_id = SnowflakeLogging.log_start(
        task_id=100,
        process_id=process_id,
        starttime=datetime.now()
    )

    try:
        # Your pipeline logic here
        Debug.log("Extracting data...", 'INFO')
        Debug.log("Transforming data...", 'INFO')
        Debug.log("Loading to target...", 'INFO')

        # Log success
        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="SUCCESS",
            log_path=f"/logs/pipeline_{execution_id}.log",
            endtime=datetime.now(),
            next_execution_time=None
        )

    except Exception as e:
        Debug.log(f"Pipeline failed: {e}", 'ERROR')

        # Log failure
        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="FAILED",
            log_path=f"/logs/pipeline_{execution_id}_error.log",
            endtime=datetime.now(),
            next_execution_time=None
        )
        raise

if __name__ == "__main__":
    run_data_pipeline()
```

---

### Config

Centralized configuration management with automatic file discovery and profile selection.

#### Key Methods

##### `get_snowflake_credentials(profile: str = "default", verbose: bool = False) -> dict`
Retrieve Snowflake credentials for a specific profile.

**Example:**
```python
from Snowforge import Config

# Get default profile
creds = Config.get_snowflake_credentials()

# Get specific profile
prod_creds = Config.get_snowflake_credentials(profile="production", verbose=True)

print(f"Username: {prod_creds['USERNAME']}")
print(f"Account: {prod_creds['ACCOUNT']}")
```

##### `get_aws_credentials(profile: str = "default", verbose: bool = False) -> dict`
Retrieve AWS credentials for a specific profile.

**Example:**
```python
# Get AWS credentials
aws_creds = Config.get_aws_credentials(profile="production")

print(f"Region: {aws_creds['REGION']}")
print(f"Access Key: {aws_creds['AWS_ACCESS_KEY'][:8]}...")
```

##### `find_config_file(config_paths: list = None, verbose: bool = False) -> str | None`
Locate the configuration file.

**Example:**
```python
# Find config file
config_path = Config.find_config_file(verbose=True)

if config_path:
    print(f"Using config: {config_path}")
else:
    print("No config file found")
```

---

### Logging

Centralized console logging with colored output for better visibility.

#### Key Methods

##### `log(message: str, level: str = 'INFO', verbose_logging: bool = False)`
Log messages with colored output based on severity level.

**Example:**
```python
from Snowforge import Debug

# Basic logging
Debug.log("Application started", 'INFO')
Debug.log("Configuration loaded", 'SUCCESS')
Debug.log("Missing parameter", 'WARNING')
Debug.log("Connection failed", 'ERROR')
Debug.log("System crash", 'CRITICAL')

# Debug logging (only shown when verbose=True)
Debug.log("Variable value: 42", 'DEBUG', verbose_logging=True)

# Custom formatting
Debug.log(
    f"Processing file: large_dataset.csv\n"
    f"Size: 1.5 GB\n"
    f"Rows: 10,000,000",
    'INFO'
)
```

#### Log Levels

| Level | Color | Usage |
|-------|-------|-------|
| `INFO` | White | General information |
| `SUCCESS` | Green | Successful operations |
| `WARNING` | Yellow | Warning messages |
| `ERROR` | Red | Error messages |
| `CRITICAL` | Bright Red | Critical failures |
| `DEBUG` | Blue | Debug information (verbose mode only) |

---

### DataMover

Extensible data extraction and movement engine with parallel processing support.

#### Key Classes

##### `Engine`
Orchestrates data extraction and file operations.

**Key Methods:**

###### `export_to_file(extractor: ExtractorStrategy, output_path: str, fully_qualified_table_name: str, filter_statement: str = None, verbose: bool = False) -> tuple`

**Example:**
```python
from Snowforge.DataMover import Engine
from Snowforge.DataMover.Extractors.NetezzaExtractor import NetezzaExtractor

# Initialize extractor
extractor = NetezzaExtractor()

# Export table to file
header, csv_file = Engine.export_to_file(
    extractor=extractor,
    output_path="/tmp/exports",
    fully_qualified_table_name="PROD_DB.SALES.TRANSACTIONS",
    filter_statement="WHERE transaction_date >= '2024-01-01'",
    verbose=True
)

print(f"Header: {header}")
print(f"Output file: {csv_file}")
```

###### `parallel_process(worker_func: object, args_list: list[tuple], num_workers: int = None) -> list`

Execute functions in parallel processes.

**Example:**
```python
def process_chunk(chunk_id, data_file, start_offset, end_offset):
    """Process a chunk of data."""
    print(f"Processing chunk {chunk_id}: {start_offset} to {end_offset}")
    # Processing logic here

# Prepare work items
args_list = [
    (1, "/tmp/data.csv", 0, 1000000),
    (2, "/tmp/data.csv", 1000000, 2000000),
    (3, "/tmp/data.csv", 2000000, 3000000),
]

# Execute in parallel
processes = Engine.parallel_process(
    worker_func=process_chunk,
    args_list=args_list,
    num_workers=3
)

# Wait for completion
for process in processes:
    process.join()
```

###### `calculate_chunks(external_table: str, compression: int = 4) -> int`

Calculate optimal number of chunks for a file.

**Example:**
```python
# Calculate chunks for large file
num_chunks = Engine.calculate_chunks(
    external_table="/tmp/large_export.csv",
    compression=4
)

print(f"File will be split into {num_chunks} chunks")
```

##### `ExtractorStrategy`
Abstract base class for database-specific extractors.

**Implementing a Custom Extractor:**

```python
from Snowforge.DataMover.Extractors.ExtractorStrategy import ExtractorStrategy

class PostgreSQLExtractor(ExtractorStrategy):
    """PostgreSQL data extractor implementation."""

    def __init__(self, connection_string):
        self.connection_string = connection_string

    def extract_table_query(self, fully_qualified_table_name: str,
                           filter_statement: str = None,
                           verbose: bool = False):
        """Build extraction query for PostgreSQL."""
        query = f"SELECT * FROM {fully_qualified_table_name}"
        if filter_statement:
            query += f" {filter_statement}"
        return query

    def list_all_tables(self, database_name: str, verbose: bool = False):
        """List all tables in PostgreSQL database."""
        query = """
            SELECT table_schema, table_name
            FROM information_schema.tables
            WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
        """
        # Execute query and return results
        pass

    def export_external_table(self, output_path: str,
                            table_name: str,
                            filter_statement: str = None,
                            verbose: bool = False):
        """Export PostgreSQL table to CSV file."""
        # Implementation here
        pass

# Use custom extractor
extractor = PostgreSQLExtractor("postgresql://user:pass@host:5432/db")
header, file = Engine.export_to_file(
    extractor=extractor,
    output_path="/tmp/exports",
    fully_qualified_table_name="public.customers"
)
```

---

### Broadcaster

Automated BI report export and multi-channel delivery system.

#### Key Classes

##### `SnowforgeBroadcaster`
Main orchestrator for report broadcasting.

**Example:**
```python
from Snowforge.Broadcaster import (
    SnowforgeBroadcaster,
    BroadcastConfig,
    SmtpConfig
)
from Snowforge.Broadcaster.adapters import FabricExporter
from Snowforge import FabricIntegration

# Initialize Fabric integration
fabric = FabricIntegration(
    aws_secret_name="powerbi_token",
    aws_profile="production"
)

# Create exporter
exporter = FabricExporter(fabric)

# Configure SMTP
smtp = SmtpConfig(
    host="smtp.company.com",
    port=587,
    username="reports@company.com",
    password="smtp_password",
    use_tls=True
)

# Create broadcaster
broadcaster = SnowforgeBroadcaster(
    exporter=exporter,
    smtp=smtp,
    poll_interval=5.0,
    max_poll_attempts=240
)

# Configure broadcast
config = BroadcastConfig(
    workspace_id="12345678-1234-1234-1234-123456789abc",
    report_id="87654321-4321-4321-4321-cba987654321",
    channel="email",
    sender="reports@company.com",
    recipients=["user1@company.com", "user2@company.com"],
    subject="Monthly Sales Report",
    body="Please find attached the monthly sales report.",
    parameters={"ReportMonth": "2024-04", "Region": "North"}
)

# Execute broadcast
result = broadcaster.broadcast(config)
print(f"Report delivered: {result.filename}")
```

##### `BroadcastConfig`
Configuration for report broadcasts.

**Example:**
```python
from Snowforge.Broadcaster import BroadcastConfig

# Email broadcast
email_config = BroadcastConfig(
    workspace_id="workspace-guid",
    report_id="report-guid",
    channel="email",
    sender="noreply@company.com",
    recipients=["team@company.com"],
    subject="Daily Dashboard - {date}",
    body="<h1>Daily Dashboard</h1><p>See attached report.</p>",
    parameters={"date": "2024-04-15"}
)

# Teams broadcast (when implemented)
teams_config = BroadcastConfig(
    workspace_id="workspace-guid",
    report_id="report-guid",
    channel="teams",
    teams_webhook_url="https://outlook.office.com/webhook/...",
    parameters={"quarter": "Q1-2024"}
)
```

#### Configuration Providers

Load broadcast configurations from multiple sources.

##### JSON File Provider

**Example:**
```python
from Snowforge.Broadcaster import JsonFileConfigProvider

# Load from JSON file
provider = JsonFileConfigProvider("/config/broadcasts.json")
configs = provider.get_all_configs()

for config in configs:
    broadcaster.broadcast(config)
```

**broadcasts.json:**
```json
[
  {
    "workspace_id": "12345678-1234-1234-1234-123456789abc",
    "report_id": "87654321-4321-4321-4321-cba987654321",
    "channel": "email",
    "sender": "reports@company.com",
    "recipients": ["user@company.com"],
    "subject": "Report",
    "body": "See attached",
    "parameters": {}
  }
]
```

##### Snowflake Config Provider

**Example:**
```python
from Snowforge.Broadcaster import SnowflakeConfigProvider
from Snowforge import SnowflakeIntegration

# Connect to Snowflake
conn = SnowflakeIntegration.connect(profile="svc_key_based")

# Load configs from table
provider = SnowflakeConfigProvider(
    connection=conn,
    database="CONFIG_DB",
    schema="BROADCAST",
    table="REPORT_CONFIGS"
)

configs = provider.get_all_configs()
```

---

### FabricIntegration

Microsoft Fabric and Power BI API integration for pipeline management, semantic model refresh, and report export.

#### Key Methods

##### `test_authentication() -> bool`
Verify API token validity.

**Example:**
```python
from Snowforge import FabricIntegration

fabric = FabricIntegration(
    aws_secret_name="powerbi_token",
    aws_profile="default"
)

if fabric.test_authentication():
    print("Authentication successful")
else:
    print("Authentication failed")
```

##### `start_pipeline(workspace_id: str, item_id: str) -> str`
Start a Fabric data pipeline.

**Example:**
```python
# Start pipeline
execution_id = fabric.start_pipeline(
    workspace_id="12345678-1234-1234-1234-123456789abc",
    item_id="pipeline-item-id"
)

if execution_id != "Error":
    print(f"Pipeline started: {execution_id}")
```

##### `check_pipeline_last_status(workspace_id: str, item_id: str) -> tuple[str, str]`
Check pipeline execution status.

**Example:**
```python
run_id, status = fabric.check_pipeline_last_status(
    workspace_id="workspace-guid",
    item_id="pipeline-item-id"
)

print(f"Pipeline {run_id}: {status}")
```

##### `semantic_model_reload(workspace_id: str, item_id: str) -> str`
Trigger semantic model refresh.

**Example:**
```python
result = fabric.semantic_model_reload(
    workspace_id="workspace-guid",
    item_id="model-item-id"
)

if result == "Completed":
    print("Refresh initiated successfully")
```

##### `export_paginated_report(access_token: str, workspace_id: str, report_id: str, input_data: dict = None, file_format: str = "PDF")`
Export a paginated report.

**Example:**
```python
# Export with parameters
response = fabric.export_paginated_report(
    access_token=fabric.access_token,
    workspace_id="workspace-guid",
    report_id="report-guid",
    input_data={
        "StartDate": "2024-01-01",
        "EndDate": "2024-03-31",
        "Department": "Sales"
    },
    file_format="XLSX"
)

if response and response.status_code == 202:
    export_id = response.json().get("id")
    print(f"Export initiated: {export_id}")
```

##### `download_report(workspace_id: str, report_id: str, export_id: str) -> tuple[bytes, str]`
Download exported report.

**Example:**
```python
# Download report
content, filename = fabric.download_report(
    workspace_id="workspace-guid",
    report_id="report-guid",
    export_id="export-guid"
)

# Save to file
with open(f"/reports/{filename}", "wb") as f:
    f.write(content)

print(f"Downloaded: {filename} ({len(content)} bytes)")
```

#### Complete Fabric Workflow

```python
from Snowforge import FabricIntegration, Debug
import time

# Initialize
fabric = FabricIntegration(aws_secret_name="powerbi_token")

# 1. Refresh data pipeline
Debug.log("Starting data pipeline...", 'INFO')
pipeline_id = fabric.start_pipeline(
    workspace_id="workspace-guid",
    item_id="pipeline-item-id"
)

# Wait for pipeline completion
while True:
    run_id, status = fabric.check_pipeline_last_status(
        workspace_id="workspace-guid",
        item_id="pipeline-item-id"
    )

    if status == "Completed":
        Debug.log("Pipeline completed successfully", 'SUCCESS')
        break
    elif status == "Failed":
        Debug.log("Pipeline failed", 'ERROR')
        break

    time.sleep(10)

# 2. Refresh semantic model
Debug.log("Refreshing semantic model...", 'INFO')
fabric.semantic_model_reload(
    workspace_id="workspace-guid",
    item_id="model-item-id"
)

# 3. Export report
Debug.log("Exporting report...", 'INFO')
response = fabric.export_paginated_report(
    access_token=fabric.access_token,
    workspace_id="workspace-guid",
    report_id="report-guid",
    file_format="PDF"
)

export_id = response.json().get("id")

# 4. Wait for export and download
while True:
    status_code = fabric.check_report_status(
        workspace_id="workspace-guid",
        report_id="report-guid",
        export_id=export_id
    )

    if status_code == 200:
        content, filename = fabric.download_report(
            workspace_id="workspace-guid",
            report_id="report-guid",
            export_id=export_id
        )

        with open(f"/reports/{filename}", "wb") as f:
            f.write(content)

        Debug.log(f"Report saved: {filename}", 'SUCCESS')
        break

    time.sleep(5)
```

---

## 🎓 Advanced Usage

### End-to-End Data Pipeline

```python
from Snowforge import (
    AWSIntegration,
    SnowflakeIntegration,
    SnowflakeLogging,
    Debug
)
from Snowforge.DataMover import Engine
from Snowforge.DataMover.Extractors.NetezzaExtractor import NetezzaExtractor
from datetime import datetime
import hashlib

def etl_pipeline():
    """Complete ETL pipeline example."""

    # 1. Initialize connections
    Debug.log("Initializing connections...", 'INFO')
    AWSIntegration.initialize(profile="production")
    SnowflakeIntegration.connect(profile="svc_key_based")

    # 2. Start logging
    process_id = int(hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8], 16)
    execution_id = SnowflakeLogging.log_start(
        task_id=200,
        process_id=process_id,
        starttime=datetime.now()
    )

    try:
        # 3. Extract data from source
        Debug.log("Extracting data from Netezza...", 'INFO')
        extractor = NetezzaExtractor()
        header, csv_file = Engine.export_to_file(
            extractor=extractor,
            output_path="/tmp/extracts",
            fully_qualified_table_name="PROD.SALES.ORDERS",
            filter_statement="WHERE order_date >= CURRENT_DATE - 7",
            verbose=True
        )

        # 4. Upload to S3
        Debug.log("Uploading to S3...", 'INFO')
        s3_key = f"raw/orders/{datetime.now().strftime('%Y/%m/%d')}/orders.csv"
        AWSIntegration.push_file_to_s3(
            bucket_name="data-lake",
            file_to_upload=csv_file,
            key=s3_key,
            verbose=True
        )

        # 5. Truncate target table
        Debug.log("Truncating target table...", 'INFO')
        SnowflakeIntegration.truncate_table(
            database="ANALYTICS",
            schema="STAGING",
            table="ORDERS_STAGING"
        )

        # 6. Load to Snowflake
        Debug.log("Loading to Snowflake...", 'INFO')
        SnowflakeIntegration.load_to_snowflake(
            stage="S3_STAGE",
            stage_key=f"/{s3_key}",
            database="ANALYTICS",
            schema="STAGING",
            table="ORDERS_STAGING",
            verbose=True
        )

        # 7. Log success
        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="SUCCESS",
            log_path=f"/logs/etl_{execution_id}.log",
            endtime=datetime.now(),
            next_execution_time=None
        )

        Debug.log("Pipeline completed successfully!", 'SUCCESS')

    except Exception as e:
        Debug.log(f"Pipeline failed: {e}", 'ERROR')

        SnowflakeLogging.log_end(
            execution_id=execution_id,
            status="FAILED",
            log_path=f"/logs/etl_{execution_id}_error.log",
            endtime=datetime.now(),
            next_execution_time=None
        )
        raise

    finally:
        SnowflakeIntegration.close_connection()

if __name__ == "__main__":
    etl_pipeline()
```

### Multi-Report Broadcasting

```python
from Snowforge.Broadcaster import (
    SnowforgeBroadcaster,
    SnowflakeConfigProvider,
    SmtpConfig
)
from Snowforge.Broadcaster.adapters import FabricExporter
from Snowforge import FabricIntegration, SnowflakeIntegration, Debug

def broadcast_reports():
    """Broadcast multiple reports from Snowflake configuration."""

    # Initialize connections
    fabric = FabricIntegration(aws_secret_name="powerbi_token")
    sf_conn = SnowflakeIntegration.connect(profile="svc_key_based")

    # Configure broadcaster
    broadcaster = SnowforgeBroadcaster(
        exporter=FabricExporter(fabric),
        smtp=SmtpConfig(
            host="smtp.company.com",
            port=587,
            username="reports@company.com",
            password="password",
            use_tls=True
        )
    )

    # Load configurations from Snowflake
    provider = SnowflakeConfigProvider(
        connection=sf_conn,
        database="CONFIG",
        schema="REPORTS",
        table="BROADCAST_CONFIGS"
    )

    configs = provider.get_all_configs()
    Debug.log(f"Found {len(configs)} report configurations", 'INFO')

    # Broadcast each report
    for i, config in enumerate(configs, 1):
        Debug.log(f"Broadcasting report {i}/{len(configs)}: {config.subject}", 'INFO')

        try:
            result = broadcaster.broadcast(config)
            Debug.log(f"Report {i} delivered: {result.filename}", 'SUCCESS')
        except Exception as e:
            Debug.log(f"Failed to broadcast report {i}: {e}", 'ERROR')
            continue

    # Cleanup
    SnowflakeIntegration.close_connection()

if __name__ == "__main__":
    broadcast_reports()
```

---

## 🔧 Extending Snowforge

### Creating a Custom Database Extractor

```python
from Snowforge.DataMover.Extractors.ExtractorStrategy import ExtractorStrategy
import pyodbc
import csv

class OracleExtractor(ExtractorStrategy):
    """Oracle database extractor."""

    def __init__(self, dsn: str, username: str, password: str):
        self.dsn = dsn
        self.username = username
        self.password = password
        self.connection = None

    def connect(self):
        """Establish Oracle connection."""
        if not self.connection:
            self.connection = pyodbc.connect(
                f"DSN={self.dsn};UID={self.username};PWD={self.password}"
            )

    def extract_table_query(self, fully_qualified_table_name: str,
                           filter_statement: str = None,
                           verbose: bool = False) -> str:
        """Build Oracle extraction query."""
        parts = fully_qualified_table_name.split('.')

        if len(parts) == 3:
            schema, table = parts[1], parts[2]
        elif len(parts) == 2:
            schema, table = parts
        else:
            table = parts[0]
            schema = self.username.upper()

        query = f"SELECT * FROM {schema}.{table}"

        if filter_statement:
            query += f" {filter_statement}"

        return query

    def list_all_tables(self, database_name: str, verbose: bool = False) -> list:
        """List all accessible tables in Oracle."""
        self.connect()
        cursor = self.connection.cursor()

        query = """
            SELECT owner, table_name
            FROM all_tables
            WHERE owner NOT IN ('SYS', 'SYSTEM')
            ORDER BY owner, table_name
        """

        cursor.execute(query)
        tables = [(row.owner, row.table_name) for row in cursor.fetchall()]
        cursor.close()

        return tables

    def export_external_table(self, output_path: str,
                             table_name: str,
                             filter_statement: str = None,
                             verbose: bool = False) -> tuple:
        """Export Oracle table to CSV."""
        from Snowforge import Debug

        self.connect()
        cursor = self.connection.cursor()

        # Build and execute query
        query = self.extract_table_query(table_name, filter_statement, verbose)
        Debug.log(f"Executing: {query}", 'DEBUG', verbose)

        cursor.execute(query)

        # Get column headers
        headers = [desc[0] for desc in cursor.description]

        # Export to CSV
        import os
        os.makedirs(output_path, exist_ok=True)

        table_short = table_name.split('.')[-1]
        csv_file = os.path.join(output_path, f"{table_short}.csv")

        with open(csv_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f, delimiter='|')
            writer.writerow(headers)

            row_count = 0
            while True:
                rows = cursor.fetchmany(10000)
                if not rows:
                    break

                writer.writerows(rows)
                row_count += len(rows)

        cursor.close()
        Debug.log(f"Exported {row_count} rows to {csv_file}", 'SUCCESS', verbose)

        return headers, csv_file

    def close(self):
        """Close Oracle connection."""
        if self.connection:
            self.connection.close()
            self.connection = None

# Usage
extractor = OracleExtractor(
    dsn="ORACLE_PROD",
    username="etl_user",
    password="password"
)

from Snowforge.DataMover import Engine

header, csv_file = Engine.export_to_file(
    extractor=extractor,
    output_path="/tmp/oracle_exports",
    fully_qualified_table_name="HR.EMPLOYEES",
    filter_statement="WHERE hire_date >= TO_DATE('2024-01-01', 'YYYY-MM-DD')"
)

extractor.close()
```

### Creating a Custom Report Exporter

```python
from Snowforge.Broadcaster.ReportExporter import ReportExporter, ExportResult
from typing import Any

class QlikSenseExporter(ReportExporter):
    """Qlik Sense report exporter."""

    def __init__(self, qlik_api_client):
        self.client = qlik_api_client

    def start_export(self, workspace_id: str, report_id: str,
                    parameters: dict[str, Any] | None = None) -> str:
        """Initiate Qlik report export."""

        # Build export request
        export_config = {
            "appId": report_id,
            "format": "pdf",
            "selections": parameters or {}
        }

        # Submit to Qlik API
        response = self.client.start_export(export_config)
        export_id = response.get("exportId")

        return export_id

    def poll_status(self, workspace_id: str, report_id: str,
                   export_id: str) -> str:
        """Check export status."""

        status_response = self.client.get_export_status(export_id)

        # Map Qlik status to standard status
        qlik_status = status_response.get("status")

        if qlik_status == "FINISHED":
            return "completed"
        elif qlik_status == "FAILED":
            return "failed"
        else:
            return "running"

    def download(self, workspace_id: str, report_id: str,
                export_id: str) -> ExportResult:
        """Download exported report."""

        # Download from Qlik
        file_bytes = self.client.download_export(export_id)
        filename = f"qlik_report_{export_id}.pdf"

        return ExportResult(content=file_bytes, filename=filename)

# Usage with Broadcaster
from Snowforge.Broadcaster import SnowforgeBroadcaster, BroadcastConfig, SmtpConfig

qlik_client = ...  # Initialize Qlik client
exporter = QlikSenseExporter(qlik_client)

broadcaster = SnowforgeBroadcaster(
    exporter=exporter,
    smtp=SmtpConfig(host="smtp.company.com", port=25)
)

config = BroadcastConfig(
    workspace_id="qlik-workspace",
    report_id="app-guid",
    channel="email",
    sender="reports@company.com",
    recipients=["user@company.com"],
    subject="Qlik Report",
    body="See attached",
    parameters={"Year": "2024"}
)

broadcaster.broadcast(config)
```

---

## 📜 License

This project is licensed under the **MIT License**. See the [LICENSE](LICENSE) file for details.

---

## 🤝 Contributing

We welcome contributions, suggestions, and collaboration!

### How to Contribute

1. **Fork the repository**
2. **Create a feature branch**: `git checkout -b feature/my-new-feature`
3. **Commit your changes**: `git commit -am 'Add new feature'`
4. **Push to the branch**: `git push origin feature/my-new-feature`
5. **Submit a pull request**

### Reporting Issues

If you encounter bugs or have feature requests, please open an issue on our [GitHub repository](https://github.com/Norsk-Tipping/snowforge-package/issues).

### Contact

For questions about using Snowforge or collaboration opportunities:

- **Authors**: Andreas Heggelund, Christophe Lebegue
- **Email**: andreasheggelund@gmail.com

Vi oppfordrer til å ta kontakt dersom du har forslag til forbedringer, spørsmål om bruken av Snowforge, eller ønsker samarbeid. Ditt bidrag er alltid velkommen!

---

## 🙏 Acknowledgments

Snowforge is maintained by **Norsk Tipping** and built on top of excellent open-source libraries:

- [boto3](https://github.com/boto/boto3) - AWS SDK for Python
- [snowflake-connector-python](https://github.com/snowflakedb/snowflake-connector-python) - Snowflake connector
- [Jinja2](https://github.com/pallets/jinja/) - Template engine
- [requests](https://github.com/psf/requests) - HTTP library

---

**Happy Data Engineering! 🚀**
