GIL 与并发

GIL(全局解释器锁)

是什么?

GIL(Global Interpreter Lock)是 CPython 中的一把互斥锁,同一时刻只允许一个线程执行 Python 字节码

为什么存在?

CPython 的内存管理(引用计数)不是线程安全的。GIL 简化了 CPython 的实现,避免竞态条件导致引用计数出错。

GIL 的影响

场景影响
CPU 密集型(计算、加密)多线程无法利用多核,性能可能还不如单线程
I/O 密集型(网络、文件)多线程有效,I/O 等待期间 GIL 会释放
# GIL 会在以下情况释放:
# 1. 执行 I/O 操作时
# 2. 执行 time.sleep() 时
# 3. 调用 C 扩展(如 numpy)时(视实现)
# 4. 每执行 sys.getswitchinterval()(默认 5ms)切换一次

GIL 底层实现(CPython 源码层面)

GIL 的数据结构

GIL 本质是 Python/ceval_gil.c 中一个全局的条件变量 + 互斥锁:

/* 伪代码,简化自 CPython 源码 */
struct _gil_runtime_state {
    unsigned long interval;      // 切换间隔(微秒,默认 5000 = 5ms)
    _Py_atomic_address locked;   // 原子变量:0=未锁,1=已锁
    unsigned long switch_number; // GIL 切换计数
    COND_T cond;                 // 条件变量:等待 GIL 的线程在此睡眠
    MUTEX_T mutex;               // 互斥锁:保护上面的条件变量
    int last_holder;             // 上次持有 GIL 的线程 ID
};

GIL 的获取(take_gil)

线程想运行 Python 代码时:
1. 检查 GIL 是否空闲(原子读 locked == 0)
2. 若空闲 → CAS 原子操作抢锁 → 成功则继续执行
3. 若已被占用 → 设置 gil_drop_request 标志 → 睡眠等待条件变量
4. 持有 GIL 的线程在每条字节码执行后检查 eval breaker(见下)
5. 检测到 gil_drop_request → 释放 GIL → 广播条件变量 → 等待线程被唤醒抢锁

Eval Breaker(求值中断器)

这是 Python 3.2 新 GIL 的核心机制,避免旧版 GIL 的”饥饿”问题:

每执行一条字节码,CPython 会检查一个原子标志位 eval_breaker:
  eval_breaker = gil_drop_request | pending_calls | signals

如果 eval_breaker 为真:
  → 检查是否需要释放 GIL(gil_drop_request)
  → 检查是否有挂起的信号(signals)
  → 检查是否有挂起的调用(pending_calls)

旧 GIL 的问题(Python 3.1 以前):
  每执行 100 条字节码(sys.getcheckinterval())就强制切换
  → 大量无效唤醒,多核下性能更差("thundering herd")

新 GIL(Python 3.2+):
  只有其他线程主动请求时才释放
  → 减少无效切换,但单个线程可持有更久(5ms)

Python 3.13 Free-Threaded 模式(PEP 703)

# Python 3.13 实验性支持无 GIL 模式
# 编译时加 --disable-gil 选项,或安装 python3.13t
 
# 为了线程安全,做了大量改造:
# 1. 引用计数改为 biased reference counting(每个线程有本地计数)
# 2. 对象增加 per-object lock
# 3. dict/list 等内置类型加了细粒度锁
# 4. 内存分配器改为 mimalloc(线程安全且高效)
 
import sys
print(sys._is_gil_enabled())  # Python 3.13t 中返回 False

如何绕过 GIL?

# 方案 1:multiprocessing(每个进程有独立 GIL)
from multiprocessing import Pool
 
def cpu_task(n):
    return sum(i * i for i in range(n))
 
with Pool(4) as p:
    results = p.map(cpu_task, [10**6] * 4)
 
# 方案 2:使用 C 扩展(numpy、pandas 等内部释放 GIL)
import numpy as np
arr = np.array([1, 2, 3])  # numpy 运算不受 GIL 限制
 
# 方案 3:使用 concurrent.futures
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    results = list(executor.map(cpu_task, [10**6] * 4))

Threading(多线程)

适合 I/O 密集型任务。

import threading
import time
 
def download(url):
    print(f"Downloading {url}")
    time.sleep(1)  # 模拟 I/O
    print(f"Done {url}")
 
urls = ["url1", "url2", "url3"]
threads = [threading.Thread(target=download, args=(u,)) for u in urls]
 
for t in threads:
    t.start()
for t in threads:
    t.join()
 
# 总耗时约 1s,而非 3s(I/O 期间 GIL 释放)

线程安全问题:

import threading
 
counter = 0
lock = threading.Lock()
 
def increment():
    global counter
    for _ in range(100000):
        with lock:   # 必须加锁!
            counter += 1
 
threads = [threading.Thread(target=increment) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # 200000(加锁后正确)

Multiprocessing(多进程)

适合 CPU 密集型任务。

from multiprocessing import Process, Queue, Pool
 
# Pool + map:最常用
def square(x):
    return x * x
 
with Pool(processes=4) as pool:
    result = pool.map(square, range(10))
print(result)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
 
# 进程间通信:Queue
def producer(q):
    q.put("hello")
 
def consumer(q):
    print(q.get())
 
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()

Asyncio(协程)

适合高并发 I/O,单线程,通过事件循环调度。

核心概念

  • async def:定义协程函数
  • await:挂起当前协程,让出控制权
  • asyncio.gather():并发运行多个协程
  • 事件循环(Event Loop):调度器,驱动协程执行
import asyncio
import aiohttp  # 异步 HTTP 库
 
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    urls = ["http://example.com"] * 5
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
 
asyncio.run(main())

asyncio 事件循环底层实现

事件循环的核心:_run_once()

asyncio 的事件循环本质是一个死循环,每次迭代调用 _run_once()

while True:
    _run_once():
        1. 执行所有 _ready 队列中的回调(已就绪的 callback)
        2. 计算下次 I/O 等待超时时间(取最近的 scheduled 定时器)
        3. 调用 selector.select(timeout) → 阻塞等待 I/O 事件
           (Linux: epoll, macOS: kqueue, Windows: IOCP)
        4. 将 I/O 就绪的回调放入 _ready 队列
        5. 处理到期的定时器(scheduled → _ready)

await 的字节码原理

import dis
 
async def my_coro():
    await asyncio.sleep(1)
 
dis.dis(my_coro)
# GET_AWAITABLE   ← 获取 awaitable 对象
# YIELD_VALUE     ← 挂起,将控制权交回事件循环
# RESUME          ← 被事件循环恢复后继续执行

协程是一个状态机:每次 await 就是在某个 yield 点暂停,等 Future 完成后被事件循环 .send(result) 恢复:

# 手动模拟 await 的工作方式(概念演示)
coro = my_coro()
try:
    future = coro.send(None)   # 驱动协程到第一个 yield
    # future 是 asyncio.sleep(1) 返回的 Future 对象
    # 事件循环注册:1秒后调用 future.set_result()
    # future.set_result() 触发 coro.send(result) 恢复执行
except StopIteration as e:
    result = e.value           # 协程执行完毕,取返回值

Task、Future、Coroutine 的关系

Coroutine(协程对象)
  └── 被 asyncio.create_task() 包装成 Task
        └── Task 是 Future 的子类
              └── Future 持有:
                    - _state: PENDING / CANCELLED / FINISHED
                    - _result: 完成时的结果
                    - _callbacks: 完成时要调用的回调列表

当 I/O 就绪时:
  selector 返回事件
  → 调用注册的回调
  → Future.set_result()
  → 触发所有 _callbacks
  → Task._step() 被调用
  → coro.send(result) 恢复协程执行

selector 底层(OS 层面)

asyncio 默认 selector:
  Linux   → epoll(水平触发)
  macOS   → kqueue
  Windows → IOCP(ProactorEventLoop)

epoll 工作模式:
  epoll_create() → 创建 epoll 实例
  epoll_ctl(EPOLL_CTL_ADD, fd, EPOLLIN) → 注册 fd
  epoll_wait(timeout) → 阻塞等待,返回就绪的 fd 列表

这就是为什么 asyncio 能用单线程处理数万个并发连接:
  不需要为每个连接开线程,OS 替你监听所有 fd

asyncio vs threading

asynciothreading
并发模型协作式(主动让出)抢占式(OS 调度)
线程数单线程多线程
开销极低(协程切换)较高(线程切换 + GIL)
适用高并发 I/OI/O 密集,有阻塞调用
陷阱不能有阻塞代码!共享状态需加锁

面试高频题

Q: Python 多线程能利用多核吗?

不能(对于纯 Python 代码)。因为 GIL 同一时刻只允许一个线程执行字节码。要利用多核需用 multiprocessing 或调用释放 GIL 的 C 扩展。

Q: threadingmultiprocessing 如何选择?

  • I/O 密集(网络请求、文件读写) → threadingasyncio
  • CPU 密集(数值计算、图像处理) → multiprocessing

Q: asyncio.gather vs asyncio.wait 区别?

# gather:所有任务并发,返回结果列表(保序)
results = await asyncio.gather(coro1(), coro2())
 
# wait:更灵活,可设置 return_when 条件
done, pending = await asyncio.wait(
    [coro1(), coro2()],
    return_when=asyncio.FIRST_COMPLETED
)