Metadata-Version: 2.1
Name: bostorchconnector
Version: 1.1.0
Summary: bostorchconnector, a Python package with a precompiled shared library
Author: 
Author-email: 
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: dcp
Requires-Dist: torch!=2.5.0,>=2.3.0; extra == "dcp"
Requires-Dist: tenacity; extra == "dcp"
Provides-Extra: lightning
Requires-Dist: lightning>=2.0; extra == "lightning"
Requires-Dist: packaging; extra == "lightning"

# bostorchconnector
专为PyTorch训练存储在Bos上的数据集而设计的高吞吐插件，使用bostorchconnector可以高效地访问云上数据集和读写checkpoint。

bostorchconnector是实现PyTorch的[dataset primitives](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html) 接口。
同时支持两种dataset：
- 随机读取[map-style datasets](https://pytorch.org/docs/stable/data.html#map-style-datasets)
- 流式顺序读[iterable-style datasets](https://pytorch.org/docs/stable/data.html#iterable-style-datasets)

支持checkpoint接口，可以直读/写云上Bos，无需落盘。

## 开始

### 前置环境

- Linux
- Python 3.8 or greater is installed 
- PyTorch >= 2.0

### 安装

```shell
pip install bostorchconnector
```

### 配置
配置访问凭证，以下方式配置一种即可，优先级有先后。
- 特定配置文件`~/.baidubce/credentials`
- 安装且配置过bcecmd，默认配置路径是`~/.go-bcecli/credentials`
- 设置环境变量：`BCE_ACCESS_KEY_ID`和`BCE_SECRET_ACCESS_KEY`

其中credentials文件的格式是
```
[Defaults]
Ak= 
Sk= 
Sts=
```

### Examples

[API docs](http://)

#### 示例

使用from_prefix方法构建BosIterableDataset:
```py
from bostorchconnector import BosIterableDataset

# You need to update <BUCKET> and <PREFIX>
DATASET_URI="bos://<BUCKET>/<PREFIX>"
ENDPOINT="http://bj.bcebos.com"

iterable_dataset = BosIterableDataset.from_prefix(DATASET_URI, endpoint=ENDPOINT)

# Datasets are also iterators. 
for item in iterable_dataset:
    data = item.read()
    print(len(data))
    print(item.key)
```

使用from_prefix方法构建BosMapDataset:
```py
from bostorchconnector import BosMapDataset

# You need to update <BUCKET> and <PREFIX>
DATASET_URI="bos://<BUCKET>/<PREFIX>"
ENDPOINT="http://bj.bcebos.com"

map_dataset = BosMapDataset.from_prefix(DATASET_URI, endpoint=ENDPOINT)

# Randomly access to an item in map_dataset.
item = map_dataset[0]

# Learn about bucket, key, and content of the object
bucket = item.bucket
key = item.key
content = item.read()
len(content)
```

直接读写model checkpoint:
```py
from bostorchconnector import BosCheckpoint

import torchvision
import torch

CHECKPOINT_URI="bos://<BUCKET>/<KEY>/"
ENDPOINT="http://bj.bcebos.com"
checkpoint = BosCheckpoint(endpoint=ENDPOINT)

model = torchvision.models.resnet18()

# Save checkpoint to Bos
with checkpoint.writer(CHECKPOINT_URI + "epoch0.ckpt") as writer:
    torch.save(model.state_dict(), writer)

# Load checkpoint from Bos
with checkpoint.reader(CHECKPOINT_URI + "epoch0.ckpt") as reader:
    state_dict = torch.load(reader)

model.load_state_dict(state_dict)
```

### 分布式Checkpoint (Distributed Checkpoints)

#### 概述

bostorchconnector 提供了对 PyTorch 分布式 Checkpoint 的支持，包括：

- **BosStorageWriter**：实现了 PyTorch 的 `StorageWriter` 接口。
- **BosStorageReader**：实现了 PyTorch 的 `StorageReader` 接口。
- **BosFileSystem**：实现了 PyTorch 的 `FileSystemBase` 接口。

这些工具实现了 Bos 与 PyTorch 分布式 Checkpoint 的无缝集成，支持高效存储和读取分布式模型 Checkpoint。

#### 前置条件与安装

需要 PyTorch 2.3 或更新版本。安装时需要指定 `dcp` 额外依赖：

```shell
pip install bostorchconnector[dcp]
```

#### 示例

```py
from bostorchconnector.dcp import BosStorageWriter, BosStorageReader

import torchvision
import torch.distributed.checkpoint as DCP

# 配置
CHECKPOINT_URI = "bos://<BUCKET>/<KEY>/"
ENDPOINT = "http://bj.bcebos.com"

model = torchvision.models.resnet18()

# 保存分布式 Checkpoint 到 Bos
bos_storage_writer = BosStorageWriter(
    endpoint=ENDPOINT,
    path=CHECKPOINT_URI,
    thread_count=4,  # 可选；写入时使用的 IO 线程数
)
DCP.save(
    state_dict=model.state_dict(),
    storage_writer=bos_storage_writer,
)

# 从 Bos 加载分布式 Checkpoint
model = torchvision.models.resnet18()
model_state_dict = model.state_dict()
bos_storage_reader = BosStorageReader(
    endpoint=ENDPOINT,
    path=CHECKPOINT_URI,
)
DCP.load(
    state_dict=model_state_dict,
    storage_reader=bos_storage_reader,
)
model.load_state_dict(model_state_dict)
```

### Lightning 集成

bostorchconnector 包含了对 PyTorch Lightning 的集成，提供了 `BosLightningCheckpoint`，它实现了 Lightning 的 `CheckpointIO` 接口。用户可以借此在 PyTorch Lightning 中使用 Bos 进行 Checkpoint 的读写。

#### 安装

```shell
pip install bostorchconnector[lightning]
```

#### 示例

```py
from lightning import Trainer
from bostorchconnector.lightning import BosLightningCheckpoint

# ...

bos_checkpoint_io = BosLightningCheckpoint(endpoint="http://bj.bcebos.com")
trainer = Trainer(
    plugins=[bos_checkpoint_io],
    default_root_dir="bos://<BUCKET>/<KEY_PREFIX>/"
)
trainer.fit(model)
```

### 直接使用 BosClient

还可以直接使用 BosClient 进行自定义的流式读写。

```py
from bostorchconnector._bos_client import BosClient

ENDPOINT = "http://bj.bcebos.com"
BUCKET_NAME = "<BUCKET>"
OBJECT_KEY = "<KEY>"

bos_client = BosClient(endpoint=ENDPOINT)

# 写入数据到 Bos
data = b"content" * 1048576
bos_writer = bos_client.put_object(bucket=BUCKET_NAME, key=OBJECT_KEY)
bos_writer.write(data)
bos_writer.close()

# 从 Bos 读取数据
bos_reader = bos_client.get_object(bucket=BUCKET_NAME, key=OBJECT_KEY)
data = bos_reader.read()
```

