查询多路复用
在源和目标页面,我们已经讨论了多路复用作为将查询拆分为多个目标的方法。本页将深入探讨此概念,因为它在将LazyFrame
与过程式编程结构结合时非常重要。
在处理急切(eager)DataFrame时,将状态保存在临时变量中是很常见的。让我们看下面的例子。下面我们创建一个DataFrame
,其中包含10个随机顺序的唯一元素(这样Polars就不会对排序键触发任何快速路径)。
np.random.seed(0)
a = np.arange(0, 10)
np.random.shuffle(a)
df = pl.DataFrame({"n": a})
print(df)
shape: (10, 1)
┌─────┐
│ n │
│ --- │
│ i64 │
╞═════╡
│ 2 │
│ 8 │
│ 4 │
│ 9 │
│ 1 │
│ 6 │
│ 7 │
│ 3 │
│ 0 │
│ 5 │
└─────┘
急切(Eager)
如果您使用 Polars 的急切(eager)API,创建一个变量并遍历该临时DataFrame
会得到您期望的结果,因为分组(group-by)的结果存储在df1
中。尽管输出顺序不稳定,但这无关紧要,因为它会急切地进行评估。因此,以下代码片段不会引发错误,并且断言会通过。
# A group-by doesn't guarantee order
df1 = df.group_by("n").len()
# Take the lower half and the upper half in a list
out = [df1.slice(offset=i * 5, length=5) for i in range(2)]
# Assert df1 is equal to the sum of both halves
pl.testing.assert_frame_equal(df1, pl.concat(out))
惰性模式 (Lazy)
现在,如果我们天真地尝试将其与LazyFrame
结合使用,这将失败。
lf1 = df.lazy().group_by("n").len()
out = [lf1.slice(offset=i * 5, length=5).collect() for i in range(2)]
pl.testing.assert_frame_equal(lf1.collect(), pl.concat(out))
AssertionError: DataFrames are different (value mismatch for column 'n')
[left]: [9, 2, 0, 5, 3, 1, 7, 8, 6, 4]
[right]: [0, 9, 6, 8, 2, 5, 4, 3, 1, 7]
这失败的原因是lf1
不包含df.lazy().group_by("n").len()
的物化结果,它而是将查询计划保存在该变量中。
这意味着每次我们从此LazyFrame
分支并调用collect
时,我们都会重新评估分组。除了开销大之外,如果您假设输出是稳定的(此处并非如此),这还会导致意想不到的结果。
在上面的例子中,您实际上评估了 2 个查询计划
计划 1
计划 2
合并查询计划
为了规避这一点,我们必须让 Polars 有机会在一次优化和执行过程中查看所有查询计划。这可以通过将发散的LazyFrame
传递给collect_all
函数来完成。
lf1 = df.lazy().group_by("n").len()
out = [lf1.slice(offset=i * 5, length=5) for i in range(2)]
results = pl.collect_all([lf1] + out)
pl.testing.assert_frame_equal(results[0], pl.concat(results[1:]))
如果我们使用pl.explain_all
解释组合查询,我们还可以观察到它们在单个“SINK_MULTIPLE”评估下共享,并且优化器已经识别出查询的部分来自相同的子计划,这由插入的“CACHE”节点表示。
SINK_MULTIPLE
PLAN 0:
CACHE[id: 7fac092154a0, cache_hits: 2]
AGGREGATE[maintain_order: false]
[len()] BY [col("n")]
FROM
DF ["n"]; PROJECT */1 COLUMNS
PLAN 1:
SLICE[offset: 0, len: 5]
CACHE[id: 7fac092154a0, cache_hits: 2]
AGGREGATE[maintain_order: false]
[len()] BY [col("n")]
FROM
DF ["n"]; PROJECT */1 COLUMNS
PLAN 2:
SLICE[offset: 5, len: 5]
CACHE[id: 7fac092154a0, cache_hits: 2]
AGGREGATE[maintain_order: false]
[len()] BY [col("n")]
FROM
DF ["n"]; PROJECT */1 COLUMNS
END SINK_MULTIPLE
因此,使用pl.collect_all
将相关的子计划组合到单个执行单元中,可以大幅提高性能,并允许发散的查询计划、存储临时表和更过程式的编程风格。