Metadata-Version: 2.3
Name: th2etl
Version: 0.0.2
Summary: A flexible ETL framework with a database-driven scheduler, extensible pipeline blocs, and a RESTful API.
Author: thaink2
Author-email: thaink2 <contact@thaink2.com>
Requires-Dist: psycopg[binary]>=3.1,<4.0
Requires-Dist: pydantic>=2.13.4
Requires-Dist: pydantic-settings>=2.14.1
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: requests>=2.28,<3.0
Requires-Dist: pyjwt[crypto]>=2.8,<3.0
Requires-Dist: python-jose[cryptography]>=3.3,<4.0
Requires-Dist: fastapi>=0.110,<0.111
Requires-Dist: uvicorn[standard]>=0.28,<0.29
Requires-Dist: passlib>=1.7.4
Requires-Python: >=3.10
Description-Content-Type: text/markdown

# th2etl
thaink2 in house built ETL and automatisation library

## Design

A pipeline is modeled as a set of interdependent blocs. Each bloc is one of:

- `LoaderBloc` — loads or extracts raw data
- `TransformerBloc` — transforms data between stages
- `ExporterBloc` — exports or writes output

Dependencies between blocs are resolved before execution, so the pipeline runs in dependency order.

### Loader Blocs

The following loader blocs are available:

- `CsvLoaderBloc`: Loads data from a CSV file.
    - `bloc_type`: `csv_loader`
    - **Config**:
        - `file_path` (required): The path to the CSV file.
        - `delimiter` (optional): The delimiter character (default: `,`).
- `PostgresLoaderBloc`: Loads data from a PostgreSQL database.
    - `bloc_type`: `postgres_loader`
    - **Config**:
        - `query` (required): The SQL query to execute.
- `ApiLoaderBloc`: Loads data from a web API.
    - `bloc_type`: `api_loader`
    - **Config**:
        - `url` (required): The API endpoint URL.
        - `method` (optional): The HTTP method (default: `GET`).
        - `params` (optional): A dictionary of URL parameters.
        - `headers` (optional): A dictionary of HTTP headers.
        - `json` (optional): A dictionary for the JSON request body.

### Transformer Blocs

The following transformer blocs are available:

- `RunAdkAgentsBloc`: Runs an ADK agent via an API call.
    - `bloc_type`: `run_adk_agents`
    - **Config**:
        - `url` (optional): The API endpoint for the agent. Defaults to the development server.
        - `agent_id` (required): The ID of the agent to run (e.g., `database_assistant`).
        - `user_id` (required): The user's ID (e.g., email), used for authentication.
        - `message_text` (required): The text message to send to the agent.
- `RefreshWebhooksBloc`: Refreshes webhooks via an API call.
    - `bloc_type`: `refresh_webhooks`
    - **Config**:
        - `url` (required): The API endpoint for refreshing webhooks.
        - `user_id` (required): The user's ID (e.g., email), used for authentication.

## API Service

The application includes a FastAPI-based API for managing resources.

To run the API server, use the `--serve-api` command:

```bash
th2etl --serve-api
```

You can also specify the host and port:

```bash
th2etl --serve-api --host 0.0.0.0 --port 8080
```

The API documentation will be available at `http://127.0.0.1:8000/docs` when the server is running.

### Health Check

You can monitor the status of the service, including its connection to the database, by sending a GET request to the `/health` endpoint.

```bash
curl http://127.0.0.1:8000/health
```

If the service is running and connected to the database, it will return a `200 OK` response with `{"status": "ok"}`. If the database connection fails, it will return a `503 Service Unavailable` error.

## Usage

Run the scheduler in the current process:

```bash
python -m th2etl.runner
```

Run the scheduler in a separate isolated session:

```bash
python -m th2etl.runner --background
```

Pass environment variables into the isolated session:

```bash
python -m th2etl.runner --background --env SOURCE=prod --env DESTINATION=warehouse
```

If the package is installed, use the CLI entry point:

```bash
th2etl
```

This will start the scheduler by default.

To start the scheduler in the background:
```bash
th2etl --background
```

To start the API service in the background:
```bash
th2etl --serve-api --background
```

## Quickstart

Create pipeline metadata from the command line and then start the ETL service.

1. Set your PostgreSQL database settings in environment variables:

```powershell
$env:DATABASE_HOST = "localhost"
$env:DATABASE_PORT = "5432"
$env:DATABASE_NAME = "th2etl"
$env:DATABASE_USER = "etl_user"
$env:DATABASE_PASSWORD = "secret"
```

2. Create blocs in the database:

```powershell
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
    storage.create_bloc('example_loader','example_loader',dependencies=[],config={'source':'csv'})
    storage.create_bloc('example_transformer','example_transformer',dependencies=['example_loader'],config={'factor':2})
    storage.create_bloc('example_exporter','example_exporter',dependencies=['example_transformer'],config={'destination':'stdout'})"
```

3. Create a trigger for your pipeline:

```powershell
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
    storage.create_trigger('every_hour','example_pipeline','0 * * * *')"
```

4. Create the pipeline and optional scheduler:

```powershell
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
    storage.create_pipeline('example_pipeline',['example_loader','example_transformer','example_exporter'])
    storage.create_scheduler('example_scheduler','example_pipeline','every_hour')"
```

5. Start the ETL service:

```bash
th2etl
```

Or in the background:

```bash
th2etl --background
```

## Persistent Storage

Use `DatabaseStorage` to persist bloc, pipeline, trigger, and scheduler definitions.

```python
from th2etl import DatabaseStorage
from th2etl.configs.settings import get_settings

settings = get_settings()
with DatabaseStorage.from_settings(settings) as storage:
    storage.create_bloc("example_loader", "example_loader", dependencies=[], config={"source": "csv"})
    storage.create_bloc("example_transformer", "example_transformer", dependencies=["example_loader"], config={"factor": 2})
    storage.create_bloc("example_exporter", "example_exporter", dependencies=["example_transformer"], config={"destination": "stdout"})
    storage.create_pipeline("example_pipeline", ["example_loader", "example_transformer", "example_exporter"])
    storage.create_trigger("every_hour", "example_pipeline", "0 * * * *")
    storage.create_scheduler("example_scheduler", "example_pipeline", "every_hour")

    print(storage.list_pipelines())
    print(storage.list_schedulers())
```

The `Settings` object reads database connection details from environment variables such as `DATABASE_HOST`, `DATABASE_PORT`, `DATABASE_NAME`, `DATABASE_USER`, `DATABASE_PASSWORD`, and optionally `DATABASE_SSL_MODE`. You can also provide `DATABASE_URL` directly.

## Scheduler

Use `CronTrigger` and `CronScheduler` to run a pipeline on a cron-like schedule:

```python
from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import CronScheduler, CronTrigger

pipeline = build_example_pipeline()
trigger = CronTrigger("*/5 * * * *")
scheduler = CronScheduler(pipeline, trigger)
scheduler.start()
```

For multiple pipelines with independent trigger schedules, use `SchedulerManager` so each pipeline can run on its own cadence in parallel:

```python
from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import CronTrigger, CronScheduler, SchedulerManager

pipeline1 = build_example_pipeline()
pipeline2 = build_example_pipeline()

scheduler1 = CronScheduler(pipeline1, CronTrigger("0 * * * *"), name="hourly_pipeline")
scheduler2 = CronScheduler(pipeline2, CronTrigger("*/5 * * * *"), name="five_minute_pipeline")

manager = SchedulerManager([scheduler1, scheduler2])
manager.start()
```

If you persist scheduler metadata in the database, load scheduled pipelines dynamically from `DatabaseStorage`:

```python
from th2etl import DatabaseStorage
from th2etl.configs.settings import get_settings
from th2etl.scheduler import load_scheduler_manager

settings = get_settings()
with DatabaseStorage.from_settings(settings) as storage:
    manager = load_scheduler_manager(storage)
    manager.start()
```

Or create a scheduler helper directly:

```python
from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import schedule_pipeline

pipeline = build_example_pipeline()
scheduler = schedule_pipeline(pipeline, "0 * * * *")
scheduler.start()
```

## Logging

The application uses Python's standard `logging` module. You can control the log verbosity using environment variables.

- `TH2ETL_LOG_LEVEL`: Sets the global log level. Defaults to `INFO`. Can be set to `DEBUG`, `INFO`, `WARNING`, `ERROR`.
- `TH2ETL_LOG_LEVELS`: Provides fine-grained control over different parts of the application. This is a comma-separated list of `logger_name:LEVEL`.

For example, to see detailed logs from the scheduler but only warnings and errors from the pipelines and blocs, you can set:

```bash
export TH2ETL_LOG_LEVELS="th2etl.scheduler:INFO,th2etl:WARNING"
```

This sets the logger for the `th2etl.scheduler` module to `INFO`, while setting the base `th2etl` logger (which other modules inherit from) to `WARNING`. This is useful for focusing on the scheduler's activity without being overwhelmed by pipeline execution details.

## Output Storage

When pipelines are run by the scheduler, their output can be stored in a directory for later review.

- `pipelines_logs_dir`: Set this environment variable to the path of a directory where you want to store the output of each pipeline run.

If this variable is set, a new subdirectory will be created for each run, named with the scheduler and a timestamp (e.g., `five_minute_scheduler/20260515_103000`). This folder is passed to the pipeline in the `RunContext`, and blocs can be designed to write their output there.
