跳到内容

云存储

Polars 可以读写 AWS S3、Azure Blob Storage 和 Google Cloud Storage。这三种存储提供商的 API 相同。

要从云存储读取,可能需要根据用例和云存储提供商安装额外的依赖项

$ pip install fsspec s3fs adlfs gcsfs
$ cargo add aws_sdk_s3 aws_config tokio --features tokio/full

从云存储读取

Polars 支持从云存储读取 Parquet、CSV、IPC 和 NDJSON 文件

read_parquet · read_csv · read_ipc

import polars as pl

source = "s3://bucket/*.parquet"

df = pl.read_parquet(source)

ParquetReader · CsvReader · IpcReader · 可在 csv 特性中使用 · 可在 parquet 特性中使用 · 可在 ipc 特性中使用

use aws_config::BehaviorVersion;
use polars::prelude::*;

#[tokio::main]
async fn main() {
    let bucket = "<YOUR_BUCKET>";
    let path = "<YOUR_PATH>";

    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = aws_sdk_s3::Client::new(&config);

    let object = client
        .get_object()
        .bucket(bucket)
        .key(path)
        .send()
        .await
        .unwrap();

    let bytes = object.body.collect().await.unwrap().into_bytes();

    let cursor = std::io::Cursor::new(bytes);
    let df = CsvReader::new(cursor).finish().unwrap();

    println!("{df:?}");
}

通过查询优化从云存储扫描

使用 pl.scan_* 函数从云存储读取可以受益于 谓词下推和投影下推,查询优化器会在文件下载前应用它们。这可以显著减少需要下载的数据量。通过调用 collect 触发查询评估。

import polars as pl

source = "s3://bucket/*.parquet"

df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()

云身份验证

Polars 能够自动加载某些云提供商的默认凭据配置。如果未能自动加载,可以手动配置 Polars 用于身份验证的凭据。可以通过以下几种方式完成:

使用 storage_options

  • 凭据可以作为配置键通过 storage_options 参数在一个字典中传递。

scan_parquet

import polars as pl

source = "s3://bucket/*.parquet"

storage_options = {
    "aws_access_key_id": "<secret>",
    "aws_secret_access_key": "<secret>",
    "aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options).collect()

使用可用的 CredentialProvider* 工具类之一

  • 可能存在一个名为 pl.CredentialProvider* 的工具类,它提供了所需的身份验证功能。例如,pl.CredentialProviderAWS 支持选择 AWS 配置文件以及假定 IAM 角色。

scan_parquet · CredentialProviderAWS

lf = pl.scan_parquet(
    "s3://.../...",
    credential_provider=pl.CredentialProviderAWS(
        profile_name="..."
        assume_role={
            "RoleArn": f"...",
            "RoleSessionName": "...",
        }
    ),
)

df = lf.collect()

使用自定义的 credential_provider 函数

  • 某些环境可能需要自定义身份验证逻辑(例如 AWS IAM 角色链)。对于这些情况,可以提供一个 Python 函数供 Polars 使用以检索凭据。

scan_parquet

def get_credentials() -> pl.CredentialProviderFunctionReturn:
    expiry = None

    return {
        "aws_access_key_id": "...",
        "aws_secret_access_key": "...",
        "aws_session_token": "...",
    }, expiry


lf = pl.scan_parquet(
    "s3://.../...",
    credential_provider=get_credentials,
)

df = lf.collect()

  • Azure 示例

scan_parquet · CredentialProviderAzure

def credential_provider():
    credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
    token = credential.get_token("https://storage.azure.com/.default")

    return {"bearer_token": token.token}, token.expires_on


pl.scan_parquet(
    "abfss://...@.../...",
    credential_provider=credential_provider,
)

# Note that for the above case, this shortcut is also available:

pl.scan_parquet(
    "abfss://...@.../...",
    credential_provider=pl.CredentialProviderAzure(
        credentials=DefaultAzureCredential(exclude_managed_identity_credential=True)
    ),
)

使用 PyArrow 进行扫描

我们还可以使用 PyArrow 从云存储扫描。这对于 Hive 分区等分区数据集特别有用。

我们首先创建一个 PyArrow 数据集,然后从该数据集创建一个 LazyFrame

scan_pyarrow_dataset

import polars as pl
import pyarrow.dataset as ds

dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
(
    pl.scan_pyarrow_dataset(dset)
    .filter(pl.col("foo") == "a")
    .select(["foo", "bar"])
    .collect()
)

写入云存储

我们可以使用 Python 中的 s3fs(用于 S3)、adlfs(用于 Azure Blob Storage)和 gcsfs(用于 Google Cloud Storage)将 DataFrame 写入云存储。在此示例中,我们将 Parquet 文件写入 S3。

write_parquet

import polars as pl
import s3fs

df = pl.DataFrame({
    "foo": ["a", "b", "c", "d", "d"],
    "bar": [1, 2, 3, 4, 5],
})

fs = s3fs.S3FileSystem()
destination = "s3://bucket/my_file.parquet"

# write parquet
with fs.open(destination, mode='wb') as f:
    df.write_parquet(f)