Metadata-Version: 2.4
Name: dishka-faststream
Version: 0.0.2
Summary: Integration package for Dishka DI and FastStream framework
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: dishka
Requires-Dist: faststream

# FastStream

Though it is not required, you can use *dishka-faststream* integration. It features:

* automatic *REQUEST* scope management using middleware
* passing `StreamMessage` and `ContextRepo` object as a context data to providers
* automatic injection of dependencies into message handler.

You can use auto-injection for `FastStream` 0.5.0 and higher. For older version you need to specify `@inject` manually.

> **Note**
> 
> If you are using **FastAPI plugin** of **FastStream** you need to use both dishka integrations, but you can share the same container.
> 
> * Call `dishka.integrations.faststream.setup_dishka` on faststream broker or router.
> * Call `dishka.integrations.fastapi.setup_dishka` on fastapi app.

## How to use

1. Import

```python
from dishka.integrations.faststream import (
    FromDishka,
    inject,
    setup_dishka,
    FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope
```

2. Create provider. You can use `faststream.types.StreamMessage` and `faststream.ContextRepo` as a factory parameter to access on *REQUEST*-scope

```python
class YourProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def create_x(self, event: StreamMessage) -> X:
         ...
```

3. Mark those of your handlers parameters which are to be injected with `FromDishka[]`

```python
@broker.subscriber("test")
async def start(
    gateway: FromDishka[Gateway],
):
    ...
```

3a. *(optional)* decorate them using `@inject` if you are not using auto-injection

```python
@broker.subscriber("test")
@inject
async def start(
    gateway: FromDishka[Gateway],
):
    ...
```

4. *(optional)* Use `FastStreamProvider()` when creating container if you are going to use  `faststream.types.StreamMessage` or `faststream.ContextRepo`  in providers

```python
container = make_async_container(YourProvider(), FastStreamProvider())
```

5. Setup `dishka` integration.  `auto_inject=True` is required unless you explicitly use `@inject` decorator

```python
setup_dishka(container=container, app=app, auto_inject=True)
```

Or pass your own inject decorator

```python
setup_dishka(container=container, broker=broker, auto_inject=my_inject)
```

## FastStream - Litestar/FastAPI - dishka integration

1. Running RabbitMQ

```shell
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:management
```

2. Example of usage FastStream + Litestar

```python
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
from dishka.integrations import faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> Litestar:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    http = Litestar(
        route_handlers=[http_handler],
        on_startup=[broker.start],
        on_shutdown=[broker.stop],
    )
    litestar_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)
```

### Example of usage FastStream + FastAPI

```python
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
from dishka.integrations import faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka.integrations.faststream import inject as faststream_inject


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


router = APIRouter(route_class=DishkaRoute)


@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> FastAPI:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    @asynccontextmanager
    async def lifespan(app: FastAPI) -> AsyncIterator[None]:
        async with broker:
            await broker.start()
            yield

    http = FastAPI(lifespan=lifespan)
    http.include_router(router)
    fastapi_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)
```

## Testing FastStream with dishka

Simple example:

```python
from collections.abc import AsyncIterator

import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
from dishka.integrations import faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter

router = RabbitRouter()


@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
    print(f"{msg=}")
    return some_dependency


@pytest.fixture
async def broker() -> RabbitBroker:
    broker = RabbitBroker()
    broker.include_router(router)
    return broker


@pytest.fixture
def mock_provider() -> Provider:
    class MockProvider(Provider):
        @provide(scope=Scope.REQUEST)
        async def get_some_dependency(self) -> int:
            return 42

    return MockProvider()


@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
    return make_async_container(mock_provider)


@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
    app = FastStream(broker)
    faststream_integration.setup_dishka(container, app, auto_inject=True)
    return FastStream(broker)


@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
    async with TestRabbitBroker(app.broker) as br, TestApp(app):
        yield br


@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
    result = await client.request("hello", "test-queue")
    assert await result.decode() == 42
```
