Python 多线程入门

更新于 2026-01-12

Jim Anderson

什么是线程?

线程(Thread)是一种独立的执行流。这意味着你的程序可以同时做两件事情。但对大多数 Python 3 的实现来说,不同的线程实际上并不会真正同时执行:它们只是看起来在同时运行。

你可能会把多线程想象成多个处理器同时运行你的程序,各自独立完成任务。这几乎是对的——线程可能运行在不同的处理器上,但它们一次只能运行一个

要让多个任务真正并行执行,你需要使用非标准的 Python 实现、用其他语言编写部分代码,或者使用 multiprocessing 模块(它会带来一些额外开销)。

由于 CPython(标准 Python 实现)的工作机制,多线程不一定能加速所有任务。这是因为全局解释器锁(GIL)限制了同一时间只能有一个 Python 线程执行。

  • 适合使用线程的任务:大部分时间都在等待外部事件(如 I/O、网络请求等)。
  • 不适合使用线程的任务:需要大量 CPU 计算、很少等待外部事件的 CPU 密集型任务。

注意:以上结论仅适用于标准 CPython 实现 + 纯 Python 代码。

  • 如果你的线程是用 C 编写的,它可以释放 GIL 并真正并发执行。
  • 如果你使用的是其他 Python 实现(如 PyPy、Jython),请查阅其文档了解线程行为。
  • 对于 CPU 密集型任务,建议使用 multiprocessing 模块。

即使多线程不能提升性能,合理使用线程也能改善程序结构清晰度。本文中的许多示例并非为了提速,而是为了让设计更简洁、更易于理解。

那么,别再空谈线程了,让我们开始使用它吧!


启动一个线程

现在你已经了解了线程的概念,接下来学习如何创建线程。Python 标准库提供了 threading 模块,其中包含本文将用到的大部分原语。Thread 类封装了线程,提供了简洁的接口。

要启动一个独立线程,你需要创建一个 Thread 实例并调用其 .start() 方法:

import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

关键代码:

x = threading.Thread(target=thread_function, args=(1,))
x.start()

创建 Thread 时,需传入目标函数和参数列表。这里告诉线程运行 thread_function() 并传入参数 1

本文使用连续整数作为线程名称。虽然 threading.get_ident() 可返回唯一 ID,但通常不够简短易读。

thread_function() 只是记录日志并在中间休眠 2 秒。

运行结果(未取消注释 x.join()

Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done
Thread 1: finishing

注意:主线程结束后,子线程才完成。下一节将解释原因并讨论神秘的第 20 行代码。


守护线程(Daemon Threads)

在计算机科学中,守护进程(daemon)是在后台运行的进程。

Python 中的守护线程有特定含义:当程序退出时,守护线程会立即被终止。你可以将其理解为“无需显式关闭的后台线程”。

  • 非守护线程:程序会等待所有非守护线程完成后再退出。
  • 守护线程:程序退出时直接终止,无论其是否完成。

回顾之前的输出,最后两行很有趣:主线程打印 "all done" 后,程序会暂停约 2 秒才结束。这是因为 Python 在等待非守护线程完成。程序退出时,threading._shutdown() 会遍历所有运行中的线程,并对非守护线程调用 .join()

这种行为通常是期望的,但也有其他选择。让我们试试守护线程:

x = threading.Thread(target=thread_function, args=(1,), daemon=True)

运行结果

Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done

最后一行缺失了!因为它是守护线程,主线程结束后立即被终止,thread_function() 没机会完成。


使用 join() 等待线程

守护线程虽方便,但有时我们需要等待线程完成而不退出程序。回到原始代码,取消注释第 20 行:

x.join()

.join() 会让当前线程(这里是主线程)暂停,直到目标线程完成。无论目标线程是守护线程还是普通线程,.join() 都会等待其结束。


处理多个线程

前面的例子只涉及两个线程:主线程和一个子线程。实际中常需要启动多个线程。先看“硬核”方式:

import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

程序维护一个 Thread 对象列表,以便后续用 .join() 等待它们。

典型输出

Main    : create and start thread 0.
Thread 0: starting
Main    : create and start thread 1.
Thread 1: starting
Main    : create and start thread 2.
Thread 2: starting
Main    : before joining thread 0.
Thread 2: finishing
Thread 1: finishing
Thread 0: finishing
Main    : thread 0 done
Main    : before joining thread 1.
Main    : thread 1 done
Main    : before joining thread 2.
Main    : thread 2 done

注意:线程启动顺序固定,但完成顺序随机!这是由操作系统调度决定的,每次运行结果可能不同。设计多线程算法时必须考虑这一点。


使用 ThreadPoolExecutor

启动线程组有更简单的方法:ThreadPoolExecutor(Python 3.2+ 标准库 concurrent.futures 中提供)。

推荐用 with 语句作为上下文管理器:

import concurrent.futures

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

代码创建含 3 个工作线程的池,并用 .map()range(3) 中的每个元素分配给线程。

with 块结束时,ThreadPoolExecutor 会自动对池中所有线程调用 .join()强烈建议使用上下文管理器,避免忘记 .join()

注意ThreadPoolExecutor 可能隐藏异常。例如,若函数不需要参数但 .map() 传入了参数,线程会抛出异常但程序静默退出,难以调试。

运行结果

Thread 0: starting
Thread 1: starting
Thread 2: starting
Thread 1: finishing
Thread 0: finishing
Thread 2: finishing

再次看到线程完成顺序的不确定性。


竞态条件(Race Conditions)

多线程编程的难点之一是竞态条件:当两个或多个线程访问共享数据或资源时可能发生。

下面创建一个必然触发竞态条件的例子(实际中竞态条件往往偶发且难复现):

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)

FakeDatabase 模拟数据库操作:

  1. 从数据库读取值(复制到局部变量)
  2. 计算(加 1)
  3. 休眠(模拟耗时操作)
  4. 写回数据库

测试代码

if __name__ == "__main__":
    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

预期结果database.value 应为 2
实际结果

Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.

单线程分析

单线程执行时:

  1. 读取 value=0local_copy
  2. local_copy 加 1 → 1
  3. 休眠(无影响)
  4. 写回 value=1

双线程竞态

  1. 线程 0 读取 value=0local_copy=0
  2. 线程 0 加 1 → local_copy=1
  3. 线程 0 休眠,线程 1 被调度
  4. 线程 1 读取 value=0(尚未更新)→ local_copy=0
  5. 线程 1 加 1 → local_copy=1
  6. 线程 1 休眠,线程 0 恢复
  7. 线程 0 写回 value=1
  8. 线程 1 恢复,写回 value=1(覆盖线程 0 的结果)

两个线程互相覆盖结果,导致最终值为 1 而非 2。

此例通过 time.sleep() 强制触发竞态,实际中竞态可能在任何指令间发生(如 x = x + 1 的读写之间)。


使用锁(Lock)解决竞态

解决竞态的核心思路:确保同一时间只有一个线程能执行临界区代码(读-改-写)。Python 提供 threading.Lock(其他语言称 mutex,即互斥锁)。

锁就像通行证:同一时间只能有一个线程持有。其他线程必须等待持有者释放锁。

基本操作:

  • .acquire():获取锁(若已被占用则阻塞)
  • .release():释放锁

重要:若线程获取锁后永不释放,程序将死锁。Python 的 Lock 支持上下文管理器(with 语句),可自动释放锁。

修复后的 FakeDatabase

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        with self._lock:
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
        logging.info("Thread %s: finishing update", name)

运行结果

Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.

开启 DEBUG 日志可见:

  • 线程 0 获取锁后休眠
  • 线程 1 尝试获取锁但被阻塞
  • 线程 0 释放锁后,线程 1 才能获取

这就是锁提供的互斥(Mutual Exclusion)。


死锁(Deadlock)

使用锁时常见问题:死锁。当线程已持有锁却再次尝试获取同一锁时会发生:

import threading
l = threading.Lock()
l.acquire()
l.acquire()  # 程序在此挂起!

死锁通常由两种原因引起:

  1. 实现缺陷:锁未正确释放(使用上下文管理器可大幅降低风险)
  2. 设计问题:工具函数可能被已持有锁或未持有锁的函数调用

Python 提供 RLock(可重入锁)解决第二种情况:允许同一线程多次获取锁,但需释放相同次数。


生产者-消费者模型

生产者-消费者问题是经典的线程同步案例。设想:

  • 生产者:从网络读取消息(突发性到达)
  • 消费者:将消息写入数据库(速度较慢)
  • 管道(Pipeline):连接两者

使用锁的解决方案

import random
SENTINEL = object()

def producer(pipeline):
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")
    pipeline.set_message(SENTINEL, "Producer")  # 结束信号

def consumer(pipeline):
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

class Pipeline:
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()  # 初始禁止消费

    def get_message(self, name):
        self.consumer_lock.acquire()  # 等待消息
        message = self.message
        self.producer_lock.release()  # 允许生产
        return message

    def set_message(self, message, name):
        self.producer_lock.acquire()  # 等待管道空
        self.message = message
        self.consumer_lock.release()  # 允许消费

工作原理

  • 初始状态:consumer_lock 已锁定,producer_lock 未锁定
  • 生产者调用 set_message()
    1. 获取 producer_lock(若管道满则阻塞)
    2. 设置消息
    3. 释放 consumer_lock(唤醒消费者)
  • 消费者调用 get_message()
    1. 获取 consumer_lock(若无消息则阻塞)
    2. 读取消息
    3. 释放 producer_lock(唤醒生产者)

局限性:管道只能容纳单个消息,无法应对消息突发。


使用队列(Queue)的改进方案

为支持多消息缓冲,使用 queue.Queue(线程安全):

import queue
import threading

def producer(queue, event):
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)
    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info("Consumer storing message: %s (size=%d)", message, queue.qsize())
    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
        time.sleep(0.1)
        event.set()  # 触发退出

关键改进

  • 使用 threading.Event 通知线程退出(替代 SENTINEL
  • queue.Queue 内置线程安全机制,无需手动加锁
  • maxsize=10 限制队列容量,防止内存溢出

运行特点

  • 生产者可快速填充队列
  • 消费者按队列深度处理消息
  • 主线程触发事件后,生产者立即退出,消费者处理完剩余消息后退出

其他线程原语

信号量(Semaphore)

threading.Semaphore 是带原子操作的计数器:

  • .acquire():计数减 1(若为 0 则阻塞)
  • .release():计数加 1

常用于限制资源访问数量(如数据库连接池)。

定时器(Timer)

threading.Timer 延迟执行函数:

t = threading.Timer(30.0, my_function)
t.start()  # 30 秒后在新线程调用 my_function
t.cancel()  # 取消定时器

栅栏(Barrier)

threading.Barrier 同步固定数量的线程:

barrier = threading.Barrier(3)  # 等待 3 个线程
# 每个线程调用
barrier.wait()  # 阻塞直到 3 个线程都到达

适用于线程初始化同步等场景。


总结

你已掌握 Python 多线程的核心概念:

  • 线程的适用场景(I/O 密集型)与局限(GIL 限制 CPU 并行)
  • 基础操作:创建、启动、等待线程
  • 同步机制:Lock 解决竞态,Queue 简化生产者-消费者模型
  • 高级原语:SemaphoreTimerBarrier
  • 常见陷阱:死锁、竞态条件、异常处理

多线程能提升程序结构清晰度,但需谨慎处理并发问题。对于 CPU 密集型任务,请考虑 multiprocessing 模块。