Python 多进程(Multiprocessing)示例

更新于 2026-01-13

Anish Singh Wali, Pankaj Kumar 2022-08-04

如今,并行处理(Parallel processing)正受到越来越多的关注。如果你还不了解并行处理,可以从维基百科中学习相关知识。随着 CPU 制造商在处理器中不断增加核心数量,编写并行代码已成为提升程序性能的一种绝佳方式。Python 引入了 multiprocessing 模块,使我们能够编写并行代码。要理解该模块的主要动机,我们需要先掌握一些并行编程的基础知识。阅读完本文后,希望你能对这一主题有所了解。


Python 多进程中的 Process、Queue 和 Lock

Python 的 multiprocessing 模块中包含大量用于构建并行程序的类。其中,三个最基本的类是 ProcessQueueLock。这些类将帮助你构建并行程序。但在详细介绍它们之前,我们先从一段简单的代码开始。

要让并行程序真正发挥作用,你需要知道你的电脑有多少个 CPU 核心。Python 的 multiprocessing 模块可以让你轻松获取这一信息。以下这段简单代码将打印出你电脑的 CPU 核心数量:

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

以下输出结果会因你的电脑而异。以我为例,我的 CPU 核心数为 8。

Python multiprocessing 示例:CPU 核心数


Python multiprocessing 的 Process 类

Python multiprocessing 的 Process 类是一种抽象,它会启动另一个 Python 进程,让该进程运行指定代码,并提供一种方式让父应用程序控制其执行。

Process 类有两个重要的方法:start()join()

首先,我们需要编写一个函数,该函数将由子进程执行。然后,我们需要实例化一个 Process 对象。创建 Process 对象本身并不会立即执行任何操作,只有当我们调用 start() 方法时,进程才会真正开始运行并返回结果。之后,我们通过调用 join() 方法来等待该进程完成。

如果没有调用 join(),进程将保持空闲状态且不会终止。因此,如果你创建了许多进程但没有正确终止它们,可能会导致系统资源耗尽,甚至需要手动杀死这些进程。

另外需要注意的是:如果你想向目标函数传递参数,必须使用 args 关键字参数。

以下代码有助于理解 Process 类的用法:

from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # 确保代码在主函数下运行
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # 不带参数实例化
    procs.append(proc)
    proc.start()

    # 带参数实例化进程
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # 等待所有进程完成
    for proc in procs:
        proc.join()

上述代码的输出如下所示:

Python multiprocessing 示例:Process 类


Python multiprocessing 的 Queue 类

如果你具备基本的计算机数据结构知识,你应该已经了解 队列(Queue)的概念。Python multiprocessing 模块提供的 Queue 类正是一个先进先出(FIFO)的数据结构。它可以存储任何可被 pickle 序列化的 Python 对象(尽管简单的对象效果最好),在多个进程之间共享数据时非常有用。

队列特别适用于作为参数传递给 Process 的 target 函数,以便该进程可以消费数据。我们可以使用 put() 方法向队列插入数据,使用 get() 方法从队列中取出数据。

下面是一个快速示例:

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# 实例化一个队列对象
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Python multiprocessing 队列


Python multiprocessing 的 Lock 类

Lock 类的任务非常简单:它允许某段代码获取锁,从而确保在该锁被释放之前,其他进程无法执行类似的代码。

因此,Lock 类主要有两个任务:

  1. 获取锁(使用 acquire() 方法)
  2. 释放锁(使用 release() 方法)

Python 多进程完整示例

在这个 Python 多进程示例中,我们将把前面学到的所有知识整合在一起。

假设我们有一些任务需要完成。为了完成这些任务,我们将使用多个进程。为此,我们会维护两个队列:

  • 一个用于存放待处理的任务(tasks_to_accomplish
  • 另一个用于记录已完成任务的日志(tasks_that_are_done

然后我们实例化多个进程来完成这些任务。

注意:Python 的 Queue 类已经是线程安全和进程安全的(即内部已同步)。这意味着我们不需要再使用 Lock 类来防止多个进程同时访问同一个队列对象。因此,在本例中无需使用 Lock。

以下是完整实现:我们将任务添加到队列中,创建并启动多个进程,然后使用 join() 等待所有进程完成。最后,我们从第二个队列中打印出完成日志。

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue  # 导入 queue 模块以使用 queue.Empty 异常


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                尝试从队列中获取任务。
                get_nowait() 函数在队列为空时会抛出 queue.Empty 异常。
                也可以使用 get(False) 实现相同功能。
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:
            break
        else:
            '''
                如果没有抛出异常,则将任务完成信息
                添加到 tasks_that_are_done 队列中
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # 创建进程
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    # 打印输出结果
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

根据任务数量的不同,这段代码可能需要一些时间才能显示输出。每次运行的输出顺序可能会有所不同。

Python multiprocessing 示例


Python multiprocessing 的 Pool(进程池)

Python multiprocessing 的 Pool 类可用于在多个输入值上并行执行某个函数,将输入数据分发到多个进程中(即数据并行)。

下面是一个简单的 Python multiprocessing Pool 示例:

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


def pool_handler():
    p = Pool(2)
    p.map(work_log, work)


if __name__ == '__main__':
    pool_handler()

下图展示了上述程序的输出。注意,进程池大小为 2,因此 work_log 函数的两个实例会并行执行。当其中一个函数执行完成后,它会自动获取下一个待处理的任务,依此类推。

Python multiprocessing Pool


以上就是关于 Python multiprocessing 模块的全部内容。 参考:官方文档