Jim Anderson
什么是线程?
线程(Thread)是一种独立的执行流。这意味着你的程序可以同时做两件事情。但对大多数 Python 3 的实现来说,不同的线程实际上并不会真正同时执行:它们只是看起来在同时运行。
你可能会把多线程想象成多个处理器同时运行你的程序,各自独立完成任务。这几乎是对的——线程可能运行在不同的处理器上,但它们一次只能运行一个。
要让多个任务真正并行执行,你需要使用非标准的 Python 实现、用其他语言编写部分代码,或者使用 multiprocessing 模块(它会带来一些额外开销)。
由于 CPython(标准 Python 实现)的工作机制,多线程不一定能加速所有任务。这是因为全局解释器锁(GIL)限制了同一时间只能有一个 Python 线程执行。
- 适合使用线程的任务:大部分时间都在等待外部事件(如 I/O、网络请求等)。
- 不适合使用线程的任务:需要大量 CPU 计算、很少等待外部事件的 CPU 密集型任务。
注意:以上结论仅适用于标准 CPython 实现 + 纯 Python 代码。
- 如果你的线程是用 C 编写的,它可以释放 GIL 并真正并发执行。
- 如果你使用的是其他 Python 实现(如 PyPy、Jython),请查阅其文档了解线程行为。
- 对于 CPU 密集型任务,建议使用
multiprocessing模块。
即使多线程不能提升性能,合理使用线程也能改善程序结构清晰度。本文中的许多示例并非为了提速,而是为了让设计更简洁、更易于理解。
那么,别再空谈线程了,让我们开始使用它吧!
启动一个线程
现在你已经了解了线程的概念,接下来学习如何创建线程。Python 标准库提供了 threading 模块,其中包含本文将用到的大部分原语。Thread 类封装了线程,提供了简洁的接口。
要启动一个独立线程,你需要创建一个 Thread 实例并调用其 .start() 方法:
import logging
import threading
import time
def thread_function(name):
logging.info("Thread %s: starting", name)
time.sleep(2)
logging.info("Thread %s: finishing", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
logging.info("Main : before creating thread")
x = threading.Thread(target=thread_function, args=(1,))
logging.info("Main : before running thread")
x.start()
logging.info("Main : wait for the thread to finish")
# x.join()
logging.info("Main : all done")
关键代码:
x = threading.Thread(target=thread_function, args=(1,))
x.start()
创建 Thread 时,需传入目标函数和参数列表。这里告诉线程运行 thread_function() 并传入参数 1。
本文使用连续整数作为线程名称。虽然
threading.get_ident()可返回唯一 ID,但通常不够简短易读。
thread_function() 只是记录日志并在中间休眠 2 秒。
运行结果(未取消注释 x.join()):
Main : before creating thread
Main : before running thread
Thread 1: starting
Main : wait for the thread to finish
Main : all done
Thread 1: finishing
注意:主线程结束后,子线程才完成。下一节将解释原因并讨论神秘的第 20 行代码。
守护线程(Daemon Threads)
在计算机科学中,守护进程(daemon)是在后台运行的进程。
Python 中的守护线程有特定含义:当程序退出时,守护线程会立即被终止。你可以将其理解为“无需显式关闭的后台线程”。
- 非守护线程:程序会等待所有非守护线程完成后再退出。
- 守护线程:程序退出时直接终止,无论其是否完成。
回顾之前的输出,最后两行很有趣:主线程打印 "all done" 后,程序会暂停约 2 秒才结束。这是因为 Python 在等待非守护线程完成。程序退出时,threading._shutdown() 会遍历所有运行中的线程,并对非守护线程调用 .join()。
这种行为通常是期望的,但也有其他选择。让我们试试守护线程:
x = threading.Thread(target=thread_function, args=(1,), daemon=True)
运行结果:
Main : before creating thread
Main : before running thread
Thread 1: starting
Main : wait for the thread to finish
Main : all done
最后一行缺失了!因为它是守护线程,主线程结束后立即被终止,thread_function() 没机会完成。
使用 join() 等待线程
守护线程虽方便,但有时我们需要等待线程完成而不退出程序。回到原始代码,取消注释第 20 行:
x.join()
.join() 会让当前线程(这里是主线程)暂停,直到目标线程完成。无论目标线程是守护线程还是普通线程,.join() 都会等待其结束。
处理多个线程
前面的例子只涉及两个线程:主线程和一个子线程。实际中常需要启动多个线程。先看“硬核”方式:
import logging
import threading
import time
def thread_function(name):
logging.info("Thread %s: starting", name)
time.sleep(2)
logging.info("Thread %s: finishing", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
threads = list()
for index in range(3):
logging.info("Main : create and start thread %d.", index)
x = threading.Thread(target=thread_function, args=(index,))
threads.append(x)
x.start()
for index, thread in enumerate(threads):
logging.info("Main : before joining thread %d.", index)
thread.join()
logging.info("Main : thread %d done", index)
程序维护一个 Thread 对象列表,以便后续用 .join() 等待它们。
典型输出:
Main : create and start thread 0.
Thread 0: starting
Main : create and start thread 1.
Thread 1: starting
Main : create and start thread 2.
Thread 2: starting
Main : before joining thread 0.
Thread 2: finishing
Thread 1: finishing
Thread 0: finishing
Main : thread 0 done
Main : before joining thread 1.
Main : thread 1 done
Main : before joining thread 2.
Main : thread 2 done
注意:线程启动顺序固定,但完成顺序随机!这是由操作系统调度决定的,每次运行结果可能不同。设计多线程算法时必须考虑这一点。
使用 ThreadPoolExecutor
启动线程组有更简单的方法:ThreadPoolExecutor(Python 3.2+ 标准库 concurrent.futures 中提供)。
推荐用 with 语句作为上下文管理器:
import concurrent.futures
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(thread_function, range(3))
代码创建含 3 个工作线程的池,并用 .map() 将 range(3) 中的每个元素分配给线程。
with 块结束时,ThreadPoolExecutor 会自动对池中所有线程调用 .join()。强烈建议使用上下文管理器,避免忘记 .join()。
注意:
ThreadPoolExecutor可能隐藏异常。例如,若函数不需要参数但.map()传入了参数,线程会抛出异常但程序静默退出,难以调试。
运行结果:
Thread 0: starting
Thread 1: starting
Thread 2: starting
Thread 1: finishing
Thread 0: finishing
Thread 2: finishing
再次看到线程完成顺序的不确定性。
竞态条件(Race Conditions)
多线程编程的难点之一是竞态条件:当两个或多个线程访问共享数据或资源时可能发生。
下面创建一个必然触发竞态条件的例子(实际中竞态条件往往偶发且难复现):
class FakeDatabase:
def __init__(self):
self.value = 0
def update(self, name):
logging.info("Thread %s: starting update", name)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s: finishing update", name)
FakeDatabase 模拟数据库操作:
- 从数据库读取值(复制到局部变量)
- 计算(加 1)
- 休眠(模拟耗时操作)
- 写回数据库
测试代码:
if __name__ == "__main__":
database = FakeDatabase()
logging.info("Testing update. Starting value is %d.", database.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(database.update, index)
logging.info("Testing update. Ending value is %d.", database.value)
预期结果:database.value 应为 2
实际结果:
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.
单线程分析
单线程执行时:
- 读取
value=0到local_copy local_copy加 1 → 1- 休眠(无影响)
- 写回
value=1
双线程竞态
- 线程 0 读取
value=0→local_copy=0 - 线程 0 加 1 →
local_copy=1 - 线程 0 休眠,线程 1 被调度
- 线程 1 读取
value=0(尚未更新)→local_copy=0 - 线程 1 加 1 →
local_copy=1 - 线程 1 休眠,线程 0 恢复
- 线程 0 写回
value=1 - 线程 1 恢复,写回
value=1(覆盖线程 0 的结果)
两个线程互相覆盖结果,导致最终值为 1 而非 2。
此例通过
time.sleep()强制触发竞态,实际中竞态可能在任何指令间发生(如x = x + 1的读写之间)。
使用锁(Lock)解决竞态
解决竞态的核心思路:确保同一时间只有一个线程能执行临界区代码(读-改-写)。Python 提供 threading.Lock(其他语言称 mutex,即互斥锁)。
锁就像通行证:同一时间只能有一个线程持有。其他线程必须等待持有者释放锁。
基本操作:
.acquire():获取锁(若已被占用则阻塞).release():释放锁
重要:若线程获取锁后永不释放,程序将死锁。Python 的 Lock 支持上下文管理器(with 语句),可自动释放锁。
修复后的 FakeDatabase
class FakeDatabase:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
def locked_update(self, name):
logging.info("Thread %s: starting update", name)
with self._lock:
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s: finishing update", name)
运行结果:
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.
开启 DEBUG 日志可见:
- 线程 0 获取锁后休眠
- 线程 1 尝试获取锁但被阻塞
- 线程 0 释放锁后,线程 1 才能获取
这就是锁提供的互斥(Mutual Exclusion)。
死锁(Deadlock)
使用锁时常见问题:死锁。当线程已持有锁却再次尝试获取同一锁时会发生:
import threading
l = threading.Lock()
l.acquire()
l.acquire() # 程序在此挂起!
死锁通常由两种原因引起:
- 实现缺陷:锁未正确释放(使用上下文管理器可大幅降低风险)
- 设计问题:工具函数可能被已持有锁或未持有锁的函数调用
Python 提供 RLock(可重入锁)解决第二种情况:允许同一线程多次获取锁,但需释放相同次数。
生产者-消费者模型
生产者-消费者问题是经典的线程同步案例。设想:
- 生产者:从网络读取消息(突发性到达)
- 消费者:将消息写入数据库(速度较慢)
- 管道(Pipeline):连接两者
使用锁的解决方案
import random
SENTINEL = object()
def producer(pipeline):
for index in range(10):
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
pipeline.set_message(message, "Producer")
pipeline.set_message(SENTINEL, "Producer") # 结束信号
def consumer(pipeline):
message = 0
while message is not SENTINEL:
message = pipeline.get_message("Consumer")
if message is not SENTINEL:
logging.info("Consumer storing message: %s", message)
class Pipeline:
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.consumer_lock.acquire() # 初始禁止消费
def get_message(self, name):
self.consumer_lock.acquire() # 等待消息
message = self.message
self.producer_lock.release() # 允许生产
return message
def set_message(self, message, name):
self.producer_lock.acquire() # 等待管道空
self.message = message
self.consumer_lock.release() # 允许消费
工作原理:
- 初始状态:
consumer_lock已锁定,producer_lock未锁定 - 生产者调用
set_message():- 获取
producer_lock(若管道满则阻塞) - 设置消息
- 释放
consumer_lock(唤醒消费者)
- 获取
- 消费者调用
get_message():- 获取
consumer_lock(若无消息则阻塞) - 读取消息
- 释放
producer_lock(唤醒生产者)
- 获取
局限性:管道只能容纳单个消息,无法应对消息突发。
使用队列(Queue)的改进方案
为支持多消息缓冲,使用 queue.Queue(线程安全):
import queue
import threading
def producer(queue, event):
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
queue.put(message)
logging.info("Producer received event. Exiting")
def consumer(queue, event):
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info("Consumer storing message: %s (size=%d)", message, queue.qsize())
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
event.set() # 触发退出
关键改进:
- 使用
threading.Event通知线程退出(替代SENTINEL) queue.Queue内置线程安全机制,无需手动加锁maxsize=10限制队列容量,防止内存溢出
运行特点:
- 生产者可快速填充队列
- 消费者按队列深度处理消息
- 主线程触发事件后,生产者立即退出,消费者处理完剩余消息后退出
其他线程原语
信号量(Semaphore)
threading.Semaphore 是带原子操作的计数器:
.acquire():计数减 1(若为 0 则阻塞).release():计数加 1
常用于限制资源访问数量(如数据库连接池)。
定时器(Timer)
threading.Timer 延迟执行函数:
t = threading.Timer(30.0, my_function)
t.start() # 30 秒后在新线程调用 my_function
t.cancel() # 取消定时器
栅栏(Barrier)
threading.Barrier 同步固定数量的线程:
barrier = threading.Barrier(3) # 等待 3 个线程
# 每个线程调用
barrier.wait() # 阻塞直到 3 个线程都到达
适用于线程初始化同步等场景。
总结
你已掌握 Python 多线程的核心概念:
- 线程的适用场景(I/O 密集型)与局限(GIL 限制 CPU 并行)
- 基础操作:创建、启动、等待线程
- 同步机制:
Lock解决竞态,Queue简化生产者-消费者模型 - 高级原语:
Semaphore、Timer、Barrier - 常见陷阱:死锁、竞态条件、异常处理
多线程能提升程序结构清晰度,但需谨慎处理并发问题。对于 CPU 密集型任务,请考虑 multiprocessing 模块。