Jim Anderson
探索 Python 中的并发
在本节中,你将熟悉与并发相关的术语。你还会了解到,并发可以根据所要解决的问题采取不同的形式。最后,你将了解不同的并发模型如何在 Python 中实现。
什么是并发?
字典中对“并发”(concurrency)的定义是“同时发生”。在 Python 中,这些“同时发生”的事物有不同的名称,包括:
- 线程(Thread)
- 任务(Task)
- 进程(Process)
从高层次来看,它们都指代按顺序执行的一系列指令。你可以把它们想象成不同的“思维列车”。每列列车都可以在某些点被暂停,而 CPU(或大脑)可以切换到另一列继续处理。每列列车的状态都会被保存下来,以便在中断处恢复执行。
你可能会疑惑:为什么 Python 要用不同的词来描述同一个概念?事实上,只有在高层面观察时,线程、任务和进程才看起来相似。一旦深入细节,你会发现它们其实代表略有不同的东西。随着示例的推进,你会更清楚地看到它们之间的差异。
现在,我们再来看看“同时”这个部分。需要注意的是,只有多个系统进程才能让 Python 真正实现“同时”运行这些“思维列车”。
相比之下,线程和异步任务始终运行在单个处理器上,这意味着它们一次只能运行一个。它们只是巧妙地轮流执行,从而加快整体流程。尽管它们并非真正“同时”运行,但仍然属于并发的范畴。
线程、任务或进程的切换方式有所不同。在多线程方法中,操作系统知道每个线程的存在,并可随时中断它以运行另一个线程。进程也是如此。这种机制称为抢占式多任务处理(preemptive multitasking),因为操作系统可以在不通知程序的情况下抢占当前线程或进程进行切换。
抢占式多任务的优点在于,线程中的代码无需做任何特殊处理即可完成切换。但这也带来困难——切换可能发生在任意时刻,甚至在一条简单的 Python 语句(如 x = x + 1)中间。这是因为 Python 语句通常由多个底层字节码指令组成。
另一方面,异步任务使用协作式多任务处理(cooperative multitasking)。任务必须主动声明自己何时准备好被切换出去,而无需操作系统的介入。这意味着任务中的代码需要稍作修改才能实现这一点。
这样做的好处是,你始终知道任务会在哪里被切换出去,从而更容易推理执行流程。除非显式标记,否则任务不会在 Python 语句中间被切换。
什么是并行?
到目前为止,你看到的并发都是在单个处理器上发生的。那么,你那台配备多个 CPU 核心的新电脑呢?如何在 Python 中利用它们?答案是:执行多个独立的进程!
你可以把进程看作几乎是一个完全独立的程序(尽管技术上它被定义为包含内存、文件句柄等资源的集合)。另一种理解方式是:每个进程都在自己的 Python 解释器中运行。
由于它们是不同的进程,因此利用多进程(multiprocessing)的程序可以让每个“思维列车”运行在不同的 CPU 核心上。这意味着它们真的可以同时运行,非常棒!当然,这样做也会带来一些复杂性,但 Python 在大多数情况下都能很好地处理这些问题。
现在你已经了解了并发与并行的区别,可以回顾一下它们的不同之处,并确定哪些 Python 模块支持它们:
| Python 模块 | CPU 核心数 | 多任务类型 | 切换决策者 |
|---|---|---|---|
asyncio |
单核 | 协作式 | 任务自行决定何时交出控制权 |
threading |
单核 | 抢占式 | 操作系统决定何时切换 |
multiprocessing |
多核 | 抢占式 | 各进程在不同处理器上同时运行 |
注意:
threading和multiprocessing都是并发程序中相对底层的构建块。在实践中,你通常可以用concurrent.futures替代它们,它为这两个模块提供了更高层次的接口。而asyncio提供了一种略有不同的并发方法,稍后会详细介绍。
每种并发方式都有其适用场景。接下来,你将看到它们分别适用于哪些类型的程序。
并发在什么情况下有用?
并发对两类问题有显著帮助:
- I/O 密集型(I/O-Bound)
- CPU 密集型(CPU-Bound)
I/O 密集型问题会导致程序变慢,因为它频繁等待来自外部资源的输入/输出(I/O)。这类问题出现在程序与比 CPU 慢得多的设备交互时。
常见的慢速设备包括文件系统和网络连接。
相反,有些程序在不访问网络或文件的情况下进行大量计算。这类程序是 CPU 密集型的,因为限制其速度的是 CPU,而不是网络或文件系统。
在后续示例中,你会看到不同形式的并发对 I/O 密集型和 CPU 密集型程序的效果不同。添加并发会引入额外代码和复杂性,因此你需要判断潜在的速度提升是否值得付出这些代价。
以下是简要总结:
| I/O 密集型程序 | CPU 密集型程序 |
|---|---|
| 程序大部分时间在与慢速设备(如网络、硬盘、打印机)通信 | 程序大部分时间在执行 CPU 运算 |
| 加速方法:重叠等待时间 | 加速方法:在相同时间内完成更多计算 |
你将先看 I/O 密集型程序,再看 CPU 密集型程序的代码示例。
加速 I/O 密集型程序
本节聚焦于 I/O 密集型程序,以通过网络下载内容为例(例如从多个网站下载网页)。
同步版本(无并发)
首先是一个非并发版本(需安装第三方库 requests):
(venv) $ python -m pip install requests
# io_non_concurrent.py
import time
import requests
def main():
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.perf_counter()
download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} bytes from {url}")
if __name__ == "__main__":
main()
该程序简单明了:依次下载站点内容并打印大小。使用 Session 可复用连接,提升性能。
输出示例:
Downloaded 160 sites in 14.29 seconds
虽然简单易懂,但速度较慢。
多线程版本
使用 concurrent.futures.ThreadPoolExecutor:
# io_threads.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import requests
thread_local = threading.local()
def main():
sites = ["https://www.jython.org", "http://www.python.org"] * 80
start_time = time.perf_counter()
download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
def download_all_sites(sites):
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)
def download_site(url):
session = get_session_for_thread()
with session.get(url) as response:
print(f"Read {len(response.content)} bytes from {url}")
def get_session_for_thread():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
if __name__ == "__main__":
main()
关键点:
- 使用 线程本地存储(
threading.local())为每个线程创建独立的Session,避免线程安全问题。 ThreadPoolExecutor管理线程池,自动分配任务。
输出示例:
Downloaded 160 sites in 3.19 seconds
速度提升约 4.5 倍!
异步版本(asyncio)
使用 aiohttp(非阻塞 HTTP 库):
(venv) $ python -m pip install aiohttp
# io_asyncio.py
import asyncio
import time
import aiohttp
async def main():
sites = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 80
start_time = time.perf_counter()
await download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = [download_site(url, session) for url in sites]
await asyncio.gather(*tasks, return_exceptions=True)
async def download_site(url, session):
async with session.get(url) as response:
print(f"Read {len(await response.read())} bytes from {url}")
if __name__ == "__main__":
asyncio.run(main())
关键点:
- 所有函数变为
async def - 使用
await和async with - 单线程内通过事件循环实现高并发
输出示例:
Downloaded 160 sites in 0.49 seconds
比多线程快 6.5 倍,比同步快 30 倍以上!
注意:
asyncio需要所有 I/O 操作都是非阻塞的。若混用阻塞库(如requests),性能会急剧下降。
多进程版本(multiprocessing)
# io_processes.py
import atexit
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
import requests
session: requests.Session
def main():
sites = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 80
start_time = time.perf_counter()
download_all_sites(sites)
duration = time.perf_counter() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
def download_all_sites(sites):
with ProcessPoolExecutor(initializer=init_process) as executor:
executor.map(download_site, sites)
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"{name}: Read {len(response.content)} bytes from {url}")
def init_process():
global session
session = requests.Session()
atexit.register(session.close)
if __name__ == "__main__":
main()
输出示例:
Downloaded 160 sites in 3.43 seconds
比同步快,但不如多线程和异步。因为 I/O 密集型任务并不需要多核 CPU,进程创建开销反而成为瓶颈。
加速 CPU 密集型程序
现在转向 CPU 密集型任务:计算第 n 项斐波那契数(递归实现,效率极低)。
def fib(n):
return n if n < 2 else fib(n - 2) + fib(n - 1)
同步版本
# cpu_non_concurrent.py
import time
def main():
start_time = time.perf_counter()
for _ in range(20):
fib(35)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
def fib(n):
return n if n < 2 else fib(n - 2) + fib(n - 1)
if __name__ == "__main__":
main()
输出:
Computed in 35.36 seconds
多线程版本
# cpu_threads.py
import time
from concurrent.futures import ThreadPoolExecutor
def main():
start_time = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(fib, [35] * 20)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
def fib(n): ...
输出:
Computed in 39.86 seconds
更慢了!因为受 GIL 限制,Python 线程无法真正并行执行 CPU 密集型任务。
异步版本
# cpu_asyncio.py
import asyncio
import time
async def main():
start_time = time.perf_counter()
tasks = [fib(35) for _ in range(20)]
await asyncio.gather(*tasks)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
async def fib(n):
return n if n < 2 else await fib(n - 2) + await fib(n - 1)
if __name__ == "__main__":
asyncio.run(main())
输出:
Computed in 86.50 seconds
最慢!因为 await 引入了大量上下文切换开销,而 CPU 计算本身无法被“挂起”。
多进程版本(唯一有效的方案)
# cpu_processes.py
import time
from concurrent.futures import ProcessPoolExecutor
def main():
start_time = time.perf_counter()
with ProcessPoolExecutor() as executor:
executor.map(fib, [35] * 20)
duration = time.perf_counter() - start_time
print(f"Computed in {duration} seconds")
def fib(n): ...
输出:
Computed in 10.02 seconds
提速 3.5 倍!因为每个进程在独立 CPU 核心上真正并行执行。
如何选择并发模型?
先确认是否真的需要并发
并发增加复杂性,可能导致难以调试的 bug。如 Donald Knuth 所言:“过早优化是万恶之源。”判断程序类型:
- I/O 密集型 → 优先
asyncio,其次threading - CPU 密集型 → 必须使用
multiprocessing
- I/O 密集型 → 优先
社区经验法则:
“能用
asyncio就用,必须用线程时再用。”注意限制:
asyncio需要所有库支持异步(如aiohttp)- 多进程涉及数据序列化(
pickle)和 IPC 开销
结论
你已学习了 Python 中的并发模型及其适用场景:
- 线程:适合 I/O 密集型,但受 GIL 限制
- 异步(
asyncio):I/O 密集型的最佳选择,单线程高并发 - 多进程:唯一能真正并行处理 CPU 密集型任务的方式
通过合理选择并发模型,你可以显著提升程序性能,充分利用系统资源。无论是优化网络爬虫还是数据处理管道,现在你都能自信地选择最适合的并发策略。