Metadata-Version: 2.3
Name: dramatiq-eager-broker
Version: 0.3.0
Summary: Eager broker for dramatiq
License: MIT
Requires-Dist: dramatiq>=1.15.0
Requires-Python: >=3.10
Project-URL: repository, https://codeberg.org/yaal/dramatiq-eager-broker
Description-Content-Type: text/markdown

# dramatiq-eager-broker

An eager broker for [Dramatiq](https://dramatiq.io) that executes tasks synchronously and immediately, without queuing. Perfect for testing and development environments.

## Features

- Synchronous task execution
- No message broker required (Redis, RabbitMQ, etc.)
- Pipeline support
- Middleware support
- Drop-in replacement for testing

## Installation

```bash
pip install dramatiq-eager-broker
```

Or with [uv](https://github.com/astral-sh/uv):

```bash
uv add dramatiq-eager-broker
```

## Usage

```python
import dramatiq
from dramatiq_eager_broker import EagerBroker

broker = EagerBroker(middleware=[])
dramatiq.set_broker(broker)

@dramatiq.actor
def send_email(email, message):
    print(f"Sending email to {email}: {message}")

# Tasks are executed immediately and synchronously
send_email.send("user@example.com", "Hello!")
```

## Testing Example

```python
import dramatiq
import pytest
from dramatiq_eager_broker import EagerBroker


@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[])
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)


def test_my_actor(eager_broker):
    results = []

    @dramatiq.actor
    def my_task(value):
        results.append(value)

    my_task.send("test")
    assert results == ["test"]
```

## Pipeline Support

The eager broker supports Dramatiq pipelines:

```python
from dramatiq.middleware import Pipelines

@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[Pipelines()])
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)

@dramatiq.actor
def add(x, y):
    return x + y

@dramatiq.actor
def multiply(result, factor):
    return result * factor

# Create and execute a pipeline
pipeline = add.message(2, 3) | multiply.message(factor=10)
broker.enqueue(pipeline.messages[0])
```


## Results Support

The eager broker supports Dramatiq `get_result()` if the Results middleware is loaded:

```python
from dramatiq.middleware import Pipelines
from dramatiq.results import Results
from dramatiq.results.backends import StubBackend

@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[Pipelines(), Results(backend=StubBackend())])
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)

@dramatiq.actor(store_result=True)
def add(x, y):
    return x + y

@dramatiq.actor
def multiply(result, factor):
    return result * factor


# Create and execute a pipeline
pipeline = add.message(2, 3) | multiply.message(factor=10)
broker.enqueue(pipeline.messages[0])

assert pipeline.get_result() == 60
```


### Results with exceptions

If the task should be executed synchronously but the actor should not raise an exception, use `fail_fast=False`.
This mimics the conventional Dramatiq behavior which stuffs the exception into the message.

```python
from dramatiq.middleware import Pipelines, Retries
from dramatiq.results import Results, ResultFailure
from dramatiq.results.backends import StubBackend

@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[Pipelines(), Retries(), Results(backend=StubBackend())], fail_fast=False)
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)

@dramatiq.actor(store_result=True, throws=ValueError)
def add(x, y):
    if x < y:
        raise ValueError("x < y")

message = add.send(2, 3)
with pytest.raises(ResultFailure):
    message.get_result()
```
