Python 的 asyncio:动手实践指南

更新于 2026-01-12

Leodanis Pozo Ramos 2025-07-30

初识异步 I/O

在深入探索 asyncio 之前,有必要将异步 I/O 与其他并发模型进行比较,以了解它在整个 Python 并发体系中的位置。以下是一些关键概念:

  • 并行(Parallelism):指同时执行多个操作。
  • 多进程(Multiprocessing):通过将任务分配到计算机的多个 CPU 核心上来实现并行。适用于 CPU 密集型任务,例如紧密循环和数学计算。
  • 并发(Concurrency):比并行更宽泛的概念,指多个任务可以以重叠的方式运行,但不一定同时执行。
  • 多线程(Threading):一种并发执行模型,多个线程轮流执行任务。一个进程中可包含多个线程。由于全局解释器锁(GIL)的存在,Python 与多线程的关系较为复杂(此处不展开)。

多线程适合 I/O 密集型任务。I/O 密集型任务的特点是大量时间花在等待输入/输出(I/O)完成上,而 CPU 密集型任务则从始至终都在高强度使用 CPU 核心。

Python 标准库长期以来通过 multiprocessingconcurrent.futuresthreading 模块支持上述模型。

现在,我们要引入一位新成员:近年来,异步 I/O(Async I/O)被更全面地集成到 CPython 中。该模型通过标准库的 asyncio 包以及 asyncawait 关键字实现。

注意:异步 I/O 并非新概念。它也存在于其他语言中,如 Go、C# 和 Rust。

Python 文档将 asyncio 描述为“用于编写并发代码的库”。但异步 I/O 既不是多线程,也不是多进程,也不基于这两者。

异步 I/O 是一种单线程、单进程的技术,采用协作式多任务处理(cooperative multitasking)。尽管只使用一个线程,它仍能给人一种并发执行的感觉。协程(coroutine,简称 coro)是异步 I/O 的核心特性,可以被并发调度,但它们本身并非天然并发。

再次强调:异步 I/O 是一种并发编程模型,但不是并行。它与多线程更接近,但又不同于两者,是并发生态系统中的独立成员。

还有一个术语需要澄清:“异步”(asynchronous)意味着什么?虽然没有严格定义,但就本教程而言,可理解为两个关键特性:

  1. 异步例程在等待结果时可以暂停执行,并允许其他例程在此期间运行。
  2. 异步代码通过协调多个异步例程,促进任务的并发执行。

异步 I/O 解析

初看之下,异步 I/O 可能显得反直觉甚至矛盾:如何在单线程、单 CPU 核心上实现并发?

Miguel Grinberg 在 PyCon 的演讲中用一个精妙的比喻解释了这一点:

国际象棋大师 Judit Polgár 举办一场车轮战展览,同时对阵 24 位业余选手。

假设条件

  • 24 位对手
  • Judit 每步棋耗时 5 秒
  • 对手每步平均耗时 55 秒
  • 每局平均 30 个回合(共 60 步)

同步模式:Judit 一局一局下完。每局耗时 (55 + 5) * 30 = 1800 秒(30 分钟),总耗时 24 * 30 = 720 分钟(12 小时)。

异步模式:Judit 在各桌间轮流行棋。她对每桌走一步后离开,让对手思考。走完所有 24 桌的第一步仅需 24 * 5 = 120 秒(2 分钟)。整场展览耗时 120 * 30 = 3600 秒(1 小时)。

整个过程中,只有一位 Judit,但她通过在等待对手思考时切换到其他棋局,将总时间从 12 小时缩短到 1 小时。

异步 I/O 正是应用了这一原理。程序的事件循环(event loop)管理多个任务,在某个任务等待 I/O 时(如网络请求、文件读写),切换到其他可执行的任务,从而避免空等。


异步 I/O 并不简单

虽然多线程代码容易出错,异步 I/O 避免了部分陷阱,但这并不意味着它很简单。

深入异步编程会遇到回调、协程、事件、传输层、协议、Future 等概念——仅术语就令人望而生畏。

不过,Python 的异步生态已大幅改善:asyncio API 已稳定,文档全面更新,高质量学习资源也越来越多。


使用 asyncio 实现异步 I/O

现在,我们来探索 Python 的异步实现:asyncio 包及其 async/await 关键字。

协程与协程函数

异步 I/O 的核心是协程(coroutine)——一种可暂停并在稍后恢复执行的对象。协程由协程函数(即异步函数)调用生成,使用 async def 定义。

先看一个同步版本的示例:

import time

def count():
    print("One")
    time.sleep(1)
    print("Two")
    time.sleep(1)

def main():
    for _ in range(3):
        count()

if __name__ == "__main__":
    start = time.perf_counter()
    main()
    elapsed = time.perf_counter() - start
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

输出:

One
Two
One
Two
One
Two
countsync.py executed in 6.03 seconds.

总耗时约 6 秒。

改用异步 I/O:

import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")
    await asyncio.sleep(1)

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    import time
    start = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - start
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

输出:

One
One
One
Two
Two
Two
countasync.py executed in 2.00 seconds.

总耗时仅 2 秒! 这体现了 asyncio 在 I/O 密集型任务中的高效性。

关键区别

  • time.sleep()阻塞调用,会冻结整个程序。
  • await asyncio.sleep()非阻塞调用,会将控制权交还给事件循环,允许其他任务运行。

asyncawait 关键字详解

正式定义如下:

  • async def:定义协程函数或异步生成器。
  • async with / async for:分别用于异步上下文管理和异步循环。
  • await:暂停当前协程的执行,将控制权交还事件循环,直到被等待的对象完成。
async def g():
    result = await f()  # 暂停 g(),等待 f() 返回结果
    return result

使用规则

  • async def 内可使用 awaitreturnyield(可选):
    • 普通协程函数可含 awaitreturn
    • yieldasync def 函数是异步生成器,需用 async for 迭代。
    • async def禁止使用 yield from(会引发 SyntaxError)。
  • await 只能async def 函数体内使用。

示例

async def f(x):
    y = await z(x)  # ✅ 合法
    return y

async def g(x):
    yield x  # ✅ 异步生成器

async def m(x):
    yield from gen(x)  # ❌ SyntaxError

def n(x):
    y = await z(x)  # ❌ SyntaxError(缺少 async def)
    return y

注意await f() 要求 f() 必须是可等待对象(awaitable),即协程或实现了 .__await__() 方法的对象。


更复杂的异步示例:随机数生成

import asyncio
import random

COLORS = (
    "\033[0m",   # 重置颜色
    "\033[36m",  # 青色
    "\033[91m",  # 红色
    "\033[35m",  # 品红
)

async def main():
    return await asyncio.gather(
        makerandom(1, 9),
        makerandom(2, 8),
        makerandom(3, 8),
    )

async def makerandom(delay, threshold=6):
    color = COLORS[delay]
    print(f"{color}Initiated makerandom({delay}).")
    while (number := random.randint(0, 10)) <= threshold:
        print(f"{color}makerandom({delay}) == {number} too low; retrying.")
        await asyncio.sleep(delay)
    print(f"{color}---> Finished: makerandom({delay}) == {number}" + COLORS[0])
    return number

if __name__ == "__main__":
    random.seed(444)
    r1, r2, r3 = asyncio.run(main())
    print(f"\nr1: {r1}, r2: {r2}, r3: {r3}")

虽然随机数生成是 CPU 密集型操作,但其开销可忽略。asyncio.sleep() 模拟了 I/O 等待,展示了异步模型的优势。


异步 I/O 事件循环

事件循环是异步编程的核心,它像一个无限循环,监控协程状态,在任务空闲时切换执行其他任务。

启动事件循环

现代 Python 推荐使用 asyncio.run()

asyncio.run(main())  # 自动创建、运行并关闭事件循环

也可在运行中获取当前循环:

loop = asyncio.get_running_loop()

关键点

  • 协程必须绑定到事件循环才能执行。
  • 默认情况下,事件循环运行在单线程、单 CPU 核心上。
  • 事件循环是可插拔的(pluggable)。例如,uvloop 提供了更快的替代实现。

常见误区:直接调用协程函数只会返回协程对象,不会执行:

routine = main()  # 返回 <coroutine object>
asyncio.run(routine)  # 才真正执行

asyncio REPL(Python 3.8+)

从 Python 3.8 起,可直接在交互式环境中使用 await

$ python -m asyncio
>>> async def hello():
...     await asyncio.sleep(1)
...     print("Hello!")
...
>>> await hello()  # 无需 asyncio.run()
Hello!

常见异步编程模式

1. 协程链式调用

将协程串联,前一个的结果作为后一个的输入:

import asyncio
import random
import time

async def main():
    user_ids = [1, 2, 3]
    start = time.perf_counter()
    await asyncio.gather(
        *(get_user_with_posts(uid) for uid in user_ids)
    )
    end = time.perf_counter()
    print(f"\n==> Total time: {end - start:.2f} seconds")

async def get_user_with_posts(user_id):
    user = await fetch_user(user_id)      # 第一步:获取用户
    await fetch_posts(user)               # 第二步:获取帖子

async def fetch_user(user_id):
    delay = random.uniform(0.5, 2.0)
    await asyncio.sleep(delay)
    return {"id": user_id, "name": f"User{user_id}"}

async def fetch_posts(user):
    delay = random.uniform(0.5, 2.0)
    await asyncio.sleep(delay)
    posts = [f"Post {i} by {user['name']}" for i in range(1, 3)]
    for post in posts:
        print(f" - {post}")

if __name__ == "__main__":
    random.seed(444)
    asyncio.run(main())

效果:同步需 ~7.6 秒,异步仅需 2.68 秒

2. 协程与队列集成

使用 asyncio.Queue 解耦生产者与消费者:

import asyncio
import random
import time

async def main():
    queue = asyncio.Queue()
    user_ids = [1, 2, 3]
    await asyncio.gather(
        producer(queue, user_ids),
        *(consumer(queue) for _ in user_ids),
    )

async def producer(queue, user_ids):
    async def fetch_user(uid):
        # ... 获取用户并放入队列
        await queue.put(user)
    await asyncio.gather(*(fetch_user(uid) for uid in user_ids))
    for _ in user_ids:
        await queue.put(None)  # 发送终止信号

async def consumer(queue):
    while True:
        user = await queue.get()
        if user is None: break
        # ... 处理用户帖子

if __name__ == "__main__":
    random.seed(444)
    asyncio.run(main())

优势:生产者和消费者完全解耦,系统更灵活可扩展。


其他异步特性

异步迭代与推导式

async def powers_of_two(stop=10):
    exponent = 0
    while exponent < stop:
        yield 2**exponent
        exponent += 1
        await asyncio.sleep(0.2)

async def main():
    # 异步循环
    g = []
    async for i in powers_of_two(5):
        g.append(i)
    
    # 异步推导式
    f = [j async for j in powers_of_two(5) if not (j // 3 % 5)]

异步上下文管理器(async with

常用于管理网络连接等资源:

import aiohttp

async def check(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(f"{url}: status -> {response.status}")

async def main():
    websites = ["https://realpython.com", "https://pycoders.com"]
    await asyncio.gather(*(check(url) for url in websites))

asyncio.run(main())

优势:确保资源(如 HTTP 连接)被正确释放,即使发生异常。


其他 asyncio 工具

asyncio.create_task()

将协程包装为 Task 对象,在后台并发执行:

task = asyncio.create_task(coro([3, 2, 1]))
result = await task

注意:未等待的 Task 会在事件循环结束时被取消。

asyncio.gather() vs asyncio.as_completed()

  • gather():等待所有任务完成,按输入顺序返回结果。
  • as_completed():按完成顺序返回任务,适合需要即时处理结果的场景。
# 按完成顺序处理
for task in asyncio.as_completed([task1, task2]):
    result = await task
    print(f"Completed: {result}")

异步异常处理(Python 3.11+)

使用 ExceptionGroupexcept* 处理多个并发异常:

async def main():
    results = await asyncio.gather(
        coro_a(), coro_b(), coro_c(),
        return_exceptions=True
    )
    exceptions = [e for e in results if isinstance(e, Exception)]
    if exceptions:
        raise ExceptionGroup("Errors", exceptions)

try:
    asyncio.run(main())
except* ValueError as eg:
    print(f"[ValueError] {eg.exceptions}")
except* TypeError as eg:
    print(f"[TypeError] {eg.exceptions}")

何时使用异步 I/O?

适用场景

  • I/O 密集型任务
    • 网络请求(客户端/服务器)
    • 数据库查询
    • 文件读写(需配合 aiofiles 等库)
  • 高并发场景:如聊天服务器、API 网关。
  • 资源受限环境:异步任务比线程更轻量,可轻松创建数千个。

不适用场景

  • CPU 密集型任务:应使用多进程。
  • 缺乏异步支持的库:如某些数据库驱动不支持 async/await

重要原则不要在异步函数中使用阻塞调用(如 time.sleep()、同步文件 I/O),否则会阻塞整个事件循环!


支持异步 I/O 的流行库

类别 库名
Web 框架 FastAPI, Starlette, Sanic, Quart, Tornado
ASGI 服务器 uvicorn, Hypercorn
HTTP 客户端 aiohttp, HTTPX
WebSocket websockets
数据库 Databases, Tortoise ORM, Motor (MongoDB)
工具库 aiofiles(异步文件), aiocache(缓存), pytest-asyncio(测试)

总结

你已掌握:

  • 并发模型的区别及 asyncio 的适用场景
  • 使用 async/await 编写、运行和链接协程
  • 通过 asyncio.run()gather()create_task() 管理事件循环和任务
  • 实现协程链式调用和基于队列的生产者-消费者模式
  • 使用 async forasync with 等高级特性
  • 集成第三方异步库构建高性能应用

异步 I/O 是构建高并发网络服务、Web API 和 I/O 密集型应用的强大工具。现在,你已准备好用它来开发现代化的 Python 应用了!