背景 在学习 Race 或原子操作时,往往会有一个很经典的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 #include < stdio.h> #include < stdlib.h> #include < errno.h> #include < pthread.h> static long long counter = 0 ;static long long loops = 0 ;static void * worker(void * arg) { (void)arg; for (long long i = 0 ; i < loops; i+ + ) { counter + = 1 ; / / 故意不加锁:会丢更新 } return NULL ; } static long long parse_ll(const char * s, const char * name) { errno = 0 ; char * end = NULL ; long long v = strtoll(s, & end , 10 ); if (errno != 0 || end = = s || * end != '\0' || v < 0 ) { fprintf(stderr, "invalid %s: %s\n", name, s); exit(1 ); } return v; } int main(int argc, char * * argv) { if (argc != 3 ) { fprintf(stderr, "usage: %s <loops> <threads>\n", argv[0 ]); return 1 ; } loops = parse_ll(argv[1 ], "loops"); long long num_threads_ll = parse_ll(argv[2 ], "threads"); if (num_threads_ll <= 0 || num_threads_ll > 1000000 ) { fprintf(stderr, "threads out of range: %lld\n", num_threads_ll); return 1 ; } int num_threads = (int )num_threads_ll; pthread_t * threads = (pthread_t * )malloc(sizeof(pthread_t) * (size_t)num_threads); if (! threads) { perror("malloc"); return 1 ; } printf("Initial value: %lld\n", counter); for (int i = 0 ; i < num_threads; i+ + ) { int rc = pthread_create(& threads[i], NULL , worker, NULL ); if (rc != 0 ) { fprintf(stderr, "pthread_create failed (rc=%d)\n", rc); free (threads); return 1 ; } } for (int i = 0 ; i < num_threads; i+ + ) { pthread_join(threads[i], NULL ); } long long expected = loops * num_threads_ll; printf("Expected: %lld\n", expected); printf("Final value: %lld\n", counter); printf("Lost updates: %lld\n", expected - counter); free (threads); return 0 ; }
用 ./race 2000 5 来运行它,最终会创建 5 个线程并发对一个全局变量自增,因为这个「自增」操作不是原子的,所以最终的更新结果往往是一个小于 10000 的数字
但是,如果把这个代码翻译成 Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import sysimport threadingcounter = 0 loops = 0 def worker (): global counter for _ in range (loops): counter += 1 def main (): global loops if len (sys.argv) != 3 : print ("usage: python race.py <loops> <threads>" , file=sys.stderr) sys.exit(1 ) loops = int (sys.argv[1 ]) num_threads = int (sys.argv[2 ]) threads = [] print ("Initial value:" , counter) for _ in range (num_threads): t = threading.Thread(target=worker) threads.append(t) t.start() for t in threads: t.join() print ("Expected:" , loops * num_threads) print ("Final value:" , counter) print ("Lost updates:" , loops * num_threads - counter) if __name__ == "__main__" : main()
你会发现无论执行 python race.py 2000 5 多少次,最终的结果永远是稳定的 10000 —— 竞争消失了
GIL 没错,稍有经验的人几乎可以一下子反应过来这是 GIL 的锅!
Python 3.10 起引入了一个优化 ,并不是所有操作都会去进行获取 + 释放 GIL 的操作。而我们想要产生竞态的核心逻辑 [LOAD_GLOBAL counter, LOAD_CONST 1, INPLACE_ADD, STORE_GLOBAL counter] 正好都不在这些 GIL 操作上,因此上述代码看似是两个线程并行,但实际上是分开执行的,所以并没有产生竞争、没有同时读写一个全局变量的情况出现,导致无法复现
解决办法 关闭 GIL 既然它是 GIL 导致的,那我们关了 GIL 不就好了🤔 Python 3.13 起开始有了一个 free-threaded 版本(常称为 python3.13t ),将 GIL 剔除了,所以我们如果直接用这种不含 GIL 的发行版去执行上面的程序,可以发现竞态成功复现了
引入 GIL 切换 既然问题出在我们的代码「过于简单」以至于不会给 GIL 释放锁的机会,那我们就手动给它引入释放的机会就好了,最简单方法就是利用「函数返回」—— 我们只需要将 += 1 替换成一个函数调用即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import sysimport threadingimport timecounter = 0 loops = 0 def one (): return 1 def worker (): global counter for _ in range (loops): counter += one() def main (): global loops if len (sys.argv) != 3 : print ("usage: python race.py <loops> <threads>" , file=sys.stderr) sys.exit(1 ) loops = int (sys.argv[1 ]) num_threads = int (sys.argv[2 ]) threads = [] print ("Initial value:" , counter) for _ in range (num_threads): t = threading.Thread(target=worker) threads.append(t) t.start() for t in threads: t.join() print ("Expected:" , loops * num_threads) print ("Final value:" , counter) print ("Lost updates:" , loops * num_threads - counter) if __name__ == "__main__" : main()
counter += one() 的指令是 [LOAD_GLOBAL counter, CALL_FUNCTION x, INPLACE_ADD, STORE_GLOBAL counter] ,而其中的 CALL_FUNCTION 会触发 GIL 切换检查,即在读写的过程中给了 GIL 释放锁的机会、允许了竞争
不过注意一点,因为现在 GIL 是 time-based switching,默认要 5ms 才会触发一次线程切换,所以每轮多跑几次,例如用 python race.py 200000 5 - 少了的话在现代 CPU 上可能根本来不及满足切换的条件程序就运行完了
参考 https://www.reddit.com/r/learnprogramming/comments/16mlz4h/race_condition_doesnt_happen_from_python_310/