Metadata-Version: 2.4
Name: non-sws-spark-calculations-engine
Version: 0.2.7
Summary: Configuration-driven statistical calculations and aggregations for non-SWS FAO data
Author: Daniele Mansillo
License: MIT
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: pydantic>=2.0
Requires-Dist: pyspark>=3.4
Provides-Extra: sdmx
Requires-Dist: pysdmx; extra == "sdmx"
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"

# fao-analytics

Configuration-driven statistical calculations and aggregations for FAO (Food and Agriculture Organization of the United Nations) data, built on PySpark and validated with Pydantic.

## Data sources

The package processes data from **FAOSTAT** -- the FAO corporate statistical database. Data can be loaded from:

- **Local files** -- CSV, Parquet, or Delta format
- **SDMX API** -- Connects to the FAO SDMX registry to retrieve dataflows with authoritative dimension ordering and attribute mappings (requires `pysdmx`)

Each FAOSTAT domain (FDI, LC, OER, CS, BE, etc.) has its own configuration directory under `configs/domains/` defining the data mapping, aggregation rules, calculation definitions, and group overrides.

## Features

- **fao_agg** -- Geographic and dimensional aggregation engine
- **fao_calc** -- Statistical indicator calculation engine (ratios, growth rates, transformations)
- **fao_common** -- Shared data adapters (CSV, Parquet, Delta, SDMX) and configuration schemas

## Installation

```bash
# From source (editable / development mode)
pip install -e .

# With SDMX support
pip install -e ".[sdmx]"

# With dev dependencies (pytest, coverage)
pip install -e ".[dev]"
```

## Quick start

### Configuration from file paths

```python
from fao_agg import AggregationEngine
from fao_calc import CalculationEngine

# Aggregation -- load config from JSON files, data from a CSV
result = (
    AggregationEngine(
        data_mapping="configs/domains/FDI/data_mapping_fdi.json",
        aggregation_config="configs/domains/FDI/aggregation.json",
    )
    .load_data(path="data/domains/FDI/DataFDI.csv")
    .aggregate()
    .get_results()
)

# Calculation
result = (
    CalculationEngine(
        data_mapping="configs/domains/FDI/data_mapping_fdi.json",
        calculations="configs/domains/FDI/calculations_fdi.json",
    )
    .load_data(path="data/domains/FDI/DataFDI.csv")
    .calculate()
    .get_results()
)
```

### Configuration from dictionaries

```python
from fao_agg import AggregationEngine

data_mapping = {
    "data_source": {
        "type": "csv",
        "options": {"header": "true", "inferSchema": "true"},
    },
    "dimensions": [
        {"name": "area",    "column": "Var1Code", "var_position": 1},
        {"name": "item",    "column": "Var2Code", "var_position": 2},
        {"name": "element", "column": "Var3Code", "var_position": 3},
        {"name": "year",    "column": "Var4Code", "var_position": 4},
    ],
    "columns": {
        "value": "Value",
        "flag": "Flag",
        "agg_flag_int": "AggFlagInt",
        "agg_flag_ext": "AggFlagExt",
    },
}

aggregation_config = {
    "iterations": [
        {
            "iteration": 1,
            "agg_dimensions": ["area"],
        }
    ],
    "base_groups": "configs/groups/base_groups.json",
}

result = (
    AggregationEngine(
        data_mapping=data_mapping,
        aggregation_config=aggregation_config,
    )
    .load_data(path="data/domains/FDI/DataFDI.csv")
    .aggregate()
    .get_results()
)
```

### Auto-generated configuration from SDMX

When you don't need to manually define the data mapping, the `SdmxDataAdapter` can build it automatically by querying the FAO SDMX registry for the dataflow schema:

```python
from fao_agg import AggregationEngine
from fao_common.adapters.sdmx import SdmxDataAdapter
from fao_common.config.schema import SdmxDataSource

# Build the data mapping automatically from the SDMX registry
adapter = SdmxDataAdapter(
    SdmxDataSource(
        endpoint="https://private-fmr.aws.fao.org/sdmx/v2/",
        domain_code="FDI",
    )
)
data_mapping = adapter.build_data_mapping()

# Use the auto-generated mapping with the aggregation engine
result = (
    AggregationEngine(
        data_mapping=data_mapping,
        aggregation_config="configs/domains/FDI/aggregation.json",
    )
    .load_data()
    .aggregate()
    .get_results()
)
```

### SDMX configuration with a local SDMX CSV

If you have an SDMX-formatted CSV file and want the adapter to handle column mapping via the registry:

```python
from fao_agg import AggregationEngine

result = (
    AggregationEngine(
        data_mapping="configs/domains/FDI/data_mapping_sdmx.json",
        aggregation_config="configs/domains/FDI/aggregation.json",
    )
    .load_data()  # data path is in the mapping config
    .aggregate()
    .get_results()
)
```

## Testing

```bash
# Run all tests
pytest

# Run only unit tests
pytest tests/fao_agg/

# Run only integration tests
pytest -m integration

# Run a single domain
pytest tests/domains/test_fdi.py -v
```

See [README_TESTING.md](README_TESTING.md) for detailed testing documentation.

## Project structure

```
src/
  fao_agg/        # Aggregation engine
  fao_calc/       # Calculation engine
  fao_common/     # Shared adapters, schemas, Spark utilities
configs/           # JSON configuration files per FAOSTAT domain
data/              # Sample/test data files (CSV)
tests/             # Unit and integration tests
```

## Publishing to PyPI

```bash
# Install build tools
pip install build twine

# Build source distribution and wheel
python -m build

# Check the package
twine check dist/*

# Upload to Test PyPI first
twine upload --repository testpypi dist/*

# Upload to PyPI
twine upload dist/*
```
