# epftoolbox2

> A Python library for electricity price forecasting with modular data pipelines and model evaluation.

## Overview

epftoolbox2 provides a complete toolkit for downloading electricity market data, processing it with transformers, validating quality, training forecasting models, and evaluating results. It uses a fluent builder pattern for pipeline construction. Both `DataPipeline` and `ModelPipeline` support YAML serialization. A `Workflow` class ties everything together for cluster/supercomputer usage.

## Installation

```bash
pip install epftoolbox2
# or
uv add epftoolbox2
```

## Architecture

```
┌─────────────────┐     ┌─────────────────┐
│  DataPipeline   │     │  ModelPipeline  │
├─────────────────┤     ├─────────────────┤
│ • Sources       │────►│ • Models        │
│ • Transformers  │     │ • Evaluators    │
│ • Validators    │     │ • Exporters     │
└─────────────────┘     └─────────────────┘
        ▼                       ▼
   pd.DataFrame          EvaluationReport

          ┌──────────────────────┐
          │       Workflow       │
          ├──────────────────────┤
          │ data_pipeline.yaml   │
          │ model_pipeline.yaml  │
          │ run params + env     │
          └──────────────────────┘
```

## Core Concepts

### DataPipeline
Downloads and processes data from multiple sources. Supports caching, transformations, and validation. Returns a pandas DataFrame with DatetimeIndex (UTC). Can be saved/loaded as YAML.

### ModelPipeline
Trains and evaluates forecasting models. Generates rolling-window forecasts for multiple horizons (1-7 days ahead) and all 24 hours. Returns an EvaluationReport. Can be saved/loaded as YAML.

### Workflow
Orchestrates DataPipeline + ModelPipeline from a single YAML file. Sets environment variables (`OMP_NUM_THREADS`, `PYTHON_GIL`, `MAX_PROCESSES`, `THREADS_PER_PROCESS`). Supports inline component overrides on top of referenced pipeline files.

### Fluent API
All pipelines use method chaining:
- `.add_source()` - Add data source
- `.add_transformer()` - Add data transformer
- `.add_validator()` - Add data validator
- `.add_model()` - Add forecasting model
- `.add_evaluator()` - Add evaluation metric
- `.add_exporter()` - Add results exporter

---

# DATA SOURCES

All sources implement `DataSource` base class with:
- `fetch(start: pd.Timestamp, end: pd.Timestamp) -> pd.DataFrame`
- `get_cache_config() -> Optional[Dict]` (returns None if not cacheable)

## EntsoeSource

Downloads electricity data from ENTSOE Transparency Platform.

```python
from epftoolbox2.data.sources import EntsoeSource

source = EntsoeSource(
    country_code="PL",           # Required. Country code (see SUPPORTED_COUNTRIES)
    api_key="YOUR_API_KEY",      # Required. ENTSOE API key
    type=["load", "price"],      # Required. Options: "load", "price", "generation"
)
```

**Parameters:**
- `country_code` (str): ISO country code. See SUPPORTED_COUNTRIES section.
- `api_key` (str): ENTSOE API key from https://transparency.entsoe.eu/
- `type` (List[str]): Data types to fetch. Options: `"load"`, `"price"`, `"generation"`

**Output columns by type:**
- `type=["load"]`: `load_actual`, `load_forecast`, `load_forecast_daily_min`, `load_forecast_daily_max`
- `type=["price"]`: `price` (EUR/MWh)
- `type=["generation"]`: `generation_biomass`, `generation_fossil_gas`, `generation_nuclear`, `generation_solar`, `generation_wind_onshore`, etc.

**Caching:** Returns cache config based on area_code and types. Cached to `.cache/sources/`.

---

## OpenMeteoSource

Downloads weather forecast data from Open-Meteo API. No API key required.

```python
from epftoolbox2.data.sources import OpenMeteoSource

source = OpenMeteoSource(
    latitude=52.2297,            # Required. Location latitude (-90 to 90)
    longitude=21.0122,           # Required. Location longitude (-180 to 180)
    horizon=7,                   # Optional. Days ahead for forecasts (default: 7)
    model="jma_seamless",        # Optional. Weather model (default: "jma_seamless")
    columns=None,                # Optional. Weather variables (default: DEFAULT_COLUMNS)
    prefix="warsaw",             # Optional. Prefix for column names (default: "")
)
```

**Default columns:** `temperature_2m`, `rain`, `showers`, `snowfall`, `relative_humidity_2m`, `dew_point_2m`, `apparent_temperature`, `precipitation`, `weather_code`, `surface_pressure`, `pressure_msl`, `cloud_cover`, `wind_speed_10m`, `wind_direction_10m`

**Output column naming:** `{prefix}_{variable}_d+{horizon}`
- Example: `warsaw_temperature_2m_d+1`, `warsaw_temperature_2m_d+2`, ..., `warsaw_temperature_2m_d+7`

**Caching:** Returns cache config based on location, horizon, model, columns.

---

## CalendarSource

Generates calendar features (holidays, weekday, hour, month, daylight).

```python
from epftoolbox2.data.sources import CalendarSource

source = CalendarSource(
    country="PL",                # Required. Country code for holidays
    timezone=None,               # Optional. Override default timezone
    holidays="binary",           # Optional. False | "binary" | "onehot" | "name"
    weekday="number",            # Optional. False | "number" | "onehot" | "name"
    hour=False,                  # Optional. False | "number" | "onehot"
    month=False,                 # Optional. False | "number" | "onehot" | "name"
    daylight=False,              # Optional. Add sunrise/sunset/daylight_hours
    prefix="",                   # Optional. Prefix for column names
)
```

**Output columns by parameter:**
| Parameter | Value | Output Columns |
|-----------|-------|----------------|
| `holidays` | `"binary"` | `is_holiday` (0/1) |
| `holidays` | `"name"` | `holiday_name` (string or None) |
| `holidays` | `"onehot"` | `is_holiday` + `holiday_{name}` columns |
| `weekday` | `"number"` | `weekday` (0=Monday, 6=Sunday) |
| `weekday` | `"name"` | `weekday_name` ("monday", "tuesday", ...) |
| `weekday` | `"onehot"` | `is_monday`, `is_tuesday`, ... |
| `hour` | `"number"` | `hour` (0-23) |
| `hour` | `"onehot"` | `is_0`, `is_1`, ..., `is_23` |
| `month` | `"number"` | `month` (1-12) |
| `month` | `"name"` | `month_name` ("january", ...) |
| `daylight` | `True` | `sunrise`, `sunset`, `daylight_hours` |

**Caching:** Returns None (not cached - deterministic generation).

---

## CsvSource

Loads time-series data from CSV files.

```python
from epftoolbox2.data.sources import CsvSource

source = CsvSource(
    file_path="data/prices.csv",     # Required. Path to CSV file
    datetime_column="datetime",       # Optional. Name of datetime column (default: "datetime")
    columns=None,                     # Optional. Columns to include (None = all)
    prefix="",                        # Optional. Prefix for column names
    datetime_format=None,             # Optional. strftime format (None = auto-detect)
    separator=",",                    # Optional. CSV separator (default: ",")
)
```

**Requirements:**
- File must have `.csv` extension
- Must contain the specified datetime column
- Datetime column is parsed and set as index
- All timestamps are converted to UTC

**Caching:** Returns None (not cached).

---

# TRANSFORMERS

All transformers implement `Transformer` base class with:
- `transform(df: pd.DataFrame) -> pd.DataFrame`

## ResampleTransformer

Resamples data to specified frequency with interpolation.

```python
from epftoolbox2.data.transformers import ResampleTransformer

transformer = ResampleTransformer(
    freq="1h",          # Required. Pandas frequency string ("1h", "15min", "1D")
    method="linear",    # Optional. Interpolation: "linear" | "ffill" | "bfill"
)
```

**Behavior:**
- Resamples to regular frequency using `asfreq()`
- Fills gaps using specified method
- Rounds values to 3 decimal places

---

## LagTransformer

Creates lagged features for time series. Positive lags look back (past), negative lags look forward (future forecasts).

```python
from epftoolbox2.data.transformers import LagTransformer

transformer = LagTransformer(
    columns=["load_actual", "price"],  # Required. Column(s) to lag
    lags=[1, 2, 7],                    # Required. Lag periods. Accepts list, range, or single int
    freq="day",                        # Required. Unit for lags
)
```

**Frequency options:**
- `"day"`, `"days"`, `"d"`, `"1D"` → 1 day
- `"hour"`, `"hours"`, `"h"`, `"1h"` → 1 hour
- `"minute"`, `"minutes"`, `"min"`, `"1min"` → 1 minute
- `"second"`, `"seconds"`, `"s"`, `"1s"` → 1 second

**Output column naming:** `{column}_{unit}-{lag}`
- Example with `freq="day"`, lag=1: `load_actual_d-1`
- Example with `freq="day"`, lag=-7: `load_actual_d+7` (forward look)
- Example with `freq="hour"`: `price_h-24`, `price_h-168`

**Note:** `range(-7, 1)` is accepted and serialized correctly to YAML (converted to list internally).

---

## TimezoneTransformer

Converts DataFrame index timezone.

```python
from epftoolbox2.data.transformers import TimezoneTransformer

transformer = TimezoneTransformer(
    target_tz="Europe/Warsaw",    # Required. Target timezone
)
```

**Behavior:**
- If index is timezone-naive, localizes to UTC first, then converts
- If index has timezone, converts directly

---

# VALIDATORS

All validators implement `Validator` base class with:
- `validate(df: pd.DataFrame) -> ValidationResult`

## ValidationResult

Dataclass returned by all validators:

```python
@dataclass
class ValidationResult:
    is_valid: bool = True           # Overall validity
    errors: List[str] = []          # Critical errors
    warnings: List[str] = []        # Non-critical warnings
    info: Dict[str, Any] = {}       # Additional metadata
    stats: pd.DataFrame = None      # Statistics (EdaValidator only)
```

## NullCheckValidator

Checks for null values in specified columns.

```python
from epftoolbox2.data.validators import NullCheckValidator

validator = NullCheckValidator(
    columns=["load_actual", "price"],  # Optional. Columns to check (None = all)
    allow_nulls=False,                 # Optional. If True, only checks column existence
)
```

**Errors:**
- Missing required columns
- Columns containing null values (if `allow_nulls=False`)

---

## ContinuityValidator

Checks for gaps in datetime index.

```python
from epftoolbox2.data.validators import ContinuityValidator

validator = ContinuityValidator(
    freq="1h",    # Optional. Expected frequency (default: "1h")
)
```

**Errors:**
- Non-DatetimeIndex
- Gaps larger than expected frequency

**Info:** `gaps` (list of gap details), `gap_count`, `expected_freq`

---

## EdaValidator

Generates exploratory data analysis statistics.

```python
from epftoolbox2.data.validators import EdaValidator

validator = EdaValidator(
    columns=None,    # Optional. Columns to analyze (None = all numeric)
)
```

**Prints rich table with:** column, dtype, count, null_count, null%, min, max, mean, std

**Stats DataFrame:** Contains all statistics for programmatic access.

---

# MODELS

All models extend `BaseModel` base class.

## Predictor Specification

Predictors can be specified in four ways:

```python
predictors = [
    # 1. Static column name
    "load_actual",

    # 2. Template with {horizon} placeholder (replaced with 1, 2, ..., horizon)
    "warsaw_temperature_2m_d+{horizon}",
    "is_monday_d+{horizon}",

    # 3. Callable receiving horizon (int 1-7)
    lambda h: f"weather_d+{h}",

    # 4. Callable receiving (horizon, hour) — different feature per hour of day
    lambda h, hour: f"load_h{hour:02d}_d+{h}",
]
```

**YAML serialization constraint:** Callables cannot be serialized to YAML. Use string templates (`"col_d+{horizon}"`) for pipelines intended to be saved with `ModelPipeline.save()` or `Workflow`.

## OLSModel

Ordinary Least Squares linear regression (sklearn LinearRegression).

```python
from epftoolbox2.models import OLSModel

model = OLSModel(
    predictors=predictors,       # Required. List of predictor specifications
    training_window=365,         # Optional. Days of training data (default: 365)
    name="OLS Baseline",         # Optional. Model display name
)
```

---

## LassoCVModel

Lasso regression with cross-validated regularization (sklearn LassoCV).

```python
from epftoolbox2.models import LassoCVModel

model = LassoCVModel(
    predictors=predictors,       # Required. List of predictor specifications
    training_window=365,         # Optional. Days of training data (default: 365)
    cv=5,                        # Optional. Cross-validation folds (default: 5)
    max_iter=10000,              # Optional. Max iterations (default: 10000)
    name="Lasso CV",             # Optional. Model display name
)
```

---

## Model Training Process

Uses `ProcessPoolExecutor` (one process per forecast day) + inner `ThreadPoolExecutor` per worker process. Worker state is module-level in `_worker.py`, populated once via pool initializer.

For each (hour, horizon, day) combination:
1. Extract training data (rolling window)
2. Scale features with StandardScaler (auto-skips binary features)
3. Fit model on scaled data
4. Predict and inverse-scale prediction
5. Store result with metadata

**Result record structure:**
```python
{
    "run_date": "2024-01-01",      # Date when prediction was made
    "target_date": "2024-01-08",   # Date being predicted
    "hour": 14,                     # Hour of day (0-23)
    "horizon": 7,                   # Days ahead (1-7)
    "day_in_test": 0,              # Day index in test period
    "prediction": 85.5,            # Predicted value
    "actual": 82.3,                # Actual value
    "coefficients": [...],         # Model coefficients
}
```

---

# EVALUATORS

All evaluators implement `Evaluator` base class with:
- `name: str` - Display name for reports
- `compute(df: pd.DataFrame) -> float` - Compute metric from prediction/actual columns

## MAEEvaluator

Mean Absolute Error.

```python
from epftoolbox2.evaluators import MAEEvaluator

evaluator = MAEEvaluator()
# evaluator.name = "MAE"
# evaluator.compute(df) = (df["prediction"] - df["actual"]).abs().mean()
```

---

# EXPORTERS

All exporters implement `Exporter` base class with:
- `export(report: EvaluationReport) -> None`

## TerminalExporter

Prints results to console with rich formatting.

```python
from epftoolbox2.exporters import TerminalExporter

exporter = TerminalExporter(
    show=["summary", "horizon"],  # Optional. Tables to display
)
```

**Show options:** `"summary"`, `"hour"`, `"horizon"`, `"hour_horizon"`, `"year"`, `"year_horizon"`

---

## ExcelExporter

Exports results to Excel with conditional formatting (green-yellow-red color scale).

```python
from epftoolbox2.exporters import ExcelExporter

exporter = ExcelExporter(
    path="results.xlsx",                                    # Required. Output path
    sheets=["summary", "hour", "horizon", "hour_horizon"],  # Optional. Sheets to include
)
```

**Sheet options:** `"summary"`, `"hour"`, `"horizon"`, `"hour_horizon"`, `"year"`, `"year_horizon"`

---

# EVALUATION REPORT

Returned by `ModelPipeline.run()`.

```python
report = pipeline.run(...)

# Available methods:
report.summary()           # DataFrame: model × metrics
report.by_hour()           # DataFrame: model × hour × metrics
report.by_horizon()        # DataFrame: model × horizon × metrics
report.by_hour_horizon()   # DataFrame: model × hour × horizon × metrics
report.by_year()           # DataFrame: model × year × metrics
report.by_year_horizon()   # DataFrame: model × year × horizon × metrics
```

---

# YAML SERIALIZATION

Both pipelines support `save(path)` / `load(path)`. Components are registered in `COMPONENT_REGISTRY` dicts.

## DataPipeline YAML

```python
pipeline.save("data_pipeline.yaml")
pipeline = DataPipeline.load("data_pipeline.yaml")
```

```yaml
sources:
- class: EntsoeSource
  params:
    country_code: PL
    api_key: YOUR_KEY
    type: [load, price]
- class: CalendarSource
  params:
    country: PL
    holidays: binary
    weekday: onehot
    daylight: true
transformers:
- class: LagTransformer
  params:
    columns: [load_actual, price]
    lags: [1, 2, 7]
    freq: day
validators:
- class: NullCheckValidator
  params:
    columns: [load_actual, price]
```

## ModelPipeline YAML

```python
pipeline.save("model_pipeline.yaml")
pipeline = ModelPipeline.load("model_pipeline.yaml")
```

```yaml
models:
- class: OLSModel
  params:
    predictors: [load_actual, "is_monday_d+{horizon}", "price_d-1"]
    training_window: 365
    name: OLS
- class: LassoCVModel
  params:
    predictors: *id001   # YAML alias if same list object — valid, loads correctly
    training_window: 365
    cv: 5
    name: LassoCV
evaluators:
- class: MAEEvaluator
  params: {}
exporters:
- class: TerminalExporter
  params:
    show: [summary, horizon]
- class: ExcelExporter
  params:
    path: results.xlsx
```

**Serialization rules:**
- Only primitive attributes are serialized: `str`, `int`, `float`, `bool`, `list`, `dict`, `None`, `Path` (→ str)
- Non-serializable public attributes excluded: `session`, `console`, `logger`, `lat`, `lon`
- Private attributes (prefixed `_`) are always excluded
- `range` lags are converted to `list` at construction time — serialize correctly
- Callable predictors raise `ValueError` at `save()` time — use `"{horizon}"` string templates instead

---

# WORKFLOW

`Workflow` orchestrates both pipelines from a single YAML file. Designed for cluster/supercomputer environments where experiment config, environment setup, and pipeline paths all live in one file.

```python
from epftoolbox2.pipelines import Workflow

# Build and save (once, on your laptop)
Workflow(
    data_pipeline="data_pipeline.yaml",
    model_pipeline="model_pipeline.yaml",
    data_start="2023-05-01",
    data_end="2024-08-01",
    data_cache=True,
    model_test_start="2024-06-01",
    model_test_end="2024-07-01",
    model_target="price",
    model_horizon=7,
    max_processes=32,
    threads_per_process=8,
    cache_path="/tmp/scratch/$SLURM_JOB_ID/cache",
).save("experiment.yaml")

# Load and run (cluster job)
report = Workflow.load("experiment.yaml").run()
```

## Workflow YAML Format

```yaml
environment:
  max_processes: 32
  threads_per_process: 8
  cache_path: /tmp/scratch/$SLURM_JOB_ID/cache   # $VAR expanded at runtime
  model_index: $SLURM_ARRAY_TASK_ID              # optional; runs all models if omitted

data_pipeline:
  path: data_pipeline.yaml          # relative to this file's directory
  start: "2023-05-01"
  end: "2024-08-01"
  cache: true

model_pipeline:
  path: model_pipeline.yaml
  test_start: "2024-06-01"
  test_end: "2024-07-01"
  target: price
  horizon: 7
  forecast_only: false              # optional; default false
```

**Forecast-only workflow (runs daily without changing dates):**
```yaml
data_pipeline:
  start: now_d-730      # 2 years of history; adjust to match largest training_window
  end: today_d+7        # fetch weather/calendar forecast window too
  cache: true

model_pipeline:
  test_start: today
  test_end: today
  target: price
  horizon: 7
  forecast_only: true
```

## Inline Component Overrides

Pipeline sections (`sources`, `transformers`, `validators`, `models`, `evaluators`, `exporters`) can be specified inline in the workflow YAML, overriding the corresponding section from the referenced `path`. If no `path` is given, the pipeline is built entirely from inline sections.

```yaml
data_pipeline:
  path: data_pipeline.yaml
  start: "2023-05-01"
  end: "2024-08-01"
  cache: true
  transformers:                    # overrides transformers from data_pipeline.yaml
    - class: LagTransformer
      params:
        columns: [load_actual, price]
        lags: [-7, -6, -5, -4, -3, -2, -1, 0]
        freq: day

model_pipeline:
  path: model_pipeline.yaml
  test_start: "2024-06-01"
  test_end: "2024-07-01"
  models:                          # overrides models from model_pipeline.yaml
    - class: OLSModel
      params:
        predictors: [load_actual, price_d-1]
        training_window: 180
        name: OLS_short
```

## Environment Behavior

`Workflow.run()` always sets before spawning any workers:
- `OMP_NUM_THREADS=1` (prevents BLAS oversubscription)
- `PYTHON_GIL=0` (enables free-threading on Python 3.13t+)
- `MAX_PROCESSES` and `THREADS_PER_PROCESS` from config (if provided)

Pipeline paths in the workflow YAML are resolved relative to the workflow YAML's directory. `$VAR` in paths is expanded at runtime via `os.path.expandvars()`.

---

# CACHING

## Source Caching (DataPipeline)

```python
# Enable automatic caching (stored in .cache/sources/)
df = pipeline.run(start, end, cache=True)

# Custom cache file (single CSV file)
df = pipeline.run(start, end, cache="custom_cache.csv")

# No caching
df = pipeline.run(start, end, cache=False)
```

**Cache behavior:**
- Each source gets unique cache key based on its configuration
- Only fetches missing date ranges from API
- Merges cached and fresh data automatically

## Result Caching (ModelPipeline)

```python
# Enable incremental training (resumable)
report = pipeline.run(..., save_dir="results")

# Results stored as JSONL files: results/{model_name}.jsonl
# On re-run, only missing (hour, horizon, day) combinations are computed
```

---

# COMPLETE EXAMPLES

## Example 1: Data Pipeline Only

```python
import os
from epftoolbox2.pipelines import DataPipeline
from epftoolbox2.data.sources import EntsoeSource, OpenMeteoSource, CalendarSource
from epftoolbox2.data.transformers import ResampleTransformer, LagTransformer
from epftoolbox2.data.validators import NullCheckValidator

ENTSOE_API_KEY = os.environ.get("ENTSOE_API_KEY")

pipeline = (
    DataPipeline()
    .add_source(EntsoeSource(country_code="PL", api_key=ENTSOE_API_KEY, type=["load", "price"]))
    .add_source(OpenMeteoSource(latitude=52.2297, longitude=21.0122, horizon=7, prefix="warsaw"))
    .add_source(CalendarSource(country="PL", holidays="binary", weekday="onehot", daylight=True))
    .add_transformer(ResampleTransformer(freq="1h"))
    .add_transformer(LagTransformer(columns=["load_actual"], lags=[1, 2, 7], freq="day"))
    .add_validator(NullCheckValidator(columns=["load_actual", "price"]))
)

df = pipeline.run(start="2023-01-01", end="2024-04-01", cache=True)
df.to_csv("data.csv")
```

---

## Example 2: Model Pipeline Only

```python
import os
os.environ["THREADS_PER_PROCESS"] = "16"
os.environ["MAX_PROCESSES"] = "4"
os.environ["OMP_NUM_THREADS"] = "1"

import pandas as pd
from epftoolbox2.pipelines import ModelPipeline
from epftoolbox2.models import OLSModel, LassoCVModel
from epftoolbox2.evaluators import MAEEvaluator
from epftoolbox2.exporters import ExcelExporter, TerminalExporter

df = pd.read_csv("data.csv", index_col=0, parse_dates=True)

seasonal_indicators = [
    "is_monday_d+{horizon}",
    "is_tuesday_d+{horizon}",
    "is_wednesday_d+{horizon}",
    "is_thursday_d+{horizon}",
    "is_friday_d+{horizon}",
    "is_saturday_d+{horizon}",
    "is_sunday_d+{horizon}",
    "is_holiday_d+{horizon}",
    "daylight_hours_d+{horizon}",
]

predictors = [
    "load_actual",
    *seasonal_indicators,
    "load_actual_d-1",
    "load_actual_d-7",
    "warsaw_temperature_2m_d+{horizon}",
]

pipeline = (
    ModelPipeline()
    .add_model(OLSModel(predictors=predictors, training_window=365, name="OLS Baseline"))
    .add_model(LassoCVModel(predictors=predictors, training_window=365, cv=5, name="Lasso CV"))
    .add_evaluator(MAEEvaluator())
    .add_exporter(TerminalExporter())
    .add_exporter(ExcelExporter("results.xlsx"))
)

report = pipeline.run(
    data=df,
    test_start="2024-02-01",
    test_end="2024-03-01",
    target="price",
    horizon=7,
    save_dir="results",
)

print(report.summary())
print(report.by_horizon())
```

---

## Example 3: Full Workflow (Code)

```python
import os
os.environ["PYTHON_GIL"] = "0"
os.environ["THREADS_PER_PROCESS"] = "16"
os.environ["MAX_PROCESSES"] = "4"
os.environ["OMP_NUM_THREADS"] = "1"

from epftoolbox2.pipelines import DataPipeline, ModelPipeline
from epftoolbox2.data.sources import EntsoeSource, OpenMeteoSource, CalendarSource
from epftoolbox2.data.transformers import ResampleTransformer, LagTransformer
from epftoolbox2.data.validators import NullCheckValidator
from epftoolbox2.models import OLSModel, LassoCVModel
from epftoolbox2.evaluators import MAEEvaluator
from epftoolbox2.exporters import ExcelExporter, TerminalExporter

ENTSOE_API_KEY = os.environ.get("ENTSOE_API_KEY")

data_pipeline = (
    DataPipeline()
    .add_source(EntsoeSource(country_code="PL", api_key=ENTSOE_API_KEY, type=["load", "price"]))
    .add_source(OpenMeteoSource(latitude=52.2297, longitude=21.0122, horizon=7, prefix="warsaw"))
    .add_source(CalendarSource(country="PL", holidays="binary", weekday="onehot", daylight=True))
    .add_transformer(ResampleTransformer(freq="1h"))
    .add_transformer(LagTransformer(columns=["load_actual", "price"], lags=[1, 2, 7], freq="day"))
    .add_validator(NullCheckValidator(columns=["load_actual", "price"]))
)

df = data_pipeline.run(start="2023-05-01", end="2024-07-01", cache=True)

seasonal_indicators = [
    "is_monday_d+{horizon}",
    "is_tuesday_d+{horizon}",
    "is_wednesday_d+{horizon}",
    "is_thursday_d+{horizon}",
    "is_friday_d+{horizon}",
    "is_saturday_d+{horizon}",
    "is_sunday_d+{horizon}",
    "is_holiday_d+{horizon}",
    "daylight_hours_d+{horizon}",
]

predictors = [
    "load_actual",
    *seasonal_indicators,
    "load_actual_d-1",
    "load_actual_d-2",
    "price_d-1",
    "price_d-2",
    lambda h: f"warsaw_temperature_2m_d+{h}",
]

model_pipeline = (
    ModelPipeline()
    .add_model(OLSModel(predictors=predictors, training_window=365, name="OLS"))
    .add_model(LassoCVModel(predictors=predictors, training_window=365, cv=5, name="LassoCV"))
    .add_evaluator(MAEEvaluator())
    .add_exporter(TerminalExporter())
    .add_exporter(ExcelExporter("results.xlsx"))
)

report = model_pipeline.run(
    data=df,
    test_start="2024-06-01",
    test_end="2024-06-30",
    target="price",
    horizon=7,
    save_dir="results",
)

print(report.summary())
print(report.by_horizon())
```

---

## Example 4: Workflow (YAML-based)

```python
import os
os.environ["PYTHON_GIL"] = "0"

from epftoolbox2.pipelines import DataPipeline, ModelPipeline, Workflow
from epftoolbox2.data.sources import EntsoeSource, OpenMeteoSource, CalendarSource
from epftoolbox2.data.transformers import ResampleTransformer, LagTransformer
from epftoolbox2.data.validators import NullCheckValidator
from epftoolbox2.models import OLSModel, LassoCVModel
from epftoolbox2.evaluators import MAEEvaluator
from epftoolbox2.exporters import ExcelExporter, TerminalExporter

ENTSOE_API_KEY = os.environ.get("ENTSOE_API_KEY")

# Phase 1: build and save config (run once)
predictors = [
    "load_actual",
    "is_monday_d+{horizon}", "is_tuesday_d+{horizon}", "is_wednesday_d+{horizon}",
    "is_thursday_d+{horizon}", "is_friday_d+{horizon}", "is_saturday_d+{horizon}",
    "is_sunday_d+{horizon}", "is_holiday_d+{horizon}", "daylight_hours_d+{horizon}",
    "load_actual_d-1", "load_actual_d-2", "price_d-1", "price_d-2",
    "warsaw_temperature_2m_d+{horizon}",
]

(
    DataPipeline()
    .add_source(EntsoeSource(country_code="PL", api_key=ENTSOE_API_KEY, type=["load", "price"]))
    .add_source(OpenMeteoSource(latitude=52.2297, longitude=21.0122, horizon=7, prefix="warsaw"))
    .add_source(CalendarSource(country="PL", holidays="binary", daylight=True, weekday="onehot"))
    .add_transformer(ResampleTransformer(freq="1h"))
    .add_transformer(LagTransformer(columns=["load_actual", "price"], lags=[1, 2, 7], freq="day"))
    .add_transformer(LagTransformer(lags=range(-7, 1), freq="day", columns=["is_monday", "daylight_hours", "load_forecast", "is_holiday"]))
    .add_validator(NullCheckValidator(columns=["load_actual", "price"]))
    .save("data_pipeline.yaml")
)

(
    ModelPipeline()
    .add_model(OLSModel(predictors=predictors, training_window=365, name="OLS"))
    .add_model(LassoCVModel(predictors=predictors, training_window=365, cv=5, name="LassoCV"))
    .add_evaluator(MAEEvaluator())
    .add_exporter(TerminalExporter())
    .add_exporter(ExcelExporter("workflow_results.xlsx"))
    .save("model_pipeline.yaml")
)

Workflow(
    data_pipeline="data_pipeline.yaml",
    model_pipeline="model_pipeline.yaml",
    data_start="2023-05-01",
    data_end="2024-08-01",
    data_cache=True,
    model_test_start="2024-06-01",
    model_test_end="2024-07-01",
    model_target="price",
    model_horizon=7,
    max_processes=4,
    threads_per_process=8,
).save("experiment.yaml")

# Phase 2: load and run (cluster job — only this line needed)
if __name__ == "__main__":
    report = Workflow.load("experiment.yaml").run()
    print(report.summary())
    print(report.by_horizon())
```

---

## Example 5: Forecast-Only (Operational Daily Forecast)

```python
import os
os.environ["THREADS_PER_PROCESS"] = "8"
os.environ["MAX_PROCESSES"] = "2"

from epftoolbox2.pipelines import DataPipeline, ModelPipeline
from epftoolbox2.data.sources import EntsoeSource, OpenMeteoSource, CalendarSource
from epftoolbox2.data.transformers import TimezoneTransformer, ResampleTransformer, LagTransformer
from epftoolbox2.models import OLSModel, LassoCVModel

ENTSOE_API_KEY = os.environ.get("ENTSOE_API_KEY")

if __name__ == "__main__":
    # Date keywords resolve at runtime — run this script every day without changing anything.
    # "now_d-730" → 2 years ago; "today_d+7" → 7 days from now.
    # Adjust the -N offset to cover your largest model's training_window.
    df = (
        DataPipeline()
        .add_source(EntsoeSource(country_code="PL", api_key=ENTSOE_API_KEY, type=["load", "price"]))
        .add_source(OpenMeteoSource(latitude=52.2297, longitude=21.0122, horizon=7, prefix="warsaw"))
        .add_source(CalendarSource(country="PL", holidays="binary", weekday="onehot", daylight=True))
        .add_transformer(TimezoneTransformer(target_tz="Europe/Warsaw"))
        .add_transformer(ResampleTransformer(freq="1h"))
        .add_transformer(LagTransformer(columns=["load_actual", "price"], lags=[1, 2, 7], freq="day"))
        .add_transformer(LagTransformer(
            lags=range(-7, 1), freq="day",
            columns=["is_monday", "is_tuesday", "is_wednesday", "is_thursday",
                     "is_friday", "is_saturday", "is_sunday", "is_holiday",
                     "daylight_hours", "load_forecast"],
        ))
        .run(start="now_d-730", end="today_d+7", cache=True)
    )

    predictors = [
        "load_actual",
        "is_monday_d+{horizon}", "is_tuesday_d+{horizon}", "is_wednesday_d+{horizon}",
        "is_thursday_d+{horizon}", "is_friday_d+{horizon}", "is_saturday_d+{horizon}",
        "is_sunday_d+{horizon}", "is_holiday_d+{horizon}", "daylight_hours_d+{horizon}",
        "load_actual_d-1", "load_actual_d-2", "price_d-1", "price_d-2",
        "warsaw_temperature_2m_d+{horizon}",
    ]

    report = (
        ModelPipeline()
        .add_model(OLSModel(predictors=predictors, training_window=365, name="OLS"))
        .add_model(LassoCVModel(predictors=predictors, training_window=365, cv=7, name="LassoCV"))
        .run(
            data=df,
            test_start="today",   # resolve_date("today") → today's ISO date at runtime
            test_end="today",
            target="price",
            horizon=7,
            forecast_only=True,   # skips evaluators and exporters
        )
    )

    # 168 rows per model (24h × 7 horizons), sorted by model → horizon → hour
    forecast = report.predictions()
    print(forecast)
```

---

# API REFERENCE

## DataPipeline

```python
class DataPipeline:
    def __init__(
        self,
        sources: Optional[List[DataSource]] = None,
        transformers: Optional[List[Transformer]] = None,
        validators: Optional[List[Validator]] = None,
    ): ...

    def add_source(self, source: DataSource) -> "DataPipeline": ...
    def add_transformer(self, transformer: Transformer) -> "DataPipeline": ...
    def add_validator(self, validator: Validator) -> "DataPipeline": ...

    def run(
        self,
        start: Union[str, pd.Timestamp],  # "YYYY-MM-DD", Timestamp, or keyword: "today", "now_d-730"
        end: Union[str, pd.Timestamp],    # Same. Use "today_d+7" to include weather forecast window.
        cache: Union[bool, str] = False,  # True, False, or path string
    ) -> pd.DataFrame: ...

    def save(self, path: Union[str, Path]) -> None: ...   # Save to YAML

    @classmethod
    def load(cls, path: Union[str, Path]) -> "DataPipeline": ...  # Load from YAML
```

## ModelPipeline

```python
class ModelPipeline:
    def __init__(self): ...

    def add_model(self, model: BaseModel) -> "ModelPipeline": ...
    def add_evaluator(self, evaluator: Evaluator) -> "ModelPipeline": ...
    def add_exporter(self, exporter: Exporter) -> "ModelPipeline": ...

    def run(
        self,
        data: pd.DataFrame,              # DataFrame with DatetimeIndex
        test_start: str,                  # Test period start ("YYYY-MM-DD" or "today", "now_d-7")
        test_end: str,                    # Test period end
        target: str = "price",            # Target column name
        horizon: int = 7,                 # Max forecast horizon (days)
        save_dir: Optional[str] = None,   # Directory for incremental results
        forecast_only: bool = False,      # Skip evaluators and exporters; use report.predictions()
    ) -> EvaluationReport: ...

    def save(self, path: Union[str, Path]) -> None: ...   # Save to YAML

    @classmethod
    def load(cls, path: Union[str, Path]) -> "ModelPipeline": ...  # Load from YAML
```

## Workflow

```python
class Workflow:
    def __init__(
        self,
        data_start: str,
        data_end: str,
        model_test_start: str,
        model_test_end: str,
        data_pipeline: Optional[Union[str, Path]] = None,   # Path to data_pipeline.yaml
        model_pipeline: Optional[Union[str, Path]] = None,  # Path to model_pipeline.yaml
        data_cache: Union[bool, str] = False,
        model_target: str = "price",
        model_horizon: int = 7,
        model_forecast_only: bool = False,                  # Skip evaluators/exporters; use report.predictions()
        max_processes: Optional[int] = None,
        threads_per_process: Optional[int] = None,
        cache_path: Optional[str] = None,                   # Overrides data_cache if set
        model_index: Optional[int] = None,                  # Run only models[model_index]; all models if None
    ): ...

    def run(self) -> EvaluationReport: ...

    def save(self, path: Union[str, Path]) -> None: ...

    @classmethod
    def load(cls, path: Union[str, Path]) -> "Workflow": ...
```

---

## EvaluationReport

```python
class EvaluationReport:
    def summary(self) -> pd.DataFrame: ...
    def by_hour(self) -> pd.DataFrame: ...
    def by_horizon(self) -> pd.DataFrame: ...
    def by_hour_horizon(self) -> pd.DataFrame: ...
    def by_year(self) -> pd.DataFrame: ...
    def by_year_horizon(self) -> pd.DataFrame: ...
    def predictions(self) -> pd.DataFrame: ...
    # Returns tidy DataFrame sorted by model→horizon→hour.
    # Columns: run_date, target_date, hour, horizon, prediction, model
    # No "actual" column — use for forecast_only=True runs.
    def iter_details(self) -> Iterator[Tuple[str, pd.DataFrame]]: ...
```

---

# ENVIRONMENT VARIABLES

Must be set **before** any library imports (especially before numpy/sklearn are imported).

```python
import os
os.environ["PYTHON_GIL"] = "0"            # Enable free-threading (Python 3.13t+, or -Xgil=0)
os.environ["MAX_PROCESSES"] = "4"         # Worker processes (default: cpu_count // THREADS_PER_PROCESS)
os.environ["THREADS_PER_PROCESS"] = "16"  # Threads per worker process (alias: MAX_THREADS)
os.environ["OMP_NUM_THREADS"] = "1"       # Prevent BLAS oversubscription (always recommended)
```

`Workflow.run()` always sets `OMP_NUM_THREADS=1`, `MKL_NUM_THREADS=1`, `OPENBLAS_NUM_THREADS=1`, `BLAS_NUM_THREADS=1`, and `PYTHON_GIL=0` automatically, and applies `MAX_PROCESSES`/`THREADS_PER_PROCESS` from its config. `model_index` is expanded from env vars at `load()` time (e.g. `$SLURM_ARRAY_TASK_ID`) and selects a single model by index — all models run if omitted. For code-only usage (no Workflow), set thread vars manually before imports.

---

# SUPPORTED COUNTRIES

## ENTSOE Countries
AL, AT, BA, BE, BG, BY, CH, CZ, DE, DE_50HZ, DE_AMPRION, DE_AT_LU, DE_LU, DE_TENNET, DE_TRANSNET, DK, DK_1, DK_2, EE, ES, FI, FR, GB, GB_ELECLINK, GB_IFA, GB_IFA2, GB_NIR, GE, GR, HR, HU, IE, IE_SEM, IS, IT, IT_* (many regions), LT, LU, LV, MD, ME, MK, MT, NL, NO, NO_1-5, PL, PT, RO, RS, RU, SE, SE_1-4, SI, SK, TR, UA, UK, XK

## CalendarSource Countries (Holidays)
AT, BE, BG, CH, CZ, DE, DK, ES, FI, FR, GB, GR, HU, IE, IT, NL, NO, PL, PT, RO, SE, SK

---

# COMMON PITFALLS

## NaN Values in Predictors
Models will fail with `ValueError: Input X contains NaN` if predictors have null values. Solutions:
1. Ensure sufficient lag data exists (check training window)
2. Choose predictors that exist for all rows
3. Use `NullCheckValidator` to catch issues early

## Callable Predictors and YAML
Callable predictors cannot be serialized to YAML. Use `"{horizon}"` string templates instead:
```python
# Instead of: lambda h: f"warsaw_temperature_2m_d+{h}"
# Use:        "warsaw_temperature_2m_d+{horizon}"
```

## Timezone Handling
- All internal processing uses UTC
- Sources convert to UTC automatically
- Use TimezoneTransformer if you need local time features

## Column Naming
- Lag columns: `{column}_{unit}-{lag}` (e.g., `load_actual_d-1`)
- Forward lags (negative): `{column}_{unit}+{abs(lag)}` (e.g., `load_actual_d+7`)
- Weather forecasts: `{prefix}_{variable}_d+{horizon}` (e.g., `warsaw_temperature_2m_d+1`)
- Predictor templates use `{horizon}` placeholder, replaced with 1, 2, ..., horizon

## Date Keywords
`DataPipeline.run()`, `ModelPipeline.run()`, `BaseModel.run()`, and `Workflow` all accept date keywords:
- `"today"` / `"now"` → today's local date (`datetime.date.today()`)
- `"today_d+7"` / `"now_d+7"` → today + 7 days
- `"now_d-730"` → today − 730 days
- The offset `N` in `_d±N` is any integer — use any value to match your training window.
- Normal ISO date strings (`"2024-01-01"`) pass through unchanged.

Implemented in `epftoolbox2/_date_utils.py:resolve_date()`.

## Windows Multiprocessing
Windows uses `spawn` context (not `fork`). Scripts must use `if __name__ == '__main__':` guard.

## Memory with Large Datasets
- Use `cache=True` to avoid re-downloading
- Use `save_dir` for incremental model training
- Set `THREADS_PER_PROCESS` and `MAX_PROCESSES` appropriately for your system

---

# EXTENDING THE LIBRARY

## Custom DataSource

```python
from epftoolbox2.data.sources.base import DataSource

class MySource(DataSource):
    def __init__(self, my_param: str):
        self.my_param = my_param  # Store constructor params as-is (no derived public attrs)
        self._validate_config()

    def _validate_config(self) -> bool:
        return True

    def fetch(self, start: pd.Timestamp, end: pd.Timestamp) -> pd.DataFrame:
        # Return DataFrame with DatetimeIndex (UTC)
        ...

    def get_cache_config(self) -> Optional[dict]:
        return {"source_type": "my_source", "param": self.my_param}
```

Register in `COMPONENT_REGISTRY["sources"]` in `data_pipeline.py` for YAML support.

## Custom Transformer

```python
from epftoolbox2.data.transformers.base import Transformer

class MyTransformer(Transformer):
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        ...
```

Register in `COMPONENT_REGISTRY["transformers"]` in `data_pipeline.py`.

## Custom Validator

```python
from epftoolbox2.data.validators.base import Validator
from epftoolbox2.data.validators.result import ValidationResult

class MyValidator(Validator):
    def validate(self, df: pd.DataFrame) -> ValidationResult:
        result = ValidationResult()
        return result
```

Register in `COMPONENT_REGISTRY["validators"]` in `data_pipeline.py`.

## Custom Model

```python
from epftoolbox2.models.base import BaseModel

class MyModel(BaseModel):
    def _fit_predict(self, train_x, train_y, test_x) -> Tuple[float, list]:
        # Fit model, return (prediction, coefficients)
        ...
```

Register in `COMPONENT_REGISTRY["models"]` in `model_pipeline.py`.

## Custom Evaluator

```python
from epftoolbox2.evaluators.base import Evaluator

class MyEvaluator(Evaluator):
    name = "MyMetric"

    def compute(self, df: pd.DataFrame) -> float:
        return ...
```

Register in `COMPONENT_REGISTRY["evaluators"]` in `model_pipeline.py`.

## Custom Exporter

```python
from epftoolbox2.exporters.base import Exporter

class MyExporter(Exporter):
    def export(self, report: EvaluationReport) -> None:
        ...
```

Register in `COMPONENT_REGISTRY["exporters"]` in `model_pipeline.py`.

---

# CLI

Installed automatically with the package. Available as `epf` on PATH after `pip install epftoolbox2`.

## Commands

### `epf` (no arguments)
Launches an interactive wizard. Arrow-key menus + text prompts — no YAML required.

**Main menu options:**
- Run experiment / data pipeline / model pipeline from an existing YAML
- Build & run data pipeline interactively (sources → transformers → validators → dates → output CSV → save YAML → run)
- Build & run model pipeline interactively (CSV input + column detection → models → evaluators → exporters → test period → save YAML → run)

### `epf run <experiment.yaml>`
Run a full Workflow YAML.

```bash
epf run experiment.yaml
epf run experiment.yaml --model-index $SLURM_ARRAY_TASK_ID   # SLURM array
epf run experiment.yaml --processes 4 --threads 8
epf run experiment.yaml --dry-run   # validate config only
```

Flags: `--model-index N`, `--processes N`, `--threads N`, `--dry-run`

### `epf data <data_pipeline.yaml>`
Run a DataPipeline and save result to CSV.

```bash
epf data data_pipeline.yaml --start 2022-01-01 --end 2024-01-01 --output data.csv
epf data data_pipeline.yaml --start 2022-01-01 --end 2024-01-01 --output data.csv --cache .cache/
```

Required flags: `--start DATE`, `--end DATE`, `--output FILE`
Optional flags: `--cache PATH`

### `epf model <model_pipeline.yaml>`
Run a ModelPipeline on an existing CSV.

```bash
epf model model_pipeline.yaml \
    --data data.csv \
    --test-start 2023-01-01 \
    --test-end 2024-01-01 \
    --target price \
    --horizon 7 \
    --save-dir results/ \
    --forecast-only
```

Required flags: `--data FILE`, `--test-start DATE`, `--test-end DATE`
Optional flags: `--target COL` (default: price), `--horizon N` (default: 7), `--save-dir DIR`, `--forecast-only`

### `epf validate <file.yaml>`
Auto-detect YAML type (Workflow / DataPipeline / ModelPipeline), load, and instantiate all components. Exits 1 on error.

```bash
epf validate data_pipeline.yaml
epf validate experiment.yaml
```

### `epf init`
Scaffold template `experiment.yaml`, `data_pipeline.yaml`, `model_pipeline.yaml` in the current directory.

```bash
epf init
epf init --force   # overwrite existing files
```

## Typical split workflow

```bash
# Step 1: fetch data once, cache it
epf data data_pipeline.yaml --start 2020-01-01 --end 2024-01-01 --output data.csv

# Step 2: iterate on model config
epf model model_pipeline.yaml \
    --data data.csv \
    --test-start 2023-01-01 \
    --test-end 2024-01-01
```
