云存储
Polars 可以读写 AWS S3、Azure Blob Storage 和 Google Cloud Storage。这三种存储提供商的 API 相同。
要从云存储读取,可能需要根据用例和云存储提供商安装额外的依赖项
$ pip install fsspec s3fs adlfs gcsfs
$ cargo add aws_sdk_s3 aws_config tokio --features tokio/full
从云存储读取
Polars 支持从云存储读取 Parquet、CSV、IPC 和 NDJSON 文件
read_parquet
· read_csv
· read_ipc
import polars as pl
source = "s3://bucket/*.parquet"
df = pl.read_parquet(source)
ParquetReader
· CsvReader
· IpcReader
· 可在 csv 特性中使用 · 可在 parquet 特性中使用 · 可在 ipc 特性中使用
use aws_config::BehaviorVersion;
use polars::prelude::*;
#[tokio::main]
async fn main() {
let bucket = "<YOUR_BUCKET>";
let path = "<YOUR_PATH>";
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = aws_sdk_s3::Client::new(&config);
let object = client
.get_object()
.bucket(bucket)
.key(path)
.send()
.await
.unwrap();
let bytes = object.body.collect().await.unwrap().into_bytes();
let cursor = std::io::Cursor::new(bytes);
let df = CsvReader::new(cursor).finish().unwrap();
println!("{df:?}");
}
通过查询优化从云存储扫描
使用 pl.scan_*
函数从云存储读取可以受益于 谓词下推和投影下推,查询优化器会在文件下载前应用它们。这可以显著减少需要下载的数据量。通过调用 collect
触发查询评估。
import polars as pl
source = "s3://bucket/*.parquet"
df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()
云身份验证
Polars 能够自动加载某些云提供商的默认凭据配置。如果未能自动加载,可以手动配置 Polars 用于身份验证的凭据。可以通过以下几种方式完成:
使用 storage_options
- 凭据可以作为配置键通过
storage_options
参数在一个字典中传递。
import polars as pl
source = "s3://bucket/*.parquet"
storage_options = {
"aws_access_key_id": "<secret>",
"aws_secret_access_key": "<secret>",
"aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options).collect()
使用可用的 CredentialProvider*
工具类之一
- 可能存在一个名为
pl.CredentialProvider*
的工具类,它提供了所需的身份验证功能。例如,pl.CredentialProviderAWS
支持选择 AWS 配置文件以及假定 IAM 角色。
scan_parquet
· CredentialProviderAWS
lf = pl.scan_parquet(
"s3://.../...",
credential_provider=pl.CredentialProviderAWS(
profile_name="..."
assume_role={
"RoleArn": f"...",
"RoleSessionName": "...",
}
),
)
df = lf.collect()
使用自定义的 credential_provider
函数
- 某些环境可能需要自定义身份验证逻辑(例如 AWS IAM 角色链)。对于这些情况,可以提供一个 Python 函数供 Polars 使用以检索凭据。
def get_credentials() -> pl.CredentialProviderFunctionReturn:
expiry = None
return {
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
"aws_session_token": "...",
}, expiry
lf = pl.scan_parquet(
"s3://.../...",
credential_provider=get_credentials,
)
df = lf.collect()
- Azure 示例
scan_parquet
· CredentialProviderAzure
def credential_provider():
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
token = credential.get_token("https://storage.azure.com/.default")
return {"bearer_token": token.token}, token.expires_on
pl.scan_parquet(
"abfss://...@.../...",
credential_provider=credential_provider,
)
# Note that for the above case, this shortcut is also available:
pl.scan_parquet(
"abfss://...@.../...",
credential_provider=pl.CredentialProviderAzure(
credentials=DefaultAzureCredential(exclude_managed_identity_credential=True)
),
)
使用 PyArrow 进行扫描
我们还可以使用 PyArrow 从云存储扫描。这对于 Hive 分区等分区数据集特别有用。
我们首先创建一个 PyArrow 数据集,然后从该数据集创建一个 LazyFrame
。
import polars as pl
import pyarrow.dataset as ds
dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
(
pl.scan_pyarrow_dataset(dset)
.filter(pl.col("foo") == "a")
.select(["foo", "bar"])
.collect()
)
写入云存储
我们可以使用 Python 中的 s3fs(用于 S3)、adlfs(用于 Azure Blob Storage)和 gcsfs(用于 Google Cloud Storage)将 DataFrame
写入云存储。在此示例中,我们将 Parquet 文件写入 S3。
import polars as pl
import s3fs
df = pl.DataFrame({
"foo": ["a", "b", "c", "d", "d"],
"bar": [1, 2, 3, 4, 5],
})
fs = s3fs.S3FileSystem()
destination = "s3://bucket/my_file.parquet"
# write parquet
with fs.open(destination, mode='wb') as f:
df.write_parquet(f)