Jim Anderson
探索 Python 中的并发
在本节中,你将熟悉与并发相关的术语。你还会了解到,并发根据所要解决的问题不同,可以采取不同的形式。最后,你将发现不同的并发模型在 Python 中是如何实现的。
什么是并发?
字典中对“并发”(concurrency)的定义是“同时发生”。在 Python 中,这些“同时发生”的事物有不同的名称,包括:
- 线程(Thread)
- 任务(Task)
- 进程(Process)
从高层次来看,它们都指代按顺序执行的一系列指令。你可以把它们想象成不同的“思维轨迹”。每一条轨迹都可以在某些点被暂停,而 CPU(或大脑)可以切换到另一条轨迹上继续处理。每条轨迹的状态都会被保存下来,以便在中断处恢复执行。
你可能会疑惑:为什么 Python 要用不同的词来描述同一个概念?事实上,只有从高层次视角看时,线程、任务和进程才显得相似。一旦深入细节,你会发现它们实际上代表略有不同的东西。随着你逐步学习示例,你会更清楚地看到它们之间的区别。
现在,我们来关注“同时发生”这一部分。你需要小心一点,因为深入细节后你会发现:只有多个系统进程才能让 Python 真正实现“同时”运行这些思维轨迹。
相比之下,线程和异步任务始终运行在单个处理器上,这意味着它们一次只能运行一个。它们只是巧妙地轮流执行,从而加快整体进程。尽管它们并没有真正“同时”运行不同的思维轨迹,但仍然属于并发的范畴。
注意:在大多数其他编程语言中,线程通常可以并行运行。要了解为什么 Python 的线程不能做到这一点,请查阅《什么是 Python 全局解释器锁(GIL)?》。
如果你对更多细节感兴趣,还可以阅读《绕过 GIL 实现 Python 并行处理》,或者查看 Python 3.13 中引入的实验性“自由线程”(free threading)功能。
线程、任务或进程的轮换方式各不相同。在多线程方法中,操作系统实际上知道每个线程的存在,并可在任意时刻中断它,转而运行另一个线程。这种机制也适用于进程。这被称为抢占式多任务处理(preemptive multitasking),因为操作系统可以在不依赖程序自身的情况下抢占线程或进程以进行切换。
抢占式多任务处理的优点在于,线程中的代码无需做任何特殊处理即可实现切换。但这也带来了困难——切换可能发生在任何时刻,甚至可能在一条 Python 语句的中间(比如 x = x + 1)。这是因为 Python 语句通常由多个底层字节码指令组成。
另一方面,异步任务使用协作式多任务处理(cooperative multitasking)。任务必须主动声明自己何时准备好被切换出去,而无需操作系统的介入。这意味着任务中的代码需要稍作修改才能实现这一点。
这样做的好处是,你总是知道自己任务会在哪里被切换出去,从而使执行流程更容易推理。除非某条语句被明确标记为可中断,否则任务不会在 Python 语句中间被切换出去。稍后你会看到,这如何简化你的设计。
什么是并行?
到目前为止,你看到的并发都是在单个处理器上发生的。那么,你那台酷炫新笔记本上的多个 CPU 核心呢?如何在 Python 中利用它们?答案就是:执行独立的进程!
你可以把一个进程看作几乎是一个完全独立的程序(尽管从技术上讲,它通常被定义为包含内存、文件句柄等资源的集合)。一种理解方式是:每个进程都在自己的 Python 解释器中运行。
由于它们是不同的进程,因此在使用多进程(multiprocessing)的程序中,每个“思维轨迹”都可以运行在不同的 CPU 核心上。在不同核心上运行意味着它们真的可以同时执行,这非常棒!当然,这样做也会带来一些复杂性,但 Python 在大多数情况下都能很好地处理这些问题。
现在你已经了解了并发与并行的概念,可以回顾一下它们的区别,并确定哪些 Python 模块支持它们:
| Python 模块 | CPU 数量 | 多任务类型 | 切换决策方 |
|---|---|---|---|
asyncio |
单核 | 协作式 | 任务自行决定何时放弃控制权 |
threading |
单核 | 抢占式 | 操作系统决定何时切换任务(Python 外部) |
multiprocessing |
多核 | 抢占式 | 各进程在不同处理器上同时运行 |
在本教程的后续内容中,你将深入探索这些模块。
注意:
threading和multiprocessing在并发程序中属于相对底层的构建块。在实践中,你通常可以用concurrent.futures替代它们,后者为这两个模块提供了更高层次的接口。而asyncio提供了一种略有不同的并发方法,稍后你会深入探讨。
每种并发类型都有其适用场景。接下来,你将看看它们分别能帮助加速哪些类型的程序。
并发在什么情况下有用?
并发对以下两类问题能带来显著提升:
- I/O 密集型(I/O-Bound)
- CPU 密集型(CPU-Bound)
I/O 密集型问题会导致程序变慢,因为它频繁地等待来自外部资源的输入或输出(I/O)。这类问题通常出现在程序与比 CPU 慢得多的设备交互时。
比 CPU 慢的设备数不胜数,但你的程序通常只与其中少数几种打交道。最常见的慢速设备是文件系统和网络连接。
下图展示了一个 I/O 密集型操作的时间分布:

蓝色框表示程序正在工作的时间,红色框表示等待 I/O 操作完成的时间。该图并非按比例绘制,因为网络请求可能比 CPU 指令慢几个数量级,因此程序大部分时间都在等待——你的网页浏览器大部分时间就是这样度过的。
另一方面,有些程序在不访问网络或文件的情况下执行大量计算。这些是CPU 密集型程序,因为限制程序速度的资源是 CPU,而不是网络或文件系统。
下图展示了 CPU 密集型程序的时间分布:

在接下来的示例中,你会看到不同形式的并发对 I/O 密集型和 CPU 密集型程序的效果各不相同。为程序添加并发会引入额外的代码和复杂性,因此你需要判断潜在的加速是否值得付出额外的努力。学完本教程后,你应该有足够的信息来做出这个决定。
以下是快速总结,以澄清这一概念:
| I/O 密集型进程 | CPU 密集型进程 |
|---|---|
| 程序大部分时间在与慢速设备通信,如网络适配器、硬盘或打印机。 | 程序大部分时间在执行 CPU 运算。 |
| 加速方法是重叠等待这些设备的时间。 | 加速方法是在相同时间内完成更多计算。 |
你将先看 I/O 密集型程序,然后再看处理 CPU 密集型程序的代码。
加速 I/O 密集型程序
在本节中,你将专注于 I/O 密集型程序和一个常见问题:通过网络下载内容。在这个例子中,你将从几个网站下载网页,但实际上可以是任何网络流量。使用网页只是为了更方便地可视化和设置。
同步版本
你将从一个非并发版本的任务开始。注意,该程序需要第三方 Requests 库。因此,你应该先在激活的虚拟环境中运行以下命令:
(venv) $ python -m pip install requests
这个版本的程序完全不使用并发:
io_non_concurrent.py
import time
import requests
def main():
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.perf_counter()
download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} bytes from {url}")
if __name__ == "__main__":
main()
如你所见,这是一个相当简短的程序。它只是从地址列表下载网站内容并打印其大小。
有一点值得注意:你使用了 requests 的 Session 对象。虽然可以直接调用 requests.get(),但创建 Session 对象可以让库在请求之间保留状态并重用连接,从而加快速度。
你在 download_all_sites() 中创建会话,然后遍历站点列表,依次下载每个站点。最后,你打印出整个过程耗时,以便在后续示例中直观感受并发带来的提升。
该程序的处理流程图与上一节中的 I/O 密集型图非常相似。
注意:网络流量受多种因素影响,可能每秒都在变化。由于网络问题,这些测试的时间可能会翻倍。
为了排除干扰,你应该多次重复每个基准测试并取最快的结果。这样,不同版本程序之间的差异仍然清晰可见。
这个版本的最大优点是简单。编写和调试都相对较快,也更容易理解。只有一个“思维轨迹”,因此你可以预测下一步是什么以及它的行为方式。
但主要问题是,与你即将看到的其他解决方案相比,它相对较慢。以下是一个可能的最终输出示例:
(venv) $ python io_non_concurrent.py
Read 10966 from https://www.jython.org
Read 276 from http://olympus.realpython.org/dice
⋮
Downloaded 160 sites in 14.289619207382202 seconds
请注意,这些结果可能因你的网速、网络拥塞等因素而有很大差异。
如果程序只需两秒且很少运行,那么可能不值得添加并发。你可以就此止步。
但如果程序频繁运行?或者需要数小时才能完成?那么你将通过重写程序使用 Python 线程来引入并发。
多线程版本
正如你所料,使用多线程编写程序需要更多努力。但你可能会惊讶于基本情况下所需的额外工作其实很少。以下是利用 concurrent.futures 和 threading 模块的同一程序:
io_threads.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import requests
thread_local = threading.local()
def main():
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.perf_counter()
download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
def download_all_sites(sites):
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)
def download_site(url):
session = get_session_for_thread()
with session.get(url) as response:
print(f"Read {len(response.content)} bytes from {url}")
def get_session_for_thread():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
if __name__ == "__main__":
main()
程序的整体结构相同,但高亮行显示了你需要做的更改。
在第 20 行,你创建了一个 ThreadPoolExecutor 实例来为你管理线程。这里你显式请求了 5 个工作线程。
注意:如何选择线程池中的线程数量?这个问题没有固定答案,正确数量因任务而异。
一般来说,对于 I/O 密集型问题,你不受 CPU 核心数量的限制。事实上,只要线程在等待数据而不是做实际工作,创建数百甚至数千个线程并不罕见。但最终会因线程切换的额外开销而出现收益递减。
建议进行一些实验。你可以随意调整这个数字,观察它如何影响整体执行时间。
创建 ThreadPoolExecutor 看起来很复杂,但分解后其实包含三个组件:
- Thread(线程):即前面提到的“思维轨迹”。
- Pool(池):创建一组可并发运行的线程。
- Executor(执行器):控制池中每个线程的运行方式和时机。
注意:当系统资源有限但仍需处理许多任务时,使用线程池很有优势。通过预先创建线程并在后续任务中重用,池减少了反复创建和销毁线程的开销。
标准库将 ThreadPoolExecutor 实现为上下文管理器,因此你可以使用 with 语法来管理线程池的创建和释放。
在这个多线程版本中,你让执行器代替你调用 download_site(),而不是在循环中手动调用。第 21 行的 executor.map() 方法负责将工作负载分配给可用线程,使每个线程可以并发处理不同的站点。该方法接受两个参数:
- 一个函数(用于处理每个数据项,如站点地址)
- 一个数据项集合(由该函数处理)
由于传递给执行器 .map() 方法的函数必须只接受一个参数,因此你在第 23 行修改了 download_site(),使其只接受 URL。但现在如何获取会话对象呢?
这是线程编程中一个有趣且棘手的问题。因为操作系统控制任务何时被中断以及何时切换到另一个任务,所以线程间共享的数据需要受到保护或保证线程安全,以避免意外行为或数据损坏。不幸的是,requests.Session() 不是线程安全的,这意味着一个线程可能会干扰另一个线程仍在使用的会话。
有几种策略可以使数据访问线程安全。一种是使用线程安全的数据结构,如 queue.Queue、multiprocessing.Queue 或 asyncio.Queue。这些对象使用底层原语(如锁对象)确保同一时间只有一个线程可以访问代码块或内存区域。你通过 ThreadPoolExecutor 间接使用了这种策略。
这里使用的另一种策略是线程本地存储(thread-local storage)。在第 7 行调用 threading.local() 时,你创建了一个类似全局变量的对象,但它是每个线程特有的。看起来有点奇怪,但你只需要创建一个这样的对象(而不是每个线程一个)。该对象本身会处理不同线程对其属性的访问隔离。
当调用 get_session_for_thread() 时,它查找的会话是运行该函数的特定线程所独有的。因此,每个线程在首次调用 get_session_for_thread() 时会创建一个会话,之后在其生命周期内都会复用该会话。
好了,是时候对你的多线程程序进行终极测试了:
(venv) $ python io_threads.py
Read 10966 from https://www.jython.org
Read 276 from http://olympus.realpython.org/dice
⋮
Downloaded 160 sites in 3.190047219999542 seconds
速度很快!记得非并发版本在最佳情况下用了 14 秒多。
其执行时序图如下:

程序使用多个线程同时向网站发起多个请求。这允许程序重叠等待时间,从而更快获得最终结果。太棒了!这正是目标。
多线程版本有什么问题吗?从示例可以看出,实现这一点需要更多代码,而且你必须仔细考虑哪些数据在线程间共享。
线程可能以微妙且难以察觉的方式交互。这些交互可能导致竞态条件(race conditions),进而引发随机、间歇性的 bug,这些 bug 通常很难发现。如果你不熟悉这个概念,可以查阅另一篇关于线程安全的教程中的竞态条件部分。
异步版本
通过并发运行线程,你将原始同步代码的总执行时间缩短了一个数量级。这已经相当了不起,但通过利用 Python 的 asyncio 模块(支持异步 I/O),你还能做得更好。
异步处理是一种非常适合 I/O 密集型任务的并发模型(因此得名 asyncio)。它通过事件循环、非阻塞操作和协程等机制,避免了线程间上下文切换的开销。或许令人惊讶的是,异步代码只需一个执行线程即可实现并发。
注意:如果这些概念对你来说很陌生,或需要快速复习,请查阅《Python 异步特性入门》和《Python 异步 IO 完全指南》。
简而言之,事件循环(event loop)控制每个异步任务的执行方式和时机。顾名思义,它会持续循环遍历你的任务并监控其状态。一旦当前任务开始等待 I/O 操作完成,循环就会暂停它并立即切换到另一个任务。反之,当预期事件发生时,循环最终会在下一次迭代中恢复被暂停的任务。
协程(coroutine)类似于线程,但更轻量级,挂起或恢复的成本更低。这使得你可以生成比线程多得多的协程,而不会带来显著的内存或性能开销。这种能力有助于解决 C10k 问题(高效处理一万个并发连接)。但有一个陷阱。
如果你想充分利用异步编程的优势,协程中不能有阻塞函数调用。阻塞调用是同步的,意味着它在等待数据到达时会阻止其他代码运行。相反,非阻塞调用可以主动放弃控制权,并在数据就绪时收到通知。
在 Python 中,通过调用异步函数(也称为协程函数,用 async def 定义而非普通 def)来创建协程对象。只有在异步函数体内,你才能使用 await 关键字,它会暂停协程的执行,直到被等待的任务完成:
import asyncio
async def main():
await asyncio.sleep(3.5)
这里,你将 main() 定义为异步函数,调用时会隐式返回一个协程对象。得益于 await 关键字,你的协程对 asyncio.sleep() 进行了非阻塞调用,模拟了 3.5 秒的延迟。在 main() 等待唤醒事件时,其他任务可能并发运行。
注意:要运行上述示例代码,你需要将
main()调用包装在asyncio.run()中,或在 Python 的asyncioREPL 中await main()。
现在你已基本了解异步 I/O 是什么,可以逐步分析示例代码的异步版本并弄清其工作原理。但由于你一直在使用的 Requests 库是阻塞的,现在必须切换到非阻塞的替代品,如专为 Python asyncio 设计的 aiohttp:
(venv) $ python -m pip install aiohttp
在虚拟环境中安装此库后,你可以在异步版本的代码中使用它:
io_asyncio.py
import asyncio
import time
import aiohttp
async def main():
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.perf_counter()
await download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = [download_site(url, session) for url in sites]
await asyncio.gather(*tasks, return_exceptions=True)
async def download_site(url, session):
async with session.get(url) as response:
print(f"Read {len(await response.read())} bytes from {url}")
if __name__ == "__main__":
asyncio.run(main())
这个版本看起来与同步版本惊人地相似,这是 asyncio 的又一优势。但它是一把双刃剑。虽然它可能比多线程版本更容易推理,但在更复杂的场景中,asyncio 远非易事。
与非并发版本相比,最重要的区别包括:
- 第 1 行导入标准库中的
asyncio,这是在第 26 行运行异步main()函数所必需的。 - 第 4 行导入第三方
aiohttp库(已安装到虚拟环境中),它替换了之前的Requests。 - 第 6、16 和 21 行通过
async关键字将普通函数重新定义为异步函数。 - 第 12 行在
download_all_sites()前添加await,以便等待返回的协程对象完成。这会有效暂停main()函数,直到所有站点下载完毕。 - 第 17 和 22 行使用
async with语句为会话对象和响应分别创建异步上下文管理器。 - 第 18 行使用列表推导式创建任务列表,每个任务都是
download_site()返回的协程对象。注意,你不会await单个协程对象,否则会导致它们顺序执行。 - 第 19 行使用
asyncio.gather()并发运行所有任务,从而高效下载多个站点。 - 第 23 行在打印读取的字节数之前,
await会话的 HTTP GET 请求完成。
你可以跨所有任务共享会话,因此会话在此处作为上下文管理器创建。任务可以共享会话,因为它们都在同一线程上运行。一个任务不可能在会话处于不良状态时中断另一个任务。
这里有一个细微但重要的变化。还记得多线程示例中关于最优线程数的讨论吗?在多线程示例中,最优线程数并不明显。
asyncio 的一个很酷的优势是,它的扩展性远优于线程或 concurrent.futures。每个任务创建和运行所需的资源和时间远少于线程,因此创建和运行更多任务效果很好。本示例仅为每个要下载的站点创建一个单独任务,效果相当不错。
而且,它真的很快。异步版本是所有版本中最快的,优势明显:
(venv) $ python io_asyncio.py
Read 10966 bytes from https://www.jython.org
Read 10966 bytes from https://www.jython.org
⋮
Downloaded 160 sites in 0.49083488899850636 seconds
耗时不到半秒,比多线程版本快 7 倍,比非并发版本快 30 多倍!
注意:在同步版本中,你按确定顺序循环下载站点内容。在多线程版本中,你将任务调度控制权交给了操作系统,因此最终顺序看似随机。而异步版本可能显示完成时间的聚类,但由于网络条件变化,总体上也是非确定性的。
执行时序图与多线程示例非常相似,只是所有 I/O 请求都由同一线程完成:

有一个常见论点认为,在适当位置添加 async 和 await 是一种额外的复杂性。在某种程度上,这确实如此。但另一方面,它迫使你思考任务何时会被切换出去,这有助于你设计出更好的方案。
扩展性问题在这里也很突出。在多线程示例中,为每个站点创建一个线程明显比使用少量线程慢。而在 asyncio 示例中,即使有数百个任务也不会变慢。
不过,asyncio 目前也存在一些问题。你需要特殊的异步版本库才能充分发挥 asyncio 的优势。如果直接使用 Requests 下载站点,速度会慢很多,因为 Requests 无法通知事件循环它已被阻塞。随着时间推移和更多库拥抱 asyncio,这个问题正变得不那么重要。
另一个更微妙的问题是,如果某个任务不合作,协作式多任务的所有优势都会丧失。代码中的一个小错误可能导致任务长时间占用处理器,从而饿死其他需要运行的任务。如果任务不将控制权交还给事件循环,循环就无法介入。
考虑到这一点,你可以尝试一种截然不同的并发方法:使用多进程。
基于进程的版本
到目前为止,本教程中的所有并发示例都只在计算机的单个 CPU 或核心上运行。原因与 CPython 的当前设计和所谓的全局解释器锁(Global Interpreter Lock, GIL)有关。
本教程不会深入探讨 GIL 的原理和原因。现在只需知道,这个示例的同步、多线程和异步版本都只在单个 CPU 上运行。
multiprocessing 模块及其在 concurrent.futures 中的对应封装旨在打破这一障碍,让你的代码在多个 CPU 上运行。从高层次看,它通过为每个 CPU 创建一个新的 Python 解释器实例,然后将程序的一部分分配给它来运行。
可以想象,启动一个独立的 Python 解释器不像在当前解释器中启动新线程那么快。这是一种重量级操作,附带一些限制和困难,但对于合适的问题,它可以带来巨大差异。
与之前的方法不同,使用 multiprocessing 可以充分利用你那台酷炫新电脑的所有 CPU。以下是示例代码:
io_processes.py
import atexit
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
import requests
session: requests.Session
def main():
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.perf_counter()
download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
def download_all_sites(sites):
with ProcessPoolExecutor(initializer=init_process) as executor:
executor.map(download_site, sites)
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"{name}:Read {len(response.content)} bytes from {url}")
def init_process():
global session
session = requests.Session()
atexit.register(session.close)
if __name__ == "__main__":
main()
这实际上与多线程示例非常相似,因为你利用了熟悉的 concurrent.futures 抽象,而不是直接使用 multiprocessing。快速浏览一下这段代码的作用:
- 第 8 行使用类型提示声明一个全局变量来保存会话对象。注意,这并未实际定义变量的值。
- 第 21 行将
ThreadPoolExecutor替换为concurrent.futures中的ProcessPoolExecutor,并传入下方定义的init_process()。 - 第 29-32 行定义了一个自定义初始化函数,每个进程在启动后不久都会调用它,确保每个进程初始化自己的会话。
- 第 32 行使用
atexit注册清理函数,确保进程停止时正确关闭会话,有助于防止潜在的内存泄漏。
这里发生的情况是,池创建了多个独立的 Python 解释器进程,并让每个进程在可迭代对象(此处是站点列表)中的某些项上运行指定函数。主进程与其他进程之间的通信由系统自动处理。
创建池实例的代码值得特别注意。首先,它没有指定池中要创建的进程数(尽管这是一个可选参数)。默认情况下,它会确定你计算机中的 CPU 数量并匹配之。这通常是最佳选择,此处也不例外。
对于 I/O 密集型问题,增加进程数并不会加快速度。实际上会变慢,因为设置和销毁所有这些进程的成本大于并行执行 I/O 请求的收益。
注意:如果你需要在进程间交换数据,则需要昂贵的进程间通信(IPC)和数据序列化,这会进一步增加总体成本。此外,序列化并非总是可行,因为 Python 在底层使用
pickle模块,而它仅支持少数数据类型。
接下来是初始化器部分。记住,池中的每个进程都有自己的内存空间。这意味着它们无法轻松共享会话对象等资源。你不希望每次调用函数时都创建新的 Session 实例——而是希望为每个进程创建一个。
初始化器函数参数正是为此设计的。虽然无法将初始化器的返回值传回 download_site(),但你可以初始化一个全局会话变量,为每个进程保存单一会话。由于每个进程都有自己的内存空间,每个进程的全局变量都是不同的。
基本上就是这样。其余代码与之前看到的非常相似。基于进程的版本确实需要一些额外设置,全局会话对象看起来也很奇怪。你必须花时间思考每个进程中会访问哪些变量。
虽然这个版本充分利用了计算机的 CPU 能力,但性能表现却出人意料地平淡:
(venv) $ python io_processes.py
ForkProcess-3:Read 10966 bytes from https://www.jython.org
ForkProcess-4:Read 276 bytes from http://olympus.realpython.org/dice
⋮
Downloaded 160 sites in 3.428215079999063 seconds
在配备四个 CPU 核心的计算机上,它比同步版本快约四倍。但它比多线程版本稍慢,远慢于异步版本。
该代码的执行时序图如下:

有几个独立进程并行执行。每个进程对应的时序图类似于本教程开头看到的非并发版本。
I/O 密集型问题并不是 multiprocessing 存在的原因。接下来进入下一节,看看 CPU 密集型示例。
加速 CPU 密集型程序
现在稍微转换一下思路。到目前为止的示例都处理 I/O 密集型问题。现在你将研究 CPU 密集型问题。如前所述,I/O 密集型问题大部分时间在等待外部操作(如网络调用)完成,而 CPU 密集型问题执行较少的 I/O 操作,其总执行时间取决于处理所需数据的速度。
在这个示例中,你将使用一个略显愚蠢的函数来创建一段在 CPU 上运行很长时间的代码。该函数使用递归方法计算第 n 个斐波那契数:
>>> def fib(n):
... return n if n < 2 else fib(n - 2) + fib(n - 1)
...
>>> for n in range(1, 11):
... print(f"fib({n:>2}) = {fib(n):,}")
...
fib( 1) = 1
fib( 2) = 1
fib( 3) = 2
fib( 4) = 3
fib( 5) = 5
fib( 6) = 8
fib( 7) = 13
fib( 8) = 21
fib( 9) = 34
fib(10) = 55
注意,随着函数计算更高的斐波那契数,结果值增长非常快。这种实现的递归性质导致对相同数字进行大量重复计算,需要大量处理时间。这使其成为 CPU 密集型任务的绝佳示例。
记住,这只是一个占位符,代表你实际执行的有用代码(如计算方程根或排序大型数据结构),这些代码需要长时间处理。
同步版本
首先,看看非并发版本的示例:
import time
def main():
start_time = time.perf_counter()
for _ in range(20):
fib(35)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
def fib(n):
return n if n < 2 else fib(n - 2) + fib(n - 1)
if __name__ == "__main__":
main()
这段代码在循环中调用 fib(35) 二十次。由于其实现的递归性质,函数会自我调用数亿次!所有这些都在单线程、单进程、单 CPU 上完成。
执行时序图如下:

与 I/O 密集型示例不同,CPU 密集型示例的运行时间通常相当一致。在同一台机器上,这个示例大约需要 35 秒:
(venv) $ python cpu_non_concurrent.py
Computed in 35.358853937003005 seconds
显然,你可以做得更好。毕竟,它只在单个 CPU 上运行,没有任何并发。接下来,看看如何改进它。
多线程版本
你认为用线程(或异步任务)重写这段代码会加速多少?
如果你回答“完全不会”,那就给自己一块饼干。如果回答“会变慢”,那就给自己两块饼干。
原因如下:在之前的 I/O 密集型示例中,大部分时间花在等待慢速操作完成上。线程和异步任务通过重叠等待时间(而非顺序执行)来加速。
而对于 CPU 密集型问题,没有等待。CPU 正以最快速度全力运算以解决问题。在 Python 中,线程和异步任务都在同一个 CPU、同一个进程上运行。这意味着单个 CPU 不仅要完成非并发代码的所有工作,还要承担设置线程或任务的额外开销。
以下是 CPU 密集型问题的多线程版本代码:
cpu_threads.py
import time
from concurrent.futures import ThreadPoolExecutor
def main():
start_time = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(fib, [35] * 20)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
def fib(n):
return n if n < 2 else fib(n - 2) + fib(n - 1)
if __name__ == "__main__":
main()
这段代码与非并发版本相比改动很小。导入 concurrent.futures 后,你只需将数字循环改为创建线程池,并使用其 .map() 方法将数字发送给空闲的工作线程。
这与 I/O 密集型多线程代码的做法相同,但这里无需担心 Session 对象。
运行此代码时,你可能会看到以下输出:
(venv) $ python cpu_threads.py
Computed in 39.86391678399741 seconds
不出所料,它比同步版本多花了好几秒。
此时,你应该已经知道 CPU 密集型问题的异步版本会是什么结果。但为了完整性,你现在将测试它与其他版本的对比。
异步版本
实现这个 CPU 密集型问题的异步版本,需要将你的函数重写为使用 async def 定义的协程函数,并通过 await 获取它们的返回值:
cpu_asyncio.py
import asyncio
import time
async def main():
start_time = time.perf_counter()
tasks = [fib(35) for _ in range(20)]
await asyncio.gather(*tasks, return_exceptions=True)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
async def fib(n):
return n if n < 2 else await fib(n - 2) + await fib(n - 1)
if __name__ == "__main__":
asyncio.run(main())
你创建了二十个任务,并将它们传递给 asyncio.gather(),以让对应的协程并发运行。然而,实际上它们是顺序执行的,因为每个任务都会阻塞执行,直到前一个任务完成。
运行这段代码时,它的执行时间比你最初的同步版本超过两倍,也比多线程版本更慢:
(venv) $ python cpu_asyncio.py
Computed in 86.50057094899967 seconds
具有讽刺意味的是,对于 CPU 密集型问题,异步方法反而是最慢的;而对于 I/O 密集型问题,它却是最快的。这是因为此处不涉及任何 I/O 操作,也就没有需要等待的内容。事件循环的开销以及在每个 await 语句处发生的上下文切换,显著拖慢了整体执行速度。
在 Python 中,若要提升此类 CPU 密集型任务的性能,必须使用其他并发模型。接下来你将深入探讨这一点。
基于进程的版本
你终于来到了多进程真正大放异彩的部分。与其他并发模型不同,基于进程的并行性专门设计用于将繁重的 CPU 工作负载分配到多个 CPU 核心上。
以下是对应的代码:
import time
from concurrent.futures import ProcessPoolExecutor
def main():
start_time = time.perf_counter()
with ProcessPoolExecutor() as executor:
executor.map(fib, [35] * 20)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
def fib(n):
return n if n < 2 else fib(n - 2) + fib(n - 1)
if __name__ == "__main__":
main()
这段代码与斐波那契问题的多线程版本几乎完全相同——你实际上只修改了两行代码!你将 ThreadPoolExecutor 替换为了 ProcessPoolExecutor。
如前所述,池构造函数的可选参数 max_workers 值得关注。你可以用它来指定希望在池中创建和管理多少个进程。默认情况下,它会检测你的机器上有多少个 CPU 核心,并为每个核心创建一个进程。虽然这对你的简单示例效果很好,但在生产环境中,你可能希望对进程数量拥有更多控制权。
该版本的执行时间约为 10 秒,不到最初非并发实现的三分之一:
(venv) $ python cpu_processes.py
Computed in 10.020093858998735 seconds
这比你之前看到的其他选项要好得多,无疑是处理此类任务的最佳选择。
下图展示了其执行时序:

各个任务在独立的 CPU 核心上并行运行,从而实现了真正的并行执行。
当然,多进程也存在一些在此简单示例中并未显现的缺点。例如,有时很难将问题划分为多个可由各处理器独立处理的片段。
此外,许多解决方案需要进程之间进行更多的通信。这会给你的程序增加一些复杂性,而这些复杂性在非并发程序中是完全不需要考虑的。
何时使用并发:决策指南
你已经学习了很多内容,现在是时候回顾一些关键概念,并讨论一些决策点,以帮助你确定在项目中是否应使用并发模块,以及使用哪一种。
这一过程的第一步是判断是否应该使用并发模块。尽管本文中的示例让各个库看起来都很简单,但并发总是会带来额外的复杂性,并常常导致难以发现的 bug。
在确认存在明确的性能瓶颈之前,不要急于引入并发。正如 Donald Knuth 所言:“过早优化是编程中所有(或至少大部分)罪恶的根源。”
一旦你决定需要优化程序,下一步就是判断你的程序是 I/O 密集型 还是 CPU 密集型。记住:I/O 密集型程序大部分时间都在等待某些事件发生(如网络响应、磁盘读写等),而 CPU 密集型程序则尽可能快地处理数据或进行数值计算。
正如你所见,在 Python 中,CPU 密集型问题只有通过基于进程的并发才能真正受益。多线程和异步 I/O 对这类问题毫无帮助。
对于 I/O 密集型问题,Python 社区有一条经验法则:“能用 asyncio 就用 asyncio,不得已时才用 threading 或 concurrent.futures。”
asyncio 能为这类程序提供最佳的加速效果,但有时你会依赖一些尚未适配 asyncio 的关键库。请记住:任何不主动将控制权交还给事件循环的任务,都会阻塞所有其他任务的执行。
结论
你已经学习了 Python 中的并发机制,以及它如何提升程序的性能和响应能力。你探索了不同的并发模型,包括多线程、异步任务和多进程。通过实际示例,你深入了解了如何以及何时应用这些模型来优化 I/O 密集型和 CPU 密集型任务。
掌握并发对希望提升应用效率的 Python 开发者至关重要,尤其是在涉及大量 I/O 操作或计算密集型工作负载的场景中。通过选择合适的并发模型,你可以显著减少执行时间,并更高效地利用系统资源。
在本教程中,你学会了如何:
- 理解 Python 中不同形式的并发
- 为 I/O 密集型任务实现多线程和异步解决方案
- 利用多进程为 CPU 密集型任务实现真正的并行性
- 根据程序需求选择合适的并发模型
凭借这些技能,你现在能够分析自己的 Python 程序,并有效应用并发技术来解决性能瓶颈。无论是优化网络爬虫还是数据处理流水线,你都能自信地选择最佳的并发模型,以提升应用程序的性能。