Metadata-Version: 2.4
Name: rowsmyth
Version: 1.1.0
Summary: Declarative relational test data as Spark DataFrames with referential integrity
Project-URL: Homepage, https://github.com/LaurenceRawlings/rowsmyth
Project-URL: Repository, https://github.com/LaurenceRawlings/rowsmyth
Project-URL: Documentation, https://laurencerawlings.github.io/rowsmyth
Project-URL: Issues, https://github.com/LaurenceRawlings/rowsmyth/issues
Project-URL: Changelog, https://github.com/LaurenceRawlings/rowsmyth/blob/main/CHANGELOG.md
Author-email: Laurence Rawlings <contact@laurencerawlings.com>
License-Expression: MIT
License-File: LICENSE
Keywords: databricks,fixtures,spark,test-data,testing
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Testing
Classifier: Typing :: Typed
Requires-Python: >=3.12
Requires-Dist: faker>=40.20.0
Provides-Extra: spark
Requires-Dist: pyspark>=4.0.0; extra == 'spark'
Description-Content-Type: text/markdown

# rowsmyth

A blacksmith forges metal. A rowsmyth forges rows - mythical ones that exist only in your tests. `rowsmyth` is declarative relational test and seed data for Apache Spark: generate rows **one at a time** with real foreign-key integrity, then materialise ordinary `DataFrame`s and temp views.

## Install

```bash
uv add "rowsmyth[spark]"
# or
pip install "rowsmyth[spark]"
```

Requires Python 3.12+, PySpark 4.0+ and Java 17+ when running Spark locally.
The `[spark]` extra installs `pyspark`; omit it on Databricks or anywhere you
already have a compatible PySpark on the cluster (avoids version clashes):

```bash
uv add rowsmyth
# or
pip install rowsmyth
```

Java must be on your `PATH` or via `JAVA_HOME` when running Spark locally.

## Quick start

```python
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType

from rowsmyth import declarative_base, variant

spark = SparkSession.builder.master("local[*]").getOrCreate()
Base = declarative_base()


class Role(Base):
    __table_name__ = "roles"
    __primary_key__ = ("id",)
    __definition__ = StructType([
        StructField("id", LongType(), False),
        StructField("name", StringType(), False),
    ])

    def generator(self, ctx):
        return {
            "id": ctx.sequence(),
            "name": ctx.random.choice(["admin", "user", "guest"]),
        }


class User(Base):
    __table_name__ = "users"
    __primary_key__ = ("id",)
    __definition__ = StructType([
        StructField("id", LongType(), False),
        StructField("role_id", LongType(), False),
        StructField("email", StringType(), False),
    ])

    def generator(self, ctx):
        return {
            "id": ctx.sequence(),
            "role_id": ctx.pool("roles", "id").choice(),
            "email": ctx.faker.unique.ascii_email(),
        }

    @variant
    def inactive(self, ctx):
        return {"email": "inactive@example.com"}


with Base.dataset(spark, seed=42) as dataset:
    admin = Role.create(name="admin")
    user = Role.create(name="user")
    users = User.factory().count(10).variant("inactive").create()

    role_ids = {admin.id, user.id}
    assert all(created_user.role_id in role_ids for created_user in users)
    users_df = dataset.dataframe("users")
    # users_df is a DataFrame; temp view "users" is registered
```

## Databricks Lakeflow

A `Model` subclass carries all the metadata your Lakeflow pipeline and Unity Catalog need - schema, comment, tags and data quality expectations - in one place.

### Define a table

```python
from pyspark.sql.types import LongType, StringType, StructField, StructType

from rowsmyth import declarative_base, variant

Base = declarative_base()


class Customer(Base):
    __table_name__ = "customers"
    __catalog__ = "main"
    __schema__ = "commerce"
    __comment__ = "One row per customer account"
    __primary_key__ = ("id",)
    __table_tags__ = {"layer": "silver", "pii": "true"}
    __expectations__ = {
        "id_not_null": "id IS NOT NULL",
        "email_not_null": "email IS NOT NULL",
        "valid_tier": "tier IN ('standard', 'premium')",
    }
    __definition__ = StructType([
        StructField("id", LongType(), False),
        StructField(
            "email",
            StringType(),
            False,
            metadata={
                "comment": "Customer email, PII",
                "tags": {"pii": "true", "classification": "restricted"},
            },
        ),
        StructField("tier", StringType(), False),
    ])

    def generator(self, ctx):
        return {
            "id": ctx.sequence(),
            "email": ctx.faker.unique.ascii_email(),
            "tier": ctx.random.choices(["standard", "premium"], weights=[7, 3])[0],
        }

    @variant
    def premium(self, ctx):
        return {"tier": "premium"}
```

### Lakeflow pipeline

Use the class attributes directly in your pipeline declaration:

```python
from pyspark import pipelines as dp

from tables.customer import Customer


@dp.table(
    name=Customer.__table_name__,
    comment=Customer.__comment__,
    schema=Customer.__definition__,
)
@dp.expect_all_or_fail(Customer.__expectations__)
def customers():
    return spark.read.table("main.bronze.raw_customers")
```

### Apply Unity Catalog metadata

After the pipeline materialises the table, apply tags from the same class:

```python
for statement in Customer.uc_tag_sql():
    spark.sql(statement)
```

`uc_tag_sql()` emits table comments, table tags, column comments and column tags.

### Generate test fixtures

Write fixtures to the source your pipeline reads - either a Unity Catalog volume or a persistent bronze table:

```python
from pyspark.sql import SparkSession

from tables.base import Base
from tables.customer import Customer

spark = SparkSession.builder.getOrCreate()

with Base.dataset(spark, seed=42) as dataset:
    customers = Customer.factory().count(100).create()
    customers_df = dataset.dataframe("customers")

# Option A - ingest volume (pipeline reads parquet from path)
customers_df.write.mode("overwrite").parquet(
    "/Volumes/main/bronze/ingest/raw_customers/"
)

# Option B - persistent bronze table
customers_df.write.mode("overwrite").saveAsTable("main.bronze.raw_customers")
```

See [docs/usage.md](docs/usage.md) for the complete API reference.

## Development

```bash
make install
make test        # requires JAVA_HOME / java on PATH
make lint
make typecheck
make security
make pre-commit
make ci          # local equivalent of CI checks
```

See [CONTRIBUTING.md](CONTRIBUTING.md) for details.
