跳到内容

查询多路复用

源和目标页面,我们已经讨论了多路复用作为将查询拆分为多个目标的方法。本页将深入探讨此概念,因为它在将LazyFrame与过程式编程结构结合时非常重要。

在处理急切(eager)DataFrame时,将状态保存在临时变量中是很常见的。让我们看下面的例子。下面我们创建一个DataFrame,其中包含10个随机顺序的唯一元素(这样Polars就不会对排序键触发任何快速路径)。

np.random.seed(0)
a = np.arange(0, 10)
np.random.shuffle(a)
df = pl.DataFrame({"n": a})
print(df)
shape: (10, 1)
┌─────┐
│ n   │
│ --- │
│ i64 │
╞═════╡
│ 2   │
│ 8   │
│ 4   │
│ 9   │
│ 1   │
│ 6   │
│ 7   │
│ 3   │
│ 0   │
│ 5   │
└─────┘

急切(Eager)

如果您使用 Polars 的急切(eager)API,创建一个变量并遍历该临时DataFrame会得到您期望的结果,因为分组(group-by)的结果存储在df1中。尽管输出顺序不稳定,但这无关紧要,因为它会急切地进行评估。因此,以下代码片段不会引发错误,并且断言会通过。

# A group-by doesn't guarantee order
df1 = df.group_by("n").len()

# Take the lower half and the upper half in a list
out = [df1.slice(offset=i * 5, length=5) for i in range(2)]

# Assert df1 is equal to the sum of both halves
pl.testing.assert_frame_equal(df1, pl.concat(out))

惰性模式 (Lazy)

现在,如果我们天真地尝试将其与LazyFrame结合使用,这将失败。

lf1 = df.lazy().group_by("n").len()

out = [lf1.slice(offset=i * 5, length=5).collect() for i in range(2)]

pl.testing.assert_frame_equal(lf1.collect(), pl.concat(out))
AssertionError: DataFrames are different (value mismatch for column 'n')
[left]:  [9, 2, 0, 5, 3, 1, 7, 8, 6, 4]
[right]: [0, 9, 6, 8, 2, 5, 4, 3, 1, 7]

这失败的原因是lf1不包含df.lazy().group_by("n").len()的物化结果,它而是将查询计划保存在该变量中。

这意味着每次我们从此LazyFrame分支并调用collect时,我们都会重新评估分组。除了开销大之外,如果您假设输出是稳定的(此处并非如此),这还会导致意想不到的结果。

在上面的例子中,您实际上评估了 2 个查询计划

计划 1

计划 2

合并查询计划

为了规避这一点,我们必须让 Polars 有机会在一次优化和执行过程中查看所有查询计划。这可以通过将发散的LazyFrame传递给collect_all函数来完成。

lf1 = df.lazy().group_by("n").len()

out = [lf1.slice(offset=i * 5, length=5) for i in range(2)]
results = pl.collect_all([lf1] + out)

pl.testing.assert_frame_equal(results[0], pl.concat(results[1:]))

如果我们使用pl.explain_all解释组合查询,我们还可以观察到它们在单个“SINK_MULTIPLE”评估下共享,并且优化器已经识别出查询的部分来自相同的子计划,这由插入的“CACHE”节点表示。

SINK_MULTIPLE
  PLAN 0:
    CACHE[id: 7fac092154a0, cache_hits: 2]
      AGGREGATE[maintain_order: false]
        [len()] BY [col("n")]
        FROM
        DF ["n"]; PROJECT */1 COLUMNS
  PLAN 1:
    SLICE[offset: 0, len: 5]
      CACHE[id: 7fac092154a0, cache_hits: 2]
        AGGREGATE[maintain_order: false]
          [len()] BY [col("n")]
          FROM
          DF ["n"]; PROJECT */1 COLUMNS
  PLAN 2:
    SLICE[offset: 5, len: 5]
      CACHE[id: 7fac092154a0, cache_hits: 2]
        AGGREGATE[maintain_order: false]
          [len()] BY [col("n")]
          FROM
          DF ["n"]; PROJECT */1 COLUMNS
END SINK_MULTIPLE

因此,使用pl.collect_all将相关的子计划组合到单个执行单元中,可以大幅提高性能,并允许发散的查询计划、存储临时表和更过程式的编程风格。