在Python中,当我们使用multiprocessing模块实现并发时,与多线程(threading)不同,子进程拥有独立的内存空间。这意味着父进程中定义的普通变量(如列表、字典、普通对象实例)不会自动且安全地在子进程间共享。如果尝试直接共享和修改这些变量,轻则导致数据不一致,重则可能引发资源竞争导致的死锁或程序崩溃。
为什么多进程直接共享变量是危险的?
多进程并发依赖于操作系统机制(如Unix上的fork),每个进程都有自己的内存地址空间。当你启动一个新进程时,数据通常被复制(copy-on-write),而不是共享。即使是可变数据类型,如果它们不是专门设计用于进程间通信(IPC)的,直接访问和修改将是不同步且不可靠的。
如果进程试图操作某些复杂资源(如文件句柄、数据库连接)或复杂的可变数据结构,而这些结构又没有为跨进程访问设计,就会出现以下问题:
- 数据不一致(Race Conditions): 多个进程试图同时读写一个变量,由于没有锁机制,最终结果是不可预测的。
- 死锁/资源耗尽: 尤其是在共享复杂资源时,如果不使用正确的同步原语,进程可能相互等待对方释放资源,导致程序卡住。
解决方案:使用IPC机制实现安全共享
Python的multiprocessing模块提供了专门用于进程间通信和共享状态的工具。
对于简单的数值类型(如整数、浮点数)或C语言结构体数组,我们应该使用multiprocessing.Value和multiprocessing.Array。
实践示例:安全地共享和更新计数器
以下示例展示了如何使用Value来安全地共享一个整数计数器,并使用Lock来保证在任何时刻只有一个进程能够修改它,从而避免竞态条件。
import multiprocessing
import time
import os
# 目标函数:对共享计数器进行累加
def increment_counter(shared_value, lock):
process_id = os.getpid()
print(f"进程 {process_id} 启动...")
# 模拟大量的累加操作
for _ in range(100000):
# 使用锁(Lock)确保操作的原子性
with lock:
# 访问和修改共享变量
current_value = shared_value.value
current_value += 1
shared_value.value = current_value
print(f"进程 {process_id} 完成,当前值: {shared_value.value}")
if __name__ == '__main__':
# 1. 创建共享变量
# 'i' 表示 signed integer (有符号整数)
shared_counter = multiprocessing.Value('i', 0)
# 2. 创建锁 (用于同步访问)
lock = multiprocessing.Lock()
num_processes = 5
processes = []
start_time = time.time()
# 启动进程
for i in range(num_processes):
# 将共享变量和锁作为参数传递给子进程
p = multiprocessing.Process(target=increment_counter, args=(shared_counter, lock))
processes.append(p)
p.start()
# 等待所有子进程完成
for p in processes:
p.join()
end_time = time.time()
# 预期结果是 5 * 100000 = 500000
print(f"\n所有进程执行完毕,耗时: {end_time - start_time:.4f} 秒")
print(f"最终共享计数器的值: {shared_counter.value}")
重点回顾:Value 和 Lock 的作用
- ****multiprocessing.Value(typecode, initial_value): 这会创建一个存储在共享内存中的对象。它不是Python原生类型,而是专门用于跨进程通信的包装器。通过.value属性来访问实际存储的数据。
- ****multiprocessing.Lock(): 这是一个同步原语。当一个进程获取锁后,其他进程必须等待锁释放才能进入临界区(即with lock:代码块)。这确保了对共享数据的修改操作(读-改-写三步)是原子性的,从而防止了竞态条件导致的错误累加。
进阶:共享复杂数据结构
如果需要共享列表、字典等更复杂的Python对象,推荐使用multiprocessing.Manager。
Manager创建了一个服务进程,该进程管理着共享对象,并通过代理的方式让其他进程可以安全地访问和修改这些对象。Manager会自动处理必要的锁和同步机制。
from multiprocessing import Process, Manager
def worker(d, l):
d[1] = 'a'
d['2'] = 2
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
# 创建一个共享的字典和列表
shared_dict = manager.dict()
shared_list = manager.list(range(10))
p = Process(target=worker, args=(shared_dict, shared_list))
p.start()
p.join()
print(f"共享字典: {shared_dict}")
print(f"共享列表 (已反转): {shared_list}")
通过使用这些内置的IPC工具,我们可以有效地避免多进程共享变量带来的死锁和数据损坏风险,实现健壮的Python并发程序。
汤不热吧