跳到内容

多进程

TLDR: 如果您发现将 Python 内置的 multiprocessing 模块与 Polars 一起使用导致 Polars 报告多进程方法错误,您应该确保使用 spawn 作为启动方法,而不是 fork

from multiprocessing import get_context


def my_fun(s):
    print(s)


with get_context("spawn").Pool() as pool:
    pool.map(my_fun, ["input1", "input2", ...])

何时不应使用多进程

在我们深入细节之前,重要的是要强调 Polars 从一开始就被设计为使用您所有的 CPU 核心。它通过在单独的线程中并行执行可计算任务来实现这一点。例如,在 select 语句中请求两个表达式可以并行完成,结果只在最后合并。另一个例子是使用 group_by().agg(<expr>) 在组内聚合值,每个组都可以单独评估。在这些情况下,multiprocessing 模块不太可能提高您的代码性能。如果您将 Polars 与 GPU 引擎一起使用,也应避免手动使用多进程。当同时使用时,它们会争夺系统内存和处理能力,从而导致性能下降。

更多优化请参阅优化部分

何时应使用多进程

尽管 Polars 是多线程的,但其他库可能是单线程的。当其他库是瓶颈并且手头的问题是可并行的时,使用多进程来提高速度是合理的。

默认多进程配置的问题

总结

Python 多进程文档列出了创建进程池的三种方法

  1. spawn
  2. fork
  3. forkserver

fork 的描述是(截至 2022-10-15)

父进程使用 os.fork() 来 fork Python 解释器。子进程开始时,与父进程实际上是相同的。父进程的所有资源都被子进程继承。请注意,安全地 fork 多线程进程是存在问题的。

仅在 Unix 上可用。Unix 上的默认设置。

简而言之:Polars 是多线程的,旨在提供开箱即用的强大性能。因此,它不能与 fork 结合使用。如果您在 Unix 系统(Linux、BSD 等)上,除非您明确覆盖它,否则您正在使用 fork

您之前可能没有遇到这种情况的原因是纯 Python 代码和大多数 Python 库(大部分)是单线程的。或者,您在 Windows 或 MacOS 上,这些系统上 fork 甚至不可用作方法(对于 MacOS,直到 Python 3.7)。

因此,应该使用 spawnforkserverspawn 在所有平台上都可用,是最安全的选择,因此是推荐的方法。

示例

fork 的问题在于复制父进程。考虑下面的例子,这是在 Polars 问题跟踪器上发布的一个略有修改的例子

import multiprocessing
import polars as pl


def test_sub_process(df: pl.DataFrame, job_id):
    df_filtered = df.filter(pl.col("a") > 0)
    print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")


def create_dataset():
    return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})


def setup():
    # some setup work
    df = create_dataset()
    df.write_parquet("/tmp/test.parquet")


def main():
    test_df = pl.read_parquet("/tmp/test.parquet")

    for i in range(0, 5):
        proc = multiprocessing.get_context("spawn").Process(
            target=test_sub_process, args=(test_df, i)
        )
        proc.start()
        proc.join()

        print(f"Executed sub process {i}")


if __name__ == "__main__":
    setup()
    main()

使用 fork 作为方法而不是 spawn 将导致死锁。

fork 方法等同于调用 os.fork(),这是 POSIX 标准中定义的系统调用

进程应以单个线程创建。如果多线程进程调用 fork(),新进程应包含调用线程的副本及其整个地址空间,可能包括互斥量和其他资源的状态。因此,为了避免错误,子进程只能执行异步信号安全操作,直到调用其中一个 exec 函数。

相比之下,spawn 将创建一个全新的 Python 解释器,并且不会继承互斥量的状态。

那么代码示例中发生了什么?使用 pl.read_parquet 读取文件时,文件必须被锁定。然后调用 os.fork(),复制父进程的状态,包括互斥量。因此,所有子进程都将以已获取状态复制文件锁,使它们无限期地挂起,等待文件锁被释放,而这永远不会发生。

使这些问题调试起来棘手的是 fork 有时也能工作。将示例更改为不调用 pl.read_parquet

import multiprocessing
import polars as pl


def test_sub_process(df: pl.DataFrame, job_id):
    df_filtered = df.filter(pl.col("a") > 0)
    print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")


def create_dataset():
    return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})


def main():
    test_df = create_dataset()

    for i in range(0, 5):
        proc = multiprocessing.get_context("fork").Process(
            target=test_sub_process, args=(test_df, i)
        )
        proc.start()
        proc.join()

        print(f"Executed sub process {i}")


if __name__ == "__main__":
    main()

这工作正常。因此,在更大的代码库中调试这些问题(即不是这里的小玩具示例)可能会非常痛苦,因为一个看似无关的更改可能会破坏您的多进程代码。总的来说,除非有非常特殊的需求无法通过其他方式满足,否则绝不应将 fork 启动方法与多线程库一起使用。

fork 的优缺点

根据这个例子,您可能会想,为什么 Python 中一开始就提供了 fork

首先,可能是因为历史原因:spawn 是在 Python 3.4 中添加的,而 fork 从 Python 2.x 系列开始就是其一部分。

其次,spawnforkserver 有一些不适用于 fork 的限制,特别是所有参数都应该是可序列化的 (pickleable)。有关更多信息,请参阅 Python 多进程文档

第三,与 spawn 相比,创建新进程更快,因为 spawn 实际上是 fork + 通过调用 execv 创建一个没有锁的全新 Python 进程。因此 Python 文档中警告它较慢:spawn 的开销更大。然而,在几乎所有情况下,人们都希望使用多个进程来加速需要数分钟甚至数小时的计算,这意味着在宏观上,开销可以忽略不计。更重要的是,它实际上可以与多线程库结合使用。

第四,spawn 启动一个新进程,因此它需要代码是可导入的,这与 fork 不同。特别是,这意味着在使用 spawn 时,相关代码不应位于全局作用域中,例如在 Jupyter Notebooks 或普通脚本中。因此,在上面的示例中,我们定义了在其中 spawn 的函数,并从 __main__ 子句中运行这些函数。这对于典型项目来说不是问题,但在 Notebooks 中进行快速实验时可能会失败。

参考资料

  1. https://docs.pythonlang.cn/3/library/multiprocessing.html

  2. https://pythonspeed.com/articles/python-multiprocessing/

  3. https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html

  4. https://bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html