Metadata-Version: 2.4
Name: iceberg-migration
Version: 0.1.1
Summary: Migrate Apache Iceberg V2 tables across cloud providers (AWS, GCP, Azure, MinIO)
Author: dutv-mta
License: MIT
Project-URL: Homepage, https://github.com/dutv-mta/iceberg_migration
Project-URL: Source, https://github.com/dutv-mta/iceberg_migration
Project-URL: Bug Reports, https://github.com/dutv-mta/iceberg_migration/issues
Project-URL: Changelog, https://github.com/dutv-mta/iceberg_migration/releases
Keywords: iceberg,migration,aws,data-lake,glue,s3,minio,hive,parquet,data-engineering
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Database
Classifier: Topic :: Scientific/Engineering :: Information Analysis
Classifier: Environment :: Console
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: boto3>=1.35
Requires-Dist: fastavro>=1.9
Requires-Dist: pandas>=2.2
Requires-Dist: pyarrow>=18.0
Requires-Dist: pyiceberg[hive]>=0.8
Requires-Dist: python-dotenv>=1.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-cov>=5.0; extra == "dev"
Requires-Dist: ruff>=0.4; extra == "dev"
Requires-Dist: mypy>=1.10; extra == "dev"

# iceberg-migration

Migrate Apache Iceberg V2 tables across cloud accounts by **patching metadata
paths and re-registering the table catalog entry** — without re-writing or
re-compacting any data files.

Supported providers: **AWS** (S3 + Glue), **MinIO** (S3-compatible + Hive Metastore).
GCP (GCS + BigLake) is stubbed and available for contribution.

---

## Table of Contents

- [How it works](#how-it-works)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Configuration](#configuration)
- [Migration workflow](#migration-workflow)
  - [Step 1 — Compact the table](#step-1--compact-the-table)
  - [Step 2 — Remove old snapshots](#step-2--remove-old-snapshots)
  - [Step 3 — Remove orphan files](#step-3--remove-orphan-files)
  - [Step 4 — Sync data and metadata files to target S3](#step-4--sync-data-and-metadata-files-to-target-s3)
  - [Step 5 — Patch metadata and register the table](#step-5--patch-metadata-and-register-the-table)
- [tables.txt format](#tablestxt-format)
- [Providers](#providers)
  - [AWS](#aws)
  - [MinIO](#minio)
  - [GCP (stub)](#gcp-stub)
- [Performance tuning](#performance-tuning)
- [Local testing with Docker](#local-testing-with-docker)
- [Development](#development)
- [Architecture](#architecture)
- [Adding a new provider](#adding-a-new-provider)

---

## How it works

An Iceberg table consists of two distinct layers:

```
s3://bucket/
  my_db/my_table/
    data/                     ← Parquet data files  (potentially terabytes)
      part-00000.parquet
      delete-00001.parquet    ← positional-delete files  (row-level deletes)
    metadata/                 ← Iceberg metadata  (small JSON / Avro files)
      v3.metadata.json
      snap-12345.avro         ← manifest-list
      manifest-001.avro       ← manifest
```

Every metadata file **hard-codes** the source bucket name in every URI it stores.
Simply copying the files to a new bucket leaves the metadata pointing at the
old bucket, making the table unreadable from the target account.

**iceberg-migration solves this in two phases:**

| Phase | Tool | What it does |
|-------|------|--------------|
| 1 | `aws s3 sync` | Copies all Parquet **data files** from source → target bucket |
| 2 | `iceberg-migration` | Downloads metadata + delete files, rewrites every bucket reference, uploads patched copies, and re-registers the table in the target catalog |

The data files themselves are never re-encoded — the tool only touches the
small metadata layer, so migration time is proportional to metadata size, not
data size.

### What gets patched

| File type | What changes |
|-----------|-------------|
| `*.metadata.json` | All `s3://source-bucket/…` strings → `s3://target-bucket/…` |
| `snap-*.avro` (manifest-list) | `manifest_path` URIs + `manifest_length` fields |
| `manifest-*.avro` | `file_path` URIs inside every `data_file` record + `file_size_in_bytes` for delete files |
| `delete-*.parquet` | `file_path` column values (row-level delete references) |

---

## Prerequisites

- Python 3.9+
- AWS CLI (for the `s3 sync` data-copy step)
- Credentials for both source and target accounts with the permissions below

### Required IAM permissions

**Source account**
```json
{
  "Effect": "Allow",
  "Action": [
    "s3:GetObject", "s3:ListBucket",
    "glue:GetTable"
  ],
  "Resource": ["arn:aws:s3:::source-bucket/*", "arn:aws:s3:::source-bucket", ...]
}
```

**Target account**
```json
{
  "Effect": "Allow",
  "Action": [
    "s3:PutObject", "s3:ListBucket",
    "glue:CreateTable"
  ],
  "Resource": ["arn:aws:s3:::target-bucket/*", "arn:aws:s3:::target-bucket", ...]
}
```

---

## Installation

```bash
# Clone the repo
git clone https://github.com/your-org/iceberg-migration.git
cd iceberg-migration

# Install (editable mode + dev deps)
pip install -e ".[dev]"

# Or install for runtime only
pip install -e .
```

---

## Configuration

All credentials and endpoints are read from a `.env` file:

```bash
cp .env.example .env
# Edit .env and fill in the values for your provider
```

See [`.env.example`](.env.example) for all available variables with inline
documentation. Only the section that matches your `--provider` flag needs to
be filled in.

---

## Migration workflow

Run steps 1–3 on the **source** table before transferring any files.
These reduce the number of files that need to be copied and ensure the
migrated table is in a clean state.

### Step 1 — Compact the table

Compaction rewrites many small data and delete files into larger optimised
files, reducing the total object count and speeding up the S3 sync.

```sql
-- Spark SQL / Iceberg REST catalog
CALL catalog.system.rewrite_data_files(
  table => 'my_db.my_table',
  strategy => 'binpack',
  options => map('min-file-size-bytes', '134217728',   -- 128 MB
                 'max-file-size-bytes', '536870912')   -- 512 MB
);

-- Also compact positional-delete files produced by row-level deletes
CALL catalog.system.rewrite_position_delete_files(
  table => 'my_db.my_table'
);
```

> **AWS Glue / Athena** — use the `OPTIMIZE` statement:
> ```sql
> OPTIMIZE my_db.my_table REWRITE DATA USING BIN_PACK;
> ```

### Step 2 — Remove old snapshots

Keeping only the latest snapshot removes the metadata overhead of every
historical snapshot and the manifest files that describe them.

```sql
-- Expire all snapshots older than now, retaining only the current one
CALL catalog.system.expire_snapshots(
  table                => 'my_db.my_table',
  older_than           => now(),
  retain_last          => 1,
  delete_orphan_files  => false   -- handled explicitly in Step 3
);
```

Verify only one snapshot remains before proceeding:

```sql
SELECT snapshot_id, committed_at, operation
FROM   my_db.my_table.snapshots
ORDER  BY committed_at DESC;
```

### Step 3 — Remove orphan files

Orphan files are objects in S3 that are no longer referenced by any
snapshot — stale data files, failed-write leftovers, or files from
expired snapshots. Removing them before the sync reduces transfer size
and keeps the target bucket clean.

```sql
CALL catalog.system.remove_orphan_files(
  table          => 'my_db.my_table',
  older_than     => now(),
  dry_run        => false
);
```

> Run with `dry_run => true` first to preview which files will be deleted.

After steps 1–3 the table's `metadata/` and `data/` folders contain only
the files needed to represent the current snapshot, making the next sync
as efficient as possible.

---

### Step 4 — Sync data and metadata files to target S3

Copy **both** the `data/` and `metadata/` folders from the source bucket to
the target bucket. Including `metadata/` here saves bandwidth — the migration
tool only needs to download, patch, and re-upload the small JSON/Avro files
rather than re-transferring the raw data.

```bash
# Sync the entire table prefix (data + metadata)
aws s3 sync \
  s3://source-bucket/my_db/my_table/ \
  s3://target-bucket/my_db/my_table/ \
  --source-region us-east-1 \
  --region us-east-1
```

To migrate an entire database in one sync:

```bash
aws s3 sync \
  s3://source-bucket/my_db/ \
  s3://target-bucket/my_db/
```

> **Cross-account sync** — when source and target are in different AWS accounts,
> run the sync with the source account's credentials and grant `s3:PutObject`
> on the target bucket to the source account principal, or use an IAM role with
> `sts:AssumeRole`. Alternatively, use
> [AWS DataSync](https://aws.amazon.com/datasync/) for managed cross-account
> transfers with built-in retries and bandwidth throttling.

> **MinIO** — use `mc mirror` instead of `aws s3 sync`:
> ```bash
> mc mirror source-alias/iceberg-source/my_db/my_table \
>           target-alias/iceberg-target/my_db/my_table
> ```

### Step 5 — Patch metadata and register the table

Create a `tables.txt` file listing every table to migrate (see
[tables.txt format](#tablestxt-format)), then run:

```bash
# AWS (default)
python main.py tables.txt

# MinIO (local testing)
python main.py tables.txt --provider minio
```

Or if installed via `pip install .`:

```bash
iceberg-migration tables.txt
iceberg-migration tables.txt --provider minio
```

The tool will:
1. Download the Iceberg metadata files from the source bucket
2. Rewrite every bucket reference to the target bucket
3. Upload the patched files to the target bucket
4. Register the table in the target Glue catalog (or Hive Metastore for MinIO)

Progress and errors are written to both `stdout` and `logs/migration.log`.

---

## tables.txt format

One table per line, three comma-separated columns:

```
# source_db, target_db, table_name   ← lines starting with # are ignored
analytics,analytics_prod,orders
analytics,analytics_prod,customers
reporting,reporting_v2,daily_summary
```

- `source_db` — database name in the **source** catalog (Glue / Hive)
- `target_db` — database name in the **target** catalog (may differ from source)
- `table_name` — the Iceberg table name (must be the same in both catalogs)

Blank lines and lines starting with `#` are skipped. Lines with the wrong
number of columns are logged and skipped (migration continues for the rest).

---

## Providers

### AWS

Migrates between two AWS accounts using **S3** for storage and **Glue Data
Catalog** for the Iceberg catalog.

```bash
cp .env.example .env
# Fill in the [AWS] section
python main.py tables.txt --provider aws
```

Key env vars:

| Variable | Description |
|----------|-------------|
| `SOURCE_AWS_REGION` | Region of the source account |
| `SOURCE_AWS_ACCESS_KEY_ID` | Source access key |
| `SOURCE_AWS_SECRET_ACCESS_KEY` | Source secret key |
| `TARGET_AWS_REGION` | Region of the target account |
| `TARGET_AWS_ACCESS_KEY_ID` | Target access key |
| `TARGET_AWS_SECRET_ACCESS_KEY` | Target secret key |
| `TARGET_S3_BUCKET` | **Explicit** destination bucket name |
| `TARGET_AWS_SESSION_TOKEN` | Optional — for temporary STS credentials |

### MinIO

Migrates between MinIO buckets using **Hive Metastore** as the catalog.
Ideal for local testing or on-premises deployments.

```bash
cp .env.example .env
# Fill in the [MinIO] section
python main.py tables.txt --provider minio
```

Key env vars:

| Variable | Description |
|----------|-------------|
| `SOURCE_MINIO_ENDPOINT` | MinIO S3 API URL (e.g. `http://localhost:9000`) |
| `SOURCE_MINIO_ACCESS_KEY` | MinIO access key |
| `SOURCE_MINIO_SECRET_KEY` | MinIO secret key |
| `SOURCE_HIVE_URI` | Hive Metastore Thrift URI (e.g. `thrift://localhost:9083`) |
| `TARGET_MINIO_ENDPOINT` | Target MinIO endpoint |
| `TARGET_MINIO_ACCESS_KEY` | Target access key |
| `TARGET_MINIO_SECRET_KEY` | Target secret key |
| `TARGET_MINIO_BUCKET` | Destination bucket name |
| `TARGET_HIVE_URI` | Target Hive Metastore URI |

### GCP (stub)

The GCP provider class exists but raises `NotImplementedError`. Contributions
are welcome — see [Adding a new provider](#adding-a-new-provider).

---

## Performance tuning

| Variable | Default | Description |
|----------|---------|-------------|
| `PARALLEL_TABLES` | `5` | Number of tables migrated simultaneously (multiprocessing) |
| `NUMBER_THREADS` | `10` | Threads per table for patching manifest and delete files |
| `BATCH_SIZE` | `500000` | Row-batch size when streaming large Parquet delete files |

Lower `BATCH_SIZE` if you see out-of-memory errors on machines with limited
RAM and large positional-delete files.

---

## Local testing with Docker

A full local stack (MinIO + Hive Metastore + Spark) is included for end-to-end
testing without any cloud credentials.

### Start the stack

```bash
cd docker

# First run: build the Spark image (~2 min, downloads JARs from Maven)
docker compose build spark

# Start all services
docker compose up -d

# Wait for Hive Metastore to initialise (~30 s on first run)
docker compose logs -f hive-metastore
```

### Seed test data

```bash
# Creates test_db.orders (1 000 rows) in the source bucket
python scripts/seed_test_data.py
```

Or create a table manually with Spark SQL:

```bash
docker compose exec spark spark-sql
```

```sql
CREATE NAMESPACE IF NOT EXISTS hive.test_db;

CREATE TABLE IF NOT EXISTS hive.test_db.orders (
    order_id     BIGINT,
    customer_id  INT,
    status       STRING,
    total_amount BIGINT,
    created_at   TIMESTAMP
)
USING iceberg
LOCATION 's3a://iceberg-source/test_db/orders';

INSERT INTO hive.test_db.orders VALUES
    (1, 100, 'pending',   5000, current_timestamp()),
    (2, 101, 'shipped',  12000, current_timestamp()),
    (3, 102, 'delivered', 8500, current_timestamp());
```

### Run the migration

The data-copy step for MinIO uses `mc mirror` (MinIO Client) instead of
`aws s3 sync`. If you don't have `mc` installed, you can skip this step for a
quick smoke-test — the table will still be re-registered pointing at the
source bucket path structure in the target bucket.

```bash
# (Optional) copy data files
mc mirror minio-local/iceberg-source/test_db \
          minio-local/iceberg-target/test_db \
          --exclude "*/metadata/*"

# Patch metadata and register
cd ..
echo "test_db,test_db,orders" > tables.txt
python main.py tables.txt --provider minio
```

### Verify

```bash
# Back in spark-sql — should return all 3 rows
SELECT * FROM hive.test_db.orders;
```

You can also browse MinIO Console at `http://localhost:9001`
(user: `minioadmin` / password: `minioadmin`) to inspect the source and
target buckets directly.

### Tear down

```bash
cd docker && docker compose down -v   # -v removes named volumes (all data)
```

---

## Development

```bash
# Install with dev dependencies
pip install -e ".[dev]"

# Run the full test suite
pytest

# Run a single test file
pytest tests/test_utils.py -v

# Lint
ruff check .

# Format
ruff format .

# Type-check
mypy iceberg_migration/
```

### Pre-commit hooks

```bash
pip install pre-commit
pre-commit install
```

After installation, `ruff` linting and formatting run automatically on every
`git commit`.

### CI

GitHub Actions runs lint, type-check, and tests on every push and pull request
across Python 3.9, 3.10, 3.11, and 3.12. See
[`.github/workflows/ci.yml`](.github/workflows/ci.yml).

---

## Architecture

```
main.py  →  build_provider()  →  TableMigrator  →  source.execute() + target.execute()
```

| Component | File | Responsibility |
|-----------|------|---------------|
| CLI | `main.py` | Parse `tables.txt`, spawn multiprocessing workers |
| Factory | `iceberg_migration/providers/__init__.py` | Build `(source, target)` pair for the requested provider |
| Orchestrator | `iceberg_migration/migrator.py` | Call `source.execute()` then `target.execute()` |
| Base interfaces | `iceberg_migration/base.py` | `IcebergSourceAccount`, `IcebergTargetAccount` |
| Constants | `iceberg_migration/constants.py` | All Iceberg field name strings |
| Utils | `iceberg_migration/utils.py` | Bucket-replacement helpers (scheme-agnostic) |

### AWS source pipeline (`AWSSourceAccount.execute`)

```
Step 1  Metadata JSON     download + patch all s3:// URIs
Step 2  Manifest-lists    scan each snap-*.avro → collect manifest URIs
Step 3  Manifests         scan each manifest-*.avro → collect delete-file URIs
Step 4  Delete files      download all positional-delete Parquets (parallel)
Step 5  Patch Parquet     rewrite file_path column in row-batches (memory-safe)
Step 6  Patch manifests   rewrite file_path URIs + update file_size_in_bytes
Step 7  Patch manifest-lists  rewrite manifest_path URIs + update manifest_length
```

### Provider layout

```
iceberg_migration/providers/
    __init__.py          ← build_provider() factory
    aws/
        config.py        ← AWSSourceConfig, AWSTargetConfig
        account_source.py  ← 7-step pipeline (S3 + Glue)
        account_target.py  ← S3 upload + Glue registration
    minio/
        config.py        ← MinIOSourceConfig, MinIOTargetConfig
        account_source.py  ← inherits AWS pipeline, overrides S3 endpoint + Hive catalog
        account_target.py  ← inherits AWS upload, overrides endpoint + HMS registration
    gcp/
        config.py        ← GCPSourceConfig, GCPTargetConfig
        account_source.py  ← stub (NotImplementedError)
        account_target.py  ← stub (NotImplementedError)
```

---

## Adding a new provider

1. Create `iceberg_migration/providers/<name>/config.py` with
   `<Name>SourceConfig` and `<Name>TargetConfig` dataclasses.

2. Create `account_source.py` and `account_target.py`:
   - **S3-compatible storage** — inherit `AWSSourceAccount` / `AWSTargetAccount`
     and override only `_boto3_client`, `_build_catalog`, `get_catalog_table`,
     and `register_table`.
   - **Other storage** — implement `IcebergSourceAccount` / `IcebergTargetAccount`
     from scratch.

3. Add a branch in `iceberg_migration/providers/__init__.py → build_provider()`.

4. Add the provider name to `SUPPORTED_PROVIDERS` in the same file.

5. Add env vars to `.env.example`.

No other files need to change.
