Metadata-Version: 2.4
Name: lca-data-provider
Version: 0.1.0
Summary: Python library and CLI for loading and serving LCA data sources
Requires-Python: >=3.12
Requires-Dist: alembic>=1.18.4
Requires-Dist: asyncpg>=0.31.0
Requires-Dist: openpyxl>=3.1.5
Requires-Dist: pandas>=3.0.3
Requires-Dist: pydantic-settings>=2.14.1
Requires-Dist: pydantic>=2.13.4
Requires-Dist: redis>=8.0.0
Requires-Dist: sqlalchemy[asyncio]>=2.0.51
Requires-Dist: typer>=0.26.7
Description-Content-Type: text/markdown

# LCA Data Provider - Database & Infrastructure

This module manages the storage, persistence, and lifecycle of the databases used by the centralized Life Cycle Assessment (LCA) Emission Factors library.

The architecture implements a **dynamic multi-tenant pattern**, allowing the library to connect to multiple separate databases isolated on demand without modifying or rebuilding the core source code.

---

## ⚠️ CRITICAL WARNING: PRODUCTION ENVIRONMENT ONLY ⚠️

Currently, the **ONLY** deployed environment is **Production**. 

Therefore, the database and Redis credentials available in **IdeCloud** belong strictly to the production environment. **Any action** executed locally using these credentials (such as running Alembic migrations or executing data ingestions) will interact directly with live data and **will immediately affect all applications** and microservices consuming this library.

If you need a safe testing, staging, or local development environment to run tests or new ingestions, **do not use the IdeCloud credentials**. Instead, please contact the **DevOps or IT team** to request a dedicated sandbox environment.

If you have any questions about the library, please feel free to contact **sergio.perezmartin@idener.ai**.

---

## Supported Databases & Versions

The following table tracks the Life Cycle Assessment (LCA) databases currently supported by the ingestion pipeline and their actively managed versions.

| Provider / Database | Supported Versions | Description |
| :--- | :--- | :--- |
| **BAFU** | `2025` | Swiss Federal Office for the Environment (FOEN) emission factors. |

---

## Querying Emission Factors (Service Layer)

External microservices consume this library by importing the `EmissionFactorService` singleton. The service implements a **cache-first strategy**: Redis is always attempted first. On cache miss (e.g. after a Redis container restart), all records for that provider and version are fetched from PostgreSQL in a single query, the full cache is re-warmed automatically, and the requested data is returned transparently.

### Installation

Add the library as a dependency in your microservice and import directly:

```python
from lca_provider import emission_factor_service, EmissionFactorCategory, ProviderEmissionFactorDTO
```

### Available Methods

#### `get_by_category` — Query a single category

```python
dtos: list[ProviderEmissionFactorDTO] = await emission_factor_service.get_by_category(
    provider="bafu",
    version="2025",
    category=EmissionFactorCategory.material,
    name="concrete",       # optional: case-insensitive substring filter
    geography="CH",        # optional: exact match filter
)
```

#### `get_all` — Query all categories at once

```python
dtos: list[ProviderEmissionFactorDTO] = await emission_factor_service.get_all(
    provider="bafu",
    version="2025",
    name="concrete",       # optional
    geography="CH",        # optional
)
```

#### `get_by_external_id` — Lookup a single record by UUID

Always queries PostgreSQL directly — does not use the Redis cache.

```python
dto: ProviderEmissionFactorDTO | None = await emission_factor_service.get_by_external_id(
    provider="bafu",
    version="2025",
    external_id=uuid.UUID("..."),
)
```

### Standard Categories (`EmissionFactorCategory`)

| Value | Description |
| :--- | :--- |
| `material` | Raw and processed materials |
| `auxiliar_material` | Auxiliary/secondary materials |
| `energy_consumption` | Energy sources and consumption |
| `water_consumption` | Water usage |
| `air_emissions` | Direct air emissions |
| `direct_emissions` | Other direct emissions |
| `transport` | Transportation and logistics |
| `consumable` | Consumable items |
| `waste_treatment` | Waste processing and disposal |
| `manufacturing` | Manufacturing processes |
| `other` | Uncategorised factors |

### Redis Cache Architecture (Pre-warming)

To ensure ultra-low latency for consuming applications, the ETL pipeline implements a cache pre-warming strategy. Upon successful ingestion into PostgreSQL, the data is automatically mapped to standard DTOs and pushed to Redis under two key types:

**Key Nomenclature:**

`ef:<provider_name>:<version>:category:<standard_category>` — per-category key

`ef:<provider_name>:<version>:all` — flat list of all factors across all categories

**Examples:**
* `ef:bafu:2025:category:material`
* `ef:bafu:2025:category:energy_consumption`
* `ef:bafu:2025:all`

**Payload Structure:**
The value stored under each key is a stringified JSON array of `ProviderEmissionFactorDTO` objects.

**Fallback behaviour:**
If Redis is unavailable or empty (e.g. after a container restart), the service fetches all records for the requested provider and version from PostgreSQL in a single query, re-warms all category keys and the `all` key, and returns the data to the caller. This is transparent to the consuming microservice.

---

## Data Ingestion Command (Admin Only)

The ingestion CLI is an **administrative tool** intended exclusively for data engineers working directly in this repository. It is **not exposed as an installable script** in the distributed package, so there is no `lca-cli` command available in consuming microservices. However, the `lca_provider.cli.main` module is still part of the distributed package and can technically be invoked via `python -m lca_provider.cli.main` by anyone with access to the package and the required credentials. There is no code-level restriction — the protection is purely operational.

To run an ingestion, execute from the repository root with the virtual environment active:

```bash
python -m lca_provider.cli.main ingest <source> <filename> --version <supported_version> --mode <clean|upsert> [--log-level <DEBUG|INFO|WARNING|ERROR>]
```

### Parameters

- `source`: target source/database identifier (example: `bafu`).
- `filename`: input file name located in `lca_provider/ingestors/resources`.
- `--version` (required): dataset version to ingest (must match a supported version for the provider, example: `2025`).
- `--mode` (optional, default: `clean`):
  - `clean`: deletes existing rows for that version, then inserts all rows from the file.
  - `upsert`: updates existing rows by `provider_code` and inserts missing ones.
- `--log-level` (optional, default: `INFO`): controls runtime verbosity.

### File Location

Place ingestion files in:

```text
lca_provider/ingestors/resources/
```

BAFU example:

```bash
python -m lca_provider.cli.main ingest bafu BAFU_2025.xlsx --version 2025 --mode clean
```

### Failed Rows Report

If invalid rows are detected, a report is generated in the same resources folder with this format:

```text
bafu_ingestion_failures_YYYYMMDD_HHMMSS.txt
```

The report includes Excel row number, source product, and failure reason for each skipped row.

---

## Database Migration Workflow (Alembic)

This project uses Alembic with dynamic database selection. The target database is selected at runtime with the flag `-x db=<database_name>`.

### Prerequisites

Ensure the following environment variables are defined before running Alembic:

```dotenv
LCA_DATA_PROVIDER_DB_HOST=<host>
LCA_DATA_PROVIDER_DB_USER=<user>
LCA_DATA_PROVIDER_DB_PASSWORD=<password>
LCA_DATA_PROVIDER_DB_PORT=<port>
LCA_DATA_PROVIDER_REDIS_HOST=<host>
LCA_DATA_PROVIDER_REDIS_PORT=<port>
LCA_DATA_PROVIDER_REDIS_USER=<user>
LCA_DATA_PROVIDER_REDIS_PASSWORD=<password>
LCA_DATA_PROVIDER_REDIS_DB=<db>
```

Even though Redis is not directly used by Alembic migrations, Redis settings are still required because they are part of the validated application configuration schema.

### 1. Configure allowed sources in `.env`

Before generating or applying migrations, the target database must be included in `LCA_DATA_PROVIDER_ALLOWED_SOURCES`.

- Single source example:

```dotenv
LCA_DATA_PROVIDER_ALLOWED_SOURCES=bafu
```

- Multiple sources example (comma-separated):

```dotenv
LCA_DATA_PROVIDER_ALLOWED_SOURCES=bafu,ecoinvent,my_other_source
```

Notes:

- Values are split by commas and trimmed.
- Source names are normalized to lowercase.
- The `db` value passed through `-x db=...` is not normalized, so use lowercase names to avoid mismatches.
- If `all` is present in the allowed sources list, any target database is accepted.
- If the selected `db` is not in the allowed sources list (and `all` is not present), Alembic execution fails.

### 2. Create a new migration (autogenerate)

Run:

```bash
alembic -x db=bafu revision --autogenerate -m "your_migration_name"
```

Where:

- `-x db=bafu` indicates which database/source you are targeting.
- `--autogenerate` compares SQLAlchemy models vs current schema to create migration operations.
- `-m "your_migration_name"` sets the migration message.

### 3. Apply the migration

Run:

```bash
alembic -x db=bafu upgrade head
```

This upgrades the selected database to the latest migration (`head`).

### 4. Recommended execution sequence

```bash
alembic -x db=<source_name> revision --autogenerate -m "<migration_name>"
alembic -x db=<source_name> upgrade head
```

Example:

```bash
alembic -x db=bafu revision --autogenerate -m "add_new_column_to_emission_factors"
alembic -x db=bafu upgrade head
```

### 5. Common validation checks

- Confirm `.env` contains the target source in `LCA_DATA_PROVIDER_ALLOWED_SOURCES`.
- Confirm the `db` value passed in `-x db=<...>` matches the intended database name.
- Review generated migration scripts before running `upgrade head`.
- Do not omit `-x db=<source_name>`; migrations fail immediately if no target database is provided.

---

## Extending the Ingestion Pipeline (Adding a New Provider)

The ingestion pipeline is designed to be extended with new data sources without modifying any existing code. It uses two patterns: a **Template Method** (`BaseIngester`) that enforces a fixed ETL sequence, and a **Factory** (`IngesterFactory`) that maps provider names to their ingester classes via a decorator.

The following steps describe how to add a new provider from scratch.

### Step 1 — Create the provider directory

```
lca_provider/ingestors/providers/<provider_name>/
    <provider_name>_ingester.py
    <provider_name>_mapping.py
```

Replace `<provider_name>` with a lowercase identifier (e.g. `ecoinvent`). This name must match exactly the value used in the CLI and in `LCA_DATA_PROVIDER_ALLOWED_SOURCES`.

### Step 2 — Define the category mapping

Create `<provider_name>_mapping.py` with a `CATEGORY_MAPPING` dictionary that translates the provider's raw category strings to the 11 standard `EmissionFactorCategory` values. Keys must be lowercase.

```python
from lca_provider.dtos.emission_factor_dto import EmissionFactorCategory

CATEGORY_MAPPING: dict[str, EmissionFactorCategory] = {
    "electricity": EmissionFactorCategory.energy_consumption,
    "transport, freight": EmissionFactorCategory.transport,
    "municipal waste": EmissionFactorCategory.waste_treatment,
    # ... map every raw category from the source
}
```

Any raw category not present in the mapping will automatically fall back to `EmissionFactorCategory.other`.

### Step 3 — Implement the ingester

Create `<provider_name>_ingester.py` extending `BaseIngester`. Register it with the factory using the `@IngesterFactory.register` decorator. Only two methods are mandatory:

- `_extract_data()` — async generator that yields raw data in batches (list of dicts). Batch size of 1000 rows is recommended.
- `_transform_batch()` — receives one batch and returns a list of `EmissionFactorEntity` objects.

```python
from typing import Any, AsyncGenerator, List
from lca_provider.db.models import EmissionFactorEntity
from lca_provider.ingestors.base_ingester import BaseIngester
from lca_provider.ingestors.ingester_factory import IngesterFactory

@IngesterFactory.register("ecoinvent")
class EcoinventIngester(BaseIngester):

    async def _extract_data(self) -> AsyncGenerator[List[dict], None]:
        # Read source file in chunks and yield each chunk as a list of dicts.
        # Example: CSV, Excel, JSON, XML, etc.
        ...
        yield batch

    def _transform_batch(self, raw_batch: List[dict]) -> List[EmissionFactorEntity]:
        entities = []
        for row in raw_batch:
            entity = EmissionFactorEntity(
                name=...,
                version=self.version,
                raw_category=...,
                subcategories=[],
                unit=...,
                gwp_value=...,
                additional_impacts={},
                extra_data={},
            )
            entities.append(entity)
        return entities
```

**Optional overrides:**

- `_generate_provider_code(entity)` — override if the default SHA-256 hash of `name|raw_category|unit|version` is not unique enough for the source (e.g. BAFU adds `geography` to the hash).
- `_after_execute(imported_rows, elapsed_seconds)` — post-ingestion hook for cache lifecycle, failure reports, or any other side effects. See `BafuIngester._after_execute()` for the reference implementation including Redis invalidation and pre-warming.

### Step 4 — Add the provider to allowed sources

Add the new provider name to `LCA_DATA_PROVIDER_ALLOWED_SOURCES` in `.env`:

```dotenv
LCA_DATA_PROVIDER_ALLOWED_SOURCES=bafu,ecoinvent
```

### Step 5 — Create and apply the database migration

Each provider has its own isolated PostgreSQL database. Run Alembic targeting the new provider name to create its schema:

```bash
alembic -x db=ecoinvent revision --autogenerate -m "initial_emission_factors_table"
alembic -x db=ecoinvent upgrade head
```

### Step 6 — Place the source file and run the ingestion

Place the source data file in `lca_provider/ingestors/resources/` and run:

```bash
python -m lca_provider.cli.main ingest ecoinvent <filename> --version <version> --mode clean
```

### Summary checklist

| Step | Action |
| :--- | :--- |
| 1 | Create `lca_provider/ingestors/providers/<name>/` directory |
| 2 | Define `CATEGORY_MAPPING` in `<name>_mapping.py` |
| 3 | Implement `<name>_ingester.py` extending `BaseIngester`, register with `@IngesterFactory.register("<name>")` |
| 4 | Add `<name>` to `LCA_DATA_PROVIDER_ALLOWED_SOURCES` in `.env` |
| 5 | Run Alembic migrations targeting the new database |
| 6 | Place source file in `resources/` and execute the ingestion CLI |
