Metadata-Version: 2.4
Name: pulse-mq
Version: 0.1.7
Summary: 基于 ZeroMQ 的轻量级消息队列系统
Requires-Python: >=3.10
Requires-Dist: alembic>=1.13.0
Requires-Dist: fastapi>=0.104.0
Requires-Dist: jinja2>=3.1.0
Requires-Dist: loguru>=0.7.0
Requires-Dist: msgpack>=1.0.0
Requires-Dist: passlib[bcrypt]>=1.7.4
Requires-Dist: pyarrow>=14.0.0
Requires-Dist: pyjwt>=2.8.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: python-multipart>=0.0.6
Requires-Dist: pyzmq>=25.0.0
Requires-Dist: sqlalchemy>=2.0.0
Requires-Dist: uvicorn[standard]>=0.24.0
Provides-Extra: dev
Requires-Dist: httpx>=0.25.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Description-Content-Type: text/markdown

# PulseMQ

基于 ZeroMQ 的轻量级消息队列系统，配套 Grafana 风格的 Web 管理后台。

## 特性

- **双 Socket 架构** — ROUTER + PULL，消息延迟低
- **多格式支持** — 字符串、msgpack、PyArrow
- **Web 管理后台** — Grafana 暗色主题，ECharts 监控图表
- **权限管理** — 用户 token 认证，按 topic 控制推送/订阅权限，支持自动创建 topic
- **单进程部署** — 一条命令启动全部服务
- **PyPI 安装** — `pip install pulse-mq` 即可使用

## 快速安装

```bash
pip install pulse-mq
```

## 快速开始

### 方式一：命令行启动

```bash
# 复制配置文件
cp .env.example .env

# 启动服务（同时启动 ZMQ 消息服务 + HTTP 管理后台）
pulse-mq-server
```

首次启动会自动生成 admin 密码，输出到控制台和 `admin_credentials.txt`。

管理后台默认地址: `http://localhost:8080`

### 方式二：Python 代码启动

```python
from pulse_mq import PulseMQServer

server = PulseMQServer()  # 自动读取 .env 配置
server.run()              # 启动全部服务，阻塞运行（Ctrl+C 停止）
```

### 客户端使用

```python
from pulse_mq import PulseMQClient

client = PulseMQClient(
    host="localhost",
    port=5555,
    username="your-username",
    token="your-token",      # 在管理后台 Users 页面生成
    heartbeat_interval=30,   # 心跳间隔（秒），默认 30
)

# 订阅消息（回调方式）
def on_message(msg):
    print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")

client.subscribe("metrics/cpu", callback=on_message)

# 运行（阻塞，处理心跳和消息接收）
client.run()
```

### 发布消息

```python
from pulse_mq import PulseMQClient

# 发布端也需要先连接
pub = PulseMQClient(host="localhost", port=5555, username="user1", token="your-token")
pub.run()  # 阻塞运行

# 注意：subscribe() 必须在 run() 之前调用，这样连接后会自动订阅
# publish() 在 run() 运行期间调用（通常在另一个线程）
pub.publish("metrics/cpu", data="42.5")
```

### 异步客户端

```python
import asyncio
from pulse_mq import PulseMQClient

async def main():
    client = PulseMQClient(host="localhost", port=5555, username="user1", token="your-token")
    client.subscribe("metrics/cpu", lambda msg: print(msg.data))
    await client.run_async()  # 异步运行

asyncio.run(main())
```

---

## PulseMQServer 参数

```python
from pulse_mq import PulseMQServer, Config

# 方式 1：自动读取 .env 配置
server = PulseMQServer()

# 方式 2：指定 .env 文件路径
server = PulseMQServer(env_file="/path/to/.env")

# 方式 3：编程方式传入配置
config = Config(
    host="0.0.0.0",
    http_port=8080,
    zmq_router_port=5555,
    zmq_pull_port=5556,
    db_type="sqlite",
    db_path="./pulse_mq.db",
    admin_password="my-password",
)
server = PulseMQServer(config=config)
```

### `server.start()` — 非阻塞启动

启动服务，非阻塞，立即返回。可继续调用 `server.publish()` 推送消息。

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `with_http` | `bool` | `True` | 是否同时启动 HTTP 管理后台 |

```python
# 启动全部服务（非阻塞）
server.start()

# 仅启动 ZMQ 消息服务，不启动 HTTP 管理后台
server.start(with_http=False)

# 启动后可以继续推送消息
server.start()
server.publish("alerts/system", data="CPU 过高", format="string")

# 需要自行保持进程运行（例如在其他框架中嵌入）
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    server.stop()
```

### `server.run()` — 阻塞运行

启动服务并阻塞运行（适合独立脚本），Ctrl+C 停止。等同于 `start()` + 阻塞等待。

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `with_http` | `bool` | `True` | 是否同时启动 HTTP 管理后台 |

```python
server.run()              # 阻塞运行，Ctrl+C 停止
server.run(with_http=False)
```

### `server.publish()`

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `topic` | `str` | 必填 | Topic 名称，格式 `group/topic` |
| `data` | `str/bytes/dict/DataFrame` | 必填 | 消息数据 |
| `format` | `str` | `"string"` | 数据格式：`"string"` / `"msgpack"` / `"pyarrow"` |

```python
server.publish("alerts/system", data="CPU 过高", format="string")
server.publish("data/events", data={"key": "value"}, format="msgpack")
```

### `server.stop()`

停止服务，关闭所有 ZMQ socket 和线程。

---

## PulseMQClient 参数

```python
from pulse_mq import PulseMQClient

# 构造函数
client = PulseMQClient(
    host="localhost",          # 服务端地址
    port=5555,                 # ZMQ ROUTER 端口
    username="user1",          # 用户名（在管理后台创建）
    token="your-token",        # Token（在管理后台生成）
    heartbeat_interval=30,     # 心跳间隔（秒）
)

# 从环境变量读取
client = PulseMQClient.from_env()
```

### `PulseMQClient()` 构造参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `host` | `str` | `"localhost"` | 服务端地址 |
| `port` | `int` | `5555` | ZMQ ROUTER 端口 |
| `username` | `str` | `""` | 用户名（管理后台 Users 页面创建） |
| `token` | `str` | `""` | 用户 Token（管理后台 Users 页面生成） |
| `heartbeat_interval` | `int` | `30` | 心跳发送间隔（秒） |

### `client.subscribe(topic, callback)`

| 参数 | 类型 | 说明 |
|------|------|------|
| `topic` | `str` | 订阅的 Topic，格式 `group/topic` |
| `callback` | `Callable` | 回调函数，参数为 `PulseMQMessage` 对象 |

> **注意：** `subscribe()` 必须在 `run()` 之前调用。连接认证后会自动发送订阅请求。

```python
def on_message(msg):
    print(f"Topic: {msg.topic}")     # Topic 名称
    print(f"Data: {msg.data}")       # 消息数据（已反序列化）
    print(f"Format: {msg.format}")   # 数据格式："string"/"msgpack"/"pyarrow"

client.subscribe("metrics/cpu", callback=on_message)
```

### `client.publish(topic, data, format)`

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `topic` | `str` | 必填 | Topic 名称，格式 `group/topic` |
| `data` | `str/bytes/dict/DataFrame` | 必填 | 消息数据 |
| `format` | `str` | `"string"` | 数据格式 |

> **注意：** `publish()` 需要在 `run()` 之后调用（即连接建立后），否则会抛出 `RuntimeError`。

### `client.run()` / `client.run_async()`

| 方法 | 说明 |
|------|------|
| `client.run()` | 阻塞运行，处理心跳和消息接收（适合独立脚本） |
| `client.run_async()` | 异步运行，内部使用线程池（适合 asyncio 环境） |

### `client.unsubscribe(topic)`

取消订阅指定 topic。

### `client.disconnect()`

断开连接，关闭 ZMQ socket。

---

## 消息格式

| 格式 | 说明 | 示例 |
|------|------|------|
| `string` | 原生字符串，最长 4096 字节 | `publish("t/d", data="hello", format="string")` |
| `msgpack` | MessagePack 序列化 | `publish("t/d", data={"key": "val"}, format="msgpack")` |
| `pyarrow` | PyArrow IPC 格式（支持 DataFrame） | `publish("t/d", data=df, format="pyarrow")` |

## Topic 命名规则

格式: `group/topic`（强制一层 `/` 分隔）

- 允许字符: `a-z A-Z 0-9 _ - .`
- 每段长度: 1-128 字符
- 示例: `metrics/cpu`, `logs/app-error`, `data_1/test.topic`

## 权限说明

### 用户权限

| 权限 | 说明 |
|------|------|
| `is_active` | 用户是否启用，禁用后无法连接 |
| `can_create_topic` | 推送消息到不存在的 topic 时自动创建 |

### Topic 权限

通过管理后台 Permissions 页面设置：

| 权限 | 说明 |
|------|------|
| `can_pub` | 允许推送到该 topic |
| `can_sub` | 允许订阅该 topic |

> 推送消息到已有 topic 需要 `can_pub` 权限。如果 topic 不存在且用户有 `can_create_topic` 权限，则自动创建 topic 并推送成功。

---

## 配置项

所有配置通过 `.env` 文件或环境变量设置：

| 环境变量 | 默认值 | 说明 |
|---------|--------|------|
| `PULSE_HOST` | `0.0.0.0` | HTTP 服务监听地址 |
| `PULSE_HTTP_PORT` | `8080` | HTTP 服务端口 |
| `PULSE_ZMQ_ROUTER_PORT` | `5555` | ZMQ ROUTER 端口（客户端连接入口） |
| `PULSE_ZMQ_PULL_PORT` | `5556` | ZMQ PULL 端口（接收客户端推送的消息） |
| `PULSE_DB_TYPE` | `sqlite` | 数据库类型：`sqlite` 或 `mysql` |
| `PULSE_DB_PATH` | `./pulse_mq.db` | SQLite 文件路径（仅 sqlite 模式） |
| `PULSE_DB_URL` | - | MySQL 连接 URL（仅 mysql 模式，格式：`mysql+pymysql://user:pass@host:3306/db`） |
| `PULSE_QUEUE_SIZE` | `1000` | 每个 topic 内存队列保留的最大消息条数 |
| `PULSE_AUTH_CACHE_TTL` | `60` | 客户端认证缓存从数据库刷新的间隔（秒） |
| `PULSE_JWT_SECRET` | 自动生成 | JWT 签名密钥（留空则首次启动自动生成） |
| `PULSE_JWT_EXPIRE_HOURS` | `24` | JWT Token 过期时间（小时） |
| `PULSE_ADMIN_PASSWORD` | 自动生成 | admin 初始密码（留空则首次启动自动生成） |
| `PULSE_HEARTBEAT_TIMEOUT` | `60` | 服务端判定客户端断线的超时时间（秒） |
| `PULSE_STATS_RETENTION_DAYS` | `7` | 消息统计数据保留天数 |

### 客户端环境变量

客户端通过 `PulseMQClient.from_env()` 从环境变量读取配置：

| 环境变量 | 说明 |
|---------|------|
| `PULSE_CLIENT_HOST` | 服务端地址 |
| `PULSE_CLIENT_PORT` | ZMQ ROUTER 端口 |
| `PULSE_CLIENT_USERNAME` | 用户名 |
| `PULSE_CLIENT_TOKEN` | 用户 Token |

---

## 管理后台

启动服务后访问 `http://localhost:8080`，使用 admin 账号登录。

- **Dashboard** — 概览统计、Topic 监控图表、系统配置、快速开始代码
- **Topics** — Topic 列表、今日消息量、最近消息时间、7 天趋势图
- **Users** — 用户管理、Token 生成/复制、启停状态、创建 topic 权限
- **Permissions** — 权限矩阵，按用户/Topic 控制推送(P)和订阅(S)权限

## 数据库

默认使用 SQLite，可通过配置切换到 MySQL：

```bash
# SQLite（默认）
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db

# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq
```

## 开发

```bash
# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq

# 创建虚拟环境
uv venv
source .venv/bin/activate  # Linux/macOS
# 或 .venv\Scripts\activate  # Windows

# 安装依赖
uv pip install -e ".[dev]"

# 运行测试
pytest tests/ -v
```

## 许可证

MIT License
