Metadata-Version: 2.3
Name: eventable
Version: 0.3.0
Summary: Domain Event Message Infrastructure
Author: Ioannis-Andreas Philippas
Author-email: Ioannis-Andreas Philippas <ioannis.philippas@entaxilabs.gr>
Requires-Dist: fastapi>=0.135.3
Requires-Dist: pika>=1.3.2
Requires-Dist: pydantic>=2.12.5
Requires-Dist: ruff>=0.15.9
Requires-Dist: starlette>=1.0.0
Requires-Python: >=3.12
Description-Content-Type: text/markdown

# eventable

Domain event infrastructure for Python microservices.

Provides the building blocks to implement **domain events** following Domain-Driven Design: aggregate roots that collect events, an in-process dispatcher, and a RabbitMQ transport — with an optional plug-and-play FastAPI integration.

---

## Features

- Frozen, serializable `DomainEvent` dataclasses
- `AggregateRoot` (Pydantic) that accumulates and releases domain events
- `EventDispatcher` — runs in-process handlers first, then publishes to the broker
- RabbitMQ publisher & subscriber backed by a durable topic exchange
- **FastAPI plugin**: one-line lifespan setup + pure-ASGI middleware that dispatches events after every successful request without swallowing exceptions

---

## Installation

```bash
pip install eventable
# or with uv
uv add eventable
```

**Requirements:** Python 3.12+, a running RabbitMQ broker (for the infrastructure layer).

---

## Core concepts

```
Request
  └─ AggregateRoot.add_domain_event(event)   # automatically pushed to context collector
       └─ EventCollector (ContextVar)         # active for the lifetime of the request
            └─ dispatch loop (middleware)     # runs until no new events remain
                 └─ EventDispatcher.dispatch(event)
                      ├─ in-process handlers  # same bounded context (may fire more events)
                      └─ RabbitMQPublisher    # cross-context via broker
```

The collector is a **context variable** — it is implicitly active for any code that runs inside the request, including in-process handlers. When a handler modifies an aggregate that fires its own events, those events are picked up in the next pass of the dispatch loop automatically.

---

## Quick start

### 1. Define a domain event

```python
from dataclasses import dataclass
from uuid import UUID
from eventable.domain_event import DomainEvent

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: UUID
    total: str
```

### 2. Define an aggregate root

```python
from uuid import uuid4
from datetime import datetime
from eventable.aggregate_root import AggregateRoot
from eventable.domain_event import generate_event_id

class Order(AggregateRoot):
    order_id: UUID
    total: str

    def place(self) -> None:
        self.add_domain_event(
            OrderPlaced(
                event_id=generate_event_id(),
                occurred_at=datetime.now(),
                order_id=self.order_id,
                total=self.total,
            )
        )
```

### 3. Register in-process handlers

Edit `event_handlers.py` (generated alongside your app):

```python
from eventable.event_managment.event_dispatcher import EventDispatcher
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    print(f"Order {event.order_id} placed — sending confirmation email")

def register_handlers(dispatcher: EventDispatcher) -> None:
    dispatcher.register(OrderPlaced, handle_order_placed)
```

---

## FastAPI integration

### Lifespan setup

`Eventable` connects to RabbitMQ on startup, registers your in-process handlers, and disconnects cleanly on shutdown. Wire it into FastAPI's async lifespan:

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventable.infrastructure.fastapi.eventable import Eventable, EventableSettings
from eventable.infrastructure.fastapi.middleware import DomainEventMiddleware

settings = EventableSettings(
    rabbit_url="amqp://guest:guest@localhost/",
    exchange_name="domain_events",   # default
)
eventable = Eventable(settings)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await eventable.startup(app)   # connects publisher, wires dispatcher
    yield
    await eventable.shutdown()     # closes RabbitMQ connection

app = FastAPI(lifespan=lifespan)
app.add_middleware(DomainEventMiddleware)
```

After startup, `app.state.publisher` and `app.state.dispatcher` are available throughout the application.

### Middleware

`DomainEventMiddleware` is a **pure ASGI middleware** (not `BaseHTTPMiddleware`) so exceptions from your route handlers propagate cleanly to FastAPI's exception handlers — nothing is swallowed.

Per-request lifecycle:

1. Creates a fresh `EventCollector` and sets it as the active context collector.
2. Runs the route handler normally — any `add_domain_event` call anywhere in the call stack automatically registers the event.
3. After the response is sent, if the status code is **< 400**, dispatches all collected events. If a handler fires new events (by modifying aggregates), those are dispatched in the next pass of the loop.
4. On 4xx/5xx the collector is discarded — no events are dispatched for failed requests.

### Collecting events in a route

No collector wiring needed in your routes. Just call your domain methods — events are captured implicitly:

```python
from fastapi import Request
from uuid import uuid4

@app.post("/orders")
def place_order(request: Request):
    order = Order(order_id=uuid4(), total="99.99")
    order.place()   # add_domain_event() inside here is captured automatically

    return {"order_id": str(order.order_id)}
```

Handlers registered via `register_handlers` can also fire new events the same way — just call `add_domain_event` on any aggregate and the middleware loop will pick them up:

```python
def handle_order_placed(event: OrderPlaced) -> None:
    invoice = Invoice.create(order_id=event.order_id)
    invoice.generate()   # fires InvoiceGenerated — dispatched in the next loop pass
```

---

## RabbitMQ subscriber (cross-context consumer)

Run a long-lived consumer process in a separate service to handle events published by another bounded context:

```python
from eventable import rabbit_subscription
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    # react to the event in this bounded context
    ...

rabbit_subscription(
    consumer_name="inventory-service",
    rabbit_url="amqp://guest:guest@localhost/",
    queue_name="inventory.order_placed",
    handlers_map={
        OrderPlaced: (handle_order_placed, {}),
    },
)
```

`rabbit_subscription` blocks, handles `SIGTERM`/`SIGINT` for graceful shutdown, and nacks messages that fail processing (sending them to the dead-letter queue if configured).

---

## Event serialization

Events are serialized to JSON automatically. The following field types are supported out of the box:

| Python type | JSON representation |
|---|---|
| `UUID` | `string` |
| `datetime` | ISO 8601 string |
| `Decimal` | `string` |
| `ValueObject` | `get_value()` result |
| `dict`, `list` | passed through recursively |

### ValueObject

Implement the `ValueObject` protocol to have your value objects serialize transparently:

```python
from eventable.value_object import ValueObject

class Money:
    def __init__(self, amount: str):
        self.value = amount

    def get_value(self) -> str:
        return self.value
```

---

## Architecture overview

```
eventable/
├── domain_event.py          # DomainEvent base dataclass + serialize/deserialize
├── aggregate_root.py        # AggregateRoot (Pydantic BaseModel)
├── value_object.py          # ValueObject protocol
├── event_handlers.py        # register_handlers() — your in-process handler hook
│
├── event_managment/
│   ├── event_collector.py   # ContextVar-based collector, auto-captures events from aggregates
│   ├── event_dispatcher.py  # Runs handlers then publishes to broker
│   ├── event_publisher.py   # Abstract EventPublisher
│   └── event_subscriber.py  # Abstract EventSubscriber
│
└── infrastructure/
    ├── rabbitmq/
    │   ├── event_publisher.py   # RabbitMQPublisher (thread-local channels)
    │   └── event_subscriber.py  # RabbitMQSubscriber (blocking consumer)
    └── fastapi/
        ├── eventable.py         # Eventable + EventableSettings (lifespan plugin)
        └── middleware.py        # DomainEventMiddleware (pure ASGI)
```

**RabbitMQ topology:** a single durable **topic exchange** (`domain_events` by default). Each event type is routed by its class name as the routing key. Consumers bind a durable queue to the exchange for the routing keys they care about.

---

## Configuration reference

### `EventableSettings`

| Field | Type | Default | Description |
|---|---|---|---|
| `rabbit_url` | `str` | — | AMQP connection URL |
| `exchange_name` | `str` | `"domain_events"` | Topic exchange name |

---

## Running the tests

```bash
uv run pytest tests/unit/          # pure unit tests, no broker needed
uv run pytest tests/infrastructure/ -k "not rabbitmq"  # FastAPI plugin tests only

# Full suite including RabbitMQ integration tests:
RABBITMQ_URL=amqp://admin:admin@localhost:5672/ uv run pytest
```

---

## License

MIT
