在 Python 中复现 Race Condition

背景

在学习 Race 或原子操作时,往往会有一个很经典的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>

static long long counter = 0;
static long long loops = 0;

static void *worker(void *arg) {
(void)arg;
for (long long i = 0; i < loops; i++) {
counter += 1; // 故意不加锁:会丢更新
}
return NULL;
}

static long long parse_ll(const char *s, const char *name) {
errno = 0;
char *end = NULL;
long long v = strtoll(s, &end, 10);
if (errno != 0 || end == s || *end != '\0' || v < 0) {
fprintf(stderr, "invalid %s: %s\n", name, s);
exit(1);
}
return v;
}

int main(int argc, char **argv) {
if (argc != 3) {
fprintf(stderr, "usage: %s <loops> <threads>\n", argv[0]);
return 1;
}

loops = parse_ll(argv[1], "loops");
long long num_threads_ll = parse_ll(argv[2], "threads");

if (num_threads_ll <= 0 || num_threads_ll > 1000000) {
fprintf(stderr, "threads out of range: %lld\n", num_threads_ll);
return 1;
}

int num_threads = (int)num_threads_ll;

pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * (size_t)num_threads);
if (!threads) {
perror("malloc");
return 1;
}

printf("Initial value: %lld\n", counter);

for (int i = 0; i < num_threads; i++) {
int rc = pthread_create(&threads[i], NULL, worker, NULL);
if (rc != 0) {
fprintf(stderr, "pthread_create failed (rc=%d)\n", rc);
free(threads);
return 1;
}
}

for (int i = 0; i < num_threads; i++) {
pthread_join(threads[i], NULL);
}

long long expected = loops * num_threads_ll;
printf("Expected: %lld\n", expected);
printf("Final value: %lld\n", counter);
printf("Lost updates: %lld\n", expected - counter);

free(threads);
return 0;
}

./race 2000 5 来运行它,最终会创建 5 个线程并发对一个全局变量自增,因为这个「自增」操作不是原子的,所以最终的更新结果往往是一个小于 10000 的数字

但是,如果把这个代码翻译成 Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import sys
import threading


counter = 0
loops = 0


def worker():
global counter
for _ in range(loops):
counter += 1


def main():
global loops

if len(sys.argv) != 3:
print("usage: python race.py <loops> <threads>", file=sys.stderr)
sys.exit(1)

loops = int(sys.argv[1])
num_threads = int(sys.argv[2])

threads = []

print("Initial value:", counter)

for _ in range(num_threads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()

for t in threads:
t.join()

print("Expected:", loops * num_threads)
print("Final value:", counter)
print("Lost updates:", loops * num_threads - counter)


if __name__ == "__main__":
main()

你会发现无论执行 python race.py 2000 5 多少次,最终的结果永远是稳定的 10000 —— 竞争消失了

GIL

没错,稍有经验的人几乎可以一下子反应过来这是 GIL 的锅!

Python 3.10 起引入了一个优化,并不是所有操作都会去进行获取 + 释放 GIL 的操作。而我们想要产生竞态的核心逻辑 [LOAD_GLOBAL counter, LOAD_CONST 1, INPLACE_ADD, STORE_GLOBAL counter] 正好都不在这些 GIL 操作上,因此上述代码看似是两个线程并行,但实际上是分开执行的,所以并没有产生竞争、没有同时读写一个全局变量的情况出现,导致无法复现

解决办法

关闭 GIL

既然它是 GIL 导致的,那我们关了 GIL 不就好了🤔 Python 3.13 起开始有了一个 free-threaded 版本(常称为 python3.13t ),将 GIL 剔除了,所以我们如果直接用这种不含 GIL 的发行版去执行上面的程序,可以发现竞态成功复现了

引入 GIL 切换

既然问题出在我们的代码「过于简单」以至于不会给 GIL 释放锁的机会,那我们就手动给它引入释放的机会就好了,最简单方法就是利用「函数返回」—— 我们只需要将 += 1 替换成一个函数调用即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import sys
import threading
import time


counter = 0
loops = 0

def one():
return 1


def worker():
global counter
for _ in range(loops):
counter += one()


def main():
global loops

if len(sys.argv) != 3:
print("usage: python race.py <loops> <threads>", file=sys.stderr)
sys.exit(1)

loops = int(sys.argv[1])
num_threads = int(sys.argv[2])

threads = []

print("Initial value:", counter)

for _ in range(num_threads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()

for t in threads:
t.join()

print("Expected:", loops * num_threads)
print("Final value:", counter)
print("Lost updates:", loops * num_threads - counter)


if __name__ == "__main__":
main()

counter += one() 的指令是 [LOAD_GLOBAL counter, CALL_FUNCTION x, INPLACE_ADD, STORE_GLOBAL counter] ,而其中的 CALL_FUNCTION 会触发 GIL 切换检查,即在读写的过程中给了 GIL 释放锁的机会、允许了竞争

不过注意一点,因为现在 GIL 是 time-based switching,默认要 5ms 才会触发一次线程切换,所以每轮多跑几次,例如用 python race.py 200000 5 - 少了的话在现代 CPU 上可能根本来不及满足切换的条件程序就运行完了

参考

https://www.reddit.com/r/learnprogramming/comments/16mlz4h/race_condition_doesnt_happen_from_python_310/