Leodanis Pozo Ramos 2025-07-30
初识异步 I/O
在深入探索 asyncio 之前,有必要将异步 I/O 与其他并发模型进行比较,以了解它在整个 Python 并发体系中的位置。以下是一些关键概念:
- 并行(Parallelism):指同时执行多个操作。
- 多进程(Multiprocessing):通过将任务分配到计算机的多个 CPU 核心上来实现并行。适用于 CPU 密集型任务,例如紧密循环和数学计算。
- 并发(Concurrency):比并行更宽泛的概念,指多个任务可以以重叠的方式运行,但不一定同时执行。
- 多线程(Threading):一种并发执行模型,多个线程轮流执行任务。一个进程中可包含多个线程。由于全局解释器锁(GIL)的存在,Python 与多线程的关系较为复杂(此处不展开)。
多线程适合 I/O 密集型任务。I/O 密集型任务的特点是大量时间花在等待输入/输出(I/O)完成上,而 CPU 密集型任务则从始至终都在高强度使用 CPU 核心。
Python 标准库长期以来通过 multiprocessing、concurrent.futures 和 threading 模块支持上述模型。
现在,我们要引入一位新成员:近年来,异步 I/O(Async I/O)被更全面地集成到 CPython 中。该模型通过标准库的 asyncio 包以及 async 和 await 关键字实现。
注意:异步 I/O 并非新概念。它也存在于其他语言中,如 Go、C# 和 Rust。
Python 文档将 asyncio 描述为“用于编写并发代码的库”。但异步 I/O 既不是多线程,也不是多进程,也不基于这两者。
异步 I/O 是一种单线程、单进程的技术,采用协作式多任务处理(cooperative multitasking)。尽管只使用一个线程,它仍能给人一种并发执行的感觉。协程(coroutine,简称 coro)是异步 I/O 的核心特性,可以被并发调度,但它们本身并非天然并发。
再次强调:异步 I/O 是一种并发编程模型,但不是并行。它与多线程更接近,但又不同于两者,是并发生态系统中的独立成员。
还有一个术语需要澄清:“异步”(asynchronous)意味着什么?虽然没有严格定义,但就本教程而言,可理解为两个关键特性:
- 异步例程在等待结果时可以暂停执行,并允许其他例程在此期间运行。
- 异步代码通过协调多个异步例程,促进任务的并发执行。
异步 I/O 解析
初看之下,异步 I/O 可能显得反直觉甚至矛盾:如何在单线程、单 CPU 核心上实现并发?
Miguel Grinberg 在 PyCon 的演讲中用一个精妙的比喻解释了这一点:
国际象棋大师 Judit Polgár 举办一场车轮战展览,同时对阵 24 位业余选手。
假设条件:
- 24 位对手
- Judit 每步棋耗时 5 秒
- 对手每步平均耗时 55 秒
- 每局平均 30 个回合(共 60 步)
同步模式:Judit 一局一局下完。每局耗时
(55 + 5) * 30 = 1800秒(30 分钟),总耗时24 * 30 = 720分钟(12 小时)。异步模式:Judit 在各桌间轮流行棋。她对每桌走一步后离开,让对手思考。走完所有 24 桌的第一步仅需
24 * 5 = 120秒(2 分钟)。整场展览耗时120 * 30 = 3600秒(1 小时)。
整个过程中,只有一位 Judit,但她通过在等待对手思考时切换到其他棋局,将总时间从 12 小时缩短到 1 小时。
异步 I/O 正是应用了这一原理。程序的事件循环(event loop)管理多个任务,在某个任务等待 I/O 时(如网络请求、文件读写),切换到其他可执行的任务,从而避免空等。
异步 I/O 并不简单
虽然多线程代码容易出错,异步 I/O 避免了部分陷阱,但这并不意味着它很简单。
深入异步编程会遇到回调、协程、事件、传输层、协议、Future 等概念——仅术语就令人望而生畏。
不过,Python 的异步生态已大幅改善:asyncio API 已稳定,文档全面更新,高质量学习资源也越来越多。
使用 asyncio 实现异步 I/O
现在,我们来探索 Python 的异步实现:asyncio 包及其 async/await 关键字。
协程与协程函数
异步 I/O 的核心是协程(coroutine)——一种可暂停并在稍后恢复执行的对象。协程由协程函数(即异步函数)调用生成,使用 async def 定义。
先看一个同步版本的示例:
import time
def count():
print("One")
time.sleep(1)
print("Two")
time.sleep(1)
def main():
for _ in range(3):
count()
if __name__ == "__main__":
start = time.perf_counter()
main()
elapsed = time.perf_counter() - start
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
输出:
One
Two
One
Two
One
Two
countsync.py executed in 6.03 seconds.
总耗时约 6 秒。
改用异步 I/O:
import asyncio
async def count():
print("One")
await asyncio.sleep(1)
print("Two")
await asyncio.sleep(1)
async def main():
await asyncio.gather(count(), count(), count())
if __name__ == "__main__":
import time
start = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - start
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
输出:
One
One
One
Two
Two
Two
countasync.py executed in 2.00 seconds.
总耗时仅 2 秒! 这体现了 asyncio 在 I/O 密集型任务中的高效性。
关键区别:
time.sleep()是阻塞调用,会冻结整个程序。await asyncio.sleep()是非阻塞调用,会将控制权交还给事件循环,允许其他任务运行。
async 与 await 关键字详解
正式定义如下:
async def:定义协程函数或异步生成器。async with/async for:分别用于异步上下文管理和异步循环。await:暂停当前协程的执行,将控制权交还事件循环,直到被等待的对象完成。
async def g():
result = await f() # 暂停 g(),等待 f() 返回结果
return result
使用规则
async def内可使用await、return或yield(可选):- 普通协程函数可含
await和return。 - 含
yield的async def函数是异步生成器,需用async for迭代。 async def中禁止使用yield from(会引发SyntaxError)。
- 普通协程函数可含
await只能在async def函数体内使用。
示例
async def f(x):
y = await z(x) # ✅ 合法
return y
async def g(x):
yield x # ✅ 异步生成器
async def m(x):
yield from gen(x) # ❌ SyntaxError
def n(x):
y = await z(x) # ❌ SyntaxError(缺少 async def)
return y
注意:
await f()要求f()必须是可等待对象(awaitable),即协程或实现了.__await__()方法的对象。
更复杂的异步示例:随机数生成
import asyncio
import random
COLORS = (
"\033[0m", # 重置颜色
"\033[36m", # 青色
"\033[91m", # 红色
"\033[35m", # 品红
)
async def main():
return await asyncio.gather(
makerandom(1, 9),
makerandom(2, 8),
makerandom(3, 8),
)
async def makerandom(delay, threshold=6):
color = COLORS[delay]
print(f"{color}Initiated makerandom({delay}).")
while (number := random.randint(0, 10)) <= threshold:
print(f"{color}makerandom({delay}) == {number} too low; retrying.")
await asyncio.sleep(delay)
print(f"{color}---> Finished: makerandom({delay}) == {number}" + COLORS[0])
return number
if __name__ == "__main__":
random.seed(444)
r1, r2, r3 = asyncio.run(main())
print(f"\nr1: {r1}, r2: {r2}, r3: {r3}")
虽然随机数生成是 CPU 密集型操作,但其开销可忽略。
asyncio.sleep()模拟了 I/O 等待,展示了异步模型的优势。
异步 I/O 事件循环
事件循环是异步编程的核心,它像一个无限循环,监控协程状态,在任务空闲时切换执行其他任务。
启动事件循环
现代 Python 推荐使用 asyncio.run():
asyncio.run(main()) # 自动创建、运行并关闭事件循环
也可在运行中获取当前循环:
loop = asyncio.get_running_loop()
关键点
- 协程必须绑定到事件循环才能执行。
- 默认情况下,事件循环运行在单线程、单 CPU 核心上。
- 事件循环是可插拔的(pluggable)。例如,
uvloop提供了更快的替代实现。
常见误区:直接调用协程函数只会返回协程对象,不会执行:
routine = main() # 返回 <coroutine object> asyncio.run(routine) # 才真正执行
asyncio REPL(Python 3.8+)
从 Python 3.8 起,可直接在交互式环境中使用 await:
$ python -m asyncio
>>> async def hello():
... await asyncio.sleep(1)
... print("Hello!")
...
>>> await hello() # 无需 asyncio.run()
Hello!
常见异步编程模式
1. 协程链式调用
将协程串联,前一个的结果作为后一个的输入:
import asyncio
import random
import time
async def main():
user_ids = [1, 2, 3]
start = time.perf_counter()
await asyncio.gather(
*(get_user_with_posts(uid) for uid in user_ids)
)
end = time.perf_counter()
print(f"\n==> Total time: {end - start:.2f} seconds")
async def get_user_with_posts(user_id):
user = await fetch_user(user_id) # 第一步:获取用户
await fetch_posts(user) # 第二步:获取帖子
async def fetch_user(user_id):
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
return {"id": user_id, "name": f"User{user_id}"}
async def fetch_posts(user):
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
posts = [f"Post {i} by {user['name']}" for i in range(1, 3)]
for post in posts:
print(f" - {post}")
if __name__ == "__main__":
random.seed(444)
asyncio.run(main())
效果:同步需 ~7.6 秒,异步仅需 2.68 秒。
2. 协程与队列集成
使用 asyncio.Queue 解耦生产者与消费者:
import asyncio
import random
import time
async def main():
queue = asyncio.Queue()
user_ids = [1, 2, 3]
await asyncio.gather(
producer(queue, user_ids),
*(consumer(queue) for _ in user_ids),
)
async def producer(queue, user_ids):
async def fetch_user(uid):
# ... 获取用户并放入队列
await queue.put(user)
await asyncio.gather(*(fetch_user(uid) for uid in user_ids))
for _ in user_ids:
await queue.put(None) # 发送终止信号
async def consumer(queue):
while True:
user = await queue.get()
if user is None: break
# ... 处理用户帖子
if __name__ == "__main__":
random.seed(444)
asyncio.run(main())
优势:生产者和消费者完全解耦,系统更灵活可扩展。
其他异步特性
异步迭代与推导式
async def powers_of_two(stop=10):
exponent = 0
while exponent < stop:
yield 2**exponent
exponent += 1
await asyncio.sleep(0.2)
async def main():
# 异步循环
g = []
async for i in powers_of_two(5):
g.append(i)
# 异步推导式
f = [j async for j in powers_of_two(5) if not (j // 3 % 5)]
异步上下文管理器(async with)
常用于管理网络连接等资源:
import aiohttp
async def check(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(f"{url}: status -> {response.status}")
async def main():
websites = ["https://realpython.com", "https://pycoders.com"]
await asyncio.gather(*(check(url) for url in websites))
asyncio.run(main())
优势:确保资源(如 HTTP 连接)被正确释放,即使发生异常。
其他 asyncio 工具
asyncio.create_task()
将协程包装为 Task 对象,在后台并发执行:
task = asyncio.create_task(coro([3, 2, 1]))
result = await task
注意:未等待的
Task会在事件循环结束时被取消。
asyncio.gather() vs asyncio.as_completed()
gather():等待所有任务完成,按输入顺序返回结果。as_completed():按完成顺序返回任务,适合需要即时处理结果的场景。
# 按完成顺序处理
for task in asyncio.as_completed([task1, task2]):
result = await task
print(f"Completed: {result}")
异步异常处理(Python 3.11+)
使用 ExceptionGroup 和 except* 处理多个并发异常:
async def main():
results = await asyncio.gather(
coro_a(), coro_b(), coro_c(),
return_exceptions=True
)
exceptions = [e for e in results if isinstance(e, Exception)]
if exceptions:
raise ExceptionGroup("Errors", exceptions)
try:
asyncio.run(main())
except* ValueError as eg:
print(f"[ValueError] {eg.exceptions}")
except* TypeError as eg:
print(f"[TypeError] {eg.exceptions}")
何时使用异步 I/O?
适用场景
- I/O 密集型任务:
- 网络请求(客户端/服务器)
- 数据库查询
- 文件读写(需配合
aiofiles等库)
- 高并发场景:如聊天服务器、API 网关。
- 资源受限环境:异步任务比线程更轻量,可轻松创建数千个。
不适用场景
- CPU 密集型任务:应使用多进程。
- 缺乏异步支持的库:如某些数据库驱动不支持
async/await。
重要原则:不要在异步函数中使用阻塞调用(如
time.sleep()、同步文件 I/O),否则会阻塞整个事件循环!
支持异步 I/O 的流行库
| 类别 | 库名 |
|---|---|
| Web 框架 | FastAPI, Starlette, Sanic, Quart, Tornado |
| ASGI 服务器 | uvicorn, Hypercorn |
| HTTP 客户端 | aiohttp, HTTPX |
| WebSocket | websockets |
| 数据库 | Databases, Tortoise ORM, Motor (MongoDB) |
| 工具库 | aiofiles(异步文件), aiocache(缓存), pytest-asyncio(测试) |
总结
你已掌握:
- 并发模型的区别及
asyncio的适用场景 - 使用
async/await编写、运行和链接协程 - 通过
asyncio.run()、gather()、create_task()管理事件循环和任务 - 实现协程链式调用和基于队列的生产者-消费者模式
- 使用
async for、async with等高级特性 - 集成第三方异步库构建高性能应用
异步 I/O 是构建高并发网络服务、Web API 和 I/O 密集型应用的强大工具。现在,你已准备好用它来开发现代化的 Python 应用了!