🐦⬛ ailab100.pdbc.datalake
数据湖(Data Lake)是一种基于集群和分布式文件系统的一种数据存储方式,对企业中的所有数据进行统一存储,数据湖的就是原始数据保存区,从原始数据(源系统数据的精确副本)转换为用于报告、可视化分析和机器学习等各种任务的目标数据。
数据湖中的数据包括结构化数据(关系数据库数据),半结构化数据(CSV、XML、JSON等),非结构化数据(电子邮件,文档,PDF)和二进制数据(图像、音频、视频),从而形成一个容纳所有形式数据的集中式数据存储。 数据湖的主要供应商一般都是超大规模公共云供应商,例如亚马逊AWS、微软Azure和谷歌云平台(GCP)。
-
MinIO 是一个基于 Golang 实现的高可用、高弹性的开源对象存储系统(OSS),皆在提供云上的高可扩展与高性能的分布式文件存储系统。MinIO 完全兼容 Amason 的 S3 分布式对象存储项目。
-
Amazon S3 是一种对象存储服务,采用分布式架构,数据存储在多个物理位置以提高可靠性和容错能力。S3使用容器化技术,可以实现水平扩展,从而满足不同规模和需求的存储。
-
Google Cloud Storage(GCS) 是 google 公司的一个存储平台,可提供高性能的对象存储服务,并包含出色的可伸缩性,数据可用性,持久性和安全性。它使您可以存储对象并立即从任何存储类别访问任何数量的数据,使用单个统一的API 将存储集成到应用程序中,并轻松优化价格和性能。
- Azure Blob 存储是 Microsoft 提供的适用于云的对象存储解决方案。 Blob 存储最适合存储巨量的非结构化数据。
alcedo_pdbc.datalake
@File :init.py @Time :2025-7-16 14:18 @Author:AI Lab Morgan
Classes
alcedo_pdbc.datalake.AzureBlob
AzureBlob class create a ligo azureblob object, through which you can able to read, write, upload, download data from Azure Blob Storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config |
dict
|
Automatically loaded from the config file (yaml) |
required |
Functions
alcedo_pdbc.datalake.AzureBlob.download_file(container_name, blob_name, path_to_download='.')
method descriptor
AzureBlob.download_file(self, str container_name: str, str blob_name: str, path_to_download='.')
Takes container name and blob name as arguments and download the file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container_name |
str
|
container name |
required |
blob_name |
str
|
blob name |
required |
path_to_download |
str
|
save location. Defaults to '.'. |
'.'
|
alcedo_pdbc.datalake.AzureBlob.download_folder(container_name, blob_path, local_path_to_download='.')
method descriptor
AzureBlob.download_folder(self, str container_name: str, str blob_path: str, local_path_to_download='.')
alcedo_pdbc.datalake.AzureBlob.read_as_dataframe(container_name, blob_name, pandas_args={}, polars_args={}, extension='csv', return_type='pandas')
method descriptor
AzureBlob.read_as_dataframe(self, str container_name: str, str blob_name: str, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')
Takes Azure Storage account container name and blob name and return datafarme.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container_name |
str
|
Container Name of the azure storage account |
required |
blob_name |
str
|
Blob Name which wants to read |
required |
pandas_args |
dict
|
pandas arguments like encoding, etc |
{}
|
extension |
str
|
extension of the files, It take automatically from the blob_name parameter. Defaults to 'csv'. |
'csv'
|
return_type |
str
|
which dataframe you want to return (pandas, polars, dask etc). Defaults to 'pandas'. |
'pandas'
|
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
``Pandas``、``Polars`` or ``Dask``
|
根据 |
alcedo_pdbc.datalake.AzureBlob.upload_file(source_file_path, container_name, blob_name=None)
method descriptor
AzureBlob.upload_file(self, str source_file_path: str, str container_name: str, str blob_name: str = None)
Takes source file path, container name and blob name as arguments and upload the file to Azure Blob Storage
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_file_path |
str
|
source file path |
required |
container_name |
str
|
container name |
required |
blob_name |
str
|
blob name, if not mentioned, it automatically takes source filename as blob name. Defaults to None. |
None
|
alcedo_pdbc.datalake.AzureBlob.upload_folder(local_folder_path, container_name, blob_name)
method descriptor
AzureBlob.upload_folder(self, str local_folder_path: str, str container_name: str, str blob_name: str) -> None
alcedo_pdbc.datalake.AzureBlob.write_dataframe(df, container_name, blob_name, overwrite=True, extension='csv', pandas_args={}, polars_args={})
method descriptor
AzureBlob.write_dataframe(self, df, str container_name: str, str blob_name: str, overwrite=True, extension='csv', pandas_args={}, polars_args={})
Takes DataFrame, container name, filename as arguments and write the dataframe to Azure Blob Storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
DataFrame
|
Dataframe which need to be uploaded |
required |
container_name |
str
|
Container Name of the azure storage account |
required |
blob_name |
str
|
file name with extension |
required |
overwrite |
bool
|
Overwrite the existing data. Defaults to True. |
True
|
extension |
str
|
extension of the files, It take automatically from the filename parameter. Defaults to 'csv' |
'csv'
|
alcedo_pdbc.datalake.GCS
GCS class create a ligo gcs object, through which you can able to read, write, upload, download data from Google Cloud Storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config |
dict
|
Automatically loaded from the config file (yaml) |
required |
Functions
alcedo_pdbc.datalake.GCS.download_file(gcs_path=None, bucket=None, blob_name=None, path_to_download='.')
method descriptor
GCS.download_file(self, str gcs_path: str = None, str bucket: str = None, str blob_name: str = None, str path_to_download: str = '.')
Takes gcs path or (bucket and blob name) as arguments and download the file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gcs_path |
str
|
GCS file path. Defaults to None. |
None
|
bucket |
str
|
GCS bucket name, if gcs path is not provided. Defaults to None. |
None
|
blob_name |
str
|
GCS blob name, if gcs path is not provied. Defaults to None. |
None
|
path_to_download |
str
|
save location. Defaults to '.'. |
'.'
|
alcedo_pdbc.datalake.GCS.download_folder(gcs_path=None, bucket=None, blob_path=None, local_path_to_download='.')
method descriptor
GCS.download_folder(self, str gcs_path: str = None, str bucket: str = None, str blob_path: str = None, str local_path_to_download: str = '.')
alcedo_pdbc.datalake.GCS.read_as_dataframe(gcs_path=None, bucket=None, blob_name=None, pandas_args={}, polars_args={}, extension='csv', return_type='pandas')
method descriptor
GCS.read_as_dataframe(self, str gcs_path: str = None, str bucket: str = None, str blob_name: str = None, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')
Takes gcs path as argument and return dataframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gcs_path |
str
|
gcs path of the file need to be loaded, for multiple file loading, use gs://bucket/path/filename* to load all files from folder, use gs://bucket/folder/. |
None
|
bucket |
str
|
GCS Bucket Name |
None
|
blob_name |
str
|
file name with extension |
None
|
pandas_args |
dict
|
pandas arguments like encoding, etc |
{}
|
extension |
str
|
extension of the files, It take automatically from the gcs path parameter. Defaults to 'csv'. |
'csv'
|
return_type |
str
|
which dataframe you want to return (pandas, polars, dask etc). Defaults to 'pandas'. |
'pandas'
|
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
``Pandas``、``Polars`` or ``Dask``
|
根据 |
alcedo_pdbc.datalake.GCS.upload_file(source_file_path, bucket, blob_name)
method descriptor
GCS.upload_file(self, str source_file_path: str, str bucket: str, str blob_name: str)
Takes source file path, bucket and blob name as arguments and upload the file to GCS
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_file_path |
str
|
Source file path |
required |
bucket |
str
|
GCS Bucket Name |
required |
blob_name |
str
|
Blob name (destination file path) |
required |
alcedo_pdbc.datalake.GCS.upload_folder(local_folder_path, bucket, blob_path='')
method descriptor
GCS.upload_folder(self, str local_folder_path: str, str bucket: str, str blob_path: str = '')
alcedo_pdbc.datalake.GCS.write_dataframe(df, bucket, blob_name, extension='csv', pandas_args={}, polars_args={})
method descriptor
GCS.write_dataframe(self, df, bucket, blob_name, extension='csv', pandas_args={}, polars_args={})
Takes DataFrame, bucket name, blob name as arguments and write the dataframe to GCS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
DataFrame
|
Dataframe which need to be uploaded |
required |
bucket |
str
|
GCS Bucket Name |
required |
blob_name |
str
|
file name with extension |
required |
extension |
str
|
extension of the files, It take automatically from the filename parameter. |
'csv'
|
alcedo_pdbc.datalake.MinIO
ailab100.alcedo_pdbc.datalake.MinIO 类用于创建 MinIO S3实例,实现MinIO 的读、写、上传、下载等操作。
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint |
str
|
S3兼容对象存储服务endpoint。 |
required |
access_key |
str
|
对象存储的Access key。(如果是匿名访问则可以为空)。 |
required |
secret_key |
str
|
对象存储的Secret key。(如果是匿名访问则可以为空) |
required |
secure |
bool
|
默认值为 |
False
|
region |
str
|
可选,默认值为 |
None
|
Examples:
>>> from alcedo_pdbc.datalake import MinIO
>>> minio_client = MinIO('127.0.0.1:9000',access_key='Q3AM3UQ867SPQQA43P2F',
secret_key='zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG')
Notes
MinIO S3部署时会曝光两个端口9000和9001,其中9000为客户端连接端口,9001为Web管理端口。
Functions
alcedo_pdbc.datalake.MinIO.bucket_exists(bucket_name)
method descriptor
MinIO.bucket_exists(self, str bucket_name: str) -> bool
bucket_exists()方法用于检查存储桶是否存在。
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_name |
str
|
存储桶名称 |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
如果存储桶存在返回True,否则返回False |
Examples:
alcedo_pdbc.datalake.MinIO.download_file(minio_path=None, bucket=None, key=None, threads=4, local_path='.')
method descriptor
MinIO.download_file(self, str minio_path: str = None, str bucket: str = None, str key: str = None, int threads: int = 4, str local_path: str = '.')
download_file()方法用于从 MinIO下载文件,可通过 s3_path 参数,或通过 bucket 和 key 参数
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
minio_path |
str
|
MinIO 文件路径, 可选项, 与 bucket 和 key 参数不能同时为空. |
None
|
bucket |
str
|
MinIO 桶名, 可选项, 与 minio_path 参数不能同时为空, |
None
|
key |
str
|
带扩展名的文件名 |
None
|
threads |
int
|
线程数 |
4
|
local_path |
str
|
可选,文件保存路径,默认为当前目录 './' |
'.'
|
Returns:
| Name | Type | Description |
|---|---|---|
file |
``CSV``、``Excel``、 ``JSON``、``HTML``、 ``HDF5``、 ``Feather``、 ``Parquet`` 、``Apache Avro``
|
根据参数 |
Examples:
alcedo_pdbc.datalake.MinIO.download_folder(minio_path=None, bucket=None, key=None, local_path_to_download='.')
method descriptor
MinIO.download_folder(self, str minio_path: str = None, str bucket: str = None, str key: str = None, str local_path_to_download: str = '.')
Takes s3 path or (bucket and key name) as arguments and download the folder
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
minio_path |
str
|
S3 path from where it needs to download the folder. Defaults to None. |
None
|
bucket |
str
|
S3 bucket name, if S3 path is not provided . Defaults to None. |
None
|
key |
str
|
S3 Key name, if S3 path is not provied. Defaults to None. |
None
|
local_path_to_download |
str
|
save location. Defaults to '.' (current directory). |
'.'
|
Returns:
| Name | Type | Description |
|---|---|---|
file |
``CSV``、``Excel``、 ``JSON``、``HTML``、 ``HDF5``、 ``Feather``、 ``Parquet`` 、``Apache Avro``
|
根据参数 |
alcedo_pdbc.datalake.MinIO.get_bucket_policy(bucket_name, prefix)
method descriptor
MinIO.get_bucket_policy(self, str bucket_name: str, str prefix: str)
bucket_exists()方法用于获取存储桶的策略
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_name |
str
|
存储桶名称。 |
required |
prefix |
str
|
对象的名称前缀。 |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Policy |
Policy
|
Policy枚举:Policy.READ_ONLY,Policy.WRITE_ONLY,Policy.READ_WRITE或 Policy.NONE。 |
Examples:
alcedo_pdbc.datalake.MinIO.list_buckets()
method descriptor
alcedo_pdbc.datalake.MinIO.read_as_dataframe(minio_path=None, bucket=None, key=None, pandas_args={}, polars_args={}, extension='csv', return_type='pandas')
method descriptor
MinIO.read_as_dataframe(self, str minio_path: str = None, str bucket: str = None, str key: str = None, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')
read_as_dataframe()方法从 MinIO 读取 CSV 文件返回指定的 DataFrame。
注意:如果要加载多个文件或全部文件时,要求文件的数据结构相同。
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
minio_path |
str
|
MinIO 文件路径,可选 与 bucket 和 key 参数不能同时为空. - 如果需要加载多个文件,请使用“bucket/path/filename*”; - 如果需要加载文件夹中的所有文件"bucket/folder/." |
None
|
bucket |
str
|
MinIO 桶名,可选 与 minio_path 参数不能同时为空, |
None
|
key |
str
|
带扩展名的文件名 |
None
|
pandas_args |
dict
|
如 |
{}
|
extension |
str
|
可选项,默认为 文件的扩展名,自动从 |
'csv'
|
return_type |
str
|
可选项,默认值为 返回 |
'pandas'
|
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
``Pandas``、``Polars`` or ``Dask``
|
根据 |
Examples:
alcedo_pdbc.datalake.MinIO.upload_file(source_file_path, bucket, key)
method descriptor
MinIO.upload_file(self, str source_file_path: str, str bucket: str, str key: str)
Takes source file path, bucket and key as arguments and upload the file to S3
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_file_path |
str
|
source file path |
required |
bucket |
str
|
destination bucket |
required |
key |
str
|
destination file path |
required |
alcedo_pdbc.datalake.MinIO.upload_folder(local_folder_path, bucket, key)
method descriptor
MinIO.upload_folder(self, str local_folder_path: str, str bucket: str, str key: str) -> None
Takes local path, bucket and key as arguments and upload the folder to s3
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_folder_path |
str
|
local path of the folder want to be uploaded |
required |
bucket |
str
|
s3 bucket name |
required |
key |
str
|
s3 key name |
required |
alcedo_pdbc.datalake.MinIO.write_dataframe(df, bucket, key, extension='csv', pandas_args={}, polars_args={})
method descriptor
MinIO.write_dataframe(self, df, str bucket: str, str key: str, extension='csv', pandas_args={}, polars_args={}) -> None
Takes DataFrame, bucket name, filename as arguments and write the dataframe to S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
DataFrame
|
Dataframe which need to be uploaded |
required |
bucket |
str
|
S3 Bucket Name |
required |
key |
str
|
file name with extension |
required |
extension |
str
|
extension of the files, It take automatically from the filename parameter. |
'csv'
|
alcedo_pdbc.datalake.S3
S3 class create a ligo s3 object, through which you can able to read, write, upload, download data from AWS S3
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config |
dict
|
Automatically loaded from the config file (yaml) |
required |
Functions
alcedo_pdbc.datalake.S3.download_file(s3_path=None, bucket=None, key=None, local_path='.')
method descriptor
S3.download_file(self, str s3_path: str = None, str bucket: str = None, str key: str = None, str local_path: str = '.')
Takes s3 path or (bucket and key name) as arguments and download the file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path |
str
|
S3 path from where it needs to download the file. Defaults to None. |
None
|
bucket |
str
|
S3 bucket name, if S3 path is not provided . Defaults to None. |
None
|
key |
str
|
S3 Key name, if S3 path is not provied. Defaults to None. |
None
|
local_path |
str
|
save location. Defaults to '.' (current directory). |
'.'
|
Returns:
| Name | Type | Description |
|---|---|---|
file |
``CSV``、``Excel``、 ``JSON``、``HTML``、 ``HDF5``、 ``Feather``、 ``Parquet`` 、``Apache Avro``
|
根据参数 |
alcedo_pdbc.datalake.S3.download_folder(s3_path=None, bucket=None, key=None, local_path_to_download='.')
method descriptor
S3.download_folder(self, str s3_path: str = None, str bucket: str = None, str key: str = None, str local_path_to_download: str = '.')
Takes s3 path or (bucket and key name) as arguments and download the folder
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path |
str
|
S3 path from where it needs to download the folder. Defaults to None. |
None
|
bucket |
str
|
S3 bucket name, if S3 path is not provided . Defaults to None. |
None
|
key |
str
|
S3 Key name, if S3 path is not provied. Defaults to None. |
None
|
local_path_to_download |
str
|
save location. Defaults to '.' (current directory). |
'.'
|
alcedo_pdbc.datalake.S3.read_as_dataframe(s3_path=None, bucket=None, key=None, pandas_args={}, polars_args={}, extension='csv', return_type='pandas')
method descriptor
S3.read_as_dataframe(self, str s3_path: str = None, str bucket: str = None, str key: str = None, dict pandas_args: Dict = {}, dict polars_args: Dict = {}, extension='csv', return_type='pandas')
Takes s3 path as arguments and return dataframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path |
str
|
s3 path of the file need to be loaded, for multiple file loading, |
None
|
use |
s3
|
//bucket/path/filename* to load all files from folder, use s3://bucket/folder/. |
required |
bucket |
str
|
S3 Bucket Name |
None
|
key |
str
|
file name with extension |
None
|
pandas_args |
dict
|
pandas arguments like encoding, etc |
{}
|
extension |
str
|
extension of the files, It take automatically from the s3_path parameter. |
'csv'
|
return_type |
str
|
which dataframe you want to return (pandas, polars, dask etc). Defaults to |
'pandas'
|
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
``Pandas``、``Polars`` or ``Dask``
|
根据 |
alcedo_pdbc.datalake.S3.upload_file(source_file_path, bucket, key)
method descriptor
S3.upload_file(self, str source_file_path: str, str bucket: str, str key: str)
Takes source file path, bucket and key as arguments and upload the file to S3
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_file_path |
str
|
source file path |
required |
bucket |
str
|
destination bucket |
required |
key |
str
|
destination file path |
required |
alcedo_pdbc.datalake.S3.upload_folder(local_folder_path, bucket, key)
method descriptor
S3.upload_folder(self, str local_folder_path: str, str bucket: str, str key: str) -> None
Takes local path, bucket and key as arguments and upload the folder to s3
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_folder_path |
str
|
local path of the folder want to be uploaded |
required |
bucket |
str
|
s3 bucket name |
required |
key |
str
|
s3 key name |
required |
alcedo_pdbc.datalake.S3.write_dataframe(df, bucket, key, extension='csv', pandas_args={}, polars_args={})
method descriptor
S3.write_dataframe(self, df, str bucket: str, str key: str, extension='csv', pandas_args={}, polars_args={}) -> None
Takes DataFrame, bucket name, filename as arguments and write the dataframe to S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
DataFrame
|
Dataframe which need to be uploaded |
required |
bucket |
str
|
S3 Bucket Name |
required |
key |
str
|
file name with extension |
required |
extension |
str
|
extension of the files, It take automatically from the filename parameter. Defaults to 'csv' |
'csv'
|