跳到内容

用户定义的 Python 函数

Polars 表达式非常强大和灵活,因此与其他库相比,对自定义 Python 函数的需求大大减少。不过,您可能仍需要将表达式的状态传递给第三方库,或者将您的黑盒函数应用于 Polars 中的数据。

在本文档的这一部分,我们将使用两个 API 来实现此目的

  • map_elements:在 Series 中的每个值上分别调用一个函数。
  • map_batches:始终将完整的 Series 传递给函数。

使用 map_elements() 处理单个值

让我们从最简单的情况开始:我们希望单独处理 Series 中的每个值。这是我们的数据

df = pl.DataFrame(
    {
        "keys": ["a", "a", "b", "b"],
        "values": [10, 7, 1, 23],
    }
)
print(df)
let df = df!(
    "keys" => &["a", "a", "b", "b"],
    "values" => &[10, 7, 1, 23],
)?;
println!("{}", df);
shape: (4, 2)
┌──────┬────────┐
│ keys ┆ values │
│ ---  ┆ ---    │
│ str  ┆ i64    │
╞══════╪════════╡
│ a    ┆ 10     │
│ a    ┆ 7      │
│ b    ┆ 1      │
│ b    ┆ 23     │
└──────┴────────┘

我们将对每个单独的值调用 math.log()

import math


def my_log(value):
    return math.log(value)


out = df.select(pl.col("values").map_elements(my_log, return_dtype=pl.Float64))
print(out)

shape: (4, 1)
┌──────────┐
│ values   │
│ ---      │
│ f64      │
╞══════════╡
│ 2.302585 │
│ 1.94591  │
│ 0.0      │
│ 3.135494 │
└──────────┘

虽然这有效,但 map_elements() 有两个问题

  1. 限于单个项: 通常,您会希望进行对整个 Series 进行操作的计算,而不是逐个处理单个项。
  2. 性能开销: 即使您确实想单独处理每个项,为每个单独的项调用函数也很慢;所有这些额外的函数调用会增加大量的开销。

让我们先解决第一个问题,然后再看看如何解决第二个问题。

使用 map_batches() 处理整个 Series

我们想对整个 Series 的内容运行一个自定义函数。为了演示目的,假设我们想要计算 Series 的均值与每个值之间的差值。

我们可以使用 map_batches() API 在完整的 Seriesgroup_by() 中的单个组上运行此函数

def diff_from_mean(series):
    # This will be very slow for non-trivial Series, since it's all Python
    # code:
    total = 0
    for value in series:
        total += value
    mean = total / len(series)
    return pl.Series([value - mean for value in series])


# Apply our custom function to a full Series with map_batches():
out = df.select(pl.col("values").map_batches(diff_from_mean))
print("== select() with UDF ==")
print(out)

# Apply our custom function per group:
print("== group_by() with UDF ==")
out = df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean))
print(out)

== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
│ ---    │
│ f64    │
╞════════╡
│ -0.25  │
│ -3.25  │
│ -9.25  │
│ 12.75  │
└────────┘
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values        │
│ ---  ┆ ---           │
│ str  ┆ list[f64]     │
╞══════╪═══════════════╡
│ a    ┆ [1.5, -1.5]   │
│ b    ┆ [-11.0, 11.0] │
└──────┴───────────────┘

使用用户定义函数进行快速操作

纯 Python 实现的问题在于它很慢。通常,如果您想获得快速结果,您会希望最小化您调用的 Python 代码量。

为了最大限度地提高速度,您需要确保使用用编译语言编写的函数。对于数值计算,Polars 支持 NumPy 定义的一对接口,称为 “ufuncs”“通用 ufuncs”。前者对每个项目单独运行,后者接受整个 NumPy 数组,从而实现更灵活的操作。

NumPy 和其他库(如 SciPy)附带了您可以与 Polars 一起使用的预编写 ufuncs。例如

import numpy as np

out = df.select(pl.col("values").map_batches(np.log))
print(out)

shape: (4, 1)
┌──────────┐
│ values   │
│ ---      │
│ f64      │
╞══════════╡
│ 2.302585 │
│ 1.94591  │
│ 0.0      │
│ 3.135494 │
└──────────┘

请注意,我们可以使用 map_batches(),因为 numpy.log() 能够对单个项目和整个 NumPy 数组运行。这意味着它将比我们最初的示例运行得快得多,因为我们只有一个 Python 调用,然后所有处理都在快速的低级语言中进行。

示例:使用 Numba 的快速自定义函数

NumPy 提供的预编写函数很有用,但我们的目标是编写自己的函数。例如,假设我们想要一个上面 diff_from_mean() 示例的快速版本。在 Python 中编写此函数的最简单方法是使用 Numba,它允许您在 Python 的(一个子集)中编写自定义函数,同时仍然获得编译代码的好处。

特别是,Numba 提供了一个名为 @guvectorize 的装饰器。它通过将 Python 函数编译成快速机器码来创建一个通用 ufunc,从而使其能够被 Polars 使用。

在以下示例中,diff_from_mean_numba() 将在导入时编译成快速机器码,这将花费一些时间。之后,所有对该函数的调用都将快速运行。Series 在传递给函数之前将被转换为 NumPy 数组

from numba import float64, guvectorize, int64


# This will be compiled to machine code, so it will be fast. The Series is
# converted to a NumPy array before being passed to the function. See the
# Numba documentation for more details:
# https://numba.readthedocs.io/en/stable/user/vectorize.html
@guvectorize([(int64[:], float64[:])], "(n)->(n)")
def diff_from_mean_numba(arr, result):
    total = 0
    for value in arr:
        total += value
    mean = total / len(arr)
    for i, value in enumerate(arr):
        result[i] = value - mean


out = df.select(pl.col("values").map_batches(diff_from_mean_numba))
print("== select() with UDF ==")
print(out)

out = df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean_numba))
print("== group_by() with UDF ==")
print(out)

== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
│ ---    │
│ f64    │
╞════════╡
│ -0.25  │
│ -3.25  │
│ -9.25  │
│ 12.75  │
└────────┘
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values        │
│ ---  ┆ ---           │
│ str  ┆ list[f64]     │
╞══════╪═══════════════╡
│ b    ┆ [-11.0, 11.0] │
│ a    ┆ [1.5, -1.5]   │
└──────┴───────────────┘

调用通用 ufunc 时不允许缺失数据

在传递给 diff_from_mean_numba() 这样的用户定义函数之前,Series 将被转换为 NumPy 数组。不幸的是,NumPy 数组没有缺失数据的概念。如果原始 Series 中存在缺失数据,这意味着结果数组实际上不会与 Series 匹配。

如果您逐项计算结果,这并不重要。例如,numpy.log() 是对每个单独的值分别调用的,因此这些缺失值不会改变计算。但是,如果用户定义函数的结果依赖于 Series 中的多个值,那么缺失值应该如何处理就不清楚了。

因此,当调用通用 ufuncs(例如用 @guvectorize 装饰的 Numba 函数)时,如果您尝试传入带有缺失数据的 Series,Polars 将会引发错误。如何去除缺失数据?在调用自定义函数之前,可以 填充它丢弃它

组合多个列值

如果您想将多列传递给用户定义的函数,可以使用 Struct这在另一个章节中详细介绍。基本思想是将多列组合成一个 Struct,然后函数可以从中提取回这些列

# Add two arrays together:
@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")
def add(arr, arr2, result):
    for i in range(len(arr)):
        result[i] = arr[i] + arr2[i]


df3 = pl.DataFrame({"values1": [1, 2, 3], "values2": [10, 20, 30]})

out = df3.select(
    # Create a struct that has two columns in it:
    pl.struct(["values1", "values2"])
    # Pass the struct to a lambda that then passes the individual columns to
    # the add() function:
    .map_batches(
        lambda combined: add(
            combined.struct.field("values1"), combined.struct.field("values2")
        )
    )
    .alias("add_columns")
)
print(out)

shape: (3, 1)
┌─────────────┐
│ add_columns │
│ ---         │
│ f64         │
╞═════════════╡
│ 11.0        │
│ 22.0        │
│ 33.0        │
└─────────────┘

流式计算

将完整的 Series 传递给用户定义的函数是有代价的:它可能会占用大量内存,因为其内容会被复制到 NumPy 数组中。您可以使用 is_elementwise=True 参数来 map_batches 将结果流式传输到函数中,这意味着它可能无法一次性获取所有值。

注意

如果 is_elementwise 参数设置不正确,可能会导致错误的结果。如果您设置 is_elementwise=True,请确保您的函数实际上是逐元素操作的(例如“计算每个值的对数”)——例如,我们的示例函数 diff_from_mean() 就不是。

返回类型

自定义 Python 函数通常是黑盒;Polars 不知道您的函数在做什么或它将返回什么。因此,返回数据类型是自动推断的。我们通过等待第一个非空值来实现这一点。该值将用于确定结果 Series 的类型。

Python 类型到 Polars 数据类型的映射如下

  • int -> Int64
  • float -> Float64
  • bool -> Boolean
  • str -> String
  • list[tp] -> List[tp] (其中内部类型以相同规则推断)
  • dict[str, [tp]] -> struct
  • Any -> object (始终避免这种情况)

Rust 类型映射如下

  • i32 or i64 -> Int64
  • f32 or f64 -> Float64
  • bool -> Boolean
  • String or str -> String
  • Vec<tp> -> List[tp] (其中内部类型以相同规则推断)

如果您想覆盖推断的类型,可以将 return_dtype 参数传递给 map_batches