Metadata-Version: 2.1
Name: outbox-streaming
Version: 0.1.0
Summary: 
Author: Yevhenii Hyzyla
Author-email: hyzyla@gmail.com
Requires-Python: >=3.10,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Provides-Extra: test
Requires-Dist: SQLAlchemy[asyncio] (>=1.4,<2.0); extra == "test"
Requires-Dist: aiokafka (>=0.7.2,<0.8.0); extra == "test"
Requires-Dist: celery (>=5.2.7,<6.0.0); extra == "test"
Requires-Dist: kafka-python (>=2.0.2,<3.0.0); extra == "test"
Requires-Dist: psycopg2-binary (>=2.9.3,<3.0.0); extra == "test"
Description-Content-Type: text/markdown

# outbox-streaming

Reliably send messages to message/task brokers, like Kafka or Celery

## Roadmap
### Done
 - ✅ Kafka + SQLAlchemy
 - ✅ Kafka + SQLAlchemy + asyncio
### In progress
 - ⏹ Celery + SQLAlchemy
 - ⏹ Celery + SQLAlchemy + asyncio
### Planned
 - 🆕 Kafka + Django ORM
 - 🆕 Celery + Django ORM
 - 🆕 Dramatiq + SQLAlchemy
 - 🆕 Dramatiq + SQLAlchemy + asyncio
 - 🆕 Dramatiq + Django
 - 🆕 RabbitMQ + SQLAlchemy
 - 🆕 RabbitMQ + SQLAlchemy + asyncio
 - 🆕 RabbitMQ + Djagno


# Example FastAPI + Kafka + SQLAlchemy
```python
from fastapi import FastAPI
from outbox_streaming.kafka.sqlalchemy import SQLAlchemyKafkaOutbox
from sqlalchemy import orm

from app import models, db
from app.config import config
from app.schemas import TodoCreate

app = FastAPI()

# create instance of SQLAlchemyKafkaOutbox
outbox = SQLAlchemyKafkaOutbox(
    engine=db.engine,
    kafka_servers=config.KAFKA_SERVERS,
)

# Run separate tread that monitor outbox table and publish messages to Kafka. It's not recommended for production,
# but it's handy for development
outbox.publisher.run_daemon()

Session = orm.sessionmaker(bind=db.engine)

# create outbox tables 
outbox.storage.create_tables(engine=db.engine)


@app.post('/todos')
def create_todo(create: TodoCreate) -> str:

    with Session() as session:

        # create new object
        todo = models.Todo(text=create.text)

        session.add(todo)

        # create kafka event in outbox table
        outbox.save(
            session=session,
            topic='todo_created',
            value=todo.to_dict(),
        )

        # commit changes in database
        session.commit()
        
    # publisher will pick up kafka message from outbox table and will send it kafka topic

    return "OK"

```

