GIL 与并发
GIL(全局解释器锁)
是什么?
GIL(Global Interpreter Lock)是 CPython 中的一把互斥锁,同一时刻只允许一个线程执行 Python 字节码。
为什么存在?
CPython 的内存管理(引用计数)不是线程安全的。GIL 简化了 CPython 的实现,避免竞态条件导致引用计数出错。
GIL 的影响
| 场景 | 影响 |
|---|---|
| CPU 密集型(计算、加密) | 多线程无法利用多核,性能可能还不如单线程 |
| I/O 密集型(网络、文件) | 多线程有效,I/O 等待期间 GIL 会释放 |
# GIL 会在以下情况释放:
# 1. 执行 I/O 操作时
# 2. 执行 time.sleep() 时
# 3. 调用 C 扩展(如 numpy)时(视实现)
# 4. 每执行 sys.getswitchinterval()(默认 5ms)切换一次GIL 底层实现(CPython 源码层面)
GIL 的数据结构
GIL 本质是 Python/ceval_gil.c 中一个全局的条件变量 + 互斥锁:
/* 伪代码,简化自 CPython 源码 */
struct _gil_runtime_state {
unsigned long interval; // 切换间隔(微秒,默认 5000 = 5ms)
_Py_atomic_address locked; // 原子变量:0=未锁,1=已锁
unsigned long switch_number; // GIL 切换计数
COND_T cond; // 条件变量:等待 GIL 的线程在此睡眠
MUTEX_T mutex; // 互斥锁:保护上面的条件变量
int last_holder; // 上次持有 GIL 的线程 ID
};GIL 的获取(take_gil)
线程想运行 Python 代码时:
1. 检查 GIL 是否空闲(原子读 locked == 0)
2. 若空闲 → CAS 原子操作抢锁 → 成功则继续执行
3. 若已被占用 → 设置 gil_drop_request 标志 → 睡眠等待条件变量
4. 持有 GIL 的线程在每条字节码执行后检查 eval breaker(见下)
5. 检测到 gil_drop_request → 释放 GIL → 广播条件变量 → 等待线程被唤醒抢锁
Eval Breaker(求值中断器)
这是 Python 3.2 新 GIL 的核心机制,避免旧版 GIL 的”饥饿”问题:
每执行一条字节码,CPython 会检查一个原子标志位 eval_breaker:
eval_breaker = gil_drop_request | pending_calls | signals
如果 eval_breaker 为真:
→ 检查是否需要释放 GIL(gil_drop_request)
→ 检查是否有挂起的信号(signals)
→ 检查是否有挂起的调用(pending_calls)
旧 GIL 的问题(Python 3.1 以前):
每执行 100 条字节码(sys.getcheckinterval())就强制切换
→ 大量无效唤醒,多核下性能更差("thundering herd")
新 GIL(Python 3.2+):
只有其他线程主动请求时才释放
→ 减少无效切换,但单个线程可持有更久(5ms)
Python 3.13 Free-Threaded 模式(PEP 703)
# Python 3.13 实验性支持无 GIL 模式
# 编译时加 --disable-gil 选项,或安装 python3.13t
# 为了线程安全,做了大量改造:
# 1. 引用计数改为 biased reference counting(每个线程有本地计数)
# 2. 对象增加 per-object lock
# 3. dict/list 等内置类型加了细粒度锁
# 4. 内存分配器改为 mimalloc(线程安全且高效)
import sys
print(sys._is_gil_enabled()) # Python 3.13t 中返回 False如何绕过 GIL?
# 方案 1:multiprocessing(每个进程有独立 GIL)
from multiprocessing import Pool
def cpu_task(n):
return sum(i * i for i in range(n))
with Pool(4) as p:
results = p.map(cpu_task, [10**6] * 4)
# 方案 2:使用 C 扩展(numpy、pandas 等内部释放 GIL)
import numpy as np
arr = np.array([1, 2, 3]) # numpy 运算不受 GIL 限制
# 方案 3:使用 concurrent.futures
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_task, [10**6] * 4))Threading(多线程)
适合 I/O 密集型任务。
import threading
import time
def download(url):
print(f"Downloading {url}")
time.sleep(1) # 模拟 I/O
print(f"Done {url}")
urls = ["url1", "url2", "url3"]
threads = [threading.Thread(target=download, args=(u,)) for u in urls]
for t in threads:
t.start()
for t in threads:
t.join()
# 总耗时约 1s,而非 3s(I/O 期间 GIL 释放)线程安全问题:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 必须加锁!
counter += 1
threads = [threading.Thread(target=increment) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 200000(加锁后正确)Multiprocessing(多进程)
适合 CPU 密集型任务。
from multiprocessing import Process, Queue, Pool
# Pool + map:最常用
def square(x):
return x * x
with Pool(processes=4) as pool:
result = pool.map(square, range(10))
print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 进程间通信:Queue
def producer(q):
q.put("hello")
def consumer(q):
print(q.get())
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()Asyncio(协程)
适合高并发 I/O,单线程,通过事件循环调度。
核心概念
async def:定义协程函数await:挂起当前协程,让出控制权asyncio.gather():并发运行多个协程- 事件循环(Event Loop):调度器,驱动协程执行
import asyncio
import aiohttp # 异步 HTTP 库
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["http://example.com"] * 5
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())asyncio 事件循环底层实现
事件循环的核心:_run_once()
asyncio 的事件循环本质是一个死循环,每次迭代调用 _run_once():
while True:
_run_once():
1. 执行所有 _ready 队列中的回调(已就绪的 callback)
2. 计算下次 I/O 等待超时时间(取最近的 scheduled 定时器)
3. 调用 selector.select(timeout) → 阻塞等待 I/O 事件
(Linux: epoll, macOS: kqueue, Windows: IOCP)
4. 将 I/O 就绪的回调放入 _ready 队列
5. 处理到期的定时器(scheduled → _ready)
await 的字节码原理
import dis
async def my_coro():
await asyncio.sleep(1)
dis.dis(my_coro)
# GET_AWAITABLE ← 获取 awaitable 对象
# YIELD_VALUE ← 挂起,将控制权交回事件循环
# RESUME ← 被事件循环恢复后继续执行协程是一个状态机:每次 await 就是在某个 yield 点暂停,等 Future 完成后被事件循环 .send(result) 恢复:
# 手动模拟 await 的工作方式(概念演示)
coro = my_coro()
try:
future = coro.send(None) # 驱动协程到第一个 yield
# future 是 asyncio.sleep(1) 返回的 Future 对象
# 事件循环注册:1秒后调用 future.set_result()
# future.set_result() 触发 coro.send(result) 恢复执行
except StopIteration as e:
result = e.value # 协程执行完毕,取返回值Task、Future、Coroutine 的关系
Coroutine(协程对象)
└── 被 asyncio.create_task() 包装成 Task
└── Task 是 Future 的子类
└── Future 持有:
- _state: PENDING / CANCELLED / FINISHED
- _result: 完成时的结果
- _callbacks: 完成时要调用的回调列表
当 I/O 就绪时:
selector 返回事件
→ 调用注册的回调
→ Future.set_result()
→ 触发所有 _callbacks
→ Task._step() 被调用
→ coro.send(result) 恢复协程执行
selector 底层(OS 层面)
asyncio 默认 selector:
Linux → epoll(水平触发)
macOS → kqueue
Windows → IOCP(ProactorEventLoop)
epoll 工作模式:
epoll_create() → 创建 epoll 实例
epoll_ctl(EPOLL_CTL_ADD, fd, EPOLLIN) → 注册 fd
epoll_wait(timeout) → 阻塞等待,返回就绪的 fd 列表
这就是为什么 asyncio 能用单线程处理数万个并发连接:
不需要为每个连接开线程,OS 替你监听所有 fd
asyncio vs threading
| asyncio | threading | |
|---|---|---|
| 并发模型 | 协作式(主动让出) | 抢占式(OS 调度) |
| 线程数 | 单线程 | 多线程 |
| 开销 | 极低(协程切换) | 较高(线程切换 + GIL) |
| 适用 | 高并发 I/O | I/O 密集,有阻塞调用 |
| 陷阱 | 不能有阻塞代码! | 共享状态需加锁 |
面试高频题
Q: Python 多线程能利用多核吗?
不能(对于纯 Python 代码)。因为 GIL 同一时刻只允许一个线程执行字节码。要利用多核需用 multiprocessing 或调用释放 GIL 的 C 扩展。
Q: threading 和 multiprocessing 如何选择?
- I/O 密集(网络请求、文件读写) →
threading或asyncio - CPU 密集(数值计算、图像处理) →
multiprocessing
Q: asyncio.gather vs asyncio.wait 区别?
# gather:所有任务并发,返回结果列表(保序)
results = await asyncio.gather(coro1(), coro2())
# wait:更灵活,可设置 return_when 条件
done, pending = await asyncio.wait(
[coro1(), coro2()],
return_when=asyncio.FIRST_COMPLETED
)