Metadata-Version: 2.4
Name: pulse-mq
Version: 0.3.0
Summary: 基于 ZeroMQ 的消息队列系统，专为金融市场数据分发设计
Author: haifeng
License-Expression: MIT
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Operating System :: OS Independent
Requires-Python: >=3.13
Description-Content-Type: text/markdown
Requires-Dist: pyzmq>=26.0
Requires-Dist: aiosqlite>=0.20
Requires-Dist: bcrypt>=4.2
Requires-Dist: msgpack>=1.1
Requires-Dist: pyarrow>=18.0
Requires-Dist: pandas>=2.2
Requires-Dist: pyyaml>=6.0
Requires-Dist: zstandard>=0.23
Requires-Dist: lz4>=4.3
Requires-Dist: aiohttp>=3.9
Requires-Dist: PyJWT>=2.9
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.24; extra == "dev"
Requires-Dist: pytest-aiohttp>=1.1; extra == "dev"

# PulseMQ

基于 ZeroMQ 的高性能消息队列系统，专为金融市场数据分发设计。

## 特性

- **高性能**：基于 ZMQ ROUTER/XPUB 架构，P50 延迟 < 1ms
- **多格式**：支持 BYTES、STRING、DataFrame（MessagePack/Apache Arrow）
- **权限控制**：namespace 级角色鉴权（sub/pub/admin），全量内存缓存
- **管理后台**：JWT 认证，实时监控仪表盘，完整的 CRUD API
- **客户端 SDK**：Python 异步客户端，自动重连、心跳、订阅恢复
- **零配置**：无需配置文件即可启动，YAML/CLI 可选覆盖

## 快速开始

### 安装

```bash
pip install pulse-mq
```

<details>
<summary>从源码安装（开发模式）</summary>

```bash
# 克隆仓库
git clone https://github.com/yourname/pulse-mq.git
cd pulse-mq

# 创建虚拟环境（Python >= 3.13）
uv venv

# 激活虚拟环境
source .venv/Scripts/activate  # Windows
source .venv/bin/activate      # Linux/Mac

# 安装依赖
uv pip install -e ".[dev]"
```
</details>

### 启动 Broker

```bash
# 零配置启动（默认端口：PUB=5555, SUB=5556, Admin=8080）
pulsemq
```

启动后自动创建管理员账号，凭证保存在 `.pulsemq_admin` 文件中：

```
==================================================
  PulseMQ Broker v4.3.0
  PUB(XSUB) : tcp://0.0.0.0:5555
  SUB(XPUB) : tcp://0.0.0.0:5556
  Admin     : http://0.0.0.0:8080
  Health    : http://0.0.0.0:8080/health
==================================================
```

更多启动方式：

```bash
# 指定配置文件
pulsemq --config config.yaml

# CLI 覆盖个别参数（优先级：CLI > YAML > 默认值）
pulsemq --host 127.0.0.1 --pub-port 6000 --sub-port 6001 --admin-port 9090 --log-level debug

# 指定数据库路径
pulsemq --db-path /data/pulsemq.db
```

通过 Python 脚本启动：

```python
"""run_server.py — 用 Python 脚本启动 Broker"""
from pulse_mq import PulseMQServer

# 方式一：零配置
server = PulseMQServer()

# 方式二：关键字参数覆盖
# server = PulseMQServer(
#     pub_port=6000,
#     sub_port=6001,
#     admin_port=9090,
#     log_level="debug",
# )

# 方式三：使用完整 Config 对象
# from pulse_mq.config import Config, ServerConfig, StorageConfig, LoggingConfig
# config = Config(server=ServerConfig(pub_port=6000, sub_port=6001))
# server = PulseMQServer(config)

server.run()
```

### 访问管理后台

打开浏览器访问 `http://localhost:8080`，使用 `.pulsemq_admin` 中的账号密码登录。

## 使用示例

### 1. 完整流程：创建用户 → 授权 → 发布/订阅

以下示例展示从零开始的完整使用流程。

#### 1.1 启动 Broker

```bash
pulsemq
```

#### 1.2 通过管理 API 准备用户和权限

```python
"""prepare.py — 创建 namespace、用户并授权"""
import requests

BASE = "http://localhost:8080"

# 1. 登录获取 JWT（凭证来自 .pulsemq_admin 文件）
with open(".pulsemq_admin") as f:
    admin_creds = eval(f.read())  # 实际使用时按文件格式解析

resp = requests.post(f"{BASE}/login", json={
    "username": admin_creds["username"],
    "password": admin_creds["password"],
})
jwt_token = resp.json()["token"]
headers = {"Authorization": f"Bearer {jwt_token}"}

# 2. 创建 namespace
resp = requests.post(f"{BASE}/api/namespaces", headers=headers, json={
    "name": "market_data",
    "description": "股票行情数据",
})
print(f"namespace: {resp.json()}")

# 3. 创建发布者用户（mq_normal 类型，会自动生成 token）
resp = requests.post(f"{BASE}/api/users", headers=headers, json={
    "username": "publisher_01",
    "user_type": "mq_normal",
})
publisher = resp.json()
print(f"publisher token: {publisher['token']}")

# 4. 创建订阅者用户
resp = requests.post(f"{BASE}/api/users", headers=headers, json={
    "username": "subscriber_01",
    "user_type": "mq_normal",
})
subscriber = resp.json()
print(f"subscriber token: {subscriber['token']}")

# 5. 获取 namespace ID
resp = requests.get(f"{BASE}/api/namespaces", headers=headers)
ns_list = resp.json()
ns_id = next(ns["id"] for ns in ns_list if ns["name"] == "market_data")

# 6. 授权发布者在 market_data namespace 发布（pub 角色）
resp = requests.post(
    f"{BASE}/api/namespaces/{ns_id}/permissions",
    headers=headers,
    json={"user_id": publisher["id"], "role": "pub"},
)
print(f"授权 publisher: {resp.json()}")

# 7. 授权订阅者在 market_data namespace 订阅（sub 角色）
resp = requests.post(
    f"{BASE}/api/namespaces/{ns_id}/permissions",
    headers=headers,
    json={"user_id": subscriber["id"], "role": "sub"},
)
print(f"授权 subscriber: {resp.json()}")
```

#### 1.3 发布者：发送行情数据

```python
"""publisher.py — 发布股票行情数据"""
import asyncio
import json
import time
from pulse_mq import PulseMQClient


async def main():
    # 连接 Broker（使用管理后台生成的 token）
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="publisher_01",
        token="上面生成的 token",
    )
    await client.async_connect()
    print("发布者已连接")

    try:
        # 模拟发送 K 线数据（bytes 格式）
        for i in range(100):
            kline = json.dumps({
                "symbol": "sz000651",
                "type": "kline_1m",
                "open": 10.5 + i * 0.01,
                "high": 10.6 + i * 0.01,
                "low": 10.4 + i * 0.01,
                "close": 10.55 + i * 0.01,
                "volume": 10000 + i * 100,
                "timestamp": int(time.time() * 1000),
            }).encode("utf-8")

            await client.async_publish(
                namespace="market_data",
                topic="sz000651.kline.1m",
                data=kline,
                format="bytes",
            )
            print(f"已发送第 {i+1} 条 K 线数据")
            await asyncio.sleep(1)

        print("发布完成")
    finally:
        await client.async_close()


asyncio.run(main())
```

#### 1.4 订阅者：接收行情数据

```python
"""subscriber.py — 订阅股票行情数据"""
import asyncio
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="subscriber_01",
        token="上面生成的 token",
    )
    await client.async_connect()
    print("订阅者已连接")

    try:
        # 订阅 sz000651 所有 K 线周期（> 通配符匹配多级 topic）
        async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.kline.>"):
            print(
                f"[{msg.namespace}] {msg.topic}: "
                f"{len(msg.data)} bytes"
            )
    finally:
        await client.async_close()


asyncio.run(main())
```

> **注意**：发布者和订阅者都使用 pub_port（5555），SDK 内部自动连接 sub_port（5556）接收消息。

### 2. 多格式发布

PulseMQ 支持 4 种数据格式：

```python
import asyncio
import json
import pandas as pd
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="demo",
        token="your-token",
    )
    await client.async_connect()

    try:
        # ── BYTES：原始字节流（适用于加密数据、二进制协议）──
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.raw",
            data=b"\x01\x02\x03\x04",
            format="bytes",
        )

        # ── STRING：UTF-8 文本 ──
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.signal",
            data=json.dumps({"action": "BUY", "price": 10.5}),
            format="string",
        )

        # ── DF_MSGPACK：小批量 DataFrame（< 1MB）──
        df_small = pd.DataFrame({
            "symbol": ["sz000651", "sz000651", "sz000651"],
            "price": [10.5, 10.6, 10.55],
            "volume": [100, 200, 150],
        })
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.snapshot",
            data=df_small,
            format="df_msgpack",
        )

        # ── DF_PYARROW：大批量 DataFrame（< 64MB，性能更好）──
        df_large = pd.DataFrame({
            "symbol": ["sz000651"] * 100000,
            "timestamp": pd.date_range("2025-01-01", periods=100000, freq="ms"),
            "price": (10.5 + __import__("random").random() * 0.1 for _ in range(100000)),
            "volume": (__import__("random").randint(100, 500) for _ in range(100000)),
        })
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.tick_batch",
            data=df_large,
            format="df_pyarrow",
        )

        print("所有格式发布完成")
    finally:
        await client.async_close()


asyncio.run(main())
```

### 3. 通配符订阅

Topic 支持两种通配符：

| 通配符 | 含义 | 示例 |
|--------|------|------|
| `*` | 匹配单级 | `sz000651.kline.*` 匹配 `sz000651.kline.1m`，不匹配 `sz000651.kline.1m.raw` |
| `>` | 匹配多级（仅限末尾） | `sz000651.>` 匹配 `sz000651.kline.1m`、`sz000651.trade` 等 |

```python
import asyncio
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="trader",
        token="your-token",
    )
    await client.async_connect()

    try:
        # 订阅单只股票的所有行情
        async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.>"):
            print(f"[全行情] {msg.topic}")

        # 订阅所有股票的 1 分钟 K 线
        async for msg in client.async_subscribe(namespace="market_data", topic="*.kline.1m"):
            print(f"[1m K线] {msg.topic}")
    finally:
        await client.async_close()


asyncio.run(main())
```

### 4. 查询信息

```python
import asyncio
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="query_user",
        token="your-token",
    )
    await client.async_connect()

    try:
        # 查询可访问的 namespace 列表
        namespaces = await client.async_query_namespaces()
        print("Namespaces:", namespaces)
        # [{'name': 'market_data', 'topics': ['sz000651.kline.1m', ...], 'topic_count': 5}]

        # 查询指定 namespace 下的 topic 列表
        topics = await client.async_query_topics("market_data")
        print("Topics:", topics)
        # ['sz000651.kline.1m', 'sz000651.kline.5m', 'sz000651.trade']
    finally:
        await client.async_close()


asyncio.run(main())
```

### 5. 多订阅者并发

```python
"""multi_subscriber.py — 多个订阅者并发接收"""
import asyncio
from pulse_mq import PulseMQClient


async def subscriber(name: str, token: str, namespace: str, topic: str):
    """单个订阅者协程"""
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username=name,
        token=token,
    )
    await client.async_connect()
    try:
        async for msg in client.async_subscribe(namespace=namespace, topic=topic):
            print(f"[{name}] {msg.topic}: {len(msg.data)} bytes")
    finally:
        await client.async_close()


async def main():
    # 3 个订阅者并发运行
    await asyncio.gather(
        subscriber("sub_kline", "token-1", "market_data", "sz000651.kline.>"),
        subscriber("sub_trade", "token-2", "market_data", "sz000651.trade"),
        subscriber("sub_all", "token-3", "market_data", "sz000651.>"),
    )


asyncio.run(main())
```

### 6. 使用 YAML 配置启动

创建 `config.yaml`：

```yaml
server:
  host: "0.0.0.0"
  pub_port: 6000
  sub_port: 6001
  admin_port: 9090

storage:
  db_path: "/data/pulsemq.db"

auth:
  jwt_secret: "your-production-secret-key-at-least-32-bytes"
  jwt_expiry: 43200  # 12 小时

limits:
  max_connections_per_user: 20
  max_subscriptions_per_conn: 1000

broker:
  hwm: 50000

logging:
  dir: "/var/log/pulsemq"
  level: "info"
  max_days: 30
```

```bash
pulsemq --config config.yaml
```

### 7. 通过 curl 调用管理 API

```bash
# 登录获取 JWT
TOKEN=$(curl -s -X POST http://localhost:8080/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"from_.pulsemq_admin"}' | python -c "import sys,json; print(json.load(sys.stdin)['token'])")

# 查看所有 namespace
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/namespaces | python -m json.tool

# 创建 namespace
curl -s -X POST http://localhost:8080/api/namespaces \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"name":"trade_signal","description":"交易信号"}'

# 创建用户
curl -s -X POST http://localhost:8080/api/users \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"username":"signal_bot","user_type":"mq_normal"}'

# 授权（需先获取 namespace_id 和 user_id）
curl -s -X POST http://localhost:8080/api/namespaces/1/permissions \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"user_id":2,"role":"pub"}'

# 查看活跃连接
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/connections

# 查看系统统计
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/stats/system

# 健康检查（无需认证）
curl http://localhost:8080/health

# 查看审计日志
curl -s -H "Authorization: Bearer $TOKEN" \
  "http://localhost:8080/api/audit-logs?limit=20" | python -m json.tool
```

### 8. 自动重连与订阅恢复

客户端 SDK 内置自动重连机制（指数退避：1s → 2s → 4s → ...，上限 30s，最多 10 次）。重连成功后自动恢复所有活跃订阅。

```python
import asyncio
from pulse_mq import PulseMQClient


async def main():
    # reconnect=True（默认开启）
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="robust_sub",
        token="your-token",
    )
    await client.async_connect()
    try:
        async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.>"):
            print(f"收到: {msg.topic}")
            # 即使 Broker 重启，客户端会自动重连并恢复订阅
    finally:
        await client.async_close()


asyncio.run(main())
```

如需禁用自动重连：

```python
client = PulseMQClient(
    host="localhost",
    pub_port=5555,
    sub_port=5556,
    username="user",
    token="token",
    reconnect=False,
)
```

## 架构

```
Publisher → ROUTER (port 5555) → [Broker] → XPUB (port 5556) → Subscriber
                                    │
                                    ├── AUTH 认证
                                    ├── PUB 消息转发
                                    ├── SUB 订阅管理
                                    └── QUERY 查询响应
```

### 消息格式

每条消息由 6 个 ZMQ frame 组成：

| Frame | 名称 | 说明 |
|-------|------|------|
| 0 | Header | 20 字节二进制（版本/类型/格式/标志/长度/计数/时间戳） |
| 1 | Namespace | UTF-8 字符串，默认 "default" |
| 2 | Topic | UTF-8 字符串，最多 3 级（如 `sz000651.kline.1m`） |
| 3 | Username | UTF-8 字符串 |
| 4 | Payload | 二进制载荷 |
| 5 | Reserved | 预留扩展 |

### 数据格式

| 格式 | 值 | 说明 | 大小限制 |
|------|---|------|---------|
| BYTES | 0x00 | 原始字节流 | 10 MB |
| STRING | 0x01 | UTF-8 文本 | 8 KB |
| DF_MSGPACK | 0x02 | DataFrame → MessagePack | 1 MB |
| DF_PYARROW | 0x03 | DataFrame → Apache Arrow IPC | 64 MB |

### 权限模型

权限控制在 namespace 级别，采用 3 级角色体系：

- `sub`：订阅权
- `pub`：发布权（包含 sub）
- `admin`：管理权（包含 pub）

用户类型：
- `super_admin`：后台管理 + 全部 MQ 权限
- `mq_super`：全部 MQ 权限（无后台管理）
- `mq_normal`：需通过 namespace_permissions 鉴权
- `admin_only`：仅后台管理

## 配置

### 零配置启动

不指定任何参数时使用默认值：

```yaml
server:
  host: "0.0.0.0"
  pub_port: 5555
  sub_port: 5556
  admin_port: 8080

storage:
  db_path: "./pulsemq.db"

auth:
  token_hash: "sha256"
  ping_interval: 30
  ping_timeout: 3

limits:
  max_connections_per_user: 10
  max_subscriptions_per_conn: 500
  cache_refresh_sec: 300

broker:
  hwm: 10000

logging:
  dir: "./logs"
  max_days: 7
  level: "info"
```

### YAML 配置文件

创建 `config.yaml`，只覆盖需要修改的项：

```yaml
server:
  pub_port: 6000
  admin_port: 9090

storage:
  db_path: "/data/pulsemq.db"
```

启动时指定配置文件：

```bash
pulsemq --config config.yaml
```

### CLI 参数

命令行参数优先级最高：`CLI > YAML > 默认值`

```bash
pulsemq --host 127.0.0.1 --pub-port 7000 --log-level debug
```

## API

### 健康检查

```bash
curl http://localhost:8080/health
# {"status":"ok","version":"4.3.0","uptime_seconds":1234,"connections":5}
```

### 管理 API

| 端点 | 方法 | 说明 |
|------|------|------|
| `/login` | POST | 登录获取 JWT |
| `/api/namespaces` | GET/POST | 命名空间列表/创建 |
| `/api/namespaces/{id}` | GET/PUT/DELETE | 命名空间详情/更新/删除 |
| `/api/namespaces/{id}/permissions` | POST | 授权 |
| `/api/topics` | GET | 主题列表（按 namespace 过滤） |
| `/api/topics/{id}` | GET/DELETE | 主题详情/删除 |
| `/api/users` | GET/POST | 用户列表/创建 |
| `/api/users/{id}` | PUT/DELETE | 用户更新/删除 |
| `/api/users/{id}/reset-token` | POST | 重置 token |
| `/api/connections` | GET | 活跃连接列表 |
| `/api/connections/disconnect-user` | POST | 断开用户连接 |
| `/api/stats/system` | GET | 系统统计 |
| `/api/stats/topics` | GET | 主题统计 |
| `/api/audit-logs` | GET | 审计日志 |

所有 API 需要在请求头中携带 JWT：

```bash
curl -H "Authorization: Bearer <token>" http://localhost:8080/api/namespaces
```

## 目录结构

```
pulse-mq/
├── pulse_mq/
│   ├── __init__.py          # 版本号
│   ├── errors.py            # 自定义异常
│   ├── config.py            # 配置系统
│   ├── protocol.py          # 协议编解码
│   ├── auth.py              # 认证（token/password）
│   ├── permission.py        # 权限缓存与鉴权
│   ├── stats.py             # 统计采集
│   ├── proxy.py             # ZMQ 消息代理
│   ├── broker.py            # Broker 主进程
│   ├── _client.py           # 客户端 SDK（内部模块）
│   ├── cli.py               # CLI 入口
│   ├── admin/
│   │   ├── app.py           # HTTP API
│   │   └── static/
│   │       └── index.html   # 管理后台前端
│   └── db/
│       ├── base.py          # 数据库抽象基类
│       ├── sqlite.py        # SQLite 适配器
│       ├── schema.py        # DDL 定义
│       └── queries.py       # CRUD 操作
├── tests/                   # 测试文件
├── docs/                    # 设计文档
├── pyproject.toml           # 项目配置
└── README.md
```

## 测试

```bash
# 运行全部测试
python -m pytest tests/ -v

# 运行特定模块测试
python -m pytest tests/test_broker.py -v
```

## 依赖

### 核心依赖

- `pyzmq` - ZeroMQ Python 绑定
- `aiosqlite` - 异步 SQLite
- `bcrypt` - 密码哈希
- `msgpack` - MessagePack 序列化
- `pyarrow` - Apache Arrow
- `pandas` - DataFrame 支持
- `pyyaml` - YAML 配置解析
- `zstandard` / `lz4` - 压缩算法
- `aiohttp` - HTTP 服务
- `PyJWT` - JWT 认证

### 开发依赖

- `pytest` - 测试框架
- `pytest-asyncio` - 异步测试支持
- `pytest-aiohttp` - aiohttp 测试支持

## 设计文档

| 文档 | 内容 |
|------|------|
| `docs/PulseMQ协议模型-v4.0.md` | 消息帧结构、数据格式、消息类型、Topic 规则、通配符 |
| `docs/消息队列权限模型.md` | 权限模型、认证机制、缓存策略、审计 |
| `docs/PulseMQ后台管理系统设计.md` | 管理后台功能、监控指标、仪表盘、SQLite 表结构 |
| `docs/PulseMQ Broker 运行设计.md` | 配置、启动流程、流控、健康检查、关闭、日志、客户端 SDK |

## 许可证

MIT
