Metadata-Version: 2.4
Name: fastrecon
Version: 0.3.0
Summary: High-performance reconciliation engine for SQL tables, queries, CSV, and Parquet using DuckDB, Polars, and Arrow.
Project-URL: Homepage, https://github.com/yourname/fastrecon
Project-URL: Issues, https://github.com/yourname/fastrecon/issues
Author: fastrecon contributors
License: MIT
License-File: LICENSE
Keywords: arrow,data-quality,diff,duckdb,etl,polars,reconciliation
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Quality Assurance
Requires-Python: >=3.9
Requires-Dist: duckdb>=1.0.0
Requires-Dist: orjson>=3.0.0
Requires-Dist: polars>=1.0.0
Requires-Dist: pyarrow>=16.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: sqlalchemy>=2.0.0
Requires-Dist: tenacity>=8.0.0
Provides-Extra: mssql
Requires-Dist: pyodbc>=5.0; extra == 'mssql'
Provides-Extra: mysql
Requires-Dist: pymysql>=1.1; extra == 'mysql'
Provides-Extra: postgres
Requires-Dist: psycopg[binary]>=3.1; extra == 'postgres'
Provides-Extra: test
Requires-Dist: pytest>=8.0.0; extra == 'test'
Description-Content-Type: text/markdown

# fastrecon

**A focused, high-performance reconciliation engine** for comparing SQL tables, SQL queries, CSV files, and Parquet files at scale. Built on **DuckDB**, **Polars**, and **Apache Arrow**.

> fastrecon is not a pandas replacement. It is a *reconciliation engine* — built specifically for proving that two datasets are (or aren't) the same.

## Why fastrecon

Most data teams hand-roll reconciliation with pandas, ad-hoc SQL, or shell scripts. None scale. fastrecon gives you one consistent API across every common combination:

| Left           | Right         |
| -------------- | ------------- |
| SQL table      | SQL table     |
| SQL table      | SQL query     |
| SQL query      | SQL query     |
| SQL table/query| CSV / Parquet |
| CSV / Parquet  | CSV / Parquet |

Everything is normalized into a single internal **relation** (a DuckDB view), then compared with pushdown-friendly SQL — no whole-dataset materialization in Python.

## Install

```bash
pip install fastrecon                 # core
pip install "fastrecon[postgres]"     # + psycopg
pip install "fastrecon[mysql]"        # + pymysql
```

Requires Python 3.9+.

## Quick start

```python
from fastrecon import compare, SqlTable, ParquetFile

result = compare(
    left=SqlTable(conn="postgresql://user:pw@host/db", table="public.orders"),
    right=ParquetFile(path="orders.parquet"),
    keys=["order_id"],
    compare_mode="keyed",
    exclude_columns=["load_ts"],
    tolerances={"amount": 0.01},
)

print(result.summary())
print(result.to_json(indent=True))
```

Sample output:

```
status               : MISMATCH
compare_mode         : keyed
row_count_left       : 1,000,001
row_count_right      : 1,000,000
schema_match         : True
data_match           : False
missing_in_left      : 0
missing_in_right     : 1
changed_rows         : 4
duplicate_keys_left  : 0
duplicate_keys_right : 0
elapsed_sec          : 1.842
engine               : duckdb+polars
```

## Compare modes

| Mode       | What it does                                                    |
| ---------- | --------------------------------------------------------------- |
| `schema`   | Column names, types, missing/extra columns                      |
| `rowcount` | Schema + row counts on both sides                               |
| `keyed`    | Schema + counts + key-based diff (missing / changed / dup keys) |
| `profile`  | Schema + counts + per-column null/distinct/min/max              |

`keyed` mode is the default and supports **partition-wise execution** for big-data workloads — see below.

## Partition-wise compare (big data)

Joining 100M+ rows in one shot is dangerous. `fastrecon` can split a keyed compare into independent partitions and aggregate the results. Each partition runs as its own filtered SQL job inside DuckDB, so memory stays bounded by the *partition* size, not the dataset size.

```python
from fastrecon import compare, SqlTable, ParquetFile, PartitionSpec

# Partition by a low-cardinality column (e.g. country, status, load_date)
result = compare(
    left=SqlTable(conn=SRC, table="orders"),
    right=ParquetFile("orders/*.parquet"),
    keys=["order_id"],
    partition=PartitionSpec(column="region", strategy="value"),
)

# Or hash-bucket any column (works for high-cardinality keys too)
result = compare(
    left=..., right=..., keys=["order_id"],
    partition=PartitionSpec(column="order_id", strategy="hash", buckets=64),
)

# Or explicit ranges (great for dates / sequential ids)
result = compare(
    left=..., right=..., keys=["order_id"],
    partition=PartitionSpec(
        column="order_dt", strategy="range",
        boundaries=[("2026-01-01", "2026-02-01"),
                    ("2026-02-01", "2026-03-01"),
                    ("2026-03-01", "2026-04-01")],
    ),
)

print(result.summary())
for p in result.column_stats["partitions"]:
    print(p)   # per-partition counts + match flag
```

### Strategies

| Strategy | Best for | Notes |
| -------- | -------- | ----- |
| `value`  | Low-cardinality partition keys (region, status, load_date) | Auto-discovers distinct values from both sides; capped by `max_partitions` (default 1000) |
| `hash`   | Any column, especially high-cardinality keys | `buckets=N` controls partition count and memory footprint |
| `range`  | Ordered columns (dates, sequential ids) | Half-open `[lo, hi)` boundaries; you supply them |

### What you get back

When you pass `partition=...`, the result includes a per-partition breakdown under `column_stats`:

```python
result.column_stats["partitioned_by"]
# {"column": "region", "strategy": "value", "n_partitions": 5}

result.column_stats["partitions"]
# [
#   {"partition": "EU", "row_count_left": 312_054, "row_count_right": 312_054,
#    "missing_in_left": 0, "missing_in_right": 0, "changed_rows": 2,
#    "duplicate_keys_left": 0, "duplicate_keys_right": 0, "match": False},
#   ...
# ]
```

Top-level counts (`missing_in_left`, `changed_rows`, etc.) are aggregated across partitions; `sample_mismatches` is a globally capped sample drawn from any partition.

### Choosing a strategy

- **You know the data has natural partitions** (`load_date`, `region`, `tenant_id`) → use `value`.
- **You don't, and just want bounded memory** → use `hash` with `buckets` ≈ `dataset_rows / 5_000_000`.
- **The data is time-series and you want to reconcile per window** → use `range` with date boundaries.

## Configuration & normalization

Reconciliation is mostly about handling the messy reality of "the same" data:

```python
from fastrecon import ReconConfig, compare

cfg = ReconConfig(
    trim_strings=True,
    case_sensitive=False,
    null_equals_empty=True,
    decimal_scale=2,
    timestamp_tz="UTC",
    column_mapping={"orderId": "order_id"},   # left -> right rename
    exclude_columns=["load_ts", "etl_batch"],
    tolerances={"amount": 0.01, "tax": 0.01},
    sample_limit=200,
)

result = compare(left, right, keys=["order_id"], config=cfg)
```

## Result object

`compare()` returns a `ReconResult` with:

- `status` — `MATCH` / `MISMATCH` / `ERROR`
- `row_count_left`, `row_count_right`
- `schema_match`, `data_match`, `schema_diff`
- `missing_in_left`, `missing_in_right`, `changed_rows`
- `duplicate_keys_left`, `duplicate_keys_right`
- `sample_mismatches` — sample rows for each mismatch class
- `column_stats` — populated in `profile` mode
- `execution_metrics` — `elapsed_sec`, `engine`

Use `result.summary()` for a printable report or `result.to_json()` / `result.to_dict()` to ship it to a logger, dashboard, or CI gate.

## Sources

```python
from fastrecon import SqlTable, SqlQuery, CsvFile, ParquetFile

SqlTable(conn="postgresql://...", table="schema.orders")
SqlQuery(conn="postgresql://...", query="SELECT * FROM orders WHERE dt >= '2026-01-01'")
CsvFile("/path/to/orders.csv", options={"delim": ","})
ParquetFile("/path/to/orders.parquet")        # also supports DuckDB globs: 'data/*.parquet'
```

## Architecture

```
fastrecon/
├── api.py                  # public compare()
├── config.py               # ReconConfig
├── sources/                # SqlTable / SqlQuery / CsvFile / ParquetFile
├── engines/                # DuckDB execution engine
├── compare/                # schema / rowcount / keyed / profile
├── output/                 # ReconResult (summary, to_dict, to_json)
└── utils/                  # normalization, logging
```

Internally:

1. Each source is registered into an in-memory DuckDB connection as a view (zero-copy from Arrow when possible).
2. Schema is read with `DESCRIBE`.
3. Row counts, anti-joins, and inner joins run in DuckDB — no full Python materialization.
4. Mismatch samples are pulled lazily, capped by `sample_limit`.

## CLI

`fastrecon` ships with a first-class command-line interface — drop it into any CI pipeline:

```bash
fastrecon \
  --left-type csv     --left-path  orders_today.csv \
  --right-type sqltable --right-conn postgresql://u:p@h/db --right-table orders \
  --keys order_id \
  --tolerance amount=0.01 \
  --partition region:value \
  --html report.html --junit report.xml \
  --fail-on mismatch
```

Source types: `csv`, `parquet`, `sqltable`, `sqlquery`, `postgres` (native scanner).

Exit codes: `0` MATCH, `1` MISMATCH (controlled by `--fail-on`), `2` ERROR.

## Reports

Self-contained HTML and JUnit XML reports — no template engine, no external assets, perfect for emailing or attaching to a CI build:

```python
res = compare(left, right, keys=["id"])
res.to_html("report.html")           # standalone HTML, embeddable in CI artifacts
res.to_junit("report.xml")           # JUnit XML — Jenkins / GitLab / Buildkite read this natively
res.exit_code                        # 0 / 1 / 2 for shell scripts
```

The HTML report includes the summary, schema diff, per-partition heatmap (when partitioned), and tables of mismatch samples. The JUnit report emits one `<testcase>` per partition so dashboards pinpoint *which* slice failed.

## Streaming SQL loader & native Postgres scanner

Both SQL sources stream batches via a server-side cursor (Arrow `RecordBatchReader` → DuckDB), so you don't `fetchall()` 100M rows into Python before doing anything useful.

```python
SqlTable(conn=URL, table="orders", chunk_size=200_000)   # batch size
SqlQuery(conn=URL, query="SELECT ...", chunk_size=200_000)
```

For Postgres specifically, use `PostgresSource` to bypass SQLAlchemy entirely. DuckDB's native `postgres_scanner` extension talks libpq directly, pushes filters down, and zero-copies result batches into the engine:

```python
from fastrecon import PostgresSource, ParquetFile, compare

result = compare(
    left=PostgresSource(conn="postgresql://u:p@h/db", table="public.orders"),
    right=ParquetFile("orders.parquet"),
    keys=["order_id"],
)
```

Use `PostgresSource` whenever both speed and memory matter — it's the recommended path for production warehouses.

## Benchmarks

Benchmarked on a 4-vCPU Linux container, comparing two Parquet files with ~0.5% mutated rows. Run yourself with `python benchmarks/bench.py --rows N`.

| Rows | Engine                    | Elapsed | Peak Python memory |
| ---- | ------------------------- | ------- | ------------------ |
| 100K | datacompy (pandas)        | 0.15 s  | 17 MB              |
| 100K | **fastrecon (DuckDB)**    | 0.20 s  | **0.1 MB**         |
| 1M   | datacompy (pandas)        | 0.68 s  | 164 MB             |
| 1M   | **fastrecon (DuckDB)**    | 0.97 s  | **0.2 MB**         |

datacompy is competitive on small data, but its memory grows linearly with row count and it relies on pandas materializing both frames; on 100M+ row workloads it OOMs. fastrecon's DuckDB-backed engine keeps Python memory near-flat regardless of dataset size, and for SQL ↔ SQL recon via `PostgresSource` skips Python entirely.

> Caveat: peak memory is measured with `tracemalloc`, which only sees Python allocations. DuckDB allocates outside Python; total RSS for fastrecon will be larger than the table shows but still bounded by working-set, not dataset size. The relative comparison stands.

`data-diff` is also widely used; it requires a live DB connection on both sides and is now in maintenance mode upstream — run `pip install data-diff && data-diff db1 table1 db2 table2` to compare against fastrecon for your own SQL ↔ SQL workloads.

## Roadmap

- ✅ MVP: package, sources, schema/rowcount/keyed/profile compare, JSON result, tests
- ✅ Partition-wise compare (value / hash / range strategies)
- ✅ Streaming SQL loader (Arrow `RecordBatchReader`)
- ✅ Native Postgres scanner via DuckDB `postgres` extension
- ✅ HTML + JUnit XML reports + CLI with exit codes
- ✅ Benchmark suite vs `datacompy`
- ⏳ Parallel partition execution (thread pool)
- ⏳ Snowflake / BigQuery / Delta / Iceberg sources
- ⏳ Rust extension (PyO3) for hashing / normalization hot path

## License

MIT
