Metadata-Version: 2.4
Name: infinity_bus
Version: 1.3.5
Summary: 基于 asyncio 的轻量级事件总线，支持发布/订阅、正则匹配、背压控制与优雅停机
Author: infinity_system: yin_bailiang
License: MIT
Project-URL: Repository, https://github.com/yinbailiang/event_bus
Keywords: event-bus,async,asyncio,pub-sub,publish-subscribe,event-driven
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Framework :: AsyncIO
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE.md
Requires-Dist: aiosqlite
Requires-Dist: pydantic
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-cov; extra == "test"
Requires-Dist: pytest-asyncio; extra == "test"
Provides-Extra: dev
Requires-Dist: interrogate; extra == "dev"
Requires-Dist: pre-commit; extra == "dev"
Requires-Dist: pytest; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"
Requires-Dist: pytest-asyncio; extra == "dev"
Dynamic: license-file

# InfinityBus — 异步事件总线

[![Test](https://github.com/yinbailiang/event_bus/actions/workflows/test.yml/badge.svg)](https://github.com/yinbailiang/event_bus/actions/workflows/test.yml)
[![License](https://img.shields.io/badge/license-MIT-green)](LICENSE.md)
[![PyPI Version](https://img.shields.io/pypi/v/infinity_bus)](https://pypi.org/project/infinity_bus/)
[![PyPI Downloads](https://img.shields.io/pypi/dm/infinity_bus)](https://pypi.org/project/infinity_bus/)
[![Supported Python](https://img.shields.io/pypi/pyversions/infinity_bus)](https://pypi.org/project/infinity_bus/)

基于 **asyncio** 的轻量级事件总线，实现发布/订阅模式，用于在异步应用中解耦组件间的通信。

## ✨ 特性

- **强类型负载校验** — 基于 Pydantic，发布时自动校验数据类型与结构
- **正则表达式订阅** — 支持灵活的事件名匹配规则
- **背压控制** — 队列大小与并发信号量双重限流，防止过载
- **超时保护** — 每个处理器可独立设置超时，避免单任务阻塞总线
- **错误隔离** — 单个处理器异常不影响其他处理器，错误通过内置事件统一上报
- **优雅停机** — 保证停止过程中已入队事件被完整处理，避免数据丢失
- **可观测性** — 提供活跃任务数、队列长度等监控指标

## 📦 安装

```bash
pip install infinity_bus
```

或从源码安装：

```bash
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"
```

## 🚀 快速开始

### 基础发布/订阅

```python
import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())
```

### 请求-响应模式

```python
import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)
from event_bus.templates.request import (
    request, RequestProtocol, ResponseProtocol,
)

# 1. 定义请求/响应负载
class GetUserRequest(RequestProtocol):
    user_id: int

class GetUserResponse(ResponseProtocol):
    user_name: str
    email: str

# 2. 声明事件
class GetUserRequestEvent(EventDeclaration):
    name = "user.get.request"
    payload_type = GetUserRequest

class GetUserResponseEvent(EventDeclaration):
    name = "user.get.response"
    payload_type = GetUserResponse

# 3. 实现服务端处理器
class GetUserHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["user.get.request"])

    async def handle(self, payload, bus_proxy, raw_event):
        if not isinstance(payload, GetUserRequest):
            return
        resp = GetUserResponse(
            session_id=payload.session_id,
            request_id=payload.request_id,
            success=True,
            user_name="Alice",
            email="alice@example.com",
        )
        await bus_proxy.publish("user.get.response", resp)

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(GetUserRequestEvent)
    reg.register(GetUserResponseEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(GetUserHandler())

    async with EventBus(reg, h_reg) as bus:
        proxy = bus.proxy("cli")
        resp = await request(
            bus_proxy=proxy,
            req_event="user.get.request",
            req_data={"user_id": 123},
            resp_event="user.get.response",
            timeout=10.0,
        )
        resp.raise_if_failed()
        print(f"User: {resp.user_name} ({resp.email})")

asyncio.run(main())
```

## 🧱 架构

| 组件 | 职责 |
| - | - |
| **Event** | 运行时事件实例，含名称、负载、处理链追踪 |
| **EventDeclaration** | 事件类型元数据声明（名称 + 可选 Pydantic 负载模型） |
| **EventRegistry** | 集中管理已注册的事件声明，发布时校验 |
| **EventHandler** | 处理器基类，实现 `handle` 方法定义业务逻辑 |
| **EventHandlerRegistry** | 管理处理器实例，按事件名匹配处理器列表 |
| **EventBus** | 事件分发中枢：任务队列、并发控制、错误上报、生命周期 |
| **Middleware** | 中间件基类，洋葱管道：`before_publish` / `on_publish` 双钩子 |
| **MiddlewareChain** | 责任链管理器，按序包裹发布流程 |
| **templates** | 高级模板：`expect` 监听、`request` RPC、`pipe` 管道、`register` 批量注册 |
| **middlewares** | 内置中间件：日志(JSONL+SQLite)、限流、转换、屏蔽、递归防护 |

## 📚 文档

| 文档 | 内容 |
| - | - |
| [核心总览](docs/event_bus.md) | Event / EventDeclaration / EventHandler / EventBus / Middleware 核心概念 |
| [中间件系统](docs/middleware.md) | `Middleware` 基类、`MiddlewareChain` 洋葱管道 |
| [高级模板](docs/templates/templates.md) | `expect`、`request`、`pipe`、`register` 四大模板总览 |
| [内置中间件](docs/templates/middlewares/middlewares.md) | 日志、限流、转换、屏蔽、递归防护 |

## 🧪 测试

```bash
# 运行全部测试
pytest --cov=src -v

# 仅运行核心测试
pytest tests/event_bus_test.py -v

# 仅运行模板测试
pytest tests/templates/ -v
```

## 📄 许可证

[MIT](LICENSE.md)

## 此项目属于 InfinitySystem

![icon](docs/res/infinity_icon/256x256.png)
