Metadata-Version: 2.1
Name: bosfs
Version: 1.0.0
Summary: fsspec filesystem for BOS
Author: 
Author-email: yangdongdong01@baidu.com
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fsspec
Requires-Dist: bce-python-sdk

# fsspec-bosfs

fsspec-bosfs(ä»¥ä¸ç®ç§°bosfs)æ¯ä¸ä¸ªåºäºpythonæ¥å£çæä»¶ç³»ç»å®ç°ï¼ç¨ä»¥ä¾¿æ·è®¿é®ç¾åº¦äºå¯¹è±¡å­å¨BOSæå¡ãéè¿bosfsï¼ç¨æ·å¯ä»¥ä½¿ç¨æ åçfsspecæ¥å£æä½å­å¨äºBOSä¸­çæ°æ®ã

## å®è£
éè¿pipå®è£bosfsï¼
```
pip install bosfs
```
æ¥çæ¯å¦å®è£æåï¼
```
pip show bosfs
```
## å¿«éå¼å§
### éç½®è®¿é®å­è¯
æ¯æä»¥ä¸2ç§æ¹å¼ï¼å®æä»»ä¸æ¹å¼å³å¯ï¼æä¼åçº§ã
1. åå§åBOSFileSystemæ¶éè¿åæ°æå®:
```python
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='{your ak}', secret_key='{your sk}', sts_token=None)
```

2. éç½®ç¯å¢åé
```bash
export BCE_ACCESS_KEY_ID=xxx
export BCE_SECRET_ACCESS_KEY=xxx
export BOS_ENDPOINT=xxx
```
### ç¤ºä¾
1. éè¿bosfsåä¸¾bosä¸çæ°æ®
```python
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
res = fs.ls('/mybucket/')
print(res)
```
2. éè¿bosfsè¯»åbosä¸çæ°æ®
```python
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
with fs.open('/mybucket/README.md') as f:
    print(f.readline())
```
    
3. éè¿bosfsåå¥æ°æ®å°bos
```
import bosfs
import os

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
object_name = "file1"
data = os.urandom(10 * 2**20)
with bosfs.open(object_name, "wb") as f_wb:
    f_wb.write(data)
```
æ´å¤ä½¿ç¨æ¹æ³å¯åè[fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html)ææ¡£

## è¯»åæ§è½

### æ§è½æµè¯ç»æ:
åæä»¶é¡ºåºè¯»åæ§è½:
| file_size_mb | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|--------------|------------------|-----------------|--------------|-------------|
|            1 |        9.672178  |       4.640667  |    0.104893  |  0.216561  |
|            4 |       24.867180  |      15.987999  |    0.160897  |  0.250713  |
|           10 |       30.681564  |      30.890233  |    0.343196  |   0.325457  |
|          128 |       75.634190  |      93.547005  |    1.692568  |   1.369047  |
|          512 |       80.359467  |      97.840121  |    6.371391  |   5.233074  |

å¤æä»¶å¹¶åè¯»åæ§è½ï¼
| file_nums | file_size_mb | threads_nums | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|--------------|--------------|--------------|--------------|--------------|--------------|--------------|
|       1000  |            4 |       16    |        381.91    |       301.56    |    10.47     |     13.26  |


### æµè¯ç¯å¢:
- æµè¯æºå¨: 16æ ¸32Gåå­, 3Gbpsç½ç»å¸¦å®½

### æµè¯ä»£ç :
```python
import bosfs
import os
import time
import numpy as np
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

def test_bosfs_performance(bucket_name, file_sizes=None, num_iterations=3, plot_results=True):
    """
    æµè¯ BosFS çè¯»åæ§è½
    
    åæ°:
    bucket_name: BOS æ¡¶åç§°
    file_sizes: è¦æµè¯çæä»¶å¤§å°åè¡¨ï¼ä»¥MBä¸ºåä½ï¼
    num_iterations: æ¯ä¸ªæä»¶å¤§å°éå¤æµè¯çæ¬¡æ°
    plot_results: æ¯å¦ç»å¶æ§è½å¾è¡¨
    """
    # åå§å BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    # å¦ææªæå®æä»¶å¤§å°ï¼åä½¿ç¨é»è®¤å¼
    if file_sizes is None:
        file_sizes = [1, 10, 50, 100, 250, 500]  # MB
    
    # åå»ºç»æå­å¨ç»æ
    results = {
        'file_size_mb': [],
        'write_speed_mbps': [],
        'read_speed_mbps': [],
        'write_time_s': [],
        'read_time_s': []
    }
    
    # å¯¹æ¯ä¸ªæä»¶å¤§å°è¿è¡æµè¯
    for size_mb in file_sizes:
        print(f"\næµè¯æä»¶å¤§å°: {size_mb} MB")
        size_bytes = size_mb * 2**20
        
        write_speeds = []
        read_speeds = []
        write_times = []
        read_times = []
        
        # éå¤æµè¯å¤æ¬¡ä»¥è·å¾å¹³åæ§è½
        for i in range(num_iterations):
            # çæéæºæ°æ®
            print(f"  è¿­ä»£ {i+1}/{num_iterations}: çæéæºæ°æ®...")
            data = os.urandom(size_bytes)
            
            # æå»ºå¯ä¸çå¯¹è±¡åç§°
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            object_name = f"{bucket_name}/perf_test_{size_mb}mb_{timestamp}_{i}.dat"
            
            # æµè¯åå¥æ§è½
            print(f"  è¿­ä»£ {i+1}/{num_iterations}: æµè¯åå¥æ§è½...")
            write_start = time.time()
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            write_end = time.time()
            
            write_time = write_end - write_start
            write_speed = size_mb / write_time  # MB/s
            
            write_times.append(write_time)
            write_speeds.append(write_speed)
            
            # æµè¯è¯»åæ§è½
            print(f"  è¿­ä»£ {i+1}/{num_iterations}: æµè¯è¯»åæ§è½...")
            read_start = time.time()
            with fs.open(object_name, "rb") as f_rb:
                read_data = f_rb.read()
            read_end = time.time()
            
            read_time = read_end - read_start
            read_speed = size_mb / read_time  # MB/s
            
            read_times.append(read_time)
            read_speeds.append(read_speed)
            
            # éªè¯æ°æ®å®æ´æ§
            if len(read_data) != len(data):
                print(f"  è­¦å: è¯»åçæ°æ®å¤§å°({len(read_data)})ä¸åå¥çæ°æ®å¤§å°({len(data)})ä¸å¹é!")
            
            # æ¸çæµè¯æä»¶
            try:
                fs.rm(object_name)
                print(f"  å·²å é¤æµè¯æä»¶: {object_name}")
            except Exception as e:
                print(f"  å é¤æä»¶æ¶åºé: {e}")
        
        # è®¡ç®å¹³åæ§è½
        avg_write_speed = np.mean(write_speeds)
        avg_read_speed = np.mean(read_speeds)
        avg_write_time = np.mean(write_times)
        avg_read_time = np.mean(read_times)
        
        # å­å¨ç»æ
        results['file_size_mb'].append(size_mb)
        results['write_speed_mbps'].append(avg_write_speed)
        results['read_speed_mbps'].append(avg_read_speed)
        results['write_time_s'].append(avg_write_time)
        results['read_time_s'].append(avg_read_time)
        
        print(f"  å¹³ååå¥éåº¦: {avg_write_speed:.2f} MB/s, å¹³ååå¥æ¶é´: {avg_write_time:.2f} ç§")
        print(f"  å¹³åè¯»åéåº¦: {avg_read_speed:.2f} MB/s, å¹³åè¯»åæ¶é´: {avg_read_time:.2f} ç§")
    
    # åå»ºDataFrameä»¥ä¾¿äºåæ
    results_df = pd.DataFrame(results)
    
    # æå°ç»æè¡¨æ ¼
    print("\næ§è½æµè¯ç»æ:")
    print(results_df.to_string(index=False))
    
    if plot_results:
        # ç»å¶æ§è½å¾è¡¨
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # éåº¦å¾è¡¨
        ax1.plot(results['file_size_mb'], results['write_speed_mbps'], 'b-o', label='Write Speed')
        ax1.plot(results['file_size_mb'], results['read_speed_mbps'], 'r-o', label='Read Speed')
        ax1.set_xlabel('File Size (MB)')
        ax1.set_ylabel('Speed (MB/s)')
        ax1.set_title('BosFS Read/Write Speed')
        ax1.legend()
        ax1.grid(True)
        
        # æ¶é´å¾è¡¨
        ax2.plot(results['file_size_mb'], results['write_time_s'], 'b-o', label='Write Cost')
        ax2.plot(results['file_size_mb'], results['read_time_s'], 'r-o', label='Read Cost')
        ax2.set_xlabel('File Size (MB)')
        ax2.set_ylabel('Cost (Second)')
        ax2.set_title('BosFS Read/Write Cost (Second)')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.savefig('bosfs_performance.png')
        plt.show()
    
    return results_df


def test_bosfs_parallel_performance(bucket_name, file_size_mb=10, num_files=5, num_threads=4):
    """
    æµè¯ BosFS çå¹¶è¡è¯»åæ§è½
    
    åæ°:
    bucket_name: BOS æ¡¶åç§°
    file_size_mb: æ¯ä¸ªæä»¶çå¤§å°ï¼MBï¼
    num_files: è¦åå¥/è¯»åçæä»¶æ°é
    num_threads: å¹¶è¡çº¿ç¨æ°
    """
    from concurrent.futures import ThreadPoolExecutor
    
    # åå§å BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    file_size_bytes = file_size_mb * 2**20
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    
    # çææä»¶ååè¡¨
    object_names = [f"{bucket_name}/parallel_test_{timestamp}_{i}.dat" for i in range(num_files)]
    
    # çæéæºæ°æ®
    print(f"çæ {num_files} ä¸ª {file_size_mb} MB çéæºæä»¶...")
    data_list = [os.urandom(file_size_bytes) for _ in range(num_files)]
    
    # å¹¶è¡åå¥å½æ°
    def write_file(args):
        object_name, data = args
        try:
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            return True
        except Exception as e:
            print(f"åå¥ {object_name} æ¶åºé: {e}")
            return False
    
    # å¹¶è¡è¯»åå½æ°
    def read_file(object_name):
        try:
            with fs.open(object_name, "rb") as f_rb:
                data = f_rb.read()
            return len(data)
        except Exception as e:
            print(f"è¯»å {object_name} æ¶åºé: {e}")
            return 0
    
    # æµè¯å¹¶è¡åå¥
    print(f"\næµè¯å¹¶è¡åå¥ ({num_threads} çº¿ç¨)...")
    write_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        results = list(tqdm(
            executor.map(write_file, zip(object_names, data_list)),
            total=num_files,
            desc="åå¥è¿åº¦"
        ))
    
    write_end = time.time()
    write_time = write_end - write_start
    total_data_size_mb = file_size_mb * num_files
    write_speed = total_data_size_mb / write_time
    
    print(f"å¹¶è¡åå¥å®æ: {sum(results)}/{num_files} ä¸ªæä»¶æå")
    print(f"æ»åå¥æ¶é´: {write_time:.2f} ç§")
    print(f"æ»æ°æ®å¤§å°: {total_data_size_mb} MB")
    print(f"å¹³ååå¥éåº¦: {write_speed:.2f} MB/s")
    
    # æµè¯å¹¶è¡è¯»å
    print(f"\næµè¯å¹¶è¡è¯»å ({num_threads} çº¿ç¨)...")
    read_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        read_sizes = list(tqdm(
            executor.map(read_file, object_names),
            total=num_files,
            desc="è¯»åè¿åº¦"
        ))
    
    read_end = time.time()
    read_time = read_end - read_start
    read_speed = total_data_size_mb / read_time
    
    print(f"å¹¶è¡è¯»åå®æ: {sum(1 for size in read_sizes if size > 0)}/{num_files} ä¸ªæä»¶æå")
    print(f"æ»è¯»åæ¶é´: {read_time:.2f} ç§")
    print(f"å¹³åè¯»åéåº¦: {read_speed:.2f} MB/s")
    
    # æ¸çæµè¯æä»¶
    print("\næ¸çæµè¯æä»¶...")
    for object_name in tqdm(object_names, desc="å é¤æä»¶"):
        try:
            fs.rm(object_name)
        except Exception as e:
            print(f"å é¤ {object_name} æ¶åºé: {e}")
    
    return {
        'parallel_write_speed_mbps': write_speed,
        'parallel_read_speed_mbps': read_speed,
        'parallel_write_time_s': write_time,
        'parallel_read_time_s': read_time
    }


if __name__ == "__main__":
    # è®¾ç½® BOS æ¡¶åç§°
    bucket_name = "your-bucket-name"
    
    # è¿è¡åæä»¶è¯»åæ§è½æµè¯
    print("=" * 80)
    print("å¼å§ BosFS åæä»¶è¯»åæ§è½æµè¯")
    print("=" * 80)
    
    # èªå®ä¹æä»¶å¤§å°åè¡¨ (MB)
    file_sizes = [1, 10, 50, 100]
    
    # è¿è¡æ§è½æµè¯
    results_df = test_bosfs_performance(bucket_name, file_sizes=file_sizes, num_iterations=3)
    
    # è¿è¡å¹¶è¡æ§è½æµè¯
    print("\n" + "=" * 80)
    print("å¼å§ BosFS å¹¶è¡è¯»åæ§è½æµè¯")
    print("=" * 80)
    
    parallel_results = test_bosfs_parallel_performance(
        bucket_name, 
        file_size_mb=4,     # æ¯ä¸ªæä»¶ 4MB
        num_files=1000,     # æ»å± 1000 ä¸ªæä»¶
        num_threads=16      # ä½¿ç¨ 16 ä¸ªçº¿ç¨
    )
    
    # ä¿å­ç»æå° CSV æä»¶
    results_df.to_csv('bosfs_performance_results.csv', index=False)
    print("\næµè¯ç»æå·²ä¿å­å° bosfs_performance_results.csv")
```


## ä½¿ç¨éå¶
å½åbosfsæä¸æ¯æasyncæ¨¡å¼ã

## çæ¬ä¿¡æ¯
v1.0.0 - 2025-03-20
---
### New features
- é¦æ¬¡æäº¤
- æ¯æsync bosfs

### Breaking changes
- æ 
