用户定义的 Python 函数
Polars 表达式非常强大和灵活,因此与其他库相比,对自定义 Python 函数的需求大大减少。不过,您可能仍需要将表达式的状态传递给第三方库,或者将您的黑盒函数应用于 Polars 中的数据。
在本文档的这一部分,我们将使用两个 API 来实现此目的
-
map_elements
:在Series
中的每个值上分别调用一个函数。 -
map_batches
:始终将完整的Series
传递给函数。
使用 map_elements()
处理单个值
让我们从最简单的情况开始:我们希望单独处理 Series
中的每个值。这是我们的数据
df = pl.DataFrame(
{
"keys": ["a", "a", "b", "b"],
"values": [10, 7, 1, 23],
}
)
print(df)
let df = df!(
"keys" => &["a", "a", "b", "b"],
"values" => &[10, 7, 1, 23],
)?;
println!("{}", df);
shape: (4, 2)
┌──────┬────────┐
│ keys ┆ values │
│ --- ┆ --- │
│ str ┆ i64 │
╞══════╪════════╡
│ a ┆ 10 │
│ a ┆ 7 │
│ b ┆ 1 │
│ b ┆ 23 │
└──────┴────────┘
我们将对每个单独的值调用 math.log()
import math
def my_log(value):
return math.log(value)
out = df.select(pl.col("values").map_elements(my_log, return_dtype=pl.Float64))
print(out)
shape: (4, 1)
┌──────────┐
│ values │
│ --- │
│ f64 │
╞══════════╡
│ 2.302585 │
│ 1.94591 │
│ 0.0 │
│ 3.135494 │
└──────────┘
虽然这有效,但 map_elements()
有两个问题
- 限于单个项: 通常,您会希望进行对整个
Series
进行操作的计算,而不是逐个处理单个项。 - 性能开销: 即使您确实想单独处理每个项,为每个单独的项调用函数也很慢;所有这些额外的函数调用会增加大量的开销。
让我们先解决第一个问题,然后再看看如何解决第二个问题。
使用 map_batches()
处理整个 Series
我们想对整个 Series
的内容运行一个自定义函数。为了演示目的,假设我们想要计算 Series
的均值与每个值之间的差值。
我们可以使用 map_batches()
API 在完整的 Series
或 group_by()
中的单个组上运行此函数
def diff_from_mean(series):
# This will be very slow for non-trivial Series, since it's all Python
# code:
total = 0
for value in series:
total += value
mean = total / len(series)
return pl.Series([value - mean for value in series])
# Apply our custom function to a full Series with map_batches():
out = df.select(pl.col("values").map_batches(diff_from_mean))
print("== select() with UDF ==")
print(out)
# Apply our custom function per group:
print("== group_by() with UDF ==")
out = df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean))
print(out)
== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
│ --- │
│ f64 │
╞════════╡
│ -0.25 │
│ -3.25 │
│ -9.25 │
│ 12.75 │
└────────┘
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values │
│ --- ┆ --- │
│ str ┆ list[f64] │
╞══════╪═══════════════╡
│ a ┆ [1.5, -1.5] │
│ b ┆ [-11.0, 11.0] │
└──────┴───────────────┘
使用用户定义函数进行快速操作
纯 Python 实现的问题在于它很慢。通常,如果您想获得快速结果,您会希望最小化您调用的 Python 代码量。
为了最大限度地提高速度,您需要确保使用用编译语言编写的函数。对于数值计算,Polars 支持 NumPy 定义的一对接口,称为 “ufuncs” 和 “通用 ufuncs”。前者对每个项目单独运行,后者接受整个 NumPy 数组,从而实现更灵活的操作。
NumPy 和其他库(如 SciPy)附带了您可以与 Polars 一起使用的预编写 ufuncs。例如
import numpy as np
out = df.select(pl.col("values").map_batches(np.log))
print(out)
shape: (4, 1)
┌──────────┐
│ values │
│ --- │
│ f64 │
╞══════════╡
│ 2.302585 │
│ 1.94591 │
│ 0.0 │
│ 3.135494 │
└──────────┘
请注意,我们可以使用 map_batches()
,因为 numpy.log()
能够对单个项目和整个 NumPy 数组运行。这意味着它将比我们最初的示例运行得快得多,因为我们只有一个 Python 调用,然后所有处理都在快速的低级语言中进行。
示例:使用 Numba 的快速自定义函数
NumPy 提供的预编写函数很有用,但我们的目标是编写自己的函数。例如,假设我们想要一个上面 diff_from_mean()
示例的快速版本。在 Python 中编写此函数的最简单方法是使用 Numba,它允许您在 Python 的(一个子集)中编写自定义函数,同时仍然获得编译代码的好处。
特别是,Numba 提供了一个名为 @guvectorize
的装饰器。它通过将 Python 函数编译成快速机器码来创建一个通用 ufunc,从而使其能够被 Polars 使用。
在以下示例中,diff_from_mean_numba()
将在导入时编译成快速机器码,这将花费一些时间。之后,所有对该函数的调用都将快速运行。Series
在传递给函数之前将被转换为 NumPy 数组
from numba import float64, guvectorize, int64
# This will be compiled to machine code, so it will be fast. The Series is
# converted to a NumPy array before being passed to the function. See the
# Numba documentation for more details:
# https://numba.readthedocs.io/en/stable/user/vectorize.html
@guvectorize([(int64[:], float64[:])], "(n)->(n)")
def diff_from_mean_numba(arr, result):
total = 0
for value in arr:
total += value
mean = total / len(arr)
for i, value in enumerate(arr):
result[i] = value - mean
out = df.select(pl.col("values").map_batches(diff_from_mean_numba))
print("== select() with UDF ==")
print(out)
out = df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean_numba))
print("== group_by() with UDF ==")
print(out)
== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
│ --- │
│ f64 │
╞════════╡
│ -0.25 │
│ -3.25 │
│ -9.25 │
│ 12.75 │
└────────┘
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values │
│ --- ┆ --- │
│ str ┆ list[f64] │
╞══════╪═══════════════╡
│ b ┆ [-11.0, 11.0] │
│ a ┆ [1.5, -1.5] │
└──────┴───────────────┘
调用通用 ufunc 时不允许缺失数据
在传递给 diff_from_mean_numba()
这样的用户定义函数之前,Series
将被转换为 NumPy 数组。不幸的是,NumPy 数组没有缺失数据的概念。如果原始 Series
中存在缺失数据,这意味着结果数组实际上不会与 Series
匹配。
如果您逐项计算结果,这并不重要。例如,numpy.log()
是对每个单独的值分别调用的,因此这些缺失值不会改变计算。但是,如果用户定义函数的结果依赖于 Series
中的多个值,那么缺失值应该如何处理就不清楚了。
因此,当调用通用 ufuncs(例如用 @guvectorize
装饰的 Numba 函数)时,如果您尝试传入带有缺失数据的 Series
,Polars 将会引发错误。如何去除缺失数据?在调用自定义函数之前,可以 填充它 或 丢弃它。
组合多个列值
如果您想将多列传递给用户定义的函数,可以使用 Struct
,这在另一个章节中详细介绍。基本思想是将多列组合成一个 Struct
,然后函数可以从中提取回这些列
# Add two arrays together:
@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")
def add(arr, arr2, result):
for i in range(len(arr)):
result[i] = arr[i] + arr2[i]
df3 = pl.DataFrame({"values1": [1, 2, 3], "values2": [10, 20, 30]})
out = df3.select(
# Create a struct that has two columns in it:
pl.struct(["values1", "values2"])
# Pass the struct to a lambda that then passes the individual columns to
# the add() function:
.map_batches(
lambda combined: add(
combined.struct.field("values1"), combined.struct.field("values2")
)
)
.alias("add_columns")
)
print(out)
shape: (3, 1)
┌─────────────┐
│ add_columns │
│ --- │
│ f64 │
╞═════════════╡
│ 11.0 │
│ 22.0 │
│ 33.0 │
└─────────────┘
流式计算
将完整的 Series
传递给用户定义的函数是有代价的:它可能会占用大量内存,因为其内容会被复制到 NumPy 数组中。您可以使用 is_elementwise=True
参数来 map_batches
将结果流式传输到函数中,这意味着它可能无法一次性获取所有值。
注意
如果 is_elementwise
参数设置不正确,可能会导致错误的结果。如果您设置 is_elementwise=True
,请确保您的函数实际上是逐元素操作的(例如“计算每个值的对数”)——例如,我们的示例函数 diff_from_mean()
就不是。
返回类型
自定义 Python 函数通常是黑盒;Polars 不知道您的函数在做什么或它将返回什么。因此,返回数据类型是自动推断的。我们通过等待第一个非空值来实现这一点。该值将用于确定结果 Series
的类型。
Python 类型到 Polars 数据类型的映射如下
int
->Int64
float
->Float64
bool
->Boolean
str
->String
list[tp]
->List[tp]
(其中内部类型以相同规则推断)dict[str, [tp]]
->struct
Any
->object
(始终避免这种情况)
Rust 类型映射如下
i32
ori64
->Int64
f32
orf64
->Float64
bool
->Boolean
String
orstr
->String
Vec<tp>
->List[tp]
(其中内部类型以相同规则推断)
如果您想覆盖推断的类型,可以将 return_dtype
参数传递给 map_batches
。