Metadata-Version: 2.4
Name: surge-python
Version: 0.1.1
Summary: Python async SDK for Surge — high-speed pub/sub data pipeline for quant trading
Author: Surge Team
License-Expression: MIT
Project-URL: Homepage, https://github.com/surge-pubsub/surge
Project-URL: Documentation, https://github.com/surge-pubsub/surge
Project-URL: Repository, https://github.com/surge-pubsub/surge
Keywords: surge,pubsub,quant,trading,low-latency,pipeline,async
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Financial and Insurance Industry
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Networking
Classifier: Framework :: AsyncIO
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: protobuf>=4.0
Provides-Extra: dev
Requires-Dist: pytest; extra == "dev"
Requires-Dist: pytest-asyncio; extra == "dev"

# surge-python

**Surge** 的 Python 异步 SDK —— 为量化交易者打造的高速低延迟发布/订阅数据管道。

> P50 延迟 < 30μs | P99 延迟 < 100μs | 吞吐量 100K+ msg/s

## 安装

```bash
pip install surge-python
```

依赖：Python >= 3.9，protobuf >= 4.0

## 快速开始

```python
import asyncio
import json
from surge import Publisher, Subscriber

async def main():
    # ── 发布行情 ──
    pub = Publisher("127.0.0.1", 9800)
    await pub.connect()

    tick = json.dumps({"price": 15.20, "volume": 83400}).encode()
    await pub.send("md.sz.000001", tick)
    await pub.disconnect()

    # ── 订阅行情 ──
    sub = Subscriber("127.0.0.1", 9800)
    await sub.connect()
    await sub.subscribe(["md.sz.*"])

    async for msg in sub.messages():
        data = json.loads(msg.payload)
        print(f"{msg.topic} price={data['price']} latency={msg.latency_us:.1f}μs")

asyncio.run(main())
```

## 核心概念

Surge 采用 **Topic 发布/订阅** 模型：

- **Publisher** 向指定 Topic 发布消息（如 `md.sz.000001`）
- **Subscriber** 订阅 Topic 模式（支持通配符 `md.sz.*`），实时接收消息
- **SurgeClient** 是底层全功能客户端，同时支持发布和订阅

```
[行情源] ──publish──▶ [Surge Server] ──push──▶ [策略程序]
                        Topic 路由
                       通配符匹配
```

## API 参考

### Publisher — 发布者

纯发布场景的高级封装，内置自动重连。

```python
from surge import Publisher

pub = Publisher(host="127.0.0.1", port=9800, token=None)
await pub.connect()
```

| 方法 | 说明 |
|------|------|
| `await pub.connect()` | 连接到 Surge 服务端 |
| `await pub.send(topic, payload, headers=None)` | 发布消息，`payload` 为 `bytes`，`headers` 为可选 `dict[str, str]` |
| `await pub.send_batch(messages)` | 批量发布，`messages` 为 `list[tuple[str, bytes]]` |
| `await pub.disconnect()` | 断开连接 |
| `pub.connected` | 属性，返回连接状态 `bool` |

**示例：批量发布**

```python
messages = [
    ("md.sz.000001", b'{"price":15.20}'),
    ("md.sz.000002", b'{"price":28.50}'),
    ("md.sh.600519", b'{"price":1680.00}'),
]
await pub.send_batch(messages)
```

**示例：带 Headers 发布**

```python
await pub.send(
    "md.sz.000001",
    b'{"price":15.20,"volume":83400}',
    headers={"source": "ctp", "exchange": "SZSE"}
)
```

---

### Subscriber — 订阅者

纯订阅场景的高级封装，支持回调和异步迭代两种消费模式。

```python
from surge import Subscriber

sub = Subscriber(host="127.0.0.1", port=9800, token=None)
await sub.connect()
```

| 方法 | 说明 |
|------|------|
| `await sub.connect()` | 连接到 Surge 服务端 |
| `await sub.subscribe(topics, snapshot=False)` | 订阅 Topic 列表，支持通配符 `*`，`snapshot=True` 获取最新快照 |
| `sub.on_message(callback)` | 注册消息回调，回调接收 `SurgeMessage` 对象 |
| `sub.on_snapshot(callback)` | 注册快照回调，回调接收 `SurgeSnapshot` 对象 |
| `sub.on_disconnect(callback)` | 注册断连回调 |
| `async for msg in sub.messages()` | 异步迭代接收消息 |
| `await sub.disconnect()` | 断开连接 |
| `sub.connected` | 属性，返回连接状态 `bool` |

**示例：回调模式**

```python
import json

def handle_msg(msg):
    tick = json.loads(msg.payload)
    print(f"[{msg.topic}] price={tick['price']} latency={msg.latency_us:.1f}μs")

sub.on_message(handle_msg)
await sub.subscribe(["md.sz.*", "md.sh.*"])

# 保持运行
await asyncio.Event().wait()
```

**示例：异步迭代模式**

```python
await sub.subscribe(["md.sz.000001"], snapshot=True)

async for msg in sub.messages():
    if isinstance(msg, SurgeSnapshot):
        print(f"快照: {msg.topic} seq={msg.latest_sequence}")
    else:
        print(f"实时: {msg.topic} seq={msg.sequence}")
```

---

### SurgeClient — 底层全功能客户端

同时支持发布和订阅，提供完整控制能力。

```python
from surge import SurgeClient

client = SurgeClient(
    host="127.0.0.1",
    port=9800,
    token=None,                # Token 认证（可选）
    auto_reconnect=True,       # 断线自动重连
    reconnect_interval=1.0,    # 重连间隔（秒）
)
await client.connect()
```

| 方法 | 说明 |
|------|------|
| `await client.connect()` | 连接服务端，如设置 `token` 会自动完成认证 |
| `await client.publish(topic, payload, headers=None)` | 发布消息 |
| `await client.subscribe(topics, snapshot=False)` | 订阅 Topic 列表 |
| `await client.unsubscribe(topics=None)` | 取消订阅 |
| `client.on_message(callback)` | 注册消息回调 |
| `client.on_snapshot(callback)` | 注册快照回调 |
| `client.on_disconnect(callback)` | 注册断连回调 |
| `async for msg in client.messages()` | 异步迭代接收消息 |
| `await client.disconnect()` | 优雅断开连接 |
| `client.connected` | 属性，返回连接状态 `bool` |

**示例：同时发布和订阅**

```python
client = SurgeClient("127.0.0.1", 9800)
await client.connect()

# 订阅所有深市行情
client.on_message(lambda msg: print(msg.topic, msg.latency_us))
await client.subscribe(["md.sz.*"], snapshot=True)

# 同时发布
await client.publish("signal.buy", b'{"code":"000001","qty":1000}')
```

---

### 数据类型

#### SurgeMessage

收到的实时消息。

| 字段 | 类型 | 说明 |
|------|------|------|
| `topic` | `str` | Topic 名称，如 `md.sz.000001` |
| `sequence` | `int` | 消息序号（Topic 内递增） |
| `payload` | `bytes` | 消息体（通常为 JSON） |
| `timestamp` | `int` | 发布时间戳（纳秒） |
| `headers` | `dict` | 附加头信息 |
| `latency_us` | `float` | 端到端延迟（微秒） |

#### SurgeSnapshot

订阅时获取的最新快照。

| 字段 | 类型 | 说明 |
|------|------|------|
| `topic` | `str` | Topic 名称 |
| `latest_sequence` | `int` | 该 Topic 最新序号 |
| `payload` | `bytes` | 最新消息体 |

---

## Topic 通配符

| 模式 | 说明 | 示例 |
|------|------|------|
| 精确匹配 | 完全匹配 Topic 名 | `md.sz.000001` |
| `*` 通配 | 匹配单层任意字符 | `md.sz.*` 匹配 `md.sz.000001` |
| 多主题 | 传入列表订阅多个 | `["md.sz.*", "md.sh.*"]` |

## Token 认证

服务端开启 Token 认证时，客户端需传入 Token：

```python
pub = Publisher("127.0.0.1", 9800, token="your-secret-token")
await pub.connect()  # 自动完成认证
```

认证失败将抛出 `PermissionError`。

## 自动重连

所有客户端默认开启自动重连：

- 断线后自动尝试重连（间隔 1 秒）
- 重连成功后自动恢复之前的所有订阅
- 通过 `on_disconnect` 回调感知断连事件

```python
sub.on_disconnect(lambda: print("连接断开，正在自动重连..."))
```

如需关闭自动重连：

```python
client = SurgeClient("127.0.0.1", 9800, auto_reconnect=False)
```

## 日志

SDK 使用 Python 标准 `logging` 模块，logger 名称为 `surge`：

```python
import logging
logging.basicConfig(level=logging.DEBUG)

# 或仅开启 surge 日志
logging.getLogger("surge").setLevel(logging.DEBUG)
```

## 完整示例：A 股行情转发

```python
import asyncio
import json
from surge import Publisher, Subscriber

async def producer():
    """模拟行情源，发布 A 股 Tick 数据"""
    pub = Publisher("127.0.0.1", 9800)
    await pub.connect()

    ticks = [
        ("md.sz.000001", {"price": 15.20, "bid": 15.19, "ask": 15.21, "volume": 83400}),
        ("md.sh.600519", {"price": 1680.0, "bid": 1679.5, "ask": 1680.5, "volume": 1200}),
        ("md.sz.000858", {"price": 152.30, "bid": 152.28, "ask": 152.32, "volume": 5600}),
    ]

    for topic, data in ticks:
        await pub.send(topic, json.dumps(data).encode())
        print(f"[PUB] {topic}")

    await pub.disconnect()

async def consumer():
    """订阅深市所有行情，计算延迟"""
    sub = Subscriber("127.0.0.1", 9800)
    await sub.connect()
    await sub.subscribe(["md.sz.*"], snapshot=True)

    count = 0
    async for msg in sub.messages():
        tick = json.loads(msg.payload)
        print(f"[SUB] {msg.topic} price={tick['price']} latency={msg.latency_us:.1f}μs")
        count += 1
        if count >= 10:
            break

    await sub.disconnect()

async def main():
    await asyncio.gather(producer(), consumer())

asyncio.run(main())
```

## 许可证

MIT License
