多文件
处理多个文件。
Polars 可以根据您的需求和内存压力,以不同的方式处理多个文件。
让我们创建一些文件以便提供上下文
import polars as pl
df = pl.DataFrame({"foo": [1, 2, 3], "bar": [None, "ham", "spam"]})
for i in range(5):
df.write_csv(f"docs/assets/data/my_many_files_{i}.csv")
读取到单个 DataFrame
中
要将多个文件读取到单个 DataFrame
中,我们可以使用 globbing 模式
df = pl.read_csv("docs/assets/data/my_many_files_*.csv")
print(df)
shape: (15, 2)
┌─────┬──────┐
│ foo ┆ bar │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪══════╡
│ 1 ┆ null │
│ 2 ┆ ham │
│ 3 ┆ spam │
│ 1 ┆ null │
│ 2 ┆ ham │
│ … ┆ … │
│ 2 ┆ ham │
│ 3 ┆ spam │
│ 1 ┆ null │
│ 2 ┆ ham │
│ 3 ┆ spam │
└─────┴──────┘
为了解其工作原理,我们可以查看查询计划。下面我们看到所有文件都被单独读取并连接成一个 DataFrame
。Polars 会尝试并行化读取。
pl.scan_csv("docs/assets/data/my_many_files_*.csv").show_graph()
并行读取和处理
如果您的文件不必在单个表中,您也可以为每个文件构建查询计划,并在 Polars 线程池上并行执行它们。
所有查询计划的执行都是“非常并行”(embarrassingly parallel)的,不需要任何通信。
import glob
import polars as pl
queries = []
for file in glob.glob("docs/assets/data/my_many_files_*.csv"):
q = pl.scan_csv(file).group_by("bar").agg(pl.len(), pl.sum("foo"))
queries.append(q)
dataframes = pl.collect_all(queries)
print(dataframes)
[shape: (3, 3)
┌──────┬─────┬─────┐
│ bar ┆ len ┆ foo │
│ --- ┆ --- ┆ --- │
│ str ┆ u32 ┆ i64 │
╞══════╪═════╪═════╡
│ ham ┆ 1 ┆ 2 │
│ null ┆ 1 ┆ 1 │
│ spam ┆ 1 ┆ 3 │
└──────┴─────┴─────┘, shape: (3, 3)
┌──────┬─────┬─────┐
│ bar ┆ len ┆ foo │
│ --- ┆ --- ┆ --- │
│ str ┆ u32 ┆ i64 │
╞══════╪═════╪═════╡
│ null ┆ 1 ┆ 1 │
│ ham ┆ 1 ┆ 2 │
│ spam ┆ 1 ┆ 3 │
└──────┴─────┴─────┘, shape: (3, 3)
┌──────┬─────┬─────┐
│ bar ┆ len ┆ foo │
│ --- ┆ --- ┆ --- │
│ str ┆ u32 ┆ i64 │
╞══════╪═════╪═════╡
│ null ┆ 1 ┆ 1 │
│ spam ┆ 1 ┆ 3 │
│ ham ┆ 1 ┆ 2 │
└──────┴─────┴─────┘, shape: (3, 3)
┌──────┬─────┬─────┐
│ bar ┆ len ┆ foo │
│ --- ┆ --- ┆ --- │
│ str ┆ u32 ┆ i64 │
╞══════╪═════╪═════╡
│ spam ┆ 1 ┆ 3 │
│ null ┆ 1 ┆ 1 │
│ ham ┆ 1 ┆ 2 │
└──────┴─────┴─────┘, shape: (3, 3)
┌──────┬─────┬─────┐
│ bar ┆ len ┆ foo │
│ --- ┆ --- ┆ --- │
│ str ┆ u32 ┆ i64 │
╞══════╪═════╪═════╡
│ ham ┆ 1 ┆ 2 │
│ null ┆ 1 ┆ 1 │
│ spam ┆ 1 ┆ 3 │
└──────┴─────┴─────┘]