Metadata-Version: 2.4
Name: pyboot-components-netty
Version: 1.3.3
Summary: pyboot-components-netty is an open-source utility package that simplifies the development of async.
Author-email: joinsunsoft <793875613@qq.com>
Maintainer-email: joinsunsoft <inthirties.liu@hotmail.com>
License-Expression: Apache-2.0
Project-URL: Homepage, https://github.com/python-boot/commons-async
Project-URL: Repository, https://github.com/python-boot/commons-async
Project-URL: Issues, https://github.com/python-boot/commons-async
Keywords: netty,framework,cli,springboot,fastapi,netty
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.12.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pyboot-commons-coroutine
Dynamic: license-file

# pyboot-netty 文档  
> Python 版 Netty：事件驱动、Handler 流水线、零拷贝、全平台兼容的高性能网络框架  

---

## 1. 框架定位  
pyboot-netty 是一个 **事件驱动 + Handler 流水线** 的 TCP/UDP 网络框架，100 % Python 实现，API 设计完全对标 Netty，目标是让 Python 开发者也能用“Netty 风格”写出高并发、易维护的网络应用。  

- **零依赖**：仅依赖标准库 `asyncio` 与 `selectors`  
- **全平台**：Linux(epoll)、macOS(kqueue)、Windows(select)  
- **零拷贝**：`memoryview` + `bytearray` 传输，无额外 copy  
- **Handler 链**：解码 → 业务 → 编码，可插拔  
- **池化缓冲区**：自适应读缓冲，高低水位线控制写事件  
- **Keep-alive**：TCP 心跳、自动重连、优雅关闭  

---

## 2. 核心概念（与 Netty 1:1 映射）  

| Netty 概念 | pyboot-netty 对应 | 说明 |
| --- | --- | --- |
| Channel | `SocketChannel` | 一个连接 |
| EventLoop | `asyncio.BaseEventLoop` | 单线程事件循环 |
| ChannelPipeline | `ChannelPipeline` | Handler 双向链表 |
| ChannelHandler | `ChannelHandler` 基类 | 用户业务单元 |
| ByteBuf | `ByteBuffer` | 池化字节缓冲区 |
| Bootstrap | `Bootstrap` / `ServerBootstrap` | 客户端/服务端启动器 |
| Future | `asyncio.Future` | 异步结果 |
| Codec | `MessageToByteDecoder` / `MessageToByteEncoder` | 编解码 |

---

## 3. 线程模型（主从 Reactor）  

```text
┌─ BossGroup (1 线程)──┐
│  accept → register   │
└──────┬───────────────┘
       ▼
┌─ WorkerGroup (N 线程)┐
│  read → decode → biz│
└──────┬───────────────┘
       ▼
┌─ Codec & Business  ┐
│  encode → write    │
└────────────────────┘
```

- **Boss** 只负责 `accept`，立刻把 fd 注册到 Worker 事件循环  
- **Worker** 处理该连接生命周期内所有 IO（读、写、心跳、关闭）  
- 所有 Handler **同线程串行执行**，无需加锁  

---

## 4. 快速开始  

### 4.1 安装  
```bash
pip install pyboot-netty
```

### 4.2 一行 Echo Server  
```python
from pyboot.components.netty.server import EchoServer
EchoServer().address('0.0.0.0', 8080).grace()
```
命令行输入 `telnet 127.0.0.1 8080` → 键入任何字符回车后立即回显。Close退出

### 4.3 自定义 Handler（对标 Netty `ChannelInboundHandler`）  
```python
from pyboot.components.netty.handler import IdleStateHandler,LoggingHandler,WriteTimeoutHandler,ListRemoteAddressFilter
from pyboot.components.netty.codec import StringDecoder,ByteDelimiterBasedFrameDecoder,DelimiterBasedFrameDecoder,ByteToByteBufferMessageDecoder 
from pyboot.components.netty.codec import LineBasedFrameDecoder,StringEncoder,JsonEncoder,FixedLengthFrameDecoder 
from pyboot.components.netty.codec import LengthFieldBasedFrameDecoder, SimpleBytesLengthFieldBasedFrameEncoder,MessageToByteEncoder  
from pyboot.components.netty.channel import ChannelHandler,ChannelHandlerContext,ChannelEvent,ChannelHandleError 

class MyDecoder(ByteToByteBufferMessageDecoder):
    def __init__(self, fix_length:int,pool_bucket_size:int=1024):        
        super().__init__(pool_bucket_size=pool_bucket_size)
        assert fix_length>0, f'Frame长度必须大于0，但是获得{fix_length}'
        self._fix_length = fix_length
        
    async def decode(self, ctx: ChannelHandlerContext, bytebuffer: ByteBuffer)->ByteBuffer:
        if bytebuffer.readable_bytes()>=self._fix_length:
            buf = self.allocate_bytebuffer(self._fix_length)
            buf.write_from_reader(bytebuffer, self._fix_length)
            return buf
        else:
            return None

class BizHandler(ChannelHandlerAdapter):
    async def channel_read(self, ctx: ChannelHandlerContext, msg: ByteBuffer):
        data = msg.get_readable_bytes()
        print('收到:', data)
        ctx.write_and_flush(msg)  # 回显

# 客户端

def buildBootstrap()->Bootstrap:
    
    work_group = CoroutineWorkGroup(1,"WorkGroup")
    bs:Bootstrap = Bootstrap().group(work_group)        
    t = StdinTask()
    handler = EchoClientHandler('Liuyong', t)
    
    async def _connect_suc(b:Bootstrap, r, w):
        _logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}')
        
    async def _connect_suc(b:Bootstrap, r, w):
        _logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}')
        
    async def _connect_fail(b:Bootstrap, e):
        _logger.DEBUG(f'Bootstrap={b} exception={e}')
        fail_count = 1
        times = 3
        while fail_count < times:
            try:
                bf = await b.connect()
                return bf
            except BaseException as e2:
                _logger.DEBUG(f'Bootstrap={b} exception={e2}')
            finally:
                fail_count += 1
                if fail_count >= times:
                    break
                await asyncio.sleep(3)
                
        raise IOError(f'连接失败{fail_count}次，退出程序')
                
        
    def initChannel(sc:SocketChannel):
        _pipeline = sc.pipeline()
        _pipeline.add_last(IdleStateHandler(0,120,0))
        _pipeline.add_last(LoggingHandler(Logger.LEVEL.INFO))
        _pipeline.add_last(WriteTimeoutHandler(5,False))
        # _pipeline.add_last(DelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False))        
        # _pipeline.add_last(ByteDelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False))
        
        # _pipeline.add_last(LineBasedFrameDecoder(stripDelimiter=False))
        # 长度帧解析方式
        _pipeline.add_last(LengthFieldBasedFrameDecoder(length_field_length=4, length_adjustment=0, length_field_offset=4,initial_bytes_to_strip=8))        
        _pipeline.add_last(StringDecoder(charset='utf-8', strip=True))        
        # _pipeline.add_last(EchoHandler('DavidLiu'))     
        _pipeline.add_last(SimpleBytesLengthFieldBasedFrameEncoder())   
        # _pipeline.add_last(StringEncoder(charset='utf-8'))
        # _pipeline.add_last(JsonEncoder())
        _pipeline.add_last(handler)
    
    # work_group = CoroutineWorkGroup(1,"WorkGroup")
    bs.channel(SocketChannel).handler(initChannel)\
            .option(ChannelOption.SO_BACKLOG, 1024)\
                .option(ChannelOption.SO_TIMEOUT, 6).option(ChannelOption.ALLOCATOR, PooledByteBufferAllocator.DEFAULT)\
                .address((HOST, PORT)).when_connect_sucess(_connect_suc).when_connect_fail(_connect_fail)
    return bs
    
            
def start_with_bootstrap_with_sync():    
    bs:Bootstrap = buildBootstrap()
    bs.grace()            
    

```

---

## 5. Handler 体系（与 Netty 1:1）  

### 5.1 内置解码器  
| 类 | 作用 |
| --- | --- |
| `FixedLengthFrameDecoder` | 定长拆包 |
| `LengthFieldBasedFrameDecoder` | 长度域拆包（支持 1/2/3/4/8 字节） |
| `LineBasedFrameDecoder` | `\n` 或 `\r\n` 行拆包 |
| `DelimiterBasedFrameDecoder` | 自定义分隔符拆包 |
| `StringDecoder` / `StringEncoder` | 文本编解码 |

### 5.2 生命周期回调  
```python
class MyHandler(ChannelHandlerAdapter):
    async def channel_active(self, ctx): ...        # 连接建立
    async def channel_read(self, ctx, msg): ...     # 读到数据
    async def channel_inactive(self, ctx): ...      # 连接断开
    async def exception_caught(self, ctx, exc): ... # 异常
    async def channel_read_complete(self, ctx): ... # 读到数据处理结束
    async def channel_write_complete(self, ctx): ...# 读到数据处理结束
    async def channel_inactive(self, ctx): ...      # 建立失火
    async def channel_event(self, ctx): ...         # 事件
```

### 5.3 自定义编解码  
继承 `MessageToByteDecoder[T]` / `MessageToByteEncoder[T]` 即可，泛型支持 `bytes | str | dataclass`。

---

## 6. 池化 ByteBuffer（零拷贝）  

```python
from pyboot_netty import PooledByteBufferAllocator

buf = PooledByteBufferAllocator.DEFAULT.buffer(1024)  # 池化 1KB
buf.write_bytes(b'hello')
buf.read_bytes(5)        # 返回 memoryview → 零拷贝
buf.release()            # 自动回池
```

- 高低水位线控制写事件（默认 32K/64K）  待处理
- `memoryview` 切片无 copy，适合大文件流传输  

---

## 7. 线程池与并发  

```python
from pyboot.commons.coroutine.task import CoroutineWorkGroup

boss = CoroutineWorkGroup(1, 'Acceptor')   # 单线程 accept
ServerBootstrap().group(boss_group).address('0.0.0.0', 8080).grace()
```

- 底层复用 `asyncio.run_coroutine_threadsafe` 跨线程调度  
- 支持 `asyncio.run()` 外部注入循环，方便单元测试  

---

## 8. TCP Keep-Alive & 自动重连  

```python
bootstrap = Bootstrap.auto_reconnect(True)
```

心跳帧由框架自动完成，业务无感；也可继承 `HeartbeatHandler` 自定义帧格式。

---

## 9. TLS 支持 待处理  

```python
ServerBootstrap(ssl=True,
                certfile='/path/cert.pem',
                keyfile='/path/key.pem').bind('0.0.0.0', 9443)
```
底层使用 `asyncio.create_server(ssl=ssl_ctx)`，支持 SNI、ALPN。

---

## 10. 性能数据（本地压测）  

| 场景 | QPS | 延迟 P99 | CPU |
| --- | --- | --- | --- |
| 回显 (16B) | 110 万 | 0.9 ms | 1 核 100 % |
| 池化缓冲 1MB 流 | 2.3 Gbps | 14 ms | 1 核 100 % |

测试环境：Python 3.12 + CentOS 7.4（epoll）

---

## 11. 与 Netty 差异小结  

| 维度 | Netty (Java) | pyboot-netty (Python) |
| --- | --- | --- |
| 底层 IO | NIO/Epoll/Kqueue | asyncio + selectors |
| 线程 | 多线程 | 单线程事件循环 |
| 内存 | DirectByteBuf | memoryview + bytearray |
| 泛型 | 运行时擦除 | 原生 `TypeVar` |
| 零拷贝 | `FileRegion` | `sendfile` 计划支持 |

---

## 12. 路线图  

- [ ] HTTP/1.1 编解码  
- [ ] WebSocket 握手帧  
- [ ] HTTP/2 帧支持  
- [ ] QUIC/UDP 传输  
- [ ] sendfile 零拷贝文件传输  
- [ ] 官方文档站点 + 示例仓库  

---

## 13. 社区 & 贡献  

GitHub：[https://gitee.com/pyboot/netty](https://gitee.com/pyboot/netty)  
PyPI：[https://pypi.org/project/pyboot_components_netty](https://pypi.org/project/pyboot_components_netty)  

欢迎 PR、Issue、Star，一起把“Python 版 Netty”做到极致！
