跳转至

🐦‍⬛ ailab100.pdbc.datalake

数据湖(Data Lake)是一种基于集群和分布式文件系统的一种数据存储方式,对企业中的所有数据进行统一存储,数据湖的就是原始数据保存区,从原始数据(源系统数据的精确副本)转换为用于报告、可视化分析和机器学习等各种任务的目标数据。

数据湖中的数据包括结构化数据(关系数据库数据),半结构化数据(CSV、XML、JSON等),非结构化数据(电子邮件,文档,PDF)和二进制数据(图像、音频、视频),从而形成一个容纳所有形式数据的集中式数据存储。 数据湖的主要供应商一般都是超大规模公共云供应商,例如亚马逊AWS、微软Azure和谷歌云平台(GCP)。

  • MinIO 是一个基于 Golang 实现的高可用、高弹性的开源对象存储系统(OSS),皆在提供云上的高可扩展与高性能的分布式文件存储系统。MinIO 完全兼容 Amason 的 S3 分布式对象存储项目。

  • Amazon S3 是一种对象存储服务,采用分布式架构,数据存储在多个物理位置以提高可靠性和容错能力。S3使用容器化技术,可以实现水平扩展,从而满足不同规模和需求的存储。

  • Google Cloud Storage(GCS) 是 google 公司的一个存储平台,可提供高性能的对象存储服务,并包含出色的可伸缩性,数据可用性,持久性和安全性。它使您可以存储对象并立即从任何存储类别访问任何数量的数据,使用单个统一的API 将存储集成到应用程序中,并轻松优化价格和性能。

  • Azure Blob 存储是 Microsoft 提供的适用于云的对象存储解决方案。 Blob 存储最适合存储巨量的非结构化数据。

alcedo_pdbc.datalake

@File :init.py @Time :2025-7-16 14:18 @Author:AI Lab Morgan

Classes

alcedo_pdbc.datalake.AzureBlob

AzureBlob class create a ligo azureblob object, through which you can able to read, write, upload, download data from Azure Blob Storage.

Parameters:

Name Type Description Default
config dict

Automatically loaded from the config file (yaml)

required
Functions
alcedo_pdbc.datalake.AzureBlob.download_file(container_name, blob_name, path_to_download='.') method descriptor

AzureBlob.download_file(self, str container_name: str, str blob_name: str, path_to_download='.')

Takes container name and blob name as arguments and download the file

Parameters:

Name Type Description Default
container_name str

container name

required
blob_name str

blob name

required
path_to_download str

save location. Defaults to '.'.

'.'
alcedo_pdbc.datalake.AzureBlob.download_folder(container_name, blob_path, local_path_to_download='.') method descriptor

AzureBlob.download_folder(self, str container_name: str, str blob_path: str, local_path_to_download='.')

alcedo_pdbc.datalake.AzureBlob.read_as_dataframe(container_name, blob_name, pandas_args={}, polars_args={}, extension='csv', return_type='pandas') method descriptor

AzureBlob.read_as_dataframe(self, str container_name: str, str blob_name: str, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')

Takes Azure Storage account container name and blob name and return datafarme.

Parameters:

Name Type Description Default
container_name str

Container Name of the azure storage account

required
blob_name str

Blob Name which wants to read

required
pandas_args dict

pandas arguments like encoding, etc

{}
extension str

extension of the files, It take automatically from the blob_name parameter. Defaults to 'csv'.

'csv'
return_type str

which dataframe you want to return (pandas, polars, dask etc). Defaults to 'pandas'.

'pandas'

Returns:

Name Type Description
DataFrame ``Pandas``、``Polars`` or ``Dask``

根据 return_type 参数返回对应的数据帧 Dataframe

alcedo_pdbc.datalake.AzureBlob.upload_file(source_file_path, container_name, blob_name=None) method descriptor

AzureBlob.upload_file(self, str source_file_path: str, str container_name: str, str blob_name: str = None)

Takes source file path, container name and blob name as arguments and upload the file to Azure Blob Storage

Parameters:

Name Type Description Default
source_file_path str

source file path

required
container_name str

container name

required
blob_name str

blob name, if not mentioned, it automatically takes source filename as blob name. Defaults to None.

None
alcedo_pdbc.datalake.AzureBlob.upload_folder(local_folder_path, container_name, blob_name) method descriptor

AzureBlob.upload_folder(self, str local_folder_path: str, str container_name: str, str blob_name: str) -> None

alcedo_pdbc.datalake.AzureBlob.write_dataframe(df, container_name, blob_name, overwrite=True, extension='csv', pandas_args={}, polars_args={}) method descriptor

AzureBlob.write_dataframe(self, df, str container_name: str, str blob_name: str, overwrite=True, extension='csv', pandas_args={}, polars_args={})

Takes DataFrame, container name, filename as arguments and write the dataframe to Azure Blob Storage.

Parameters:

Name Type Description Default
df DataFrame

Dataframe which need to be uploaded

required
container_name str

Container Name of the azure storage account

required
blob_name str

file name with extension

required
overwrite bool

Overwrite the existing data. Defaults to True.

True
extension str

extension of the files, It take automatically from the filename parameter. Defaults to 'csv'

'csv'
alcedo_pdbc.datalake.GCS

GCS class create a ligo gcs object, through which you can able to read, write, upload, download data from Google Cloud Storage.

Parameters:

Name Type Description Default
config dict

Automatically loaded from the config file (yaml)

required
Functions
alcedo_pdbc.datalake.GCS.download_file(gcs_path=None, bucket=None, blob_name=None, path_to_download='.') method descriptor

GCS.download_file(self, str gcs_path: str = None, str bucket: str = None, str blob_name: str = None, str path_to_download: str = '.')

Takes gcs path or (bucket and blob name) as arguments and download the file

Parameters:

Name Type Description Default
gcs_path str

GCS file path. Defaults to None.

None
bucket str

GCS bucket name, if gcs path is not provided. Defaults to None.

None
blob_name str

GCS blob name, if gcs path is not provied. Defaults to None.

None
path_to_download str

save location. Defaults to '.'.

'.'
alcedo_pdbc.datalake.GCS.download_folder(gcs_path=None, bucket=None, blob_path=None, local_path_to_download='.') method descriptor

GCS.download_folder(self, str gcs_path: str = None, str bucket: str = None, str blob_path: str = None, str local_path_to_download: str = '.')

alcedo_pdbc.datalake.GCS.read_as_dataframe(gcs_path=None, bucket=None, blob_name=None, pandas_args={}, polars_args={}, extension='csv', return_type='pandas') method descriptor

GCS.read_as_dataframe(self, str gcs_path: str = None, str bucket: str = None, str blob_name: str = None, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')

Takes gcs path as argument and return dataframe.

Parameters:

Name Type Description Default
gcs_path str

gcs path of the file need to be loaded, for multiple file loading, use gs://bucket/path/filename* to load all files from folder, use gs://bucket/folder/.

None
bucket str

GCS Bucket Name

None
blob_name str

file name with extension

None
pandas_args dict

pandas arguments like encoding, etc

{}
extension str

extension of the files, It take automatically from the gcs path parameter. Defaults to 'csv'.

'csv'
return_type str

which dataframe you want to return (pandas, polars, dask etc). Defaults to 'pandas'.

'pandas'

Returns:

Name Type Description
DataFrame ``Pandas``、``Polars`` or ``Dask``

根据 return_type 参数返回对应的数据帧 Dataframe

alcedo_pdbc.datalake.GCS.upload_file(source_file_path, bucket, blob_name) method descriptor

GCS.upload_file(self, str source_file_path: str, str bucket: str, str blob_name: str)

Takes source file path, bucket and blob name as arguments and upload the file to GCS

Parameters:

Name Type Description Default
source_file_path str

Source file path

required
bucket str

GCS Bucket Name

required
blob_name str

Blob name (destination file path)

required
alcedo_pdbc.datalake.GCS.upload_folder(local_folder_path, bucket, blob_path='') method descriptor

GCS.upload_folder(self, str local_folder_path: str, str bucket: str, str blob_path: str = '')

alcedo_pdbc.datalake.GCS.write_dataframe(df, bucket, blob_name, extension='csv', pandas_args={}, polars_args={}) method descriptor

GCS.write_dataframe(self, df, bucket, blob_name, extension='csv', pandas_args={}, polars_args={})

Takes DataFrame, bucket name, blob name as arguments and write the dataframe to GCS.

Parameters:

Name Type Description Default
df DataFrame

Dataframe which need to be uploaded

required
bucket str

GCS Bucket Name

required
blob_name str

file name with extension

required
extension str

extension of the files, It take automatically from the filename parameter.

'csv'
alcedo_pdbc.datalake.MinIO

ailab100.alcedo_pdbc.datalake.MinIO 类用于创建 MinIO S3实例,实现MinIO 的读、写、上传、下载等操作。

Parameters:

Name Type Description Default
endpoint str

S3兼容对象存储服务endpoint。

required
access_key str

对象存储的Access key。(如果是匿名访问则可以为空)。

required
secret_key str

对象存储的Secret key。(如果是匿名访问则可以为空)

required
secure bool

默认值为True 设为True代表启用HTTPS

False
region str

可选,默认值为None 设置该值以覆盖自动发现存储桶region。

None

Examples:

>>> from alcedo_pdbc.datalake import MinIO
>>> minio_client = MinIO('127.0.0.1:9000',access_key='Q3AM3UQ867SPQQA43P2F',
secret_key='zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG')
Notes

MinIO S3部署时会曝光两个端口9000和9001,其中9000为客户端连接端口,9001为Web管理端口。

Functions
alcedo_pdbc.datalake.MinIO.bucket_exists(bucket_name) method descriptor

MinIO.bucket_exists(self, str bucket_name: str) -> bool

bucket_exists()方法用于检查存储桶是否存在。

Parameters:

Name Type Description Default
bucket_name str

存储桶名称

required

Returns:

Name Type Description
bool bool

如果存储桶存在返回True,否则返回False

Examples:

>>> if minio_client.bucket_exists("my-bucket"):
>>>     print("my-bucket exists")
>>> else:
>>>     print("my-bucket does not exist")
alcedo_pdbc.datalake.MinIO.download_file(minio_path=None, bucket=None, key=None, threads=4, local_path='.') method descriptor

MinIO.download_file(self, str minio_path: str = None, str bucket: str = None, str key: str = None, int threads: int = 4, str local_path: str = '.')

download_file()方法用于从 MinIO下载文件,可通过 s3_path 参数,或通过 bucketkey 参数

Parameters:

Name Type Description Default
minio_path str

MinIO 文件路径, 可选项, 与 bucket 和 key 参数不能同时为空.

None
bucket str

MinIO 桶名, 可选项, 与 minio_path 参数不能同时为空,

None
key str

带扩展名的文件名

None
threads int

线程数

4
local_path str

可选,文件保存路径,默认为当前目录 './'

'.'

Returns:

Name Type Description
file ``CSV``、``Excel``、 ``JSON``、``HTML``、 ``HDF5``、 ``Feather``、 ``Parquet`` 、``Apache Avro``

根据参数filename文件类型导出文件保存至指定的目录下

Examples:

>>> minio_client.download_file(minio_path="s3://datalake/datasets/3财务困境研究数据集/ST财务预警.csv",threads=4)
>>> df = pd.read_csv('./ST财务预警.csv')
>>> print(df.info)
alcedo_pdbc.datalake.MinIO.download_folder(minio_path=None, bucket=None, key=None, local_path_to_download='.') method descriptor

MinIO.download_folder(self, str minio_path: str = None, str bucket: str = None, str key: str = None, str local_path_to_download: str = '.')

Takes s3 path or (bucket and key name) as arguments and download the folder

Parameters:

Name Type Description Default
minio_path str

S3 path from where it needs to download the folder. Defaults to None.

None
bucket str

S3 bucket name, if S3 path is not provided . Defaults to None.

None
key str

S3 Key name, if S3 path is not provied. Defaults to None.

None
local_path_to_download str

save location. Defaults to '.' (current directory).

'.'

Returns:

Name Type Description
file ``CSV``、``Excel``、 ``JSON``、``HTML``、 ``HDF5``、 ``Feather``、 ``Parquet`` 、``Apache Avro``

根据参数filename文件类型导出文件保存至指定的目录下

alcedo_pdbc.datalake.MinIO.get_bucket_policy(bucket_name, prefix) method descriptor

MinIO.get_bucket_policy(self, str bucket_name: str, str prefix: str)

bucket_exists()方法用于获取存储桶的策略

Parameters:

Name Type Description Default
bucket_name str

存储桶名称。

required
prefix str

对象的名称前缀。

required

Returns:

Name Type Description
Policy Policy

Policy枚举:Policy.READ_ONLY,Policy.WRITE_ONLY,Policy.READ_WRITE或 Policy.NONE。

Examples:

>>> policy = minio_client.get_bucket_policy(bucket_name="my-bucket",
                                           prefix='/datasets/1客户订单数据集')
alcedo_pdbc.datalake.MinIO.list_buckets() method descriptor

MinIO.list_buckets(self)

list_buckets()方法应用于返回所有可访问桶的信息

Returns:

Name Type Description
Buckets桶列表 list

class:Bucket <Bucket> object

Examples:

>>> buckets = minio_client.list_buckets()
>>> for bucket in buckets:
        print(bucket.name, bucket.creation_date)
alcedo_pdbc.datalake.MinIO.read_as_dataframe(minio_path=None, bucket=None, key=None, pandas_args={}, polars_args={}, extension='csv', return_type='pandas') method descriptor

MinIO.read_as_dataframe(self, str minio_path: str = None, str bucket: str = None, str key: str = None, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')

read_as_dataframe()方法从 MinIO 读取 CSV 文件返回指定的 DataFrame

注意:如果要加载多个文件或全部文件时,要求文件的数据结构相同。

Parameters:

Name Type Description Default
minio_path str

MinIO 文件路径,可选

与 bucket 和 key 参数不能同时为空. - 如果需要加载多个文件,请使用“bucket/path/filename*”; - 如果需要加载文件夹中的所有文件"bucket/folder/."

None
bucket str

MinIO 桶名,可选

与 minio_path 参数不能同时为空,

None
key str

带扩展名的文件名

None
pandas_args dict

Pandas的参数

encoding

{}
extension str

可选项,默认为 csv

文件的扩展名,自动从 minio_path 参数中获取。

'csv'
return_type str

可选项,默认值为 pandas

返回 Dataframe 的类型,可选项为 pandaspolarsdask.

'pandas'

Returns:

Name Type Description
DataFrame ``Pandas``、``Polars`` or ``Dask``

根据 return_type 参数返回对应的数据帧 Dataframe

Examples:

>>> data=minio_client.read_as_dataframe(minio_path="/datalake/datasets/3财务困境研究数据集/ST财务预警.csv")
>>> print(data)
>>> #加载文件夹中的所有文件,要求数据结构相同
>>> data1=minio_client.read_as_dataframe(minio_path="/datalake/datasets/3财务困境研究数据集/")
alcedo_pdbc.datalake.MinIO.upload_file(source_file_path, bucket, key) method descriptor

MinIO.upload_file(self, str source_file_path: str, str bucket: str, str key: str)

Takes source file path, bucket and key as arguments and upload the file to S3

Parameters:

Name Type Description Default
source_file_path str

source file path

required
bucket str

destination bucket

required
key str

destination file path

required
alcedo_pdbc.datalake.MinIO.upload_folder(local_folder_path, bucket, key) method descriptor

MinIO.upload_folder(self, str local_folder_path: str, str bucket: str, str key: str) -> None

Takes local path, bucket and key as arguments and upload the folder to s3

Parameters:

Name Type Description Default
local_folder_path str

local path of the folder want to be uploaded

required
bucket str

s3 bucket name

required
key str

s3 key name

required
alcedo_pdbc.datalake.MinIO.write_dataframe(df, bucket, key, extension='csv', pandas_args={}, polars_args={}) method descriptor

MinIO.write_dataframe(self, df, str bucket: str, str key: str, extension='csv', pandas_args={}, polars_args={}) -> None

Takes DataFrame, bucket name, filename as arguments and write the dataframe to S3.

Parameters:

Name Type Description Default
df DataFrame

Dataframe which need to be uploaded

required
bucket str

S3 Bucket Name

required
key str

file name with extension

required
extension str

extension of the files, It take automatically from the filename parameter.

'csv'
alcedo_pdbc.datalake.S3

S3 class create a ligo s3 object, through which you can able to read, write, upload, download data from AWS S3

Parameters:

Name Type Description Default
config dict

Automatically loaded from the config file (yaml)

required
Functions
alcedo_pdbc.datalake.S3.download_file(s3_path=None, bucket=None, key=None, local_path='.') method descriptor

S3.download_file(self, str s3_path: str = None, str bucket: str = None, str key: str = None, str local_path: str = '.')

Takes s3 path or (bucket and key name) as arguments and download the file

Parameters:

Name Type Description Default
s3_path str

S3 path from where it needs to download the file. Defaults to None.

None
bucket str

S3 bucket name, if S3 path is not provided . Defaults to None.

None
key str

S3 Key name, if S3 path is not provied. Defaults to None.

None
local_path str

save location. Defaults to '.' (current directory).

'.'

Returns:

Name Type Description
file ``CSV``、``Excel``、 ``JSON``、``HTML``、 ``HDF5``、 ``Feather``、 ``Parquet`` 、``Apache Avro``

根据参数filename文件类型导出文件保存至指定的目录下

alcedo_pdbc.datalake.S3.download_folder(s3_path=None, bucket=None, key=None, local_path_to_download='.') method descriptor

S3.download_folder(self, str s3_path: str = None, str bucket: str = None, str key: str = None, str local_path_to_download: str = '.')

Takes s3 path or (bucket and key name) as arguments and download the folder

Parameters:

Name Type Description Default
s3_path str

S3 path from where it needs to download the folder. Defaults to None.

None
bucket str

S3 bucket name, if S3 path is not provided . Defaults to None.

None
key str

S3 Key name, if S3 path is not provied. Defaults to None.

None
local_path_to_download str

save location. Defaults to '.' (current directory).

'.'
alcedo_pdbc.datalake.S3.read_as_dataframe(s3_path=None, bucket=None, key=None, pandas_args={}, polars_args={}, extension='csv', return_type='pandas') method descriptor

S3.read_as_dataframe(self, str s3_path: str = None, str bucket: str = None, str key: str = None, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')

Takes s3 path as arguments and return dataframe.

Parameters:

Name Type Description Default
s3_path str

s3 path of the file need to be loaded, for multiple file loading,

None
use s3

//bucket/path/filename* to load all files from folder, use s3://bucket/folder/.

required
bucket str

S3 Bucket Name

None
key str

file name with extension

None
pandas_args dict

pandas arguments like encoding, etc

{}
extension str

extension of the files, It take automatically from the s3_path parameter.

'csv'
return_type str

which dataframe you want to return (pandas, polars, dask etc). Defaults to

'pandas'

Returns:

Name Type Description
DataFrame ``Pandas``、``Polars`` or ``Dask``

根据 return_type 参数返回对应的数据帧 Dataframe

alcedo_pdbc.datalake.S3.upload_file(source_file_path, bucket, key) method descriptor

S3.upload_file(self, str source_file_path: str, str bucket: str, str key: str)

Takes source file path, bucket and key as arguments and upload the file to S3

Parameters:

Name Type Description Default
source_file_path str

source file path

required
bucket str

destination bucket

required
key str

destination file path

required
alcedo_pdbc.datalake.S3.upload_folder(local_folder_path, bucket, key) method descriptor

S3.upload_folder(self, str local_folder_path: str, str bucket: str, str key: str) -> None

Takes local path, bucket and key as arguments and upload the folder to s3

Parameters:

Name Type Description Default
local_folder_path str

local path of the folder want to be uploaded

required
bucket str

s3 bucket name

required
key str

s3 key name

required
alcedo_pdbc.datalake.S3.write_dataframe(df, bucket, key, extension='csv', pandas_args={}, polars_args={}) method descriptor

S3.write_dataframe(self, df, str bucket: str, str key: str, extension='csv', pandas_args={}, polars_args={}) -> None

Takes DataFrame, bucket name, filename as arguments and write the dataframe to S3.

Parameters:

Name Type Description Default
df DataFrame

Dataframe which need to be uploaded

required
bucket str

S3 Bucket Name

required
key str

file name with extension

required
extension str

extension of the files, It take automatically from the filename parameter. Defaults to 'csv'

'csv'