Metadata-Version: 2.4
Name: nexus-core
Version: 0.1.0
Summary: A Python library for code-first data ingestion, transformation, and loading
Author: Avaneesh Devkota
License: MIT License
        
        Copyright (c) 2026 Avaneesh Devkota
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
License-File: LICENSE
Requires-Python: >=3.12
Requires-Dist: boto3>=1.42.91
Requires-Dist: gspread>=6.2.1
Requires-Dist: httpx>=0.28.1
Requires-Dist: psycopg[binary]>=3.3.3
Requires-Dist: pymysql>=1.1.2
Requires-Dist: stripe>=15.0.1
Description-Content-Type: text/markdown

# Nexus

A Python library for code-first data ingestion, transformation, and loading.

```python
Pipeline(source, steps, sink).run()
```

## Installation

Nexus requires Python 3.12+. Clone the repo and install dependencies with [uv](https://github.com/astral-sh/uv):

```bash
git clone https://github.com/avaneeshdevkota/nexus
cd nexus
uv sync
```

> **Note:** Nexus uses a flat layout. Set `PYTHONPATH=.` when running scripts outside of `pytest`:
> ```bash
> PYTHONPATH=. uv run python examples/stripe_customers_to_sqlite.py
> ```
> Tests set this automatically via `pyproject.toml`.

## Quick start

Fetch a CSV over HTTP and load it into SQLite:

```python
from pipelines.pipeline import Pipeline
from sources.http.http_config import HTTPSourceConfig
from sources.http.http_source import HTTPSource
from steps.bytes_to_csv import BytesToCSV
from sinks.sqlite.sqlite_config import SQLiteSinkConfig
from sinks.sqlite.sqlite_sink import SQLiteSink

Pipeline(
    source=HTTPSource(HTTPSourceConfig(url="https://example.com/data.csv")),
    steps=[BytesToCSV()],
    sink=SQLiteSink(SQLiteSinkConfig(
        db_path="output.db",
        table_name="data",
        schema={"name": "TEXT", "age": "TEXT"},
    )),
).run()
```

## Sources

| Source | Description |
|---|---|
| `HTTPSource` | Fetch from any HTTP endpoint, with optional pagination |
| `FileSource` | Read a single local file |
| `GlobSource` | Read multiple local files matching a glob pattern |
| `S3Source` | Fetch a single object from S3 (authenticated or anonymous) |
| `S3GlobSource` | Fetch multiple S3 objects matching a prefix and pattern |
| `GoogleSheetsSource` | Read a Google Sheet (service account or public) |
| `StripeSource` | List any Stripe resource (customers, payouts, etc.) |
| `StripeSearchSource` | Search Stripe resources using Stripe's search API |
| `PostgresSource` | Query a Postgres database with raw SQL |
| `MySQLSource` | Query a MySQL database with raw SQL |

### HTTPSource with pagination

```python
from sources.http.http_config import HTTPSourceConfig, PaginationConfig

HTTPSource(HTTPSourceConfig(
    url="https://api.example.com/items",
    pagination=PaginationConfig(
        get_params=lambda resp: {"page": resp.json()["next_page"]} if resp else {"page": 1},
        has_more=lambda resp: resp.json()["next_page"] is not None,
    ),
))
```

### S3Source (anonymous public bucket)

```python
from sources.s3.s3_config import S3SourceConfig
from sources.s3.s3_source import S3Source

S3Source(S3SourceConfig(
    bucket="noaa-ghcn-pds",
    key="csv/by_station/ACW00011604.csv",
    region_name="us-east-1",
))
```

### StripeSource

```python
from sources.stripe.stripe_config import StripeSourceConfig
from sources.stripe.stripe_source import StripeSource

StripeSource(StripeSourceConfig(
    api_key=os.environ["STRIPE_API_KEY"],
    resource="Customer",
))
```

### StripeSearchSource

```python
from sources.stripe.stripe_search_config import StripeSearchSourceConfig
from sources.stripe.stripe_search_source import StripeSearchSource

StripeSearchSource(StripeSearchSourceConfig(
    api_key=os.environ["STRIPE_API_KEY"],
    resource="Customer",
    query=["email:'alice@example.com'", "name:'Alice'"],
    operator="AND",
))
```

### PostgresSource

```python
from sources.postgres.postgres_config import PostgresSourceConfig
from sources.postgres.postgres_source import PostgresSource

PostgresSource(PostgresSourceConfig(
    connection_string=os.environ["POSTGRES_CONNECTION_STRING"],
    query="SELECT * FROM transactions WHERE status = 'completed'",
))
```

### MySQLSource

```python
from sources.mysql.mysql_config import MySQLSourceConfig
from sources.mysql.mysql_source import MySQLSource

MySQLSource(MySQLSourceConfig(
    connection_string=os.environ["MYSQL_CONNECTION_STRING"],
    query="SELECT * FROM orders WHERE created_at > '2024-01-01'",
))
```

## Steps

| Step | Description |
|---|---|
| `BytesToCSV` | Parse raw bytes into `list[dict[str, str]]` |
| `BytesToJSON` | Parse raw bytes into a JSON value |
| `JSONToRows` | Store a JSON payload as a single row under a named column |
| `CastTypes` | Coerce column values to Python types (`int`, `float`, `str`, …) |
| `FilterRows` | Drop rows that don't match a predicate |
| `SelectColumns` | Keep only specified columns |
| `RenameColumns` | Rename columns via a mapping |
| `AddColumn` | Derive a new column from existing ones via a callable |

### Example: filter + select + cast + rename

```python
steps = [
    BytesToCSV(),
    FilterRows(lambda row: row["status"] == "completed"),
    SelectColumns(["id", "amount", "created_at"]),
    CastTypes({"amount": float}),
    RenameColumns({"created_at": "date"}),
]
```

### AddColumn

```python
from steps.add_column import AddColumn

AddColumn("total", lambda row: row["price"] * row["quantity"])
```

## Sinks

| Sink | Description |
|---|---|
| `SQLiteSink` | Write rows into a SQLite table |
| `FileSink` | Write rows to a local CSV or JSON file |

## Incremental loading

`StatefulPipeline` runs a pipeline incrementally — it remembers where it left off between runs and only fetches new data.

```python
from pipelines.stateful_pipeline import StatefulPipeline

StatefulPipeline(
    build_pipeline=lambda since: Pipeline(
        source=StripeSource(StripeSourceConfig(
            api_key=os.environ["STRIPE_API_KEY"],
            resource="Customer",
            params={"created[gt]": since} if since else {},
        )),
        steps=[SelectColumns(["id", "email", "name", "created"])],
        sink=sink,
    ),
    get_state=lambda: store.get("stripe_customers"),
    save_state=lambda value: store.set("stripe_customers", value),
    advance_state=lambda: max_created_from_db(),
).run()
```

- `build_pipeline` — builds the pipeline given the current checkpoint
- `get_state` — returns the last saved checkpoint
- `save_state` — persists the new checkpoint after a run
- `advance_state` — derives the new checkpoint (called after the pipeline runs)

## Running tests

```bash
# Unit tests only
uv run pytest

# Include integration tests (hits real external endpoints)
uv run pytest -m integration
```

## Examples

See the [`examples/`](examples/) directory for end-to-end pipelines:

- `http_csv_to_file.py` — HTTP → CSV → local file
- `file_csv_to_sqlite.py` — local file → SQLite
- `glob_csv_to_sqlite.py` — multiple local files → SQLite
- `s3_csv_to_sqlite.py` — S3 object → SQLite
- `s3_glob_csv_to_sqlite.py` — multiple S3 objects → SQLite
- `s3_filter_select_to_sqlite.py` — S3 → filter → select → SQLite
- `google_sheets_to_sqlite.py` — public Google Sheet → SQLite
- `github_paginated.py` — paginated GitHub API → SQLite
- `stripe_customers_to_sqlite.py` — Stripe customers → SQLite
- `stripe_payouts_to_sqlite.py` — Stripe payouts → SQLite
- `stripe_search_to_sqlite.py` — Stripe search → SQLite
- `stripe_customers_incremental.py` — incremental Stripe sync with StatefulPipeline
- `postgres_transactions_to_sqlite.py` — Postgres → SQLite
