Metadata-Version: 2.4
Name: flexible_thread_pool
Version: 0.3
Summary: flexible_thread_pool ，auto expand thread and reduce threads. both support sync and asyncio,fast than concurrent.futures.ThreadpoolExecutor
Home-page: https://github.com/ydf0509/flexible_thread_pool
Author: bfzs
Author-email: ydf0509@sohu.com
Maintainer: ydf
Maintainer-email: ydf0509@sohu.com
License: BSD License
Keywords: thread pool,async,asyncio,auto scala,flexible_thread_pool
Platform: all
Classifier: Development Status :: 4 - Beta
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: Implementation
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Software Development :: Libraries
Description-Content-Type: text/markdown
Requires-Dist: nb_log
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: maintainer
Dynamic: maintainer-email
Dynamic: platform
Dynamic: requires-dist
Dynamic: summary

# flexible_thread_pool

> 自动弹性伸缩线程池，完美兼容 sync / async 函数，性能是官方 `concurrent.futures.ThreadPoolExecutor` 的 **200%+**。

[![PyPI](https://img.shields.io/pypi/v/flexible_thread_pool)](https://pypi.org/project/flexible_thread_pool/)
[![Python](https://img.shields.io/pypi/pyversions/flexible_thread_pool)](https://pypi.org/project/flexible_thread_pool/)
[![License](https://img.shields.io/pypi/l/flexible_thread_pool)](https://github.com/ydf0509/flexible_thread_pool/blob/main/LICENSE)

---

## 特性

- ✅ **弹性伸缩** — 根据任务提交频率和执行耗时，自动扩缩线程数，无需人工估算
- ✅ **返回 Future** — `submit()` 返回标准 `concurrent.futures.Future`，支持 `result()` 获取返回值
- ✅ **支持 map** — 内置 `Executor.map` 方法，批量提交更便捷
- ✅ **完美兼容 asyncio** — 自动检测 `async def` 函数并使用事件循环调度，无需手动 `await`
- ✅ **协程跨 Loop 安全** — 每个工作线程拥有独立事件循环（`threading.local`），彻底解决 *"attached to a different loop"* 错误
- ✅ **指定外部 Loop** — 支持通过 `specify_async_loop` 参数绑定外部事件循环，满足连接池等特殊场景
- ✅ **有界队列背压** — 严格队列大小限制，防止内存溢出
- ✅ **自然生命周期** — 利用 `daemon=False` + `MIN_WORKERS`，脚本任务完成后自动退出

---

## 安装

```bash
pip install flexible_thread_pool
```

依赖：`nb_log`

---

## 快速开始

### 基本用法

```python
from flexible_thread_pool import FlexibleThreadPool


def sync_task(x):
    return x * 2


pool = FlexibleThreadPool(max_workers=100)

# submit 返回 Future，可获取结果
fut = pool.submit(sync_task, 42)
print(fut.result())  # 84

# 批量提交（自动处理超时）
results = pool.map(sync_task, [1, 2, 3, 4], timeout=5)
for res in results:
    print(res)
```

### 异步函数支持

```python
import asyncio
from flexible_thread_pool import FlexibleThreadPool


async def async_task(x):
    await asyncio.sleep(0.1)
    return x * 2


pool = FlexibleThreadPool(max_workers=100)

# 自动调度 async def 函数
fut = pool.submit(async_task, 21)
print(fut.result())  # 42
```

> 无需手动创建事件循环，线程池自动为每个工作线程分配独立的 `asyncio.new_event_loop()`，避免跨线程 loop 冲突。

### 指定外部事件循环

某些异步库（如 aiohttp、asyncpg）要求连接池与事件循环绑定在同一线程。可以通过 `specify_async_loop` 解决：

```python
import asyncio
from flexible_thread_pool import FlexibleThreadPool

loop = asyncio.new_event_loop()

pool = FlexibleThreadPool(
    max_workers=100,
    specify_async_loop=loop,
    is_auto_start_specify_async_loop_in_child_thread=True,
)
```

### 自动退出 vs 常驻进程

| 类名 | MIN_WORKERS | 行为 |
|------|------------|------|
| `FlexibleThreadPool` | `0` | 任务全部完成后自动退出 |
| `FlexibleThreadPoolMinWorkers0` | `0` | 同上，显式别名 |
| `FlexibleThreadPoolMinWorkers1` | `1` | 至少保留 1 个常驻线程，进程永不退出 |

```python
from flexible_thread_pool import (
    FlexibleThreadPool,
    FlexibleThreadPoolMinWorkers1,
)

# 脚本结束后自动退出
pool = FlexibleThreadPool(max_workers=100)
pool.MIN_WORKERS = 0  # 默认就是 0

# 常驻进程，永不结束
pool = FlexibleThreadPoolMinWorkers1(max_workers=100)

# 或动态设置
pool.MIN_WORKERS = 1
```

---

## 为什么选择 FlexibleThreadPool？

### 官方线程池的缺陷

```python
import time
from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(500)


def f(x):
    time.sleep(10)  # 模拟 IO 阻塞
    print(x)


for i in range(10000):
    time.sleep(100)  # 模拟低频提交
    pool.submit(f, i)
```

- **低峰期浪费**：即使每隔 100 秒才提交一个任务，官方线程池依然会创建 500 个线程
- **无自动缩容**：流量高峰过后，线程数不会自动降低

### FlexibleThreadPool 的智能策略

| 场景 | 官方 ThreadPoolExecutor | FlexibleThreadPool |
|------|------------------------|--------------------|
| 任务稀疏（隔 100s 提交一次） | 盲目扩容到 500 线程 | 只开 1 个线程 |
| 流量高峰（9:00-10:00） → 低峰 | 始终保持 500 线程 | 高峰 500，低峰自动降至 ~5 |
| 函数耗时降低（100s → 1s） | 保持 500 线程 | 自动从 500 降至 ~100 |
| 函数耗时增加（1s → 100s） | 保持 500 线程 | 自动从 ~100 升至 500 |

**FlexibleThreadPool 无需用户分析函数耗时，自动计算出最合理的线程数量。**

---

## 性能对比

在 **Win11 + AMD R5 4600U** 单核单进程测试：

```
线程池                          | 吞吐量 (ops/s)
-------------------------------|---------------
concurrent.futures.ThreadPoolExecutor |  ~10000
FlexibleThreadPool              |  ~30000+  🚀
```

性能提升约 **200%**，且线程数越高峰值吞吐优势越明显。

---

## API

### `FlexibleThreadPool(max_workers, work_queue_maxsize, specify_async_loop, is_auto_start_specify_async_loop_in_child_thread)`

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `max_workers` | `int` | `cpu_count * 5` | 最大线程数 |
| `work_queue_maxsize` | `int` | `10` | 工作队列最大长度（背压控制） |
| `specify_async_loop` | `AbstractEventLoop` | `None` | 指定外部事件循环 |
| `is_auto_start_specify_async_loop_in_child_thread` | `bool` | `True` | 是否自动启动指定 loop |

### 实例方法

- **`submit(func, *args, **kwargs)` → `Future`** — 提交任务，返回 Future 对象
- **`map(func, *iterables, timeout, chunksize)`** — 批量提交（继承自 `Executor.map`）
- **`shutdown(wait=True)`** — 优雅关闭（当前依赖 KEEP_ALIVE_TIME 自动清理，无需显式调用）

### 实例属性

| 属性 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `KEEP_ALIVE_TIME` | `float` | `5.0` | 空闲线程存活超时（秒） |
| `MIN_WORKERS` | `int` | `0` | 最小常驻线程数 |
| `max_workers` | `int` | — | 最大线程数（构造函数传入） |

---

## 链接

- GitHub: [https://github.com/ydf0509/flexible_thread_pool](https://github.com/ydf0509/flexible_thread_pool)
- PyPI: [https://pypi.org/project/flexible_thread_pool/](https://pypi.org/project/flexible_thread_pool/)
- 另一个弹性线程池实现: [threadpool_executor_shrink_able](https://github.com/ydf0509/threadpool_executor_shrink_able)

---

*如有问题或建议，欢迎提交 [Issue](https://github.com/ydf0509/flexible_thread_pool/issues) 或 PR。*
