Python multiprocessing多进程模块使用教程_multiprocessing 教程
Python multiprocessing模块使用教程
1. 进程基础概念
1.1 什么是进程
进程是操作系统资源分配的基本单位,每个进程都有独立的内存空间、数据栈等。与线程不同,进程间的内存不共享。
1.2 Python中的多进程
由于GIL(全局解释器锁)的存在,Python的多线程在CPU密集型任务上无法充分利用多核优势。multiprocessing模块通过创建子进程绕过GIL限制,实现真正的并行计算。
1.3 进程 vs 线程
2. 创建进程
2.1 基本进程创建
import multiprocessingimport osdef worker(): \"\"\"子进程执行的任务\"\"\" print(f\'Worker process ID: {os.getpid()}\') print(f\'Parent process ID: {os.getppid()}\')if __name__ == \'__main__\': print(f\'Main process ID: {os.getpid()}\') processes = [] for i in range(3): p = multiprocessing.Process(target=worker) processes.append(p) p.start() for p in processes: p.join() print(\"All processes completed\")
2.2 带参数的进程
def worker(num, name): print(f\'Process {name} ({num}) running in {os.getpid()}\')if __name__ == \'__main__\': for i in range(3): p = multiprocessing.Process( target=worker, args=(i,), kwargs={\'name\': f\'worker-{i}\'} ) p.start() p.join()
2.3 继承Process类
class MyProcess(multiprocessing.Process): def __init__(self, name): super().__init__() self.name = name def run(self): print(f\'{self.name} running in {os.getpid()}\') time.sleep(2)if __name__ == \'__main__\': processes = [MyProcess(f\'Process-{i}\') for i in range(3)] for p in processes: p.start() for p in processes: p.join()
3. 进程间通信
3.1 队列(Queue)
def producer(q): for i in range(5): q.put(i) print(f\'Produced {i}\') time.sleep(0.5)def consumer(q): while True: item = q.get() if item is None: # 终止信号 break print(f\'Consumed {item}\') time.sleep(1)if __name__ == \'__main__\': q = multiprocessing.Queue() prod = multiprocessing.Process(target=producer, args=(q,)) cons = multiprocessing.Process(target=consumer, args=(q,)) prod.start() cons.start() prod.join() q.put(None) # 发送终止信号 cons.join()
3.2 管道(Pipe)
def sender(conn): for i in range(5): conn.send(i) print(f\'Sent {i}\') time.sleep(0.5) conn.close()def receiver(conn): while True: try: msg = conn.recv() print(f\'Received {msg}\') except EOFError: breakif __name__ == \'__main__\': parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(parent_conn,)) p2 = multiprocessing.Process(target=receiver, args=(child_conn,)) p1.start() p2.start() p1.join() p2.join()
3.3 共享内存
3.3.1 Value和Array
def worker(n, a): n.value += 1 for i in range(len(a)): a[i] += 1if __name__ == \'__main__\': num = multiprocessing.Value(\'i\', 0) # \'i\'表示整数 arr = multiprocessing.Array(\'d\', [0.0, 1.0, 2.0]) # \'d\'表示双精度浮点 processes = [ multiprocessing.Process(target=worker, args=(num, arr)) for _ in range(3) ] for p in processes: p.start() for p in processes: p.join() print(f\'Final value: {num.value}\') print(f\'Final array: {list(arr)}\')
3.3.2 共享内存管理器
def worker(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid())if __name__ == \'__main__\': with multiprocessing.Manager() as manager: d = manager.dict() l = manager.list() processes = [ multiprocessing.Process(target=worker, args=(d, l)) for _ in range(3) ] for p in processes: p.start() for p in processes: p.join() print(f\'Dict: {d}\') print(f\'List: {l}\')
4. 进程同步
4.1 锁(Lock)
def worker(lock, shared_value): with lock: shared_value.value += 1 print(f\'Process {os.getpid()} incremented to {shared_value.value}\')if __name__ == \'__main__\': lock = multiprocessing.Lock() shared_value = multiprocessing.Value(\'i\', 0) processes = [ multiprocessing.Process(target=worker, args=(lock, shared_value)) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join()
4.2 信号量(Semaphore)
def worker(sem, name): with sem: print(f\'{name} acquired semaphore\') time.sleep(2) print(f\'{name} released semaphore\')if __name__ == \'__main__\': sem = multiprocessing.Semaphore(2) # 允许2个进程同时访问 processes = [ multiprocessing.Process(target=worker, args=(sem, f\'Process-{i}\')) for i in range(5) ] for p in processes: p.start() for p in processes: p.join()
4.3 事件(Event)
def waiter(event): print(\'Waiting for event\') event.wait() print(\'Event set, continuing\')def setter(event): time.sleep(3) print(\'Setting event\') event.set()if __name__ == \'__main__\': event = multiprocessing.Event() p1 = multiprocessing.Process(target=waiter, args=(event,)) p2 = multiprocessing.Process(target=setter, args=(event,)) p1.start() p2.start() p1.join() p2.join()
4.4 屏障(Barrier)
def worker(barrier, name): print(f\'{name} waiting at barrier\') barrier.wait() print(f\'{name} passed barrier\')if __name__ == \'__main__\': barrier = multiprocessing.Barrier(3) # 需要3个进程到达 processes = [ multiprocessing.Process(target=worker, args=(barrier, f\'Process-{i}\')) for i in range(3) ] for p in processes: p.start() for p in processes: p.join()
5. 进程池
5.1 基本进程池
def square(x): return x * xif __name__ == \'__main__\': with multiprocessing.Pool(processes=4) as pool: results = pool.map(square, range(10)) print(results)
5.2 异步方法
def cube(x): time.sleep(1) return x ** 3if __name__ == \'__main__\': with multiprocessing.Pool(4) as pool: # apply_async - 单个任务异步执行 result = pool.apply_async(cube, (5,)) print(result.get(timeout=2)) # 125 # map_async - 类似map但异步 results = pool.map_async(cube, range(5)) print(results.get()) # [0, 1, 8, 27, 64] # imap - 惰性迭代器 for res in pool.imap(cube, range(5)): print(res)
5.3 回调函数
def process_data(data): return data * 2def callback(result): print(f\'Callback got: {result}\')if __name__ == \'__main__\': with multiprocessing.Pool(2) as pool: for i in range(5): pool.apply_async( process_data, args=(i,), callback=callback ) pool.close() pool.join()
5.4 使用ProcessPoolExecutor
concurrent.futures
模块提供了更高级的进程池接口,相比multiprocessing.Pool
有以下优势:
- 更一致的API(与ThreadPoolExecutor接口一致)
- 支持Future模式
- 更好的异常处理
- 与asyncio集成能力
5.4.1 基本用法
from concurrent.futures import ProcessPoolExecutorimport timedef square(x): time.sleep(0.5) return x * xif __name__ == \'__main__\': with ProcessPoolExecutor(max_workers=4) as executor: # 提交单个任务 future = executor.submit(square, 5) print(future.result()) # 25 # 批量提交任务 results = list(executor.map(square, range(5))) print(results) # [0, 1, 4, 9, 16]
5.4.2 Future对象与回调
def callback(future): print(f\'Callback got: {future.result()}\')if __name__ == \'__main__\': with ProcessPoolExecutor(3) as executor: futures = [executor.submit(square, i) for i in range(5)] for future in futures: future.add_done_callback(callback) # 等待所有任务完成 for future in futures: print(f\'Result: {future.result()}\')
5.4.3 异常处理
def might_fail(x): if x == 3: raise ValueError(\'Bad value\') return x * xif __name__ == \'__main__\': with ProcessPoolExecutor(2) as executor: futures = [executor.submit(might_fail, i) for i in range(5)] for future in futures: try: print(future.result()) except ValueError as e: print(f\'Error: {e}\')
5.4.4 ProcessPoolExecutor vs multiprocessing.Pool
5.4.5 使用建议
-
新项目优先使用ProcessPoolExecutor:除非需要特定功能如imap_unordered
-
合理设置max_workers:通常设为CPU核心数,I/O密集型可适当增加
-
使用with语句:确保资源正确释放
-
考虑任务粒度:避免大量小任务造成的通信开销
-
异常处理:总是检查Future.exception()
-
资源清理:长时间运行的应用定期重启工作进程避免内存泄漏
def worker(x): # 模拟内存泄漏 global _cache _cache[x] = x * x return _cache[x]if __name__ == \'__main__\': # 不好的做法 - 长时间运行可能内存泄漏 # with ProcessPoolExecutor(max_workers=4) as executor: # executor.map(worker, range(10000)) # 好的做法 - 分批处理并定期重建executor chunk_size = 1000 for i in range(0, 10000, chunk_size): with ProcessPoolExecutor(max_workers=4) as executor: list(executor.map(worker, range(i, i+chunk_size)))
6. 高级特性
6.1 自定义管理器
class MyListProxy(multiprocessing.managers.BaseProxy): _exposed_ = [\'append\', \'count\', \'index\'] def append(self, value): return self._callmethod(\'append\', (value,)) def count(self, value): return self._callmethod(\'count\', (value,)) def index(self, value): return self._callmethod(\'index\', (value,))class MyManager(multiprocessing.managers.BaseManager): passMyManager.register(\'MyList\', list, proxytype=MyListProxy)def worker(l): l.append(os.getpid())if __name__ == \'__main__\': with MyManager() as manager: my_list = manager.MyList() processes = [ multiprocessing.Process(target=worker, args=(my_list,)) for _ in range(3) ] for p in processes: p.start() for p in processes: p.join() print(f\'List contents: {my_list}\')
6.2 进程间共享状态
def worker(state): state.value += 1 print(f\'Process {os.getpid()} incremented to {state.value}\')if __name__ == \'__main__\': # 使用服务器进程管理器 with multiprocessing.Manager() as manager: shared_state = manager.Namespace() shared_state.value = 0 processes = [ multiprocessing.Process(target=worker, args=(shared_state,)) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f\'Final value: {shared_state.value}\')
6.3 进程间共享numpy数组
import numpy as npdef worker(arr): arr[:] += 1 # 修改共享数组if __name__ == \'__main__\': # 创建共享数组 arr = multiprocessing.Array(\'d\', 10) # 双精度浮点数组 np_arr = np.frombuffer(arr.get_obj()) # 转为numpy数组 np_arr[:] = np.arange(10) # 初始化 print(\'Before:\', np_arr) processes = [ multiprocessing.Process(target=worker, args=(np_arr,)) for _ in range(3) ] for p in processes: p.start() for p in processes: p.join() print(\'After:\', np_arr)
7. 常见问题
7.1 常见问题
-
死锁:多个进程互相等待对方释放资源
- 解决方案:按固定顺序获取锁,设置超时
-
资源竞争:多个进程同时修改共享资源
- 解决方案:使用适当的同步机制
-
进程启动开销:创建进程比线程开销大
- 解决方案:使用进程池复用进程
-
内存消耗:每个进程有独立内存空间
- 解决方案:合理设计数据结构,使用共享内存
7.2 注意事项
-
使用if name == ‘main’:防止子进程无限递归
-
优先使用队列通信:比共享内存更安全
-
合理设置进程数:通常等于CPU核心数
-
使用进程池:避免频繁创建销毁进程
-
处理异常:子进程异常不会自动传播到父进程
-
资源清理:确保进程正确终止,释放资源
-
避免大量数据传输:进程间通信开销大
-
考虑替代方案:对于简单任务,考虑concurrent.futures
7.3 进程 vs 线程选择指南
8. 应用示例
8.1 并行处理文件
def process_file(filename): \"\"\"模拟处理文件\"\"\" print(f\'Processing {filename}\') time.sleep(1) # 模拟耗时操作 return f\'{filename.upper()} processed\'if __name__ == \'__main__\': files = [\'file1.txt\', \'file2.txt\', \'file3.txt\', \'file4.txt\'] with multiprocessing.Pool(2) as pool: results = pool.map(process_file, files) print(\'Results:\', results)
8.2 并行Web请求
import requestsdef fetch_url(url): try: response = requests.get(url, timeout=5) return f\'{url}: {len(response.content)} bytes\' except Exception as e: return f\'{url}: {str(e)}\'if __name__ == \'__main__\': urls = [ \'https://www.python.org\', \'https://www.google.com\', \'https://www.github.com\', \'https://www.example.com\' ] with multiprocessing.Pool(4) as pool: results = pool.map(fetch_url, urls) for result in results: print(result)
8.3 生产者-消费者模式
def producer(queue, items): for item in items: print(f\'Producing {item}\') queue.put(item) time.sleep(0.5) queue.put(None) # 结束信号def consumer(queue): while True: item = queue.get() if item is None: break print(f\'Consuming {item}\') time.sleep(1)if __name__ == \'__main__\': queue = multiprocessing.Queue() items = [f\'item-{i}\' for i in range(10)] prod = multiprocessing.Process( target=producer, args=(queue, items) ) cons = multiprocessing.Process( target=consumer, args=(queue,) ) prod.start() cons.start() prod.join() cons.join()
9. 性能优化
9.1 减少进程间通信
# 不好的做法 - 频繁通信def worker(queue_in, queue_out): while True: data = queue_in.get() if data is None: break result = data * 2 # 简单计算 queue_out.put(result)# 好的做法 - 批量处理def worker_batch(queue_in, queue_out): while True: data_list = queue_in.get() if data_list is None: break results = [d * 2 for d in data_list] queue_out.put(results)
9.2 使用共享内存减少拷贝
def worker(shared_arr): # 直接操作共享内存,避免数据拷贝 for i in range(len(shared_arr)): shared_arr[i] *= 2if __name__ == \'__main__\': arr = multiprocessing.Array(\'d\', [1.0, 2.0, 3.0, 4.0]) processes = [ multiprocessing.Process(target=worker, args=(arr,)) for _ in range(2) ] for p in processes: p.start() for p in processes: p.join() print(\'Result:\', list(arr))
9.3 进程池初始化
def init_worker(): \"\"\"初始化工作进程,例如加载模型、建立连接等\"\"\" print(f\'Worker {os.getpid()} initialized\')def task(x): return x * xif __name__ == \'__main__\': # 每个工作进程初始化时调用init_worker with multiprocessing.Pool( processes=4, initializer=init_worker ) as pool: print(pool.map(task, range(10)))