Metadata-Version: 2.1
Name: bosfs
Version: 1.0.3
Summary: fsspec filesystem for BOS
Author: 
Author-email: yangdongdong01@baidu.com
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fsspec
Requires-Dist: bce-python-sdk

# fsspec-bosfs

fsspec-bosfs (abbreviated as bosfs) is a Python-based file system implementation for convenient access to (BOS)Baidu Cloud Object Storage services. Through bosfs, users can use the standard fsspec interface to operate data stored in BOS.


## Installation
Install bosfs via pip:
```
pip install bosfs
```
Check if the installation is successful:
```
pip show bosfs
```
## Quick Start
### Configure Access Credentials
Support the following two methods, complete any one of them, with priority.
1. Initialize BOSFileSystem by specifying parameters:
```python
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='{your ak}', secret_key='{your sk}', sts_token=None)
```

2. Configure Environment Variables
```bash
export BCE_ACCESS_KEY_ID=xxx
export BCE_SECRET_ACCESS_KEY=xxx
export BOS_ENDPOINT=xxx
```
### Examples
1. List data on BOS using bosfs
```python
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
res = fs.ls('/mybucket/')
print(res)
```
2. Read data from BOS using bosfs
```python
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
with fs.open('/mybucket/README.md') as f:
    print(f.readline())
```
    
3. Write data to BOS using bosfs
```
import bosfs
import os

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
object_name = "file1"
data = os.urandom(10 * 2**20)
with bosfs.open(object_name, "wb") as f_wb:
    f_wb.write(data)
```
For more usage methods, refer to the documentation [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html).

## Performance

### Test Results:
Sequential read/write performance for single file
| file_size_mb | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|--------------|------------------|-----------------|--------------|-------------|
|            1 |        9.672178  |       4.640667  |    0.104893  |  0.216561  |
|            4 |       24.867180  |      15.987999  |    0.160897  |  0.250713  |
|           10 |       30.681564  |      30.890233  |    0.343196  |   0.325457  |
|          128 |       75.634190  |      93.547005  |    1.692568  |   1.369047  |
|          512 |       80.359467  |      97.840121  |    6.371391  |   5.233074  |

Concurrent read/write performance for multiple files
| file_nums | file_size_mb | threads_nums | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|--------------|--------------|--------------|--------------|--------------|--------------|--------------|
|       1000  |            4 |       16    |        381.91    |       301.56    |    10.47     |     13.26  |


### Test Environment:
- Test Machine: 16-core 32G memory, 3Gbps network bandwidth

### Test Code:
```python
import bosfs
import os
import time
import numpy as np
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

def test_bosfs_performance(bucket_name, file_sizes=None, num_iterations=3, plot_results=True):
    """
    Test the read/write performance of BosFS
    
    Parameters:
    bucket_name: BOS bucket name
    file_sizes: List of file sizes (in MB) to test
    num_iterations: Number of times to repeat the test for each file size
    plot_results: Whether to plot performance charts
    """
    # Initialize BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    # Use default file sizes if not specified
    if file_sizes is None:
        file_sizes = [1, 10, 50, 100, 250, 500]  # MB
    
    # Create a structure to store results
    results = {
        'file_size_mb': [],
        'write_speed_mbps': [],
        'read_speed_mbps': [],
        'write_time_s': [],
        'read_time_s': []
    }
    
    # Test for each file size
    for size_mb in file_sizes:
        print(f"\nTesting file size: {size_mb} MB")
        size_bytes = size_mb * 2**20
        
        write_speeds = []
        read_speeds = []
        write_times = []
        read_times = []
        
        # Repeat the test multiple times for average performance
        for i in range(num_iterations):
            # Generate random data
            print(f"  Iteration {i+1}/{num_iterations}: Generating random data...")
            data = os.urandom(size_bytes)
            
            # Construct a unique object name
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            object_name = f"{bucket_name}/perf_test_{size_mb}mb_{timestamp}_{i}.dat"
            
            # Test write performance
            print(f"  Iteration {i+1}/{num_iterations}: Testing write performance...")
            write_start = time.time()
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            write_end = time.time()
            
            write_time = write_end - write_start
            write_speed = size_mb / write_time  # MB/s
            
            write_times.append(write_time)
            write_speeds.append(write_speed)
            
            # Test read performance
            print(f"  Iteration {i+1}/{num_iterations}: Testing read performance...")
            read_start = time.time()
            with fs.open(object_name, "rb") as f_rb:
                read_data = f_rb.read()
            read_end = time.time()
            
            read_time = read_end - read_start
            read_speed = size_mb / read_time  # MB/s
            
            read_times.append(read_time)
            read_speeds.append(read_speed)
            
            # Verify data integrity
            if len(read_data) != len(data):
                print(f"  Warning: Read data size ({len(read_data)}) does not match written data size ({len(data)})!")
            
            # Clean up test file
            try:
                fs.rm(object_name)
                print(f"  Deleted test file: {object_name}")
            except Exception as e:
                print(f"  Error deleting file: {e}")
        
        # Calculate average performance
        avg_write_speed = np.mean(write_speeds)
        avg_read_speed = np.mean(read_speeds)
        avg_write_time = np.mean(write_times)
        avg_read_time = np.mean(read_times)
        
        # Store results
        results['file_size_mb'].append(size_mb)
        results['write_speed_mbps'].append(avg_write_speed)
        results['read_speed_mbps'].append(avg_read_speed)
        results['write_time_s'].append(avg_write_time)
        results['read_time_s'].append(avg_read_time)
        
        print(f"  Average write speed: {avg_write_speed:.2f} MB/s, Average write time: {avg_write_time:.2f} seconds")
        print(f"  Average read speed: {avg_read_speed:.2f} MB/s, Average read time: {avg_read_time:.2f} seconds")
    
    # Create DataFrame for analysis
    results_df = pd.DataFrame(results)
    
    # Print results table
    print("\nPerformance test results:")
    print(results_df.to_string(index=False))
    
    if plot_results:
        # Plot performance charts
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # Speed chart
        ax1.plot(results['file_size_mb'], results['write_speed_mbps'], 'b-o', label='Write Speed')
        ax1.plot(results['file_size_mb'], results['read_speed_mbps'], 'r-o', label='Read Speed')
        ax1.set_xlabel('File Size (MB)')
        ax1.set_ylabel('Speed (MB/s)')
        ax1.set_title('BosFS Read/Write Speed')
        ax1.legend()
        ax1.grid(True)
        
        # Time chart
        ax2.plot(results['file_size_mb'], results['write_time_s'], 'b-o', label='Write Cost')
        ax2.plot(results['file_size_mb'], results['read_time_s'], 'r-o', label='Read Cost')
        ax2.set_xlabel('File Size (MB)')
        ax2.set_ylabel('Cost (Second)')
        ax2.set_title('BosFS Read/Write Cost (Second)')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.savefig('bosfs_performance.png')
        plt.show()
    
    return results_df


def test_bosfs_parallel_performance(bucket_name, file_size_mb=10, num_files=5, num_threads=4):
    """
    Test the parallel read/write performance of BosFS
    
    Parameters:
    bucket_name: BOS bucket name
    file_size_mb: Size of each file (MB)
    num_files: Number of files to write/read
    num_threads: Number of parallel threads
    """
    from concurrent.futures import ThreadPoolExecutor
    
    # Initialize BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    file_size_bytes = file_size_mb * 2**20
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    
    # Generate list of file names
    object_names = [f"{bucket_name}/parallel_test_{timestamp}_{i}.dat" for i in range(num_files)]
    
    # Generate random data
    print(f"Generating {num_files} {file_size_mb} MB random files...")
    data_list = [os.urandom(file_size_bytes) for _ in range(num_files)]
    
    # Parallel write function
    def write_file(args):
        object_name, data = args
        try:
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            return True
        except Exception as e:
            print(f"Error writing {object_name}: {e}")
            return False
    
    # Parallel read function
    def read_file(object_name):
        try:
            with fs.open(object_name, "rb") as f_rb:
                data = f_rb.read()
            return len(data)
        except Exception as e:
            print(f"Error reading {object_name}: {e}")
            return 0
    
    # Test parallel write
    print(f"\nTesting parallel write ({num_threads} threads)...")
    write_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        results = list(tqdm(
            executor.map(write_file, zip(object_names, data_list)),
            total=num_files,
            desc="Write Progress"
        ))
    
    write_end = time.time()
    write_time = write_end - write_start
    total_data_size_mb = file_size_mb * num_files
    write_speed = total_data_size_mb / write_time
    
    print(f"Parallel write complete: {sum(results)}/{num_files} files successful")
    print(f"Total write time: {write_time:.2f} seconds")
    print(f"Total data size: {total_data_size_mb} MB")
    print(f"Average write speed: {write_speed:.2f} MB/s")
    
    # Test parallel read
    print(f"\nTesting parallel read ({num_threads} threads)...")
    read_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        read_sizes = list(tqdm(
            executor.map(read_file, object_names),
            total=num_files,
            desc="Read Progress"
        ))
    
    read_end = time.time()
    read_time = read_end - read_start
    read_speed = total_data_size_mb / read_time
    
    print(f"Parallel read complete: {sum(1 for size in read_sizes if size > 0)}/{num_files} files successful")
    print(f"Total read time: {read_time:.2f} seconds")
    print(f"Average read speed: {read_speed:.2f} MB/s")
    
    # Clean up test files
    print("\nCleaning up test files...")
    for object_name in tqdm(object_names, desc="Deleting files"):
        try:
            fs.rm(object_name)
        except Exception as e:
            print(f"Error deleting {object_name}: {e}")
    
    return {
        'parallel_write_speed_mbps': write_speed,
        'parallel_read_speed_mbps': read_speed,
        'parallel_write_time_s': write_time,
        'parallel_read_time_s': read_time
    }


if __name__ == "__main__":
    # Set BOS bucket name
    bucket_name = "your-bucket-name"
    
    # Run single file read/write performance test
    print("=" * 80)
    print("Starting BosFS Single File Read/Write Performance Test")
    print("=" * 80)
    
    # Custom file size list (MB)
    file_sizes = [1, 10, 50, 100]
    
    # Run performance test
    results_df = test_bosfs_performance(bucket_name, file_sizes=file_sizes, num_iterations=3)
    
    # Run parallel performance test
    print("\n" + "=" * 80)
    print("Starting BosFS Parallel Read/Write Performance Test")
    print("=" * 80)
    
    parallel_results = test_bosfs_parallel_performance(
        bucket_name, 
        file_size_mb=4,     # Each file is 4MB
        num_files=1000,     # Total of 1000 files
        num_threads=16      # Use 16 threads
    )
    
    # Save results to CSV file
    results_df.to_csv('bosfs_performance_results.csv', index=False)
    print("\nTest results saved to bosfs_performance_results.csv")
```


## Usage Limitations
- Does not support async mode.

## Versions
v1.0.3 - 2025-03-21
---
### New features
- Fix fs.ls() detail parameter
### Breaking changes
- None

v1.0.2 - 2025-03-21
---
### New features
- Fix fs.info() error
### Breaking changes
- None

v1.0.1 - 2025-03-20
---
### New features
- Fix messy code
### Breaking changes
- None

v1.0.0 - 2025-03-20
---
### New features
- First release
- Supports sync bosfs
### Breaking changes
- None
