多进程
TLDR: 如果您发现将 Python 内置的 multiprocessing
模块与 Polars 一起使用导致 Polars 报告多进程方法错误,您应该确保使用 spawn
作为启动方法,而不是 fork
。
from multiprocessing import get_context
def my_fun(s):
print(s)
with get_context("spawn").Pool() as pool:
pool.map(my_fun, ["input1", "input2", ...])
何时不应使用多进程
在我们深入细节之前,重要的是要强调 Polars 从一开始就被设计为使用您所有的 CPU 核心。它通过在单独的线程中并行执行可计算任务来实现这一点。例如,在 select
语句中请求两个表达式可以并行完成,结果只在最后合并。另一个例子是使用 group_by().agg(<expr>)
在组内聚合值,每个组都可以单独评估。在这些情况下,multiprocessing
模块不太可能提高您的代码性能。如果您将 Polars 与 GPU 引擎一起使用,也应避免手动使用多进程。当同时使用时,它们会争夺系统内存和处理能力,从而导致性能下降。
更多优化请参阅优化部分。
何时应使用多进程
尽管 Polars 是多线程的,但其他库可能是单线程的。当其他库是瓶颈并且手头的问题是可并行的时,使用多进程来提高速度是合理的。
默认多进程配置的问题
总结
Python 多进程文档列出了创建进程池的三种方法
- spawn
- fork
- forkserver
fork 的描述是(截至 2022-10-15)
父进程使用 os.fork() 来 fork Python 解释器。子进程开始时,与父进程实际上是相同的。父进程的所有资源都被子进程继承。请注意,安全地 fork 多线程进程是存在问题的。
仅在 Unix 上可用。Unix 上的默认设置。
简而言之:Polars 是多线程的,旨在提供开箱即用的强大性能。因此,它不能与 fork
结合使用。如果您在 Unix 系统(Linux、BSD 等)上,除非您明确覆盖它,否则您正在使用 fork
。
您之前可能没有遇到这种情况的原因是纯 Python 代码和大多数 Python 库(大部分)是单线程的。或者,您在 Windows 或 MacOS 上,这些系统上 fork
甚至不可用作方法(对于 MacOS,直到 Python 3.7)。
因此,应该使用 spawn
或 forkserver
。spawn
在所有平台上都可用,是最安全的选择,因此是推荐的方法。
示例
fork
的问题在于复制父进程。考虑下面的例子,这是在 Polars 问题跟踪器上发布的一个略有修改的例子
import multiprocessing
import polars as pl
def test_sub_process(df: pl.DataFrame, job_id):
df_filtered = df.filter(pl.col("a") > 0)
print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")
def create_dataset():
return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})
def setup():
# some setup work
df = create_dataset()
df.write_parquet("/tmp/test.parquet")
def main():
test_df = pl.read_parquet("/tmp/test.parquet")
for i in range(0, 5):
proc = multiprocessing.get_context("spawn").Process(
target=test_sub_process, args=(test_df, i)
)
proc.start()
proc.join()
print(f"Executed sub process {i}")
if __name__ == "__main__":
setup()
main()
使用 fork
作为方法而不是 spawn
将导致死锁。
fork 方法等同于调用 os.fork()
,这是 POSIX 标准中定义的系统调用
进程应以单个线程创建。如果多线程进程调用 fork(),新进程应包含调用线程的副本及其整个地址空间,可能包括互斥量和其他资源的状态。因此,为了避免错误,子进程只能执行异步信号安全操作,直到调用其中一个 exec 函数。
相比之下,spawn
将创建一个全新的 Python 解释器,并且不会继承互斥量的状态。
那么代码示例中发生了什么?使用 pl.read_parquet
读取文件时,文件必须被锁定。然后调用 os.fork()
,复制父进程的状态,包括互斥量。因此,所有子进程都将以已获取状态复制文件锁,使它们无限期地挂起,等待文件锁被释放,而这永远不会发生。
使这些问题调试起来棘手的是 fork
有时也能工作。将示例更改为不调用 pl.read_parquet
import multiprocessing
import polars as pl
def test_sub_process(df: pl.DataFrame, job_id):
df_filtered = df.filter(pl.col("a") > 0)
print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")
def create_dataset():
return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})
def main():
test_df = create_dataset()
for i in range(0, 5):
proc = multiprocessing.get_context("fork").Process(
target=test_sub_process, args=(test_df, i)
)
proc.start()
proc.join()
print(f"Executed sub process {i}")
if __name__ == "__main__":
main()
这工作正常。因此,在更大的代码库中调试这些问题(即不是这里的小玩具示例)可能会非常痛苦,因为一个看似无关的更改可能会破坏您的多进程代码。总的来说,除非有非常特殊的需求无法通过其他方式满足,否则绝不应将 fork
启动方法与多线程库一起使用。
fork 的优缺点
根据这个例子,您可能会想,为什么 Python 中一开始就提供了 fork
?
首先,可能是因为历史原因:spawn
是在 Python 3.4 中添加的,而 fork
从 Python 2.x 系列开始就是其一部分。
其次,spawn
和 forkserver
有一些不适用于 fork
的限制,特别是所有参数都应该是可序列化的 (pickleable)。有关更多信息,请参阅 Python 多进程文档。
第三,与 spawn
相比,创建新进程更快,因为 spawn
实际上是 fork
+ 通过调用 execv 创建一个没有锁的全新 Python 进程。因此 Python 文档中警告它较慢:spawn
的开销更大。然而,在几乎所有情况下,人们都希望使用多个进程来加速需要数分钟甚至数小时的计算,这意味着在宏观上,开销可以忽略不计。更重要的是,它实际上可以与多线程库结合使用。
第四,spawn
启动一个新进程,因此它需要代码是可导入的,这与 fork
不同。特别是,这意味着在使用 spawn
时,相关代码不应位于全局作用域中,例如在 Jupyter Notebooks 或普通脚本中。因此,在上面的示例中,我们定义了在其中 spawn
的函数,并从 __main__
子句中运行这些函数。这对于典型项目来说不是问题,但在 Notebooks 中进行快速实验时可能会失败。
参考资料
-
https://docs.pythonlang.cn/3/library/multiprocessing.html
-
https://pythonspeed.com/articles/python-multiprocessing/
-
https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html
-
https://bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html