Metadata-Version: 2.4
Name: s3py-helper
Version: 0.0.12
Summary: Unified file storage component with S3 and FastDFS fusion support
Author-email: ferdinand <gongjpa@gmail.com>
License-Expression: Apache-2.0 OR MIT
Keywords: aio,fastdfs,s3,storage
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.12
Requires-Dist: aioboto3>=13.0.0
Requires-Dist: aiofdfs>=0.0.1
Requires-Dist: aiofiles>=24.0.0
Requires-Dist: fastapi>=0.100.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Description-Content-Type: text/markdown

# s3py-helper

## 1. Component Overview

s3py-helper is a unified asynchronous file storage component that supports both S3 protocol object storage and the FastDFS distributed file system. It is designed for the Python 3.12+ asynchronous ecosystem, built on aioboto3 and aiofdfs respectively.

The component adopts a layered architecture. The `s3raw` package provides raw operations for the S3 protocol, the `fastdfs` package provides raw operations for FastDFS, and the `fusion` package implements automatic routing between the two at the upper layer. The core design of the fusion layer is **automatic routing based on the `file_id` prefix**: `file_id`s starting with `s3:` are routed to the S3 backend, while all others are routed to the FastDFS backend. Upload operations uniformly go to S3 and automatically have the `s3:` prefix added. Download, delete, and metadata query operations automatically determine the source backend based on the `file_id`. This design allows business code to work with a single set of fusion service interfaces, transparently operating on both storage backends, and supports packaging files from mixed sources in batch download scenarios.

The component manages the lifecycle of all asynchronous clients (S3Client, FastdfsClient) using a context manager pattern. Service instances are assembled through dependency injection, facilitating testing and substitution.

## 2. Integration and Configuration Guide

### Dependency Installation

The project uses uv or pip for dependency management. Core dependencies include aioboto3 (S3 async client), aiofdfs (FastDFS async client), pydantic (configuration models), aiofiles (async file operations), and fastapi (UploadFile support).

```bash
pip install s3py-helper
```

### S3 Protocol Storage Configuration

Configure S3-compatible object storage services (such as MinIO, AWS S3, etc.) using `S3Config`. `endpoint` is the service address, `access_key_id` and `secret_access_key` are the authentication credentials, `default_bucket` specifies the default bucket name, and `region` is the region identifier, defaulting to `us-east-1`.

```python
from s3py_helper.storage import S3Config

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
    region="us-east-1",
)
```

### FastDFS Configuration

Configure FastDFS connection parameters using `FastdfsConfig`. `tracker_servers` is a list of tracker server addresses (format `host:port`), `connect_timeout` and `network_timeout` control the connection timeout and network read/write timeout (in seconds) respectively, `store_path_index` specifies the storage path index, default -1 for automatic selection.

```python
from s3py_helper.storage import FastdfsConfig

fastdfs_config = FastdfsConfig(
    tracker_servers=["192.168.30.36:22122"],
    connect_timeout=2,
    network_timeout=30,
)
```

### Fusion Mode Configuration

`FusionConfig` controls the fusion layer's access permissions to the FastDFS backend. `enable_fastdfs_download` controls whether downloading files from FastDFS is allowed, `enable_fastdfs_delete` controls whether deleting files on FastDFS is allowed. S3 operations are always available and not restricted by this configuration. When the business only requires read-only access to FastDFS, you can set `enable_fastdfs_delete` to `False`.

```python
from s3py_helper.storage import FusionConfig

fusion_config = FusionConfig(
    enable_fastdfs_download=True,
    enable_fastdfs_delete=True,
)
```

### File Validation Configuration

`FileTypeConfig` is used for file type validation during uploads, supporting both whitelist and blacklist modes. When the whitelist is non-empty, only extensions in the whitelist are allowed. The blacklist always takes effect. Additionally, `FileValidatorImpl` supports limiting file size via the `max_file_size` parameter. If validation is not required, it can be omitted or default values can be passed.

```python
from s3py_helper.storage import FileTypeConfig, FileValidatorImpl

file_type_config = FileTypeConfig(
    whitelist=["pdf", "jpg", "png", "zip"],
    blacklist=["exe", "bat"],
)
validator = FileValidatorImpl(file_type_config=file_type_config, max_file_size=100 * 1024 * 1024)
```

## 3. Guide for Fusion Usage of FastDFS and S3

The fusion package provides unified interfaces for upload, download, deletion, and metadata querying. Business code does not need to worry about whether the underlying storage is S3 or FastDFS. All fusion services are assembled from raw services built using `S3Client` and `FastdfsClient`.

### Service Initialization

The following code demonstrates how to initialize all fusion services. Each client's lifecycle must be managed using the `async with` context manager. Service instances can be reused while the clients are alive.

```python
from s3py_helper.storage import (
    S3Config, FastdfsConfig, FusionConfig,
    FileValidatorImpl, FileIdRouterImpl,
)
from s3py_helper.storage.s3raw.client.s3_client import S3Client
from s3py_helper.storage.s3raw.service.upload_service import S3RawUploadServiceImpl
from s3py_helper.storage.s3raw.service.download_service import S3RawDownloadServiceImpl
from s3py_helper.storage.s3raw.service.delete_service import S3RawDeleteServiceImpl
from s3py_helper.storage.s3raw.service.info_service import S3RawInfoServiceImpl
from s3py_helper.storage.fastdfs.client.fastdfs_client import FastdfsClient
from s3py_helper.storage.fastdfs.service.download_service import FastdfsDownloadServiceImpl
from s3py_helper.storage.fastdfs.service.delete_service import FastdfsDeleteServiceImpl
from s3py_helper.storage.fastdfs.service.info_service import FastdfsInfoServiceImpl
from s3py_helper.storage.fusion.service.upload_service import FusionUploadServiceImpl
from s3py_helper.storage.fusion.service.download_service import FusionDownloadServiceImpl
from s3py_helper.storage.fusion.service.delete_service import FusionDeleteServiceImpl
from s3py_helper.storage.fusion.service.info_service import FusionInfoServiceImpl

async def setup_fusion_services():
    s3_client = S3Client(s3_config)
    fdfs_client = FastdfsClient(fastdfs_config)
    await s3_client.__aenter__()
    await fdfs_client.__aenter__()

    router = FileIdRouterImpl()

    # S3 raw services
    s3_upload = S3RawUploadServiceImpl(s3_client, s3_config)
    s3_download = S3RawDownloadServiceImpl(s3_client)
    s3_delete = S3RawDeleteServiceImpl(s3_client)
    s3_info = S3RawInfoServiceImpl(s3_client)

    # FastDFS raw services
    fdfs_download = FastdfsDownloadServiceImpl(fdfs_client)
    fdfs_delete = FastdfsDeleteServiceImpl(fdfs_client)
    fdfs_info = FastdfsInfoServiceImpl(fdfs_client)

    # Fusion services
    fusion_upload = FusionUploadServiceImpl(s3_upload, FileValidatorImpl(), router)
    fusion_download = FusionDownloadServiceImpl(s3_download, fdfs_download, router, fusion_config)
    fusion_delete = FusionDeleteServiceImpl(s3_delete, fdfs_delete, router, fusion_config)
    fusion_info = FusionInfoServiceImpl(s3_info, fdfs_info, router)

    return fusion_upload, fusion_download, fusion_delete, fusion_info
```

### File Upload

The fusion upload service always uploads files to S3. Before uploading, it performs file name validity checks, file type validation, and file size validation. After the upload is complete, it automatically adds the `s3:` prefix to the `file_id`. The returned `FileAttach` contains the fusion-format `file_id` (e.g., `s3:my-bucket/folder/uuid_filename.pdf`) and the original file name.

`upload_bytes` is used for uploading in-memory byte data, `upload_file` is used for uploading FastAPI `UploadFile` objects (streaming internally to avoid loading the entire file into memory), `upload_stream` is used for uploading custom binary streams or async iterators, and `upload_files` supports batch uploading multiple `UploadFile` objects. All methods support an optional `folder` parameter to specify the prefix directory for the S3 object key, and a `bucket` parameter to override the default bucket.

```python
# Upload byte data
attach = await fusion_upload.upload_bytes("report.pdf", content, folder="reports")

# Upload FastAPI UploadFile (streaming)
attach = await fusion_upload.upload_file(upload_file, folder="uploads")

# Upload custom stream
import io
stream = io.BytesIO(content)
attach = await fusion_upload.upload_stream("data.csv", stream, len(content), folder="data")
```

### File Download

The fusion download service automatically routes based on the `file_id` prefix to the corresponding backend. `download_bytes` reads the entire file into memory and returns `bytes`, suitable for small files. `download_to_stream` writes the data streamingly into the specified `BinaryIO` output stream, suitable for directly writing download content to a file or buffer. `download_to_generator` returns an asynchronous generator that yields data in chunks, suitable for streaming response scenarios.

```python
# Download as byte data
data = await fusion_download.download_bytes("s3:my-bucket/reports/uuid_report.pdf")

# Download to output stream
with open("output.pdf", "wb") as f:
    await fusion_download.download_to_stream("s3:my-bucket/reports/uuid_report.pdf", f)

# Streaming download (async iterator)
async for chunk in fusion_download.download_to_generator(some_file_id):
    process(chunk)
```

#### Mixed-Source Batch Download as Zip

`download_files_as_zip` supports packaging multiple files from S3 and FastDFS into a single ZIP archive written to an output stream. Each file is specified via `FileDownload` with its `file_id` and an optional display name. The service automatically identifies the source backend and streams each file's content into the ZIP archive. When `file_name` is missing, the filename part from the `file_id` is used as the entry name in the ZIP archive. Duplicate entry names are automatically suffixed with a sequence number to avoid overwrites.

```python
from s3py_helper.storage import FileDownload
import io

files = [
    FileDownload(file_id="s3:my-bucket/reports/uuid_report.pdf", file_name="report.pdf"),
    FileDownload(file_id="group1/M00/00/00/old_contract.pdf", file_name="contract.pdf"),
]
buf = io.BytesIO()
await fusion_download.download_files_as_zip(files, buf)
```

For scenarios where the ZIP stream needs to be sent directly to a FastAPI `StreamingResponse`, the `stream_zip` method can be used. It returns an asynchronous generator that internally uses a queue to parallelize ZIP writing and network sending.

```python
from starlette.responses import StreamingResponse

async def download_zip(request):
    files = [FileDownload(file_id="s3:my-bucket/doc.pdf", file_name="doc.pdf")]
    return StreamingResponse(
        fusion_download.stream_zip(files),
        media_type="application/zip",
        headers={"Content-Disposition": "attachment; filename=files.zip"},
    )
```

### File Deletion

`delete_file` automatically routes to the S3 or FastDFS backend for deletion based on the `file_id` prefix. S3 deletion is idempotent (deleting a non-existent object does not raise an error). FastDFS deletion is controlled by `FusionConfig.enable_fastdfs_delete`; when disabled, a `DeleteNotSupportedException` is raised.

```python
# Delete S3 file
await fusion_delete.delete_file("s3:my-bucket/reports/uuid_report.pdf")

# Delete FastDFS file (requires enable_fastdfs_delete=True)
await fusion_delete.delete_file("group1/M00/00/00/some_file.pdf")
```

### Metadata Query

`get_file_info` routes to the corresponding backend based on the `file_id` prefix to query metadata, returning a unified `FileBasicInfo` object. The S3 backend uses `head_object` to get file size, last modification time, storage class, and custom metadata. The FastDFS backend uses `get_meta_data` to get the file name and size. For S3 files, the returned `file_id` retains the fusion-layer `s3:` prefix, making it convenient for subsequent operations to use directly. A `FileNotFoundException` is raised when the file does not exist.

```python
info = await fusion_info.get_file_info("s3:my-bucket/reports/uuid_report.pdf")
print(info.file_name)    # File name
print(info.file_size)    # File size (bytes)
print(info.last_modify_time)  # Last modification time (S3)
print(info.storage_class)     # Storage class (S3)
print(info.metadata)          # Custom metadata (S3)
```

## 4. Guide for Using Pure S3 Protocol Object Storage

The s3raw package provides direct operation interfaces for the S3 protocol without routing logic or file validation, suitable for scenarios requiring only S3 storage or scenarios needing finer control.

### Client and Service Initialization

All S3 raw services share the same `S3Client` instance. The client manages the lifecycle of the underlying connection pool via the `async with` context manager. `S3RawUploadServiceImpl` requires `S3Config` to resolve the default bucket, while other services only depend on the client instance.

```python
from s3py_helper.storage.s3raw.client.s3_client import S3Client
from s3py_helper.storage.s3raw.service.upload_service import S3RawUploadServiceImpl
from s3py_helper.storage.s3raw.service.download_service import S3RawDownloadServiceImpl
from s3py_helper.storage.s3raw.service.delete_service import S3RawDeleteServiceImpl
from s3py_helper.storage.s3raw.service.info_service import S3RawInfoServiceImpl

async with S3Client(s3_config) as s3:
    upload_svc = S3RawUploadServiceImpl(s3, s3_config)
    download_svc = S3RawDownloadServiceImpl(s3)
    delete_svc = S3RawDeleteServiceImpl(s3)
    info_svc = S3RawInfoServiceImpl(s3)
```

The returned `file_id` format is `bucket/key` (without a prefix). Subsequent operations can use this `file_id` directly.

### Upload

`upload_bytes` uploads byte data to S3. `upload_stream` supports uploading from a `BinaryIO` or async iterator. `upload_file` accepts a FastAPI `UploadFile` object and streams it to save memory. `upload_files` supports batch uploading. All methods return a `FileAttach`, where `file_id` is in the format `bucket/key` and `file_name` is the original file name. When uploading, the S3 object key is automatically generated in the format `[folder/]uuid_prefix_filename`, where `uuid_prefix` is a 16-character random hexadecimal string ensuring uniqueness.

```python
# Byte upload
attach = await upload_svc.upload_bytes("report.pdf", content, folder="reports")
# attach.file_id → "my-bucket/reports/a1b2c3d4e5f6g7h8_report.pdf"

# Stream upload
import io
attach = await upload_svc.upload_stream("data.bin", io.BytesIO(content), len(content))

# UploadFile upload (streaming, does not load entire file into memory)
attach = await upload_svc.upload_file(upload_file, bucket="other-bucket")
```

### Download

`download_bytes` reads the entire content at once and returns `bytes`, suitable for small files. `download_to_stream` writes data in chunks to the specified `BinaryIO` output stream, keeping memory usage controllable. `download_to_generator` returns an asynchronous generator that yields data progressively in fixed block sizes (default 64KB), suitable for streaming responses or large file processing. Internally, it adapts to different versions of the aiobotocore response body interface, automatically falling back for versions that do not support chunked reading.

```python
# Download as bytes
data = await download_svc.download_bytes("my-bucket/reports/uuid_report.pdf")

# Download to file
with open("output.pdf", "wb") as f:
    await download_svc.download_to_stream("my-bucket/reports/uuid_report.pdf", f)

# Streaming read
async for chunk in download_svc.download_to_generator("my-bucket/reports/uuid_report.pdf"):
    process(chunk)
```

#### Multi-File Packaged Download

`download_files_as_zip` streams multiple S3 files one by one into a ZIP archive, without loading all files into memory simultaneously. Each file is specified by a `FileDownload` containing its `file_id` and an optional display name. Entry names within the ZIP are path-sanitized (defending against Zip Slip), and sequence numbers are automatically added for duplicate names.

`stream_zip` is an asynchronous generator wrapper for FastAPI `StreamingResponse`. It internally uses `asyncio.Queue` to parallelize ZIP writing and data production, making it suitable for direct return as a streaming response body to the client.

```python
from s3py_helper.storage import FileDownload

# Write to buffer
files = [
    FileDownload(file_id="my-bucket/docs/a.pdf", file_name="Document A.pdf"),
    FileDownload(file_id="my-bucket/docs/b.pdf", file_name="Document B.pdf"),
]
buf = io.BytesIO()
await download_svc.download_files_as_zip(files, buf)

# Direct streaming response
from starlette.responses import StreamingResponse

@app.get("/download")
async def download():
    return StreamingResponse(
        download_svc.stream_zip(files),
        media_type="application/zip",
    )
```

### Deletion

`delete_file` deletes an S3 object using its `file_id`. S3 deletion operations are idempotent; deleting a non-existent object does not raise an error. The `file_id` format is `bucket/key`.

```python
await delete_svc.delete_file("my-bucket/reports/uuid_report.pdf")
```

### Metadata Query

`get_file_info` retrieves metadata of an object via the S3 `head_object` operation, returning a `FileBasicInfo`. This includes the file size, last modification time, storage class (e.g., STANDARD), and custom metadata. A `FileNotFoundException` is raised if the object does not exist.

```python
info = await info_svc.get_file_info("my-bucket/reports/uuid_report.pdf")
print(info.file_size)         # File size (bytes)
print(info.last_modify_time)  # Last modification time
print(info.storage_class)     # Storage class
print(info.metadata)          # Custom metadata dictionary
```

### Multipart Upload

`S3Client` also provides low-level interfaces for multipart uploads, suitable for very large file upload scenarios where manual control over part logic is needed. The complete workflow is: `create_multipart_upload` initializes the upload and obtains an UploadId -> `upload_part` is called multiple times to upload parts -> `complete_multipart_upload` completes the assembly. During the upload, `abort_multipart_upload` can be called to cancel and clean up already uploaded parts.

```python
async with S3Client(s3_config) as s3:
    # Initiate multipart upload
    result = await s3.create_multipart_upload("my-bucket", "large-file.bin")
    upload_id = result["UploadId"]

    # Upload parts
    parts = []
    for part_num, chunk in enumerate(read_chunks("large-file.bin", 8 * 1024 * 1024), start=1):
        result = await s3.upload_part("my-bucket", "large-file.bin", upload_id, part_num, chunk)
        parts.append({"PartNumber": part_num, "ETag": result["ETag"]})

    # Complete assembly
    await s3.complete_multipart_upload("my-bucket", "large-file.bin", upload_id, parts)
```

## 5. Guide for Integration with FastAPI

This section provides a complete FastAPI application example demonstrating how to integrate fusion storage services within request handling. The core idea is to use FastAPI's dependency injection system to manage client lifecycles and service instances, allowing route functions to simply declare dependencies for direct usage.

### Dependency Injection and Service Management

It is recommended to provide fusion services to routes via FastAPI's dependency injection (`Depends`). Clients are created and cached when the application starts and released when the application shuts down, avoiding re-establishing connections for each request. The following example uses global variables to hold client instances and uses `yield`-style dependencies to ensure request-level service assembly.

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, UploadFile, File
from starlette.responses import StreamingResponse

from s3py_helper.storage import (
    S3Config, FastdfsConfig, FusionConfig, FileTypeConfig,
    FileValidatorImpl, FileIdRouterImpl, FileDownload,
)
from s3py_helper.storage.s3raw.client.s3_client import S3Client
from s3py_helper.storage.s3raw.service.upload_service import S3RawUploadServiceImpl
from s3py_helper.storage.s3raw.service.download_service import S3RawDownloadServiceImpl
from s3py_helper.storage.s3raw.service.delete_service import S3RawDeleteServiceImpl
from s3py_helper.storage.s3raw.service.info_service import S3RawInfoServiceImpl
from s3py_helper.storage.fastdfs.client.fastdfs_client import FastdfsClient
from s3py_helper.storage.fastdfs.service.download_service import FastdfsDownloadServiceImpl
from s3py_helper.storage.fastdfs.service.delete_service import FastdfsDeleteServiceImpl
from s3py_helper.storage.fastdfs.service.info_service import FastdfsInfoServiceImpl
from s3py_helper.storage.fusion.service.upload_service import FusionUploadServiceImpl
from s3py_helper.storage.fusion.service.download_service import FusionDownloadServiceImpl
from s3py_helper.storage.fusion.service.delete_service import FusionDeleteServiceImpl
from s3py_helper.storage.fusion.service.info_service import FusionInfoServiceImpl

# ---- Configuration ----

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])
fusion_config = FusionConfig(enable_fastdfs_download=True, enable_fastdfs_delete=True)

# ---- Client Lifecycle ----

_s3_client: S3Client | None = None
_fdfs_client: FastdfsClient | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create clients on application startup, release on shutdown."""
    global _s3_client, _fdfs_client
    _s3_client = S3Client(s3_config)
    await _s3_client.__aenter__()
    _fdfs_client = FastdfsClient(fastdfs_config)
    await _fdfs_client.__aenter__()
    yield
    await _s3_client.__aexit__(None, None, None)
    await _fdfs_client.__aexit__(None, None, None)

app = FastAPI(lifespan=lifespan)

# ---- Assemble Services (via Dependency Injection) ----

def _build_services():
    """Assemble fusion service instances from initialized clients."""
    router = FileIdRouterImpl()
    s3_up = S3RawUploadServiceImpl(_s3_client, s3_config)
    s3_dl = S3RawDownloadServiceImpl(_s3_client)
    s3_del = S3RawDeleteServiceImpl(_s3_client)
    s3_info = S3RawInfoServiceImpl(_s3_client)
    fdfs_dl = FastdfsDownloadServiceImpl(_fdfs_client)
    fdfs_del = FastdfsDeleteServiceImpl(_fdfs_client)
    fdfs_info = FastdfsInfoServiceImpl(_fdfs_client)
    return dict(
        upload=FusionUploadServiceImpl(s3_up, FileValidatorImpl(), router),
        download=FusionDownloadServiceImpl(s3_dl, fdfs_dl, router, fusion_config),
        delete=FusionDeleteServiceImpl(s3_del, fdfs_del, router, fusion_config),
        info=FusionInfoServiceImpl(s3_info, fdfs_info, router),
    )

def get_services():
    return _build_services()
```

### File Upload Endpoint

The upload endpoint receives an `UploadFile`, streams it to S3 via the fusion service, and returns the fusion-format `file_id`.

```python
@app.post("/files/upload")
async def upload_file(file: UploadFile = File(...)):
    svc = get_services()
    attach = await svc["upload"].upload_file(file, folder="uploads")
    return {"file_id": attach.file_id, "file_name": attach.file_name}


@app.post("/files/batch-upload")
async def batch_upload(files: list[UploadFile] = File(...)):
    svc = get_services()
    results = await svc["upload"].upload_files(files, folder="batch")
    return [{"file_id": r.file_id, "file_name": r.file_name} for r in results]
```

### File Download Endpoints

Single file downloads retrieve content via `download_bytes` and return it directly. Batch downloads use `stream_zip` to return a streaming ZIP response. Data is written to a queue asynchronously in the background and read from the queue on the foreground side, keeping memory usage constant.

```python
from starlette.responses import Response

@app.get("/files/{file_id:path}/download")
async def download_file(file_id: str):
    svc = get_services()
    data = await svc["download"].download_bytes(file_id)
    return Response(content=data, media_type="application/octet-stream")


@app.post("/files/download-zip")
async def download_zip(body: list[FileDownload]):
    svc = get_services()
    return StreamingResponse(
        svc["download"].stream_zip(body),
        media_type="application/zip",
        headers={"Content-Disposition": "attachment; filename=files.zip"},
    )
```

The return value of `stream_zip` can be used directly as the content body of a `StreamingResponse`. The list of `FileDownload` objects passed by the requester can contain both S3 files (with `s3:` prefix) and FastDFS files; the fusion layer automatically identifies the source and pulls data from the corresponding backend.

### File Deletion and Metadata Query Endpoints

The deletion and metadata query endpoints directly accept fusion-format `file_id`s. The services automatically route based on the prefix.

```python
@app.delete("/files/{file_id:path}")
async def delete_file(file_id: str):
    svc = get_services()
    await svc["delete"].delete_file(file_id)
    return {"detail": "deleted"}


@app.get("/files/{file_id:path}/info")
async def get_file_info(file_id: str):
    svc = get_services()
    info = await svc["info"].get_file_info(file_id)
    return {
        "file_id": info.file_id,
        "file_name": info.file_name,
        "file_size": info.file_size,
        "last_modify_time": info.last_modify_time.isoformat() if info.last_modify_time else None,
    }
```

### Exception Handling

The component defines a clear exception hierarchy. It is recommended to register global exception handlers in FastAPI to map storage exceptions to appropriate HTTP status codes. The base class `StorageException` maps to 500, `FileNotFoundException` maps to 404, `InvalidArgumentException` and `InvalidFileTypeException` map to 400, and `DeleteNotSupportedException` and `RouteNotSupportedException` map to 403.

```python
from fastapi.responses import JSONResponse
from s3py_helper.storage import (
    StorageException, FileNotFoundException,
    InvalidArgumentException, InvalidFileTypeException,
    DeleteNotSupportedException, RouteNotSupportedException,
)

@app.exception_handler(FileNotFoundException)
async def not_found_handler(request, exc):
    return JSONResponse(status_code=404, content={"detail": str(exc)})

@app.exception_handler(InvalidArgumentException)
@app.exception_handler(InvalidFileTypeException)
async def bad_request_handler(request, exc):
    return JSONResponse(status_code=400, content={"detail": str(exc)})

@app.exception_handler(DeleteNotSupportedException)
@app.exception_handler(RouteNotSupportedException)
async def forbidden_handler(request, exc):
    return JSONResponse(status_code=403, content={"detail": str(exc)})

@app.exception_handler(StorageException)
async def storage_error_handler(request, exc):
    return JSONResponse(status_code=500, content={"detail": str(exc)})
```