Metadata-Version: 2.3
Name: package-events-bus
Version: 0.1.25
Summary: Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.
License: MIT
Author: Jose Luis Rosales Meza
Author-email: jose.rosales@finkargo.com
Requires-Python: >=3.10,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: boto3 (>=1.38.13,<2.0.0)
Requires-Dist: mangum (>=0.19.0,<0.20.0)
Requires-Dist: python-dotenv (>=1.1.0,<2.0.0)
Requires-Dist: redis (>=5.0,<6.0)
Description-Content-Type: text/markdown

# 📦 Package Events Bus

Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.

---

## � Tabla de Contenidos

- [📦 Package Events Bus](#-package-events-bus)
  - [� Tabla de Contenidos](#-tabla-de-contenidos)
  - [📝 Descripción](#-descripción)
  - [⚙️ Instalación](#️-instalación)
  - [🗂️ Estructura del Proyecto](#️-estructura-del-proyecto)
  - [💡 ¿Cómo usarlo?](#-cómo-usarlo)
    - [📤 Publicar eventos](#-publicar-eventos)
    - [📥 Consumir eventos](#-consumir-eventos)
      - [Opción 1: Consumir eventos con múltiples colas por caso de uso](#opción-1-consumir-eventos-con-múltiples-colas-por-caso-de-uso)
      - [Opción 2: Consumir eventos usando una sola cola por microservicio](#opción-2-consumir-eventos-usando-una-sola-cola-por-microservicio)
      - [Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)](#opción-3-usar-mangum-para-aws-lambda-sqseventbridge)
    - [⚡ ¿Cómo implementar el dispatcher en FastAPI?](#-cómo-implementar-el-dispatcher-en-fastapi)
      - [Opción 1: Cola de SQS por caso de uso](#opción-1-cola-de-sqs-por-caso-de-uso)
      - [Opción 2: Una cola SQS por microservicio](#opción-2-una-cola-sqs-por-microservicio)
    - [🛡️ Opcional: Configuración de Redis para deduplicación](#️-opcional-configuración-de-redis-para-deduplicación)
  - [⚙️ Variables por Defecto](#️-variables-por-defecto)
  - [🏷️ Definición de Topics](#️-definición-de-topics)
  - [🛠️ TODO](#️-todo)
  - [📝 Buenas Prácticas](#-buenas-prácticas)
  - [📄 Licencia](#-licencia)

---

## 📝 Descripción

Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.

---

## ⚙️ Instalación

Instala la librería usando **pip**:

```bash
pip install package-events-bus
```

O usando **poetry**:

```bash
poetry add package-events-bus
```

---

## 🗂️ Estructura del Proyecto

```
package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│   ├── event_bridge_publisher.py
│   └── sqs_dispatcher.py
├── core/
│   ├── __init__.py
│   ├── contracts/
│   │   ├── base_event.py
│   │   ├── deduplication.py
│   │   └── handler.py
│   ├── infrastructure/
│   │   ├── event_bus_publisher.py
│   │   ├── event_serializer.py
│   │   └── redis_deduplication.py
│   └── runtime/
│       └── event_register.py
```

- **/aws:** Implementaciones específicas para servicios de AWS.
  - **event_bridge_publisher.py:** Publica eventos a AWS EventBridge.
  - **sqs_dispatcher.py:** Despacha mensajes de colas SQS y procesa eventos.

---

## 💡 ¿Cómo usarlo?

### 📤 Publicar eventos

1. Crea una instancia de `EventBridgePublisher` y llama a `publish` con el evento deseado.

```python
from events_bus.aws.event_bridge_publisher import EventBridgePublisher

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)
```

2. Define tu evento heredando de `BaseEvent`:
   Reemplaza `DisbursementCreatedEvent` con el nombre de tu evento, recuerda sobreescribir el método `to_dict` para serializar los atributos del evento que deseas enviar.

```python
from events_bus.core.contracts.base_event import BaseEvent

@dataclass
class DisbursementCreatedEvent(BaseEvent):
    event_name = 'finkargo.portfolio.1.event.disbursement.created'

    def __init__(self, disbursement):
        super().__init__(event_name=self.event_name)
        self.id_disbursement = disbursement.id_disbursement

    def to_dict(self) -> dict:
        return {
            'id_disbursement': self.id_disbursement
        }

    @classmethod
    def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
        return cls(disbursement_id=attributes.get('id_disbursement'))

event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))
```

---

### 📥 Consumir eventos

#### Opción 1: Consumir eventos con múltiples colas por caso de uso

1. Hereda de `AsyncHandler` o `SyncHandler` y sobrescribe el método `handle`.
    El método `handle` recibe el evento como parámetro, recuerda que el evento debe heredar de `BaseEvent`,
    y el método `from_dict` debe estar implementado para poder deserializar el evento.

    Usa `AsyncHandler` para manejar eventos de forma asíncrona y `SyncHandler` para eventos sincrónicos.

```python
from events_bus.core import AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, data: DisbursementCreatedEvent):
        print(f"Handling event: {data.event_name}")
        print(f"Finished handling event: {data.event_name}")
```

2. Registra el handler en el bus de eventos:
    Usa `EventHandlerRegister` para registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS.
    Para mas información sobre la configuración de AWS, consulta la [documentación oficial](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html).

```python
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher

dispatcher = SQSDispatcher()
EventHandlerRegister.register_by_queue(
    queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    handler=CustomAsyncHandler()
)
```

---

#### Opción 2: Consumir eventos usando una sola cola por microservicio

Si quieres consumir solo una cola SQS por microservicio (y despachar a los handlers según el tipo de evento), registra tus handlers por nombre de evento y usa `start_from_one_queue`:

```python
from events_bus.aws.sqs_dispatcher import SQSDispatcher
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, data):
        print(f"Handling event: {data.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

dispatcher = SQSDispatcher()

import asyncio

async def main():
    await dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')

asyncio.run(main())
```

- Así solo consumes una cola por microservicio, y despachas a los handlers según el tipo de evento recibido.

---

#### Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)

Si tu microservicio se ejecuta como Lambda y recibe eventos desde SQS o EventBridge, puedes usar la integración con Mangum:

```python
from events_bus.aws.mangum import MangumExtended
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, data):
        print(f"Handling event: {data.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

handler = MangumExtended()
```

- Registra tus handlers usando `EventHandlerRegister.register_handler`.
- Mangum detectará automáticamente si el evento proviene de SQS o EventBridge y despachará al handler correcto.
- AWS Lambda llamará a `handler(event, context)` y la librería despachará el evento al handler correspondiente.

---

### ⚡ ¿Cómo implementar el dispatcher en FastAPI?

#### Opción 1: Cola de SQS por caso de uso

Puedes consumir eventos en FastAPI usando múltiples colas SQS por caso de uso. Registra tus handlers y usa `start` para iniciar el dispatcher.

```python
@asynccontextmanager
async def lifespan(app: FastAPI):
    await dispatcher.start()
    yield
    dispatcher.stop()

app = FastAPI(lifespan=lifespan)

```

#### Opción 2: Una cola SQS por microservicio
Puedes consumir eventos usando una sola cola SQS por microservicio en FastAPI utilizando el método `start_from_one_queue` de `SQSDispatcher`. Así, tu microservicio solo escucha una cola y despacha a los handlers registrados según el tipo de evento recibido.

```python
import asyncio

dispatcher = SQSDispatcher()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Consumir solo una cola por microservicio
    task = asyncio.create_task(
        dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
    )
    yield
    dispatcher.stop()
    await task

app = FastAPI(lifespan=lifespan)
```

---

### 🛡️ Opcional: Configuración de Redis para deduplicación

```python
from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher

deduplication = RedisDeduplication(
    url='redis://localhost:6379/0',
    ttl=3600,
)

dispatcher = SQSDispatcher(deduplication=deduplication)
```

> Si no deseas usar Redis, puedes heredar de `BaseDeduplication` e implementar tu propia lógica.

---

## ⚙️ Variables por Defecto

```python
from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')
```

Puedes modificar las variables por defecto de la librería usando un archivo `.env` o configurando las variables de entorno directamente. Las variables configurables son:

| Variable                    | Tipo | Por defecto |Descripción                                                                 |
|-------------------------------|---------|----------------|-------------------------------------------------------------------------------|
| MAX_NUMBER_OF_MESSAGES         | int     | 5              | Número máximo de mensajes a recibir por llamada.                              |
| WAIT_TIME_SECONDS              | int     | 10             | Tiempo de espera para recibir mensajes.                                       |
| VISIBILITY_TIMEOUT             | int     | 30             | Tiempo de visibilidad del mensaje en la cola.                                 |
| AWS_CLIENT_URL                 | str     | None           | URL del cliente de AWS (útil para LocalStack u otros entornos locales).       |
| AWS_REGION_NAME                | str     | us-east-1      | Región de AWS.                                                                |
| SLEEP_BETWEEN_MESSAGES_SECONDS | float   | 0.1            | Tiempo de espera entre mensajes.                                              |
| ERROR_SLEEP_SECONDS            | int     | 5              | Tiempo de espera en caso de error.                                            |

> ℹ️ **Tip:** Usa `.env` para mantener tu configuración fuera del código fuente.

---
## 🏷️ Definición de Topics

Los topics deben seguir la convención:

```
COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
```

- **COMPANY:** Nombre de la empresa (`Finkargo` para internos).
- **SERVICE:** Servicio generador del evento.
- **VERSION:** Versión del topic.
- **MESSAGE_TYPE:** `command` o `event`.
- **RESOURCE_NAME:** Entidad relacionada.
- **EVENT_COMMAND_NAME:** Verbo en pasado para eventos (`created`), infinitivo para comandos (`create`).

**Regex de validación:**

```
TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"
```

Basado en: [Topics Definition | AsyncApi](https://github.com/fmvilas/topic-definition)

---

## 🛠️ TODO

- [ ]  Mejorar el control de excepciones en la publicación de eventos.
- [ ]  Agregar más ejemplos de integración con otros frameworks.
- [ ]  Documentar casos de uso avanzados.
- [ ]  Implementar métricas y logging para los eventos publicados y consumidos.
- [ ]  Añadir pruebas unitarias y de integración.
- [ ]  Mejorar la documentación en inglés y español.

---

## 📝 Buenas Prácticas

- Usa nombres de eventos claros y consistentes.
- Implementa deduplicación para evitar procesar eventos repetidos.
- Mantén tus handlers simples y enfocados en una sola responsabilidad.
- Documenta tus eventos y handlers.

---
## 📄 Licencia

Este proyecto está bajo la licencia MIT.


