Metadata-Version: 2.4
Name: FUploader
Version: 1.0.0
Summary: 通用文件写入、打包、OSS 上传管线 SDK
Author: data-team
License: MIT
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: aio-pika>=9.4.0
Requires-Dist: redis>=5.0.0
Requires-Dist: zstandard>=0.22.0
Provides-Extra: cos
Requires-Dist: cos-python-sdk-v5>=1.9.0; extra == "cos"
Provides-Extra: oss
Requires-Dist: oss2>=2.18.0; extra == "oss"
Provides-Extra: s3
Requires-Dist: boto3>=1.34.0; extra == "s3"
Provides-Extra: adapters
Requires-Dist: FUploader[cos,oss,s3]; extra == "adapters"

# FUploader

通用的「消费消息 → 写本地文件 → 打包归档 → 上传 OSS」管线 SDK。

## 核心流程

```
 MessageSource          MetaWriter                  MetaPacker
 ────────────         ──────────────            ─────────────────
 RabbitMQ ──► 解析消息 ──► 写盘到 slot ──► 封口 ──► 打包 .tar.zst ──► 上传 OSS ──► 清理本地
                          │
                          ▼
                    Redis (StateStore)
                    · slot 计数 / 封口阈值
                    · 待打包队列
                    · 分布式锁
```

## 架构设计

### 三层抽象接口

SDK 通过抽象接口解耦外部依赖，方便替换不同中间件：

| 接口 | 职责 | 内置实现 |
|------|------|----------|
| `MessageSource` | 消息消费、ACK/NACK | `RabbitMQSource` (aio-pika) |
| `StateStore` | slot 状态、队列、分布式锁 | `RedisStateStore` (redis-py async) |
| `ObjectStore` | 文件上传、存在性检查 | `OssUploader` (COS / 阿里云 OSS / S3) |

### 四种策略注入

业务逻辑通过 Callable 注入，无需继承任何类：

| 策略 | 签名 | 作用 |
|------|------|------|
| `MessageParser` | `(bytes) -> dict` | 将原始消息字节解析为结构化数据 |
| `FileNameGenerator` | `(dict) -> str` | 根据数据生成写入文件名 |
| `Packer` | `(Path) -> Path \| None` | 将目录打包为归档文件 |
| `RemoteKeyGenerator` | `(str) -> str` | 根据归档文件名生成远端 OSS key |

### Slot 分片机制

- 按文件名 hash 取模分配到不同 slot 目录（`slot_0_0`、`slot_1_0` ...）
- 每个 slot 有独立的 `active_dir_id` 和 `active_count`
- 支持多进程并行写入同一存储目录，互不冲突

### 封口（Seal）机制

- 每个 slot 目录内文件数达到 `pack_threshold` 时自动触发封口
- 当前目录被标记为 ready，加入打包队列；同时创建新序号目录（如 `slot_0_0` → `slot_0_1`）
- 阈值检查通过 Redis Lua 脚本保证原子性

### 残留恢复

启动时自动扫描 `storage_root`，找出非当前活跃的 `slot_*` 目录，重新加入待打包队列，防止进程崩溃后数据丢失。

## 安装

```bash
# 基础安装
pip install FUploader

# 按需安装 OSS 后端
pip install FUploader[cos]      # 腾讯云 COS
pip install FUploader[oss]      # 阿里云 OSS
pip install FUploader[s3]       # AWS S3 / MinIO
pip install FUploader[adapters] # 全部 OSS 后端
```

## 快速开始

以下示例展示抖音评论数据写入管线的完整用法（源码见 `examples/douyin_writer.py`）：

```python
import asyncio
import json
from pathlib import Path

from file_uploader import (
    FileWriterConfig,
    FileWriterPipeline,
    OssProvider,
    OssUploader,
    RabbitMQSource,
    RedisStateStore,
    tar_zstd_packer,
)


# 1. 业务策略函数
def douyin_message_parser(raw_body: bytes) -> dict:
    return json.loads(raw_body.decode("utf-8"))


def douyin_file_name_generator(data: dict) -> str:
    comment_id = data.get("cid", "unknown")
    return f"comment_{comment_id}.json"


def douyin_remote_key_generator(archive_name: str) -> str:
    from datetime import datetime
    date_str = datetime.now().strftime("%Y%m%d")
    return f"douyin/comments/{date_str}/{archive_name}"


async def main():
    # 2. 配置
    config = FileWriterConfig(
        slot_count=16,
        pack_threshold=1000,
        storage_root=Path("/data/douyin_output"),
        save_timeout=5.0,
        packer_concurrency=4,
        packer_interval=2.0,
        packer_max_retries=3,
    )

    # 3. Redis 状态存储
    state_store = RedisStateStore(
        host="localhost", port=6379, db=0,
        key_prefix="file_uploader:douyin:",
    )
    await state_store.connect()

    # 4. RabbitMQ 消息源
    message_source = RabbitMQSource(
        host="localhost", port=5672,
        user="guest", password="guest",
        queue_name="douyin_comment_queue",
    )

    # 5. OSS 上传器（腾讯云 COS）
    object_store = OssUploader(
        provider=OssProvider.COS,
        bucket="douyin-data-1250000000",
        region="ap-guangzhou",
        endpoint="cos.ap-guangzhou.myqcloud.com",
        access_key_id="AKID...",
        access_key_secret="...",
    )

    # 6. 构建 Pipeline
    pipeline = (
        FileWriterPipeline
        .with_config(config)
        .with_state_store(state_store)
        .with_message_source(message_source)
        .with_object_store(object_store)
        .with_message_parser(douyin_message_parser)
        .with_file_name_generator(douyin_file_name_generator)
        .with_packer(tar_zstd_packer)
        .with_remote_key_generator(douyin_remote_key_generator)
    )

    # 7. 启动 & 等待终止信号
    await pipeline.start()
    await pipeline.wait()  # 阻塞直到 SIGINT / SIGTERM
    await pipeline.stop()


if __name__ == "__main__":
    asyncio.run(main())
```

## 配置参考

`FileWriterConfig` 所有字段：

| 字段 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `storage_root` | `Path` | **必填** | 本地存储根目录 |
| `slot_count` | `int` | `1` | 分片数（多进程写文件时避免文件锁竞争） |
| `pack_threshold` | `int` | `1000` | 每个目录最多文件数，触发封口（seal） |
| `write_concurrency` | `int` | `10` | 写文件 worker 协程数 |
| `packer_concurrency` | `int` | `2` | 打包上传 worker 协程数 |
| `save_timeout` | `float` | `30.0` | 单文件写入超时（秒） |
| `packer_interval` | `float` | `1.0` | 打包轮询间隔（秒） |
| `packer_max_retries` | `int` | `3` | 打包失败最大重试次数（0=不重试，直接丢弃） |
| `batch_size` | `int` | `100` | 批量 ACK 阈值 |
| `flush_interval` | `float` | `5.0` | 时间窗口 flush（秒） |
| `prefetch_count` | `int` | `50` | 消息预取数 |
| `resume_orphan_archives` | `bool` | `True` | 启动时是否恢复孤儿目录 |

## 核心组件

### FileWriterPipeline

Builder 模式的一站式入口，封装完整的生命周期管理：

| 方法 | 说明 |
|------|------|
| `.with_config(config)` | 注入 `FileWriterConfig` 配置 |
| `.with_state_store(store)` | 注入 `StateStore` 实现 |
| `.with_message_source(source)` | 注入 `MessageSource` 实现 |
| `.with_object_store(store)` | 注入 `ObjectStore` 实现 |
| `.with_message_parser(parser)` | 注入消息解析策略 |
| `.with_file_name_generator(gen)` | 注入文件名生成策略 |
| `.with_packer(packer)` | 注入打包策略 |
| `.with_remote_key_generator(gen)` | 注入远端 key 生成策略 |
| `.no_recovery()` | 禁用启动时残留恢复 |
| `await pipeline.start()` | 启动 Pipeline |
| `await pipeline.wait()` | 阻塞直到收到 SIGINT / SIGTERM |
| `await pipeline.stop()` | 优雅关闭 |

### MetaWriter

文件写入服务，负责：

1. 通过 `MessageSource` 消费消息
2. 调用 `MessageParser` 解析为结构化数据
3. 调用 `FileNameGenerator` 生成文件名
4. 按文件名 hash 取模路由到对应 slot 目录
5. 将数据序列化为 JSON 写入磁盘
6. 原子递增 slot 计数并检查封口阈值
7. 达到阈值时封口（seal），将目录加入打包队列

### MetaPacker

打包上传服务，负责：

1. 轮询待打包队列
2. 通过分布式锁防止重复打包
3. 调用 `Packer` 将目录打包为归档文件
4. 调用 `RemoteKeyGenerator` 生成远端 key
5. 通过 `ObjectStore` 上传到 OSS
6. 上传成功后清理本地归档和源目录
7. 打包失败时指数退避重试，全部失败后丢弃

### ResidualRecovery

启动期间执行残留恢复：

- 扫描 `storage_root` 下所有 `slot_*` 目录
- 排除当前活跃目录（`active_dir_id`）
- 跳过空目录
- 将孤儿目录重新加入待打包队列

## 自定义策略

所有策略都是普通函数（支持同步/异步），无需继承框架基类：

```python
# 自定义消息解析器（异步版本）
async def my_parser(raw_body: bytes) -> dict:
    data = json.loads(raw_body)
    # 数据清洗、补全...
    return data

# 自定义打包器（可替换 tar.zstd）
async def my_packer(source_dir: Path) -> Path | None:
    """7z 压缩示例"""
    archive = source_dir.with_suffix(".7z")
    result = await run_7z(source_dir, archive)
    return archive if result else None

# 注入到 Pipeline
pipeline = (
    FileWriterPipeline
    .with_config(config)
    # ...其他配置...
    .with_message_parser(my_parser)
    .with_packer(my_packer)
)
```

## 适配器

### RabbitMQSource

基于 `aio-pika` 的 RabbitMQ 消费实现：

```python
source = RabbitMQSource(
    host="localhost", port=5672,
    vhost="/", user="guest", password="guest",
    queue_name="my_queue",
    prefetch_count=50,
    requeue_on_nack=True,
)
```

### RedisStateStore

基于 `redis.asyncio` 的状态存储实现。使用 Lua 脚本保证阈值检查和分布式锁的原子性：

```python
store = RedisStateStore(
    host="localhost", port=6379, db=0,
    password="",
    key_prefix="file_uploader:task1:",
    lock_ttl=300,
)
await store.connect()
```

### OssUploader

支持三种 OSS 后端的统一上传接口，各后端依赖按需安装：

```python
# 腾讯云 COS → pip install FUploader[cos]
uploader = OssUploader(
    provider=OssProvider.COS,
    bucket="my-bucket-1250000000",
    region="ap-guangzhou",
    endpoint="cos.ap-guangzhou.myqcloud.com",
    access_key_id="AKID...",
    access_key_secret="...",
)

# 阿里云 OSS → pip install FUploader[oss]
uploader = OssUploader(
    provider=OssProvider.ALIYUN_OSS,
    bucket="my-bucket",
    endpoint="oss-cn-hangzhou.aliyuncs.com",
    access_key_id="LTAI...",
    access_key_secret="...",
)

# AWS S3 / MinIO → pip install FUploader[s3]
uploader = OssUploader(
    provider=OssProvider.S3,
    bucket="my-bucket",
    region="us-east-1",
    endpoint="https://s3.amazonaws.com",
    access_key_id="AKIA...",
    access_key_secret="...",
)
```

## 项目结构

```
src/file_uploader/
├── __init__.py              # 公开 API 导出
├── config.py                # FileWriterConfig 配置类
├── pipeline.py              # FileWriterPipeline 入口 + Builder
├── interfaces/              # 抽象接口层
│   ├── message_source.py    #   MessageSource 抽象
│   ├── object_store.py      #   ObjectStore 抽象
│   ├── state_store.py       #   StateStore 抽象 + SlotRuntimeState
│   └── strategies.py        #   MessageParser / FileNameGenerator / Packer / RemoteKeyGenerator 类型
├── adapters/                # 适配器实现
│   ├── rabbitmq_source.py   #   RabbitMQSource
│   ├── redis_state_store.py #   RedisStateStore
│   └── oss_uploader.py      #   OssUploader (COS / OSS / S3)
├── packers/                 # 打包策略
│   └── tar_zstd.py          #   tar + zstd 打包器
├── services/                # 核心服务
│   ├── meta_writer.py       #   MetaWriter 写入服务
│   └── meta_packer.py       #   MetaPacker 打包上传服务
└── recovery/                # 残留恢复
    └── residual.py          #   ResidualRecovery
```

## License

MIT
