跳到内容

源和汇

扫描

在使用 LazyFrame API 时,优先使用 scan_* (例如 scan_parquetscan_csv 等) 而非 read_* 非常重要。Polars 的 scan 是惰性的,会延迟执行直到查询被收集。这样做的好处是 Polars 优化器可以将优化下推到读取器中。它们可以跳过读取不需要的列和行。另一个好处是,在流式执行期间,引擎可以在文件完全读取之前就开始处理批次数据。

接收器

接收器可以执行查询并将结果流式传输到存储(磁盘或云)。将数据接收到存储的好处是,您不一定需要将所有数据存储在 RAM 中,而是可以分批处理数据。

如果我们想将多个 CSV 文件转换为 Parquet,同时丢弃缺失数据,我们可以执行如下查询。我们使用一种分区策略,该策略定义了单个 Parquet 文件中允许的最大行数,在此之前我们会生成一个新的文件。

lf = scan_csv("my_dataset/*.csv").filter(pl.all().is_not_null())
lf.sink_parquet(
    pl.PartitionMaxSize(
        "my_table_{part}.parquet"
        max_size=512_000
    )
)

这将在磁盘上创建以下文件

my_table_0.parquet
my_table_1.parquet
...
my_table_n.parquet

多路复用接收器

接收器也可以多路复用。这意味着我们可以在一个查询中同时写入不同的接收器。在下面的代码片段中,我们获取一个 LazyFrame 并同时将其接收到两个接收器中。

# Some expensive computation
lf: LazyFrame 

q1 = lf.sink_parquet(.., lazy=True)
q2 = lf.sink_ipc(.., lazy=True)

lf.collect_all([q1, q2])