Metadata-Version: 2.4
Name: tosfsspec
Version: 2026.4.22
Summary: TOS implementation for pythonic file-system interface
Author-email: xiangshijian <xiangshijian@bytedance.com>
Classifier: Development Status :: 4 - Beta
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Topic :: Utilities
Requires-Python: <3.14,>=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: tos>=2.8.0
Requires-Dist: tosnativeclient>=1.1.16
Requires-Dist: fsspec>=2023.5.0
Dynamic: license-file

# tosfsspec - 火山引擎 TOS 文件系统接口

## 概述

`tosfsspec` 是基于 Python `fsspec` (Filesystem Specification) 规范实现的火山引擎对象存储（TOS）文件系统接口。通过
`tosfsspec`，用户可以使用标准的 Python 文件操作方式（如 `open`, `read`, `write`）直接访问 TOS 中的数据，同时支持与
Pandas、Dask、Ray、PyArrow 等主流数据分析和 AI 框架无缝集成。

## 产品优势

* **标准接口**：完全遵循 `fsspec` 标准，与 Python 生态系统高度兼容。
* **高性能**：底层核心逻辑使用 Rust 编写的 `tosnativeclient`，提供高性能的并发 IO 操作。
* **生态集成**：原生支持 Pandas、Ray、PyTorch 等框架，无需额外适配即可读取 TOS 数据。
* **易用性**：提供类似于本地文件系统的操作体验，简化了对象存储的使用复杂度。

## 安装方式

使用 pip 安装 `tosfsspec`：

```bash
pip install tosfsspec
```

验证安装是否成功：

```bash
pip show tosfsspec
```

## 快速开始

### 1. 配置访问凭证

在使用 `tosfsspec` 之前，您需要准备好火山引擎 TOS 的访问凭证（Access Key ID 和 Secret Access Key）、Endpoint 和 Region 信息。

初始化 `TosFileSystem` 示例：

```python
import tosfsspec

# 初始化文件系统对象
fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='https://tos-cn-beijing.volces.com',  # 替换为您的 Endpoint
    region='cn-beijing'  # 替换为您的 Region
)
```

### 2. 基本文件操作

#### 列举文件

```python
# 列举存储桶中的文件
files = fs.ls('tos://my-bucket/data/')
print(files)
```

#### 写入数据

```python
# 直接写入字符串数据
with fs.open('tos://my-bucket/test.txt', 'w') as f:
    f.write('Hello, TOS!')

# 上传本地文件到 TOS
fs.put_file('local_data.csv', 'tos://my-bucket/data/remote_data.csv')
```

#### 读取数据

```python
# 读取 TOS 文件内容
with fs.open('tos://my-bucket/test.txt', 'r') as f:
    content = f.read()
    print(content)

# 下载 TOS 文件到本地
fs.get_file('tos://my-bucket/data/remote_data.csv', 'local_data.csv')
```

## TosFileSystem 初始化详解

`TosFileSystem` 是与 TOS 交互的核心入口，支持多种参数配置以适应不同的网络环境和性能需求。

### 初始化参数

| 参数名                             | 类型     | 必填 | 默认值    | 说明                                               |
|:--------------------------------|:-------|:---|:-------|:-------------------------------------------------|
| `region`                        | `str`  | 是  | -      | TOS 存储桶所在的区域，例如 `cn-beijing`。                    |
| `endpoint`                      | `str`  | 否  | `''`   | TOS 服务端点，例如 `https://tos-cn-beijing.volces.com`。 |
| `key`                           | `str`  | 否  | `''`   | 访问密钥 Access Key ID。                              |
| `secret`                        | `str`  | 否  | `''`   | 访问密钥 Secret Access Key。                          |
| `part_size`                     | `int`  | 否  | `8MB`  | 分片上传/下载的大小（字节）。对于大文件，增大此值可减少 API 调用次数并提升吞吐量。     |
| `max_retry_count`               | `int`  | 否  | `3`    | 请求失败时的最大重试次数。                                    |
| `shared_prefetch_tasks`         | `int`  | 否  | `32`   | 数据预取时的最大并发任务数。                                   |
| `shared_upload_part_tasks`      | `int`  | 否  | `32`   | 分片上传时的最大并发任务数。                                   |
| `shared_upload_part_copy_tasks` | `int`  | 否  | `32`   | 分片拷贝时的最大并发任务数。                                   |
| `enable_crc`                    | `bool` | 否  | `True` | 是否开启 CRC64 数据完整性校验。关闭可降低 CPU 开销，但会降低数据安全性。       |

### 初始化示例

#### 1. 基础初始化

使用显式的 AK/SK 进行初始化。

```python
from tosfsspec import TosFileSystem

fs = TosFileSystem(
    region='cn-beijing',
    endpoint='https://tos-cn-beijing.volces.com',
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY'
)
```

#### 2. 性能调优初始化

针对高吞吐场景，增加分片大小和并发数，并关闭 CRC 校验。

```python
fs = TosFileSystem(
    region='cn-beijing',
    endpoint='https://tos-cn-beijing.volces.com',
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    part_size=16 * 1024 * 1024,  # 16MB 分片
    shared_upload_part_tasks=64,  # 提高上传并发
    enable_crc=False  # 关闭 CRC 校验
)
```

## 常用功能使用示例

### 示例 1：在 Python 环境中直接使用

在 Python 环境中可直接使用 `tosfsspec.TosFileSystem` 的常用 API 读写 TOS，类似本地文件系统的使用方式。

```python
import os

import tosfsspec

fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

bucket = 'my-bucket'
local_file_path = 'localfile.txt'
remote_file_path = f'tos://{bucket}/remote_file.txt'

# create a local file to upload
with open(local_file_path, 'w') as f:
    f.write('Hello TOSFS.')

# upload to tos
fs.put_file(local_file_path, remote_file_path)
print(f'Uploaded {local_file_path} to {remote_file_path}')

# download from tos
downloaded_file_path = 'downloaded_file.txt'
fs.get_file(remote_file_path, downloaded_file_path)
print(f'Downloaded {remote_file_path} to {downloaded_file_path}')

# read content from downloaded local file
with open(downloaded_file_path, 'r') as f:
    content = f.read()
    print(f'Content of {downloaded_file_path}: {content}')

# delete tos file
fs.rm(remote_file_path)

# write to tos
with fs.open(remote_file_path, 'w') as f:
    f.write('Hello TOSFS.')

# read from tos
with fs.open(remote_file_path, 'r') as f:
    tos_content = f.read()
    print(f'Content of {remote_file_path}: {tos_content}')

# clean local files
os.remove(local_file_path)
os.remove(downloaded_file_path)

```

### 示例 2：Ray 分布式计算集成

Ray 可以利用 `tosfsspec.TosFileSystem` 在分布式集群中直接加载 TOS 数据进行处理。

```python
import ray
import tosfsspec

# 初始化 Ray (通常在集群环境中已初始化)
ray.init()

bucket = 'my-bucket'

fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

# csv file path
input_csv_path = f'tos://{bucket}/input.csv'
output_csv_path = f'tos://{bucket}/output.csv'

with fs.open(input_csv_path, 'w') as f:
    f.write('id,name,age\n1,John Doe,30\n2,Jane Smith,25\n3,Bob Johnson,40\n')

ds = ray.data.read_csv(input_csv_path, filesystem=fs)

# processing data
# ds = ds.map_batches(your_processing_function)

ds.repartition(1).write_csv(output_csv_path, filesystem=fs)
```

### 示例 3：PyArrow 集成

PyArrow 可以通过 `filesystem` 参数使用 `tosfsspec.TosFileSystem`，实现高效读写 TOS 中的数据。

```python
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tosfsspec

bucket = 'my-bucket'

fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

# 生成 Parquet 数据集
table = pa.table({'year': [2020, 2022, 2021, 2022, 2019, 2021],
                  'n_legs': [2, 2, 4, 4, 5, 100],

                  'animal': ['Flamingo', 'Parrot', 'Dog', 'Horse',
                             'Brittle stars', 'Centipede']})
pq.write_table(table, f'tos://{bucket}/data.parquet', filesystem=fs)

# 读取 Parquet 数据集
dataset = ds.dataset(f'tos://{bucket}/data.parquet', filesystem=fs)

# 输出数据集信息
print(f'Rows: {dataset.count_rows()}')
print(dataset.schema)

# 转换为 Pandas DataFrame
df = dataset.to_table().to_pandas()
print(df.head())

```

### 示例 4：PyTorch 集成

PyTorch 可以在数据集加载和 Checkpoint 读写等场景集成 `tosfsspec.TosFileSystem`。

```python
import fsspec
import torch
import tosfsspec
from torchdata.datapipes.iter import FSSpecFileLister, FSSpecFileOpener

bucket = 'my-bucket'

# 数据集加载
fsspec.register_implementation('tos-new', 'tosfsspec.TosFileSystem', )
kwargs = {
    'key': 'YOUR_ACCESS_KEY',
    'secret': 'YOUR_SECRET_KEY',
    'endpoint': 'http://tos-cn-beijing.volces.com',
    'region': 'cn-beijing'
}

file_lister = FSSpecFileLister(root=f'tos-new://{bucket}/dataset/', **kwargs)
iterable_dataset = FSSpecFileOpener(file_lister, mode='rb', **kwargs)

for _, item in iterable_dataset:
    data = item.read()

# Checkpoint 读写
fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

model = torch.nn.Linear(10, 10)
with fs.open(f'tos://{bucket}/models/model.pt', 'wb') as f:
    torch.save(model.state_dict(), f)

with fs.open(f'tos://{bucket}/models/model.pt', 'rb') as f:
    state_dict = torch.load(f)
    # print(state_dict)
    model.load_state_dict(state_dict)
```

### 示例 5：Pandas 集成

Pandas 可以利用`tosfsspec.TosFileSystem`直接从 TOS 读取或向 TOS 写入各种格式的数据（如 CSV、Parquet 等），无需先将文件下载到本地。

```python

# 从 TOS 读取 CSV 文件
import fsspec
import pandas as pd
import tosfsspec

fsspec.register_implementation('tos-new', 'tosfsspec.TosFileSystem', )

# 准备一个 CSV 文件在 TOS 上
fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    region='cn-beijing'
)
bucket = 'my-bucket'
csv_path = f'tos-new://{bucket}/datasets/iris.csv'
csv_content = 'sepal_length,sepal_width,petal_length,petal_width,species\n5.1,3.5,1.4,0.2,setosa\n'
with fs.open(csv_path, 'wb') as f:
    f.write(csv_content.encode())

# 配置 storage_options
storage_options = {
    'key': 'YOUR_ACCESS_KEY',
    'secret': 'YOUR_SECRET_KEY',
    'region': 'cn-beijing'
}

# 使用 pandas 直接读取
df = pd.read_csv(csv_path, storage_options=storage_options)

print('Read csv data from TOS:')
print(df)

# 将 DataFrame 写入 TOS 的 Parquet 文件
# 创建一个 DataFrame
data = {'col1': [1, 2], 'col2': [3, 4]}
df_to_write = pd.DataFrame(data=data)

# 定义要写入的 Parquet 文件路径
parquet_path = f'tos-new://{bucket}/output/data.parquet'

# 直接写入 TOS，需要安装 pyarrow 或 fastparquet
df_to_write.to_parquet(parquet_path, storage_options=storage_options, index=False)

print(f'Write data to: {parquet_path}')
```

### 示例 6：Dask 集成

Dask 是一个用于并行计算的灵活库，它天然支持`fsspec`兼容的文件系统。结合`tosfsspec.TosFileSystem`后，你可以使用
dask.dataframe 一次性并行读取 TOS 上的多个 CSV 文件，构建一个分布式 DataFrame。

```python
import dask.dataframe as dd
import fsspec

fsspec.register_implementation('tos-new', 'tosfsspec.TosFileSystem', )

bucket = 'my-bucket'

# 配置 storage_options
storage_options = {
    'key': 'YOUR_ACCESS_KEY',
    'secret': 'YOUR_SECRET_KEY',
    'region': 'cn-beijing'
}

# 文件路径支持通配符
dask_path = f'tos-new://{bucket}/stock_data/part.*.csv'

# 使用 Dask 读取数据，Dask 会自动处理通配符并并行加载
ddf = dd.read_csv(dask_path, storage_options=storage_options)

# Dask 是惰性计算的，此时数据还未真正加载
print('Dask DataFrame:', ddf.npartitions)

# 执行计算，例如计算平均值，这会触发实际的 I/O 操作
mean_value = ddf.some_column.mean().compute()

print('Avg:', mean_value)
```
