Metadata-Version: 2.4
Name: nsq2mariadb
Version: 0.1.3
Summary: generic NSQ → MariaDB transporter with per-topic Python mapper classes
Home-page: https://github.com/larsborn/nsq2mariadb
Author: Lars Wallenborn
Project-URL: Bug Tracker, https://github.com/larsborn/nsq2mariadb/issues
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pynsq
Requires-Dist: pymysql
Dynamic: author
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: license-file
Dynamic: project-url
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# nsq2mariadb

Generic transporter for moving JSON messages from [NSQ](https://nsq.io/) topics into
[MariaDB](https://mariadb.org/) tables. You write one Python `Mapper` class per
topic; the framework handles the NSQ subscription, schema bootstrap, and
transactional inserts.

The shape mirrors [`nsq2arangodb`](https://github.com/larsborn/nsq2arangodb) —
but because MariaDB is schema-full, the per-topic schema and JSON-to-row
translation has to live in code rather than configuration.

## Installation

### From PyPI (normal use)

```bash
pip install nsq2mariadb
```

Pin a specific version in production:

```bash
pip install nsq2mariadb==0.1.2
```

Or in a `requirements.txt`:

```
nsq2mariadb==0.1.2
```

Requires Python 3.9+. Dependencies: `pynsq`, `pymysql` (both pulled in
automatically).

### Dev install (working on `nsq2mariadb` itself)

```bash
git clone https://github.com/larsborn/nsq2mariadb.git
cd nsq2mariadb
python3 -m venv .venv
source .venv/bin/activate     # Windows bash: source .venv/Scripts/activate
pip install -U pip
pip install -e .              # install the package in editable mode
pip install build twine       # only needed if you'll cut releases locally
```

Verify the install:

```bash
make test       # runs the unittest suite
make version    # prints the current version from setup.py
```

See [`CLAUDE.md`](CLAUDE.md) for the full release flow and packaging notes.

## Usage

For each NSQ topic you want to consume, subclass `Mapper`:

```python
from nsq2mariadb import Mapper

class OrderMapper(Mapper):
    topic = "orders"
    schema_sql = """
        CREATE TABLE IF NOT EXISTS `order` (
            id          INT PRIMARY KEY,
            customer    VARCHAR(255) NOT NULL,
            inserted_at DATETIME     NOT NULL
        );
        CREATE TABLE IF NOT EXISTS order_item (
            order_id INT          NOT NULL,
            position SMALLINT     NOT NULL,
            sku      VARCHAR(32)  NOT NULL,
            PRIMARY KEY (order_id, position),
            FOREIGN KEY (order_id) REFERENCES `order`(id) ON DELETE CASCADE
        );
    """

    def transform(self, doc):
        yield "order", {
            "id":          doc["id"],
            "customer":    doc["customer"],
            "inserted_at": doc["inserted_at"],
        }
        for i, sku in enumerate(doc.get("items", [])):
            yield "order_item", {"order_id": doc["id"], "position": i, "sku": sku}
```

Then wire it into the runner:

```python
import logging
from nsq2mariadb import MariaDBConfig, Nsq2MariaDB, NsqConfig

logging.basicConfig(level=logging.INFO)

runner = Nsq2MariaDB(
    logger=logging.getLogger("nsq2mariadb"),
    mariadb_config=MariaDBConfig(
        host="mariadb", port=3306,
        user="orders", password="secret", database="orders",
    ),
    nsq_config=NsqConfig(
        address="nsq-nsqd-1", port=4150,
        channel="nsq2mariadb",
    ),
    mappers=[OrderMapper()],
)
runner.run()
```

On startup the framework opens one pymysql connection, executes every mapper's
`schema_sql` with `CREATE TABLE IF NOT EXISTS` semantics, and subscribes an
`nsq.Reader` per mapper. Each NSQ message is decoded, fanned out through the
mapper's `transform()`, and inserted in a single transaction with parameterized
`INSERT IGNORE` statements (so re-published messages are idempotent as long as
your primary key reflects content identity).

## Error handling

| Failure mode                       | Behavior                                                    |
|------------------------------------|-------------------------------------------------------------|
| JSON decode error                  | Logged with traceback, message FIN'd (dropped — won't fix). |
| `pymysql.MySQLError` during insert | Transaction rolled back, message FIN'd, traceback logged.   |
| Connection drop                    | pymysql raises, propagates, process exits — relies on container restart. |

Schema mismatches (unknown column, missing table, FK violation) are programmer
or schema bugs that loop forever if requeued, so we drop them loudly. Tune your
log shipping accordingly.

## Multiple topics per process

Pass several mappers to one `Nsq2MariaDB` instance — pynsq supports multiple
`Reader`s in one IOLoop. They share the database connection but each runs an
independent NSQ subscription. Useful when a single project produces several
related topics (e.g. `entries` + `runs`) that you want to land in the same DB.

## Releasing

Releases are auto-published to [PyPI](https://pypi.org/project/nsq2mariadb/)
by `.github/workflows/publish.yml` on every `v*` tag, via PyPI's Trusted
Publishers (OIDC — no API token stored in the repo).

To cut a release, from a clean `main`:

```bash
make release-patch   # X.Y.Z -> X.Y.Z+1   (bug fixes)
make release-minor   # X.Y.Z -> X.Y+1.0   (new features, backwards compatible)
make release-major   # X.Y.Z -> X+1.0.0   (breaking changes)
```

Each target bumps `setup.py`, commits with `chore: bump <version>`, creates
a `v<version>` tag, and pushes both. The GitHub Actions workflow picks up the
tag and uploads to PyPI.

Use `make dry-release-patch` (etc.) to preview without modifying anything.

See [`CLAUDE.md`](CLAUDE.md) for safety checks, prerequisites, and the
manual-fallback flow.

## License

MIT.
