Metadata-Version: 2.4
Name: s3py-helper
Version: 0.0.13
Summary: Unified file storage component with S3 and FastDFS fusion support
Author-email: ferdinand <gongjpa@gmail.com>
License-Expression: Apache-2.0 OR MIT
Keywords: aio,fastdfs,s3,storage
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.12
Requires-Dist: aioboto3>=13.0.0
Requires-Dist: aiofdfs>=0.0.1
Requires-Dist: aiofiles>=24.0.0
Requires-Dist: fastapi>=0.100.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Description-Content-Type: text/markdown

# s3py-helper

## 1. Component Overview

s3py-helper is a unified asynchronous file storage component that supports both S3 protocol object storage and the FastDFS distributed file system. It is designed for the Python 3.12+ asynchronous ecosystem, built on aioboto3 and aiofdfs respectively.

The component adopts a layered architecture. The `s3raw` package provides raw operations for the S3 protocol, the `fastdfs` package provides raw operations for FastDFS, and the `fusion` package implements automatic routing between the two at the upper layer. The core design of the fusion layer is **automatic routing based on the `file_id` prefix**: `file_id`s starting with `s3:` are routed to the S3 backend, while all others are routed to the FastDFS backend.

The component supports three storage modes via `StorageModeConfig`: `s3` (S3 only), `fastdfs` (FastDFS only), and `hybrid` (both backends). In `hybrid` mode, upload operations go to S3 with the `s3:` prefix added, while download, delete, and metadata query operations automatically route based on the `file_id` prefix. In `s3` mode, all operations go to S3. In `fastdfs` mode, all operations go to FastDFS. The `StorageManager` class serves as the unified entry point, automatically creating the required clients and services based on the configured mode.

This design allows business code to work with a single set of fusion service interfaces, transparently operating on one or both storage backends, and supports packaging files from mixed sources in batch download scenarios. The `StorageManager` manages the lifecycle of all asynchronous clients using a context manager pattern.

## 2. Integration and Configuration Guide

### Dependency Installation

The project uses uv or pip for dependency management. Core dependencies include aioboto3 (S3 async client), aiofdfs (FastDFS async client), pydantic (configuration models), aiofiles (async file operations), and fastapi (UploadFile support).

```bash
pip install s3py-helper
```

### S3 Protocol Storage Configuration

Configure S3-compatible object storage services (such as MinIO, AWS S3, etc.) using `S3Config`. `endpoint` is the service address, `access_key_id` and `secret_access_key` are the authentication credentials, `default_bucket` specifies the default bucket name, and `region` is the region identifier, defaulting to `us-east-1`.

```python
from s3py_helper.storage import S3Config

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
    region="us-east-1",
)
```

### FastDFS Configuration

Configure FastDFS connection parameters using `FastdfsConfig`. `tracker_servers` is a list of tracker server addresses (format `host:port`), `connect_timeout` and `network_timeout` control the connection timeout and network read/write timeout (in seconds) respectively, `store_path_index` specifies the storage path index, default -1 for automatic selection.

```python
from s3py_helper.storage import FastdfsConfig

fastdfs_config = FastdfsConfig(
    tracker_servers=["192.168.30.36:22122"],
    connect_timeout=2,
    network_timeout=30,
)
```

### Storage Mode Configuration

`StorageModeConfig` defines which storage backend(s) to use via the `mode` field. Three modes are supported:

- `hybrid` (default): Both S3 and FastDFS. Upload goes to S3, download/delete/info auto-route by `file_id` prefix. Requires both `s3_config` and `fastdfs_config`.
- `s3`: S3 only. All operations go to S3. Requires `s3_config`. Accessing FastDFS-routed files raises `RouteNotSupportedException`.
- `fastdfs`: FastDFS only. All operations go to FastDFS. Requires `fastdfs_config`. Accessing S3-routed files raises `RouteNotSupportedException`.

```python
from s3py_helper.storage import StorageModeConfig, StorageMode

# Hybrid mode (default, both backends)
mode_config = StorageModeConfig(mode=StorageMode.HYBRID)

# S3 only mode
mode_config = StorageModeConfig(mode=StorageMode.S3)

# FastDFS only mode
mode_config = StorageModeConfig(mode=StorageMode.FASTDFS)
```

### File Validation Configuration

`FileTypeConfig` is used for file type validation during uploads, supporting both whitelist and blacklist modes. When the whitelist is non-empty, only extensions in the whitelist are allowed. The blacklist always takes effect. Additionally, `FileValidatorImpl` supports limiting file size via the `max_file_size` parameter. If validation is not required, it can be omitted or default values can be passed.

```python
from s3py_helper.storage import FileTypeConfig, FileValidatorImpl

file_type_config = FileTypeConfig(
    whitelist=["pdf", "jpg", "png", "zip"],
    blacklist=["exe", "bat"],
)
validator = FileValidatorImpl(file_type_config=file_type_config, max_file_size=100 * 1024 * 1024)
```

## 3. Guide for Fusion Usage of FastDFS and S3

The fusion package provides unified interfaces for upload, download, deletion, and metadata querying. Business code does not need to worry about whether the underlying storage is S3 or FastDFS. All fusion services are assembled from raw services built using `S3Client` and `FastdfsClient`.

### Service Initialization

`StorageManager` is the recommended entry point for initializing all fusion services. It manages client lifecycles and automatically creates the required services based on the configured mode. Use the `async with` context manager to ensure proper resource cleanup.

```python
from s3py_helper.storage import (
    StorageManager, StorageModeConfig, StorageMode,
    S3Config, FastdfsConfig,
)

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])

async with StorageManager(
    mode_config=StorageModeConfig(mode=StorageMode.HYBRID),
    s3_config=s3_config,
    fastdfs_config=fastdfs_config,
) as manager:
    # Access services via properties
    fusion_upload = manager.upload_service
    fusion_download = manager.download_service
    fusion_delete = manager.delete_service
    fusion_info = manager.info_service
```

For S3-only or FastDFS-only modes, only the corresponding config is required:

```python
# S3 only
async with StorageManager(
    mode_config=StorageModeConfig(mode=StorageMode.S3),
    s3_config=s3_config,
) as manager:
    attach = await manager.upload_service.upload_bytes("report.pdf", content)

# FastDFS only
async with StorageManager(
    mode_config=StorageModeConfig(mode=StorageMode.FASTDFS),
    fastdfs_config=fastdfs_config,
) as manager:
    attach = await manager.upload_service.upload_bytes("report.pdf", content)
```

### File Upload

The fusion upload service routes based on the storage mode. In `s3` and `hybrid` modes, files are uploaded to S3 with the `s3:` prefix added to the `file_id`. In `fastdfs` mode, files are uploaded to FastDFS without the `s3:` prefix. Before uploading, it performs file name validity checks, file type validation, and file size validation. The returned `FileAttach` contains the fusion-format `file_id` and the original file name.

`upload_bytes` is used for uploading in-memory byte data, `upload_file` is used for uploading FastAPI `UploadFile` objects (streaming internally to avoid loading the entire file into memory), `upload_stream` is used for uploading custom binary streams or async iterators, and `upload_files` supports batch uploading multiple `UploadFile` objects. All methods support an optional `folder` parameter to specify the prefix directory for the S3 object key, a `bucket` parameter to override the default bucket, and a `metadata` parameter (`dict[str, str]`) to attach custom metadata to the uploaded file.

```python
# Upload byte data
attach = await fusion_upload.upload_bytes("report.pdf", content, folder="reports")

# Upload FastAPI UploadFile (streaming)
attach = await fusion_upload.upload_file(upload_file, folder="uploads")

# Upload custom stream
import io
stream = io.BytesIO(content)
attach = await fusion_upload.upload_stream("data.csv", stream, len(content), folder="data")

# Upload with custom metadata
attach = await fusion_upload.upload_bytes(
    "report.pdf", content, folder="reports",
    metadata={"author": "john", "category": "finance"},
)
```

### File Download

The fusion download service automatically routes based on the `file_id` prefix to the corresponding backend. `download_bytes` reads the entire file into memory and returns `bytes`, suitable for small files. `download_to_stream` writes the data streamingly into the specified `BinaryIO` output stream, suitable for directly writing download content to a file or buffer. `download_to_generator` returns an asynchronous generator that yields data in chunks, suitable for streaming response scenarios.

```python
# Download as byte data
data = await fusion_download.download_bytes("s3:my-bucket/reports/uuid_report.pdf")

# Download to output stream
with open("output.pdf", "wb") as f:
    await fusion_download.download_to_stream("s3:my-bucket/reports/uuid_report.pdf", f)

# Streaming download (async iterator)
async for chunk in fusion_download.download_to_generator(some_file_id):
    process(chunk)
```

#### Mixed-Source Batch Download as Zip

`download_files_as_zip` supports packaging multiple files from S3 and FastDFS into a single ZIP archive written to an output stream. Each file is specified via `FileDownload` with its `file_id` and an optional display name. The service automatically identifies the source backend and streams each file's content into the ZIP archive. When `file_name` is missing, the filename part from the `file_id` is used as the entry name in the ZIP archive. Duplicate entry names are automatically suffixed with a sequence number to avoid overwrites.

```python
from s3py_helper.storage import FileDownload
import io

files = [
    FileDownload(file_id="s3:my-bucket/reports/uuid_report.pdf", file_name="report.pdf"),
    FileDownload(file_id="group1/M00/00/00/old_contract.pdf", file_name="contract.pdf"),
]
buf = io.BytesIO()
await fusion_download.download_files_as_zip(files, buf)
```

For scenarios where the ZIP stream needs to be sent directly to a FastAPI `StreamingResponse`, the `stream_zip` method can be used. It returns an asynchronous generator that internally uses a queue to parallelize ZIP writing and network sending.

```python
from starlette.responses import StreamingResponse

async def download_zip(request):
    files = [FileDownload(file_id="s3:my-bucket/doc.pdf", file_name="doc.pdf")]
    return StreamingResponse(
        fusion_download.stream_zip(files),
        media_type="application/zip",
        headers={"Content-Disposition": "attachment; filename=files.zip"},
    )
```

### File Deletion

`delete_file` automatically routes to the S3 or FastDFS backend for deletion based on the `file_id` prefix. S3 deletion is idempotent (deleting a non-existent object does not raise an error). In `s3` mode, accessing FastDFS-routed files raises a `RouteNotSupportedException`.

```python
# Delete S3 file
await fusion_delete.delete_file("s3:my-bucket/reports/uuid_report.pdf")

# Delete FastDFS file (hybrid/fastdfs mode only)
await fusion_delete.delete_file("group1/M00/00/00/some_file.pdf")
```

### Metadata Query

`get_file_info` routes to the corresponding backend based on the `file_id` prefix to query metadata, returning a unified `FileBasicInfo` object. The S3 backend uses `head_object` to get file size, last modification time, storage class, and custom metadata. The FastDFS backend uses `get_meta_data` to get the file name and size. For S3 files, the returned `file_id` retains the fusion-layer `s3:` prefix, making it convenient for subsequent operations to use directly. A `FileNotFoundException` is raised when the file does not exist.

```python
info = await fusion_info.get_file_info("s3:my-bucket/reports/uuid_report.pdf")
print(info.file_name)    # File name
print(info.file_size)    # File size (bytes)
print(info.last_modify_time)  # Last modification time (S3)
print(info.storage_class)     # Storage class (S3)
print(info.metadata)          # Custom metadata (S3)
```

## 4. Guide for Using Pure S3 Protocol Object Storage

The s3raw package provides direct operation interfaces for the S3 protocol without routing logic or file validation, suitable for scenarios requiring only S3 storage or scenarios needing finer control.

### Client and Service Initialization

All S3 raw services share the same `S3Client` instance. The client manages the lifecycle of the underlying connection pool via the `async with` context manager. `S3RawUploadServiceImpl` requires `S3Config` to resolve the default bucket, while other services only depend on the client instance.

```python
from s3py_helper.storage.s3raw.client.s3_client import S3Client
from s3py_helper.storage.s3raw.service.upload_service import S3RawUploadServiceImpl
from s3py_helper.storage.s3raw.service.download_service import S3RawDownloadServiceImpl
from s3py_helper.storage.s3raw.service.delete_service import S3RawDeleteServiceImpl
from s3py_helper.storage.s3raw.service.info_service import S3RawInfoServiceImpl

async with S3Client(s3_config) as s3:
    upload_svc = S3RawUploadServiceImpl(s3, s3_config)
    download_svc = S3RawDownloadServiceImpl(s3)
    delete_svc = S3RawDeleteServiceImpl(s3)
    info_svc = S3RawInfoServiceImpl(s3)
```

The returned `file_id` format is `bucket/key` (without a prefix). Subsequent operations can use this `file_id` directly.

### Upload

`upload_bytes` uploads byte data to S3. `upload_stream` supports uploading from a `BinaryIO` or async iterator. `upload_file` accepts a FastAPI `UploadFile` object and streams it to save memory. `upload_files` supports batch uploading. All methods return a `FileAttach`, where `file_id` is in the format `bucket/key` and `file_name` is the original file name. When uploading, the S3 object key is automatically generated in the format `[folder/]uuid_prefix_filename`, where `uuid_prefix` is a 16-character random hexadecimal string ensuring uniqueness.

```python
# Byte upload
attach = await upload_svc.upload_bytes("report.pdf", content, folder="reports")
# attach.file_id → "my-bucket/reports/a1b2c3d4e5f6g7h8_report.pdf"

# Stream upload
import io
attach = await upload_svc.upload_stream("data.bin", io.BytesIO(content), len(content))

# UploadFile upload (streaming, does not load entire file into memory)
attach = await upload_svc.upload_file(upload_file, bucket="other-bucket")
```

### Download

`download_bytes` reads the entire content at once and returns `bytes`, suitable for small files. `download_to_stream` writes data in chunks to the specified `BinaryIO` output stream, keeping memory usage controllable. `download_to_generator` returns an asynchronous generator that yields data progressively in fixed block sizes (default 64KB), suitable for streaming responses or large file processing. Internally, it adapts to different versions of the aiobotocore response body interface, automatically falling back for versions that do not support chunked reading.

```python
# Download as bytes
data = await download_svc.download_bytes("my-bucket/reports/uuid_report.pdf")

# Download to file
with open("output.pdf", "wb") as f:
    await download_svc.download_to_stream("my-bucket/reports/uuid_report.pdf", f)

# Streaming read
async for chunk in download_svc.download_to_generator("my-bucket/reports/uuid_report.pdf"):
    process(chunk)
```

#### Multi-File Packaged Download

`download_files_as_zip` streams multiple S3 files one by one into a ZIP archive, without loading all files into memory simultaneously. Each file is specified by a `FileDownload` containing its `file_id` and an optional display name. Entry names within the ZIP are path-sanitized (defending against Zip Slip), and sequence numbers are automatically added for duplicate names.

`stream_zip` is an asynchronous generator wrapper for FastAPI `StreamingResponse`. It internally uses `asyncio.Queue` to parallelize ZIP writing and data production, making it suitable for direct return as a streaming response body to the client.

```python
from s3py_helper.storage import FileDownload

# Write to buffer
files = [
    FileDownload(file_id="my-bucket/docs/a.pdf", file_name="Document A.pdf"),
    FileDownload(file_id="my-bucket/docs/b.pdf", file_name="Document B.pdf"),
]
buf = io.BytesIO()
await download_svc.download_files_as_zip(files, buf)

# Direct streaming response
from starlette.responses import StreamingResponse

@app.get("/download")
async def download():
    return StreamingResponse(
        download_svc.stream_zip(files),
        media_type="application/zip",
    )
```

### Deletion

`delete_file` deletes an S3 object using its `file_id`. S3 deletion operations are idempotent; deleting a non-existent object does not raise an error. The `file_id` format is `bucket/key`.

```python
await delete_svc.delete_file("my-bucket/reports/uuid_report.pdf")
```

### Metadata Query

`get_file_info` retrieves metadata of an object via the S3 `head_object` operation, returning a `FileBasicInfo`. This includes the file size, last modification time, storage class (e.g., STANDARD), and custom metadata. A `FileNotFoundException` is raised if the object does not exist.

```python
info = await info_svc.get_file_info("my-bucket/reports/uuid_report.pdf")
print(info.file_size)         # File size (bytes)
print(info.last_modify_time)  # Last modification time
print(info.storage_class)     # Storage class
print(info.metadata)          # Custom metadata dictionary
```

### Multipart Upload

`S3Client` also provides low-level interfaces for multipart uploads, suitable for very large file upload scenarios where manual control over part logic is needed. The complete workflow is: `create_multipart_upload` initializes the upload and obtains an UploadId -> `upload_part` is called multiple times to upload parts -> `complete_multipart_upload` completes the assembly. During the upload, `abort_multipart_upload` can be called to cancel and clean up already uploaded parts.

```python
async with S3Client(s3_config) as s3:
    # Initiate multipart upload
    result = await s3.create_multipart_upload("my-bucket", "large-file.bin")
    upload_id = result["UploadId"]

    # Upload parts
    parts = []
    for part_num, chunk in enumerate(read_chunks("large-file.bin", 8 * 1024 * 1024), start=1):
        result = await s3.upload_part("my-bucket", "large-file.bin", upload_id, part_num, chunk)
        parts.append({"PartNumber": part_num, "ETag": result["ETag"]})

    # Complete assembly
    await s3.complete_multipart_upload("my-bucket", "large-file.bin", upload_id, parts)
```

## 5. Guide for Integration with FastAPI

This section provides a complete FastAPI application example demonstrating how to integrate fusion storage services within request handling. The core idea is to use FastAPI's dependency injection system to manage client lifecycles and service instances, allowing route functions to simply declare dependencies for direct usage.

### Dependency Injection and Service Management

It is recommended to provide fusion services to routes via FastAPI's dependency injection (`Depends`). Clients are created and cached when the application starts and released when the application shuts down, avoiding re-establishing connections for each request. The following example uses global variables to hold client instances and uses `yield`-style dependencies to ensure request-level service assembly.

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, UploadFile, File
from starlette.responses import StreamingResponse

from s3py_helper.storage import (
    StorageManager, StorageModeConfig, StorageMode,
    S3Config, FastdfsConfig, FileTypeConfig,
    FileDownload,
)

# ---- Configuration ----

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])

# ---- StorageManager Lifecycle ----

_manager: StorageManager | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create StorageManager on application startup, release on shutdown."""
    global _manager
    _manager = StorageManager(
        mode_config=StorageModeConfig(mode=StorageMode.HYBRID),
        s3_config=s3_config,
        fastdfs_config=fastdfs_config,
    )
    async with _manager:
        yield

app = FastAPI(lifespan=lifespan)

# ---- Access Services ----

def get_manager() -> StorageManager:
    return _manager
```

### File Upload Endpoint

The upload endpoint receives an `UploadFile`, streams it to S3 via the fusion service, and returns the fusion-format `file_id`.

```python
@app.post("/files/upload")
async def upload_file(file: UploadFile = File(...)):
    attach = await get_manager().upload_service.upload_file(file, folder="uploads")
    return {"file_id": attach.file_id, "file_name": attach.file_name}


@app.post("/files/batch-upload")
async def batch_upload(files: list[UploadFile] = File(...)):
    results = await get_manager().upload_service.upload_files(files, folder="batch")
    return [{"file_id": r.file_id, "file_name": r.file_name} for r in results]
```

### File Download Endpoints

Single file downloads retrieve content via `download_bytes` and return it directly. Batch downloads use `stream_zip` to return a streaming ZIP response. Data is written to a queue asynchronously in the background and read from the queue on the foreground side, keeping memory usage constant.

```python
from starlette.responses import Response

@app.get("/files/{file_id:path}/download")
async def download_file(file_id: str):
    data = await get_manager().download_service.download_bytes(file_id)
    return Response(content=data, media_type="application/octet-stream")


@app.post("/files/download-zip")
async def download_zip(body: list[FileDownload]):
    return StreamingResponse(
        get_manager().download_service.stream_zip(body),
        media_type="application/zip",
        headers={"Content-Disposition": "attachment; filename=files.zip"},
    )
```

The return value of `stream_zip` can be used directly as the content body of a `StreamingResponse`. The list of `FileDownload` objects passed by the requester can contain both S3 files (with `s3:` prefix) and FastDFS files; the fusion layer automatically identifies the source and pulls data from the corresponding backend.

### File Deletion and Metadata Query Endpoints

The deletion and metadata query endpoints directly accept fusion-format `file_id`s. The services automatically route based on the prefix.

```python
@app.delete("/files/{file_id:path}")
async def delete_file(file_id: str):
    await get_manager().delete_service.delete_file(file_id)
    return {"detail": "deleted"}


@app.get("/files/{file_id:path}/info")
async def get_file_info(file_id: str):
    info = await get_manager().info_service.get_file_info(file_id)
    return {
        "file_id": info.file_id,
        "file_name": info.file_name,
        "file_size": info.file_size,
        "last_modify_time": info.last_modify_time.isoformat() if info.last_modify_time else None,
    }
```

### Exception Handling

The component defines a clear exception hierarchy. It is recommended to register global exception handlers in FastAPI to map storage exceptions to appropriate HTTP status codes. The base class `StorageException` maps to 500, `FileNotFoundException` maps to 404, `InvalidArgumentException` and `InvalidFileTypeException` map to 400, and `DeleteNotSupportedException` and `RouteNotSupportedException` map to 403.

```python
from fastapi.responses import JSONResponse
from s3py_helper.storage import (
    StorageException, FileNotFoundException,
    InvalidArgumentException, InvalidFileTypeException,
    DeleteNotSupportedException, RouteNotSupportedException,
)

@app.exception_handler(FileNotFoundException)
async def not_found_handler(request, exc):
    return JSONResponse(status_code=404, content={"detail": str(exc)})

@app.exception_handler(InvalidArgumentException)
@app.exception_handler(InvalidFileTypeException)
async def bad_request_handler(request, exc):
    return JSONResponse(status_code=400, content={"detail": str(exc)})

@app.exception_handler(DeleteNotSupportedException)
@app.exception_handler(RouteNotSupportedException)
async def forbidden_handler(request, exc):
    return JSONResponse(status_code=403, content={"detail": str(exc)})

@app.exception_handler(StorageException)
async def storage_error_handler(request, exc):
    return JSONResponse(status_code=500, content={"detail": str(exc)})
```