Metadata-Version: 2.4
Name: dlt_utils
Version: 0.1.2
Summary: Shared utilities for dlt data pipelines with multi-company support
Author: Jay van den Bos
License: MIT
Project-URL: Homepage, https://pypi.org/project/dlt_utils/
Project-URL: Repository, https://dev.azure.com/your-org/your-project/_git/dlt_utils
Keywords: dlt,data,pipeline,etl,incremental
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.13
Description-Content-Type: text/markdown
Requires-Dist: dlt>=1.19.1
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"
Requires-Dist: ruff; extra == "dev"

# dlt_utils

[![PyPI version](https://badge.fury.io/py/dlt_utils.svg)](https://badge.fury.io/py/dlt_utils)
[![Python 3.13+](https://img.shields.io/badge/python-3.13+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Shared utilities for [dlt](https://dlthub.com/) data pipelines with multi-company support.

## Features

- **PartitionedIncremental**: Incremental state tracking per partition key (e.g., company_id)
- **Date utilities**: Generate (year, week) and (year, month) tuples for time-based partitioning
- **Schema utilities**: Ensure tables exist in destination database
- **CLI arguments**: Reusable argument parser with common dlt pipeline options
- **Logging**: Consistent logging configuration across pipelines
- **Pipeline utilities**: Resource filtering, validation, and helper functions

## Installation

```bash
# From PyPI
pip install dlt_utils

# For development
pip install -e ".[dev]"
```

## Usage

### PartitionedIncremental

Track incremental state per company (or any partition key):

```python
import dlt
from dlt_utils import PartitionedIncremental

@dlt.resource
def sync_resource():
    state = dlt.current.resource_state()
    inc = PartitionedIncremental(
        state=state,
        state_key="sequences",
        cursor_path="sequenceNumber",
        initial_value=0,
    )

    for company_id in ["company_a", "company_b"]:
        start_seq = inc.get_last_value(company_id)
        for record in fetch_data(company_id, since=start_seq):
            inc.track(company_id, record["sequenceNumber"])
            yield record
```

### Date utilities

Generate time periods for partitioned data extraction:

```python
from dlt_utils import generate_year_weeks, generate_year_months

# Generate weeks from 2024 to now + 52 weeks
weeks = generate_year_weeks(start_year=2024)
# [(2024, 1), (2024, 2), ..., (2025, 52)]

# Generate months from October 2024 to February 2025
months = generate_year_months(2024, 10, 2025, 2)
# [(2024, 10), (2024, 11), (2024, 12), (2025, 1), (2025, 2)]
```

### Schema utilities

Ensure tables exist before running pipeline:

```python
from dlt_utils import ensure_all_tables_exist, ensure_tables_for_resources

# Create all tables from schema
ensure_all_tables_exist(pipeline)

# Create only specific resource tables (including child tables)
ensure_tables_for_resources(pipeline, ["trade_items", "organizations"])
```

### CLI Arguments

Reusable argument parser with common dlt pipeline options:

```python
from dlt_utils import create_base_parser, add_common_args, get_common_args

# Option 1: Use pre-configured base parser
parser = create_base_parser(description="My pipeline")
parser.add_argument("--my-custom-arg", type=int, default=10)
args = parser.parse_args()

# Option 2: Add common args to existing parser
import argparse
parser = argparse.ArgumentParser(description="My pipeline")
add_common_args(parser, default_destination="duckdb")
args = parser.parse_args()

# Extract common args into a typed dataclass
common = get_common_args(args)
print(common.debug)        # bool
print(common.destination)  # "mssql" or "duckdb"
print(common.resources)    # list[str] | None
```

Available arguments:
- `--debug`: Enable debug logging
- `--destination`: Target database (duckdb, mssql)
- `--dev-mode`: Reset schema between runs
- `--resources`: List of resources to load
- `--refresh`: Refresh mode (drop_sources, drop_resources)
- `--list-resources`: Show available resources and exit

### Logging

Consistent logging configuration across pipelines:

```python
from dlt_utils import configure_logging, get_logger

# Configure logging based on debug flag
configure_logging(debug=args.debug)

# Get a logger for your module
logger = get_logger(__name__)
logger.info("Pipeline started")
```

### Pipeline Utilities

Helper functions for running dlt pipelines:

```python
from dlt_utils import (
    validate_refresh_args,
    filter_resources,
    print_resources,
    get_resource_names,
    mask_sensitive_value,
)

# Validate refresh args (prevents drop_sources with resource filter)
validate_refresh_args(refresh=args.refresh, resources_filter=args.resources)

# Filter resources based on user selection
available = ["orders", "products", "customers"]
to_load = filter_resources(available, args.resources)

# Print available resources from a dlt source
if args.list_resources:
    print_resources(source)
    exit(0)

# Get resource names from a source
names = get_resource_names(source)

# Mask sensitive values for logging
api_key = "sk-abc123xyz789"
print(mask_sensitive_value(api_key))  # "sk..12..***"
```

## Development

```bash
# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check dlt_utils/
```

## CI/CD Pipeline

De pipeline draait automatisch bij:
- **Push naar `main`**: Voert tests uit
- **Tag met `v*` prefix**: Voert tests uit én publiceert naar PyPI

### Pipeline Workflow

```
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Push/Tag      │────▶│   Test Stage    │────▶│  Publish Stage  │
│   naar repo     │     │   (altijd)      │     │  (alleen tags)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                              │                        │
                              ▼                        ▼
                        - Install deps           - Build package
                        - Run pytest             - Upload to PyPI
                        - Publish results
```

### Nieuwe Versie Releasen

#### Optie 1: Via Git CLI

```bash
# 1. Zorg dat alle changes gecommit zijn
git add .
git commit -m "Release v0.2.0"

# 2. Maak een tag aan
git tag v0.2.0

# 3. Push commit én tag naar remote
git push origin main
git push origin v0.2.0
```

#### Optie 2: Via Azure DevOps

1. Ga naar **Repos** → **Tags**
2. Klik op **New tag**
3. Vul in:
   - **Name**: `v0.2.0` (moet beginnen met `v`)
   - **Based on**: selecteer de commit of branch (bijv. `main`)
   - **Description**: optioneel, bijv. "Added new feature X"
4. Klik op **Create**

De pipeline wordt automatisch getriggered en publiceert naar PyPI.

### Versienummering

Gebruik [Semantic Versioning](https://semver.org/):
- `vMAJOR.MINOR.PATCH` (bijv. `v1.2.3`)
- **MAJOR**: Breaking changes
- **MINOR**: Nieuwe features (backwards compatible)
- **PATCH**: Bugfixes

> ⚠️ **Belangrijk**: Vergeet niet de versie in `pyproject.toml` bij te werken vóór het taggen!

## License

MIT
