Metadata-Version: 2.4
Name: cuneiform-sdk
Version: 0.1.3
Summary: Enterprise-grade Python SDK for data processing workflows with AWS Lambda-style interfaces.
Author: Peernova inc.
License: MIT
Project-URL: Homepage, https://peernova.com
Project-URL: Repository, https://peernova.com
Keywords: sdk,workflow,duckdb,sql
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: duckdb
Requires-Dist: pandas
Requires-Dist: click
Requires-Dist: requests
Requires-Dist: PyYAML
Requires-Dist: sqlglot
Requires-Dist: kestra

# Cuneiform SDK

**Enterprise-grade Python SDK for data processing workflows with AWS Lambda-style interfaces.**

A comprehensive toolkit for building, testing, and deploying data transformation workflows with DuckDB-powered SQL execution, automatic schema validation, and intelligent dataset discovery.

---

## Table of Contents

- [Features](#features)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Core Concepts](#core-concepts)
- [Usage Guide](#usage-guide)
  - [Defining Workflow Functions](#defining-workflow-functions)
  - [Working with Datasets](#working-with-datasets)
  - [Executing SQL](#executing-sql)
  - [Schema Management](#schema-management)
- [CLI Commands](#cli-commands)
- [API Reference](#api-reference)
- [Examples](#examples)
- [Advanced Features](#advanced-features)
- [Troubleshooting](#troubleshooting)

---

## Features

✨ **Workflow Functions**
- Decorator-based workflow function definition with metadata
- Automatic function discovery and registration
- Lambda-style execution model

📊 **Data Processing**
- DuckDB-based in-memory SQL execution engine
- Support for CSV, Parquet, and Parquet files
- Automatic dataset loading and validation
- Seamless DataFrame/table conversion

✅ **Schema Management**
- YAML-based schema definitions
- Automatic schema validation
- Column-level type checking
- Schema discovery and generation

🔍 **Code Analysis**
- AST-based function analysis
- SQL dependency detection via SQLGlot
- Dataset dependency discovery
- Code quality insights

🎯 **Developer Experience**
- Comprehensive CLI for dataset management
- Rich error messages and validation feedback
- Structured logging and debug support
- Local testing capabilities

---

## Installation

### Requirements
- Python 3.8+
- DuckDB
- Pandas
- SQLGlot (for SQL analysis)

### From Source

```bash
# Clone the repository
git clone <repository-url>
cd lib

# Install in development mode
pip install -e cuneiform_sdk
```

### Basic Install

```bash
pip install cuneiform-sdk
```

---

## Quick Start

### 1. Define a Workflow Function

```python
from cuneiform_sdk import workflow_function, WorkflowRunContext

@workflow_function(
    name="process_customers",
    description="Process customer data and generate insights",
    version="1.0.0",
    tags=["customers", "etl"]
)
def process_customers(context: WorkflowRunContext, config: dict) -> dict:
    """
    Load customer data, transform it, and save results.
    """
    # Load input dataset
    context.load_dataset("customers", "data/customers.csv")
    
    # Execute SQL transformation
    result = context.sql("""
        SELECT 
            id,
            name,
            email,
            UPPER(city) as city,
            age * 1.1 as adjusted_age
        FROM customers
        WHERE age > 18
    """)
    
    # Save output
    output_info = context.save_dataset("customers", format="parquet")
    
    return {
        "status": "success",
        "output_path": output_info["output_path"],
        "row_count": len(result)
    }
```

### 2. Execute the Workflow

```python
from cuneiform_sdk import WorkflowRunContext

# Create execution context
context = WorkflowRunContext(
    data_dir="./data",
    output_dir="./output",
    schemas_dir="./schemas"
)

# Run the function
result = process_customers(context, config={"filter": "active"})
print(result)
```

### 3. Use the CLI

```bash
# Download a dataset
cuneiform dataset download customers --format csv

# Validate dataset against schema
cuneiform dataset validate customers.csv

# Test your workflow
cuneiform workflow test process_customers.py --data-dir ./data
```

---

## Core Concepts

### WorkflowContext

The execution context that provides:
- **SQL Execution**: Run queries on loaded datasets
- **Dataset Management**: Load and save datasets
- **Schema Validation**: Ensure data quality
- **I/O Operations**: Read/write in multiple formats

```python
context = WorkflowRunContext(
    data_dir="./data",           # Input datasets location
    output_dir="./output",       # Output datasets location
    schemas_dir="./schemas",     # Schema definitions location
    log_level="INFO"
)
```

### Workflow Functions

Functions decorated with `@workflow_function` that:
- Accept a `WorkflowContext` as first parameter
- Receive configuration as additional parameters
- Return a dictionary with execution results
- Have automatic metadata and discovery support

```python
@workflow_function(
    name="my_workflow",
    description="Description",
    version="1.0.0",
    tags=["tag1", "tag2"]
)
def my_workflow(context: WorkflowContext, config: dict) -> dict:
    # Implementation
    return {"status": "success"}
```

### DatasetSchema

YAML-based schema definitions for datasets:

```yaml
# schemas/customers.yaml
name: customers
version: "1.0.0"
description: "Customer master data"
columns:
  - name: id
    type: int64
    nullable: false
    description: "Unique customer ID"
  - name: name
    type: string
    nullable: false
  - name: email
    type: string
    nullable: true
  - name: created_at
    type: timestamp
    nullable: false
```

---

## Usage Guide

### Defining Workflow Functions

#### Basic Definition

```python
from cuneiform_sdk import workflow_function, WorkflowRunContext

@workflow_function(
    name="transform_data",
    description="Transform raw data",
    version="1.0.0"
)
def transform_data(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("raw_data")
    
    context.sql("""
        CREATE TABLE processed_data AS
        SELECT * FROM raw_data WHERE status = 'active'
    """)
    
    return context.save_dataset("processed_data")
```

#### With Multiple Datasets

```python
@workflow_function(name="merge_datasets", tags=["merge", "etl"])
def merge_datasets(context: WorkflowRunContext, config: dict) -> dict:
    # Load multiple datasets
    context.load_dataset("customers", "data/customers.csv")
    context.load_dataset("orders", "data/orders.parquet")
    
    # Transform
    result = context.sql("""
        SELECT 
            c.id as customer_id,
            c.name,
            COUNT(*) as order_count,
            SUM(o.amount) as total_spent
        FROM customers c
        LEFT JOIN orders o ON c.id = o.customer_id
        GROUP BY c.id, c.name
        ORDER BY total_spent DESC
    """)
    
    return context.save_dataset("customer_orders")
```

### Working with Datasets

#### Loading Datasets

```python
# Load from default location (data_dir/table_name.csv)
context.load_dataset("customers")

# Load from custom location
context.load_dataset("customers", "data/source/customers.parquet")

# Load multiple datasets
for table_name in ["customers", "orders", "products"]:
    context.load_dataset(table_name)
```

#### Saving Datasets

```python
# Save single dataset
output = context.save_dataset("results", format="parquet")
print(output["output_path"])  # Path to saved file

# Save multiple datasets
outputs = context.save_datasets(["table1", "table2", "table3"])
for table_name, path in outputs.items():
    print(f"{table_name} -> {path}")
```

#### Working with DataFrames

```python
# Get a pandas DataFrame
df = context.get_dataframe("customers")
print(df.head())

# Convert DataFrame to table
context.save_dataframe(df, "my_table")
```

### Executing SQL

#### Simple Queries

```python
# Execute SELECT query
result = context.sql("""
    SELECT id, name, email FROM customers LIMIT 10
""")

# Execute CREATE TABLE AS SELECT
context.sql("""
    CREATE TABLE high_value_customers AS
    SELECT * FROM customers WHERE lifetime_value > 10000
""")

# Execute UPDATE
context.sql("""
    UPDATE customers SET last_updated = CURRENT_TIMESTAMP
    WHERE status = 'active'
""")
```

#### Multi-Statement Execution

```python
context.sql("""
    CREATE TEMP TABLE temp_data AS SELECT * FROM raw_data;
    
    CREATE TABLE processed AS
    SELECT *, ROW_NUMBER() OVER (ORDER BY id) as rn
    FROM temp_data;
    
    DROP TABLE temp_data;
""")
```

#### Complex Transformations

```python
# Window functions
context.sql("""
    CREATE TABLE ranked_customers AS
    SELECT 
        *,
        ROW_NUMBER() OVER (ORDER BY spending DESC) as rank,
        ROUND(spending / SUM(spending) OVER () * 100, 2) as pct_of_total
    FROM customers
""")

# Aggregations with HAVING
context.sql("""
    CREATE TABLE customer_stats AS
    SELECT 
        region,
        COUNT(*) as customer_count,
        AVG(age) as avg_age,
        SUM(lifetime_value) as total_value
    FROM customers
    GROUP BY region
    HAVING COUNT(*) > 100
""")
```

### Schema Management

#### Define Schemas

Create YAML files in your `schemas/` directory:

```yaml
# schemas/transactions.yaml
name: transactions
version: "1.0.0"
description: "Financial transaction records"
columns:
  - name: transaction_id
    type: string
    nullable: false
  - name: customer_id
    type: int64
    nullable: false
  - name: amount
    type: float64
    nullable: false
  - name: currency
    type: string
    nullable: false
  - name: transaction_date
    type: timestamp
    nullable: false
  - name: status
    type: string
    nullable: false
```

#### Validate Data Against Schema

```python
from cuneiform_sdk import SchemaManager

schema_manager = SchemaManager("schemas/")
schema = schema_manager.load_schema("transactions")

# Validate DataFrame
try:
    is_valid = schema.validate_dataframe(df)
    print("✓ Data is valid")
except ValidationError as e:
    print(f"✗ Validation error: {e}")
```

#### Generate Schemas

```python
from cuneiform_sdk import SchemaManager

schema_manager = SchemaManager("schemas/")

# Generate schema from DataFrame
df = context.get_dataframe("customers")
schema = schema_manager.generate_schema_from_dataframe(df, name="customers")

# Save schema
schema_manager.save_schema(schema, "customers.yaml")
```

---

## CLI Commands

### Dataset Management

```bash
# Download dataset from remote storage
cuneiform dataset download <dataset-name> --format csv

# Validate dataset against schema
cuneiform dataset validate <dataset-file>

# Generate schema from dataset
cuneiform dataset generate-schema <dataset-file> --output schema.yaml

# List available datasets
cuneiform dataset list
```

### Workflow Management

```bash
# Scan directory for workflow functions
cuneiform workflow scan <directory>

# Test workflow locally
cuneiform workflow test <workflow-file> --data-dir ./data

# Show workflow metadata
cuneiform workflow info <workflow-function>
```

### Configuration

```bash
# Use verbose logging
cuneiform --verbose <command>

# Specify custom data directory
cuneiform --data-dir ./custom/data <command>
```

---

## API Reference

### WorkflowContext

**Abstract interface for workflow execution.**

#### Methods

| Method | Description |
|--------|-------------|
| `sql(query: str) -> Any` | Execute SQL query |
| `table(table_name: str) -> Any` | Get table reference |
| `list_tables() -> List[str]` | List all tables |
| `load_dataset(name: str, path: str = None)` | Load dataset |
| `save_dataset(name: str, format: str = "parquet") -> dict` | Save dataset |
| `save_datasets(names: List[str], format: str) -> dict` | Save multiple |
| `get_dataframe(name: str, path: str = None) -> pd.DataFrame` | Get DataFrame |
| `save_dataframe(df: pd.DataFrame, name: str)` | Save DataFrame |
| `log(msg: str, level: str = "INFO")` | Log message |

### WorkflowRunContext

**DuckDB-based implementation of WorkflowContext.**

```python
context = WorkflowRunContext(
    data_dir: str = "data",
    output_dir: str = "output", 
    schemas_dir: str = "datasets",
    log_level: str = "INFO",
    connection: Optional[object] = None
)
```

### DatasetSchema

**Schema definition for a dataset.**

```python
schema = DatasetSchema(
    name: str,                          # Dataset name
    columns: List[ColumnSchema],        # Column definitions
    description: Optional[str] = None,  # Description
    version: Optional[str] = None       # Version
)

# Load from YAML
schema = DatasetSchema.from_yaml_file("schemas/customers.yaml")

# Load from dictionary
schema = DatasetSchema.from_dict({
    "name": "customers",
    "columns": [{"name": "id", "type": "int64", "nullable": false}]
})

# Validate DataFrame
schema.validate_dataframe(df)
```

### SchemaManager

**Manage schema files and operations.**

```python
manager = SchemaManager("schemas/")

# Load schema
schema = manager.load_schema("customers")

# Save schema
manager.save_schema(schema, "customers.yaml")

# Generate from DataFrame
schema = manager.generate_schema_from_dataframe(df, "customers")

# List schemas
schemas = manager.list_schemas()
```

### workflow_function Decorator

**Mark functions as workflow functions.**

```python
@workflow_function(
    name: Optional[str] = None,           # Function name
    description: Optional[str] = None,    # Description
    version: Optional[str] = None,        # Version
    tags: Optional[List[str]] = None      # Tags for categorization
)
def my_function(context: WorkflowContext, config: dict) -> dict:
    pass
```

---

## Examples

### Example 1: Customer Segmentation

```python
@workflow_function(
    name="segment_customers",
    description="Segment customers by spending patterns",
    tags=["customers", "segmentation"]
)
def segment_customers(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("customers", "data/customers.csv")
    context.load_dataset("orders", "data/orders.parquet")
    
    # Create segments based on spending
    context.sql("""
        CREATE TABLE customer_segments AS
        WITH spending_stats AS (
            SELECT 
                o.customer_id,
                COUNT(*) as order_count,
                SUM(o.total_amount) as total_spent,
                AVG(o.total_amount) as avg_order_value,
                MAX(o.order_date) as last_order_date
            FROM orders o
            GROUP BY o.customer_id
        )
        SELECT 
            c.id,
            c.name,
            c.email,
            ss.order_count,
            ss.total_spent,
            CASE 
                WHEN ss.total_spent > 10000 THEN 'VIP'
                WHEN ss.total_spent > 5000 THEN 'Premium'
                WHEN ss.total_spent > 1000 THEN 'Regular'
                ELSE 'Low Value'
            END as segment,
            DATE_DIFF('day', ss.last_order_date, CURRENT_DATE) as days_since_purchase
        FROM customers c
        LEFT JOIN spending_stats ss ON c.id = ss.customer_id
        ORDER BY ss.total_spent DESC
    """)
    
    return context.save_dataset("customer_segments", format="parquet")
```

### Example 2: Data Quality Checks

```python
@workflow_function(
    name="validate_data_quality",
    description="Run data quality checks",
    tags=["quality", "validation"]
)
def validate_data_quality(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("transactions")
    
    # Run quality checks
    checks = context.sql("""
        SELECT 
            'null_customer_ids' as check_name,
            COUNT(*) as issue_count
        FROM transactions
        WHERE customer_id IS NULL
        
        UNION ALL
        
        SELECT 
            'negative_amounts' as check_name,
            COUNT(*) as issue_count
        FROM transactions
        WHERE amount < 0
        
        UNION ALL
        
        SELECT 
            'future_dates' as check_name,
            COUNT(*) as issue_count
        FROM transactions
        WHERE transaction_date > CURRENT_DATE
    """)
    
    # Save report
    context.save_dataset("quality_report")
    
    return {"checks_completed": True, "report_saved": True}
```

### Example 3: Time Series Analysis

```python
@workflow_function(
    name="analyze_trends",
    description="Analyze sales trends over time",
    tags=["analysis", "timeseries"]
)
def analyze_trends(context: WorkflowRunContext, config: dict) -> dict:
    context.load_dataset("sales", "data/sales.parquet")
    
    context.sql("""
        CREATE TABLE sales_trends AS
        WITH daily_sales AS (
            SELECT 
                DATE(order_date) as sale_date,
                SUM(amount) as daily_total,
                COUNT(*) as transaction_count
            FROM sales
            GROUP BY DATE(order_date)
        )
        SELECT 
            sale_date,
            daily_total,
            transaction_count,
            AVG(daily_total) OVER (
                ORDER BY sale_date 
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) as moving_avg_7day,
            LAG(daily_total, 1) OVER (ORDER BY sale_date) as prev_day_total,
            ROUND(
                (daily_total - LAG(daily_total, 1) OVER (ORDER BY sale_date)) / 
                LAG(daily_total, 1) OVER (ORDER BY sale_date) * 100, 
                2
            ) as pct_change
        FROM daily_sales
        ORDER BY sale_date
    """)
    
    return context.save_dataset("sales_trends")
```

---

## Advanced Features

### Workflow Function Discovery

Automatically discover and register workflow functions:

```python
from cuneiform_sdk import WorkflowRegistry, get_global_registry

# Discover functions in a module
registry = WorkflowRegistry()
registry.discover_functions_in_module("my_workflows")

# List registered functions
for func_name, metadata in registry.functions.items():
    print(f"Function: {func_name}")
    print(f"  Description: {metadata.description}")
    print(f"  Version: {metadata.version}")
    print(f"  Tags: {metadata.tags}")

# Get function by name
func_metadata = registry.get_function("segment_customers")
```

### Code Analysis and Dependency Discovery

Analyze workflow functions for dependencies:

```python
from cuneiform_sdk import WorkflowFunctionScanner

scanner = WorkflowFunctionScanner(".")

# Scan directory for workflow functions
analyses = scanner.scan_directory("workflows/")

for analysis in analyses:
    print(f"Function: {analysis.name}")
    print(f"Input datasets: {[d.name for d in analysis.dependencies if d.type == 'input']}")
    print(f"Output datasets: {[d.name for d in analysis.dependencies if d.type == 'output']}")
    print(f"SQL operations: {len(analysis.sql_operations)}")
    if analysis.issues:
        print(f"Issues: {analysis.issues}")
```

### SQL Analysis

Extract table dependencies from SQL:

```python
from cuneiform_sdk import WorkflowFunctionScanner

scanner = WorkflowFunctionScanner(".")
analyzer = scanner.sql_analyzer

# Analyze SQL query
operation = analyzer.analyze_sql_query("""
    SELECT c.id, c.name, COUNT(*) as order_count
    FROM customers c
    JOIN orders o ON c.id = o.customer_id
    WHERE c.status = 'active'
    GROUP BY c.id, c.name
""")

print(f"Input tables: {operation.inputs}")
print(f"Output tables: {operation.outputs}")
print(f"Operation type: {operation.operation_type}")
```

### Custom Logging

```python
context.log("Processing customer data", "INFO")
context.log("Starting data validation", "DEBUG")
context.log("Warning: Large dataset detected", "WARNING")
context.log("Critical error in transformation", "ERROR")
```

### Error Handling

```python
from cuneiform_sdk import (
    CuneiformError,
    ValidationError,
    DatasetError,
    ContextError
)

@workflow_function(name="robust_workflow")
def robust_workflow(context: WorkflowRunContext, config: dict) -> dict:
    try:
        context.load_dataset("customers")
    except DatasetError as e:
        return {
            "status": "error",
            "error_type": "dataset_error",
            "message": str(e),
            "error_code": e.error_code
        }
    except ValidationError as e:
        return {
            "status": "error",
            "error_type": "validation_error",
            "message": str(e)
        }
    except CuneiformError as e:
        return {
            "status": "error",
            "message": str(e),
            "context": e.context
        }
```

---

## Troubleshooting

### Dataset Not Found

**Problem**: `DatasetError: Dataset 'customers' not found`

**Solution**:
1. Verify file exists in data directory
2. Check file naming: should be `customers.csv`, `customers.parquet`, or `customers.pq`
3. Specify explicit path: `context.load_dataset("customers", "path/to/file.csv")`

### Schema Validation Fails

**Problem**: `ValidationError: Column 'age' expected int64, got float64`

**Solution**:
1. Update schema to match data types
2. Add type conversion in SQL: `CAST(age AS INT64)`
3. Update DataFrame before saving

### SQL Syntax Errors

**Problem**: `Error: SQL parse error`

**Solution**:
1. Check DuckDB SQL documentation for correct syntax
2. Test queries separately in DuckDB
3. Use multi-line strings for clarity
4. Enable verbose logging: `context = WorkflowRunContext(log_level="DEBUG")`

### Memory Issues with Large Datasets

**Problem**: `MemoryError: Unable to allocate memory`

**Solution**:
1. Process data in chunks using `LIMIT` and pagination
2. Use SQL to filter data early
3. Delete intermediate tables: `DROP TABLE temp_table`
4. Increase available memory to DuckDB process

### Import Errors

**Problem**: `ModuleNotFoundError: No module named 'cuneiform_sdk'`

**Solution**:
1. Ensure package is installed: `pip install cuneiform-sdk`
2. Check Python path includes package location
3. Use relative imports within package
4. Install in development mode: `pip install -e .`

---

## Support

For issues, questions, or contributions:
- 📧 Email: support@peernova.com
- 🐛 Report bugs: [GitHub Issues](https://github.com/peernova/titanium/issues)
- 📚 Documentation: [Full Docs](https://docs.peernova.com/cuneiform)

---

## License

Cuneiform SDK is proprietary software. See LICENSE file for details.

---

## Changelog

### v0.1.0 (2024)
- Initial release
- WorkflowContext interface and DuckDB implementation
- Schema validation system
- CLI tools for dataset management
- Workflow function discovery and analysis
- SQL dependency detection


