跳到内容

表达式插件

表达式插件是创建用户定义函数的首选方式。它们允许您编译 Rust 函数并将其注册为 Polars 库中的表达式。Polars 引擎将在运行时动态链接您的函数,您的表达式将以几乎与原生表达式一样快的速度运行。请注意,这不会受到 Python 的任何干扰,因此不会有 GIL 争用。

它们将受益于默认表达式所具有的相同优势

  • 优化
  • 并行性
  • Rust 原生性能

为了开始,我们将了解创建自定义表达式所需的内容。

我们的第一个自定义表达式:猪拉丁语

对于我们的第一个表达式,我们将创建一个猪拉丁语转换器。猪拉丁语是一种有趣的语言游戏,其中每个单词的首字母被移除,添加到单词末尾,最后加上“ay”。所以单词“pig”会转换为“igpay”。

当然,我们已经可以使用表达式来完成,例如 col("name").str.slice(1) + col("name").str.slice(0, 1) + "ay",但为此创建一个专门的函数会表现更好,并能让我们了解插件。

设置

我们从一个新的库开始,其 Cargo.toml 文件如下

[package]
name = "expression_lib"
version = "0.1.0"
edition = "2021"

[lib]
name = "expression_lib"
crate-type = ["cdylib"]

[dependencies]
polars = { version = "*" }
pyo3 = { version = "*", features = ["extension-module", "abi3-py38"] }
pyo3-polars = { version = "*", features = ["derive"] }
serde = { version = "*", features = ["derive"] }

编写表达式

在这个库中,我们创建一个辅助函数,将 &str 转换为猪拉丁语,并且我们创建了一个将作为表达式暴露的函数。要暴露一个函数,我们必须添加 #[polars_expr(output_type=DataType)] 属性,并且该函数必须始终接受 inputs: &[Series] 作为其第一个参数。

// src/expressions.rs
use polars::prelude::*;
use pyo3_polars::derive::polars_expr;
use std::fmt::Write;

fn pig_latin_str(value: &str, output: &mut String) {
    if let Some(first_char) = value.chars().next() {
        write!(output, "{}{}ay", &value[1..], first_char).unwrap()
    }
}

#[polars_expr(output_type=String)]
fn pig_latinnify(inputs: &[Series]) -> PolarsResult<Series> {
    let ca = inputs[0].str()?;
    let out: StringChunked = ca.apply_into_string_amortized(pig_latin_str);
    Ok(out.into_series())
}

请注意,我们使用 apply_into_string_amortized 而不是 apply_values,以避免为每一行分配新的字符串。如果您的插件接受多个输入,按元素操作,并生成 String 输出,那么您可能需要查看 polars::prelude::arity 中的 binary_elementwise_into_string_amortized 实用函数。

这就是 Rust 端所需的一切。在 Python 端,我们必须设置一个与 Cargo.toml 中定义的名称相同的文件夹,在本例中为“expression_lib”。我们将在与 Rust src 文件夹相同的目录下创建一个名为 expression_lib 的文件夹,并创建一个 expression_lib/__init__.py 文件。最终的文件结构应该看起来像这样

├── 📁 expression_lib/  # name must match "lib.name" in Cargo.toml
|   └── __init__.py
|
├── 📁src/
|   ├── lib.rs
|   └── expressions.rs
|
├── Cargo.toml
└── pyproject.toml

然后我们创建一个新的 Language 类,它将为我们新的 expr.language 命名空间保存表达式。我们的表达式的函数名可以被注册。请注意,这个名称正确很重要,否则主 Polars 包无法解析函数名。此外,我们可以设置额外的关键字参数,向 Polars 解释此表达式的行为方式。在本例中,我们告诉 Polars 这个函数是按元素操作的。这允许 Polars 以批处理方式运行此表达式。而对于其他操作,这是不允许的,例如排序或切片。

# expression_lib/__init__.py
from pathlib import Path
from typing import TYPE_CHECKING

import polars as pl
from polars.plugins import register_plugin_function
from polars._typing import IntoExpr

PLUGIN_PATH = Path(__file__).parent

def pig_latinnify(expr: IntoExpr) -> pl.Expr:
    """Pig-latinnify expression."""
    return register_plugin_function(
        plugin_path=PLUGIN_PATH,
        function_name="pig_latinnify",
        args=expr,
        is_elementwise=True,
    )

然后我们可以在我们的环境中通过安装 maturin 并运行 maturin develop --release 来编译这个库。

就是这样。我们的表达式已准备好使用!

import polars as pl
from expression_lib import pig_latinnify

df = pl.DataFrame(
    {
        "convert": ["pig", "latin", "is", "silly"],
    }
)
out = df.with_columns(pig_latin=pig_latinnify("convert"))

另外,您可以注册自定义命名空间,这使您能够编写

out = df.with_columns(
    pig_latin=pl.col("convert").language.pig_latinnify(),
)

接受 kwargs

如果您想在 Polars 表达式中接受 kwargs(关键字参数),您只需定义一个 Rust struct 并确保它派生 serde::Deserialize

/// Provide your own kwargs struct with the proper schema and accept that type
/// in your plugin expression.
#[derive(Deserialize)]
pub struct MyKwargs {
    float_arg: f64,
    integer_arg: i64,
    string_arg: String,
    boolean_arg: bool,
}

/// If you want to accept `kwargs`. You define a `kwargs` argument
/// on the second position in you plugin. You can provide any custom struct that is deserializable
/// with the pickle protocol (on the Rust side).
#[polars_expr(output_type=String)]
fn append_kwargs(input: &[Series], kwargs: MyKwargs) -> PolarsResult<Series> {
    let input = &input[0];
    let input = input.cast(&DataType::String)?;
    let ca = input.str().unwrap();

    Ok(ca
        .apply_into_string_amortized(|val, buf| {
            write!(
                buf,
                "{}-{}-{}-{}-{}",
                val, kwargs.float_arg, kwargs.integer_arg, kwargs.string_arg, kwargs.boolean_arg
            )
                .unwrap()
        })
        .into_series())
}

在 Python 端,当我们注册插件时,可以传递 kwargs

def append_args(
    expr: IntoExpr,
    float_arg: float,
    integer_arg: int,
    string_arg: str,
    boolean_arg: bool,
) -> pl.Expr:
    """
    This example shows how arguments other than `Series` can be used.
    """
    return register_plugin_function(
        plugin_path=PLUGIN_PATH,
        function_name="append_kwargs",
        args=expr,
        kwargs={
            "float_arg": float_arg,
            "integer_arg": integer_arg,
            "string_arg": string_arg,
            "boolean_arg": boolean_arg,
        },
        is_elementwise=True,
    )

输出数据类型

输出数据类型当然不必是固定的。它们通常取决于表达式的输入类型。为了适应这种情况,您可以为 #[polars_expr()] 宏提供一个指向函数的 output_type_func 参数。此函数可以将输入字段 &[Field] 映射到输出 Field(名称和数据类型)。

在下面的代码片段中,有一个我们使用实用工具 FieldsMapper 来帮助进行这种映射的示例。

use polars_plan::dsl::FieldsMapper;

fn haversine_output(input_fields: &[Field]) -> PolarsResult<Field> {
    FieldsMapper::new(input_fields).map_to_float_dtype()
}

#[polars_expr(output_type_func=haversine_output)]
fn haversine(inputs: &[Series]) -> PolarsResult<Series> {
    let out = match inputs[0].dtype() {
        DataType::Float32 => {
            let start_lat = inputs[0].f32().unwrap();
            let start_long = inputs[1].f32().unwrap();
            let end_lat = inputs[2].f32().unwrap();
            let end_long = inputs[3].f32().unwrap();
            crate::distances::naive_haversine(start_lat, start_long, end_lat, end_long)?
                .into_series()
        }
        DataType::Float64 => {
            let start_lat = inputs[0].f64().unwrap();
            let start_long = inputs[1].f64().unwrap();
            let end_lat = inputs[2].f64().unwrap();
            let end_long = inputs[3].f64().unwrap();
            crate::distances::naive_haversine(start_lat, start_long, end_lat, end_long)?
                .into_series()
        }
        _ => polars_bail!(InvalidOperation: "only supported for float types"),
    };
    Ok(out)
}

这就是您入门所需了解的全部内容。请查看这个仓库,了解这一切是如何协同工作的,并参阅本教程,以获得更深入的理解。