> 技术文档 > 【Python】multiprocessing 模块:实现多进程并行计算_python multiprocessing

【Python】multiprocessing 模块:实现多进程并行计算_python multiprocessing

Python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 Python 的全局解释器锁(GIL),在多核 CPU 上实现真正的并行,特别适合 CPU 密集型任务(如数值计算、图像处理)。相比线程(threading 模块),multiprocessing 更适合需要高性能计算的场景。本文将详细介绍 multiprocessing 模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。


1. multiprocessing 模块的定义和原理

1.1 定义

multiprocessing 是一个跨平台的模块,提供创建和管理进程的 API,支持进程间通信(IPC)、同步机制和共享资源管理。它模仿了 threading 模块的接口,方便开发者从线程迁移到进程。

  • 核心功能

    • 进程创建:创建独立进程,运行指定函数或任务。
    • 进程池:管理一组工作进程,分配任务。
    • 进程通信:支持管道(Pipe)、队列(Queue)等 IPC 机制。
    • 同步原语:提供锁(Lock)、信号量(Semaphore)、事件(Event)等。
    • 共享内存:支持共享基本数据类型(Value)和数组(Array)。
    • 跨平台:在 Windows、Linux、macOS 上运行一致。
  • 依赖:标准库,无需额外安装。

1.2 原理

  • 进程 vs 线程
    • 进程:独立的内存空间,拥有自己的 Python 解释器和 GIL,适合 CPU 密集型任务。
    • 线程:共享内存空间,受 GIL 限制,适合 I/O 密集型任务。
  • GIL 绕过:每个进程有独立的 GIL,允许多核并行。
  • 进程创建
    • Linux/macOS:使用 fork(复制父进程),或 spawn(新进程)。
    • Windows:始终使用 spawn,启动新解释器。
  • 通信开销:进程间通信(如 Queue)比线程慢,需优化设计。

1.3 导入

import multiprocessing

2. multiprocessing 的核心组件和功能

2.1 进程创建(Process

通过 multiprocessing.Process 创建进程,运行指定函数。

  • 构造函数

    Process(target=None, args=(), kwargs={}, name=None, daemon=None)
    • target:目标函数。
    • args/kwargs:函数参数。
    • name:进程名称。
    • daemon:是否为守护进程(随主进程退出)。
  • 主要方法

    • start():启动进程。
    • join():等待进程结束。
    • terminate():强制终止进程。
    • is_alive():检查进程是否存活。

示例

import multiprocessingdef worker(num): print(f\"Worker {num} running in process {multiprocessing.current_process().name}\")if __name__ == \"__main__\": processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(3)] for p in processes: p.start() for p in processes: p.join()

输出(顺序可能不同):

Worker 0 running in process Process-1Worker 1 running in process Process-2Worker 2 running in process Process-3
  • 说明:创建 3 个进程,每个运行 worker 函数。

2.2 进程池(Pool

Pool 用于管理固定数量的进程,适合并行处理大量任务。

  • 构造函数

    Pool(processes=None, initializer=None, initargs=())
    • processes:进程数(默认 CPU 核心数)。
    • initializer:每个进程的初始化函数。
    • initargs:初始化函数参数。
  • 主要方法

    • map(func, iterable):并行执行 func 应用于 iterable,返回结果列表。
    • imap(func, iterable):惰性版本,返回迭代器。
    • apply(func, args=(), kwds={}):同步执行单任务。
    • apply_async(func, args=(), kwds={}):异步执行单任务。
    • close():关闭池,禁止新任务。
    • join():等待池内进程完成。

示例

from multiprocessing import Pooldef square(n): return n * nif __name__ == \"__main__\": with Pool(processes=4) as pool: results = pool.map(square, range(10)) print(results) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.3 进程通信

支持 PipeQueue 实现进程间数据交换。

Pipe
  • 双向或单向管道,适合两个进程通信。
  • 构造函数
    Pipe(duplex=True)
    • 返回 (conn1, conn2),两个连接对象。
    • duplex=True:双向;False:单向。

示例

from multiprocessing import Process, Pipedef sender(conn): conn.send(\"Hello from sender\") conn.close()def receiver(conn): print(conn.recv()) conn.close()if __name__ == \"__main__\": parent_conn, child_conn = Pipe() p1 = Process(target=sender, args=(child_conn,)) p2 = Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()

输出

Hello from sender
Queue
  • 线程和进程安全的队列,适合多生产者/消费者场景。
  • 构造函数
    Queue(maxsize=0)
    • maxsize:最大容量(0 表示无限制)。

示例

from multiprocessing import Process, Queuedef producer(queue): queue.put(\"Data from producer\")def consumer(queue): print(queue.get())if __name__ == \"__main__\": queue = Queue() p1 = Process(target=producer, args=(queue,)) p2 = Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()

2.4 同步机制

提供锁、信号量等原语,确保进程安全访问共享资源。

Lock
  • 互斥锁,防止多个进程同时访问资源。
  • 示例
from multiprocessing import Process, Lockdef printer(lock, msg): with lock: print(msg)if __name__ == \"__main__\": lock = Lock() processes = [Process(target=printer, args=(lock, f\"Message {i}\")) for i in range(3)] for p in processes: p.start() for p in processes: p.join()
Semaphore
  • 控制有限资源的并发访问。
  • 示例
from multiprocessing import Process, Semaphoredef worker(sem, name): with sem: print(f\"{name} acquired resource\") # 模拟工作if __name__ == \"__main__\": sem = Semaphore(2) # 允许 2 个进程同时访问 processes = [Process(target=worker, args=(sem, f\"Worker {i}\")) for i in range(5)] for p in processes: p.start() for p in processes: p.join()
Event
  • 进程间信号通知。
  • 示例
from multiprocessing import Process, Eventimport timedef wait_for_event(event): event.wait() print(\"Event triggered\")if __name__ == \"__main__\": event = Event() p = Process(target=wait_for_event, args=(event,)) p.start() time.sleep(1) event.set() # 触发事件 p.join()

2.5 共享内存

通过 ValueArray 共享基本数据类型。

  • Value:单个共享值。
  • Array:共享数组。

示例

from multiprocessing import Process, Value, Arraydef modify(shared_num, shared_arr): shared_num.value += 1 for i in range(len(shared_arr)): shared_arr[i] += 1if __name__ == \"__main__\": num = Value(\"i\", 0) # 共享整数 arr = Array(\"i\", [1, 2, 3]) # 共享数组 p = Process(target=modify, args=(num, arr)) p.start() p.join() print(num.value) # 输出: 1 print(list(arr)) # 输出: [2, 3, 4]

3. 应用场景

  1. 数值计算

    • 并行处理矩阵运算、蒙特卡洛模拟。
    • 示例:计算大数组的平方。
  2. 图像处理

    • 并行处理图像滤波、特征提取。
    • 示例:批量应用卷积滤波。
  3. 机器学习

    • 并行训练模型或处理数据预处理。
    • 示例:并行特征提取。
  4. 数据处理

    • 并行处理 CSV 文件、数据库查询。
    • 示例:多进程解析日志文件。
  5. 爬虫

    • 并行抓取网页(注意网络限制)。
    • 示例:结合 urllib 并发下载。

4. 示例:多进程爬虫

结合 urllibQueue 实现并行网页抓取。

示例

import urllib.requestfrom multiprocessing import Process, Queuefrom urllib.error import URLErrordef fetch_url(queue, url): try: with urllib.request.urlopen(url) as response: content = response.read().decode(\"utf-8\") queue.put((url, len(content))) except URLError as e: queue.put((url, str(e)))def main(): urls = [\"https://example.com\", \"https://python.org\", \"https://invalid-url\"] queue = Queue() processes = [Process(target=fetch_url, args=(queue, url)) for url in urls] for p in processes: p.start() for p in processes: p.join() while not queue.empty(): url, result = queue.get() print(f\"{url}: {result}\")if __name__ == \"__main__\": main()

输出(示例):

https://example.com: 1256https://python.org: 50000https://invalid-url: [Errno 11001] getaddrinfo failed

5. 最佳实践

  1. 使用 if __name__ == \"__main__\":

    • 防止 Windows 和某些 Unix 系统重复导入模块。
    • 示例
      if __name__ == \"__main__\": p = Process(target=worker) p.start()
  2. 选择进程池

    • 对于批量任务,使用 Pool 简化管理。
    • 示例
      with Pool(4) as pool: results = pool.map(func, data)
  3. 优化通信

    • 尽量减少进程间通信,使用共享内存或批量传递数据。
    • 示例
      arr = Array(\"i\", [0] * size)
  4. 异常处理

    • 在子进程中捕获异常,通过 Queue 或日志返回。
    • 示例
      def worker(queue): try: # 工作代码 except Exception as e: queue.put(str(e))
  5. 测试代码

    • 使用 pytest 测试多进程行为。
    • 示例
      import pytestfrom multiprocessing import Processdef test_process(): def worker(): print(\"Test\") p = Process(target=worker) p.start() p.join() assert p.exitcode == 0
  6. 进程数选择

    • 默认使用 CPU 核心数(multiprocessing.cpu_count())。
    • 示例
      processes = min(len(tasks), multiprocessing.cpu_count())

6. 注意事项

  1. GIL 限制

    • multiprocessing 绕过 GIL,适合 CPU 密集型任务;I/O 密集型任务考虑 threadingasyncio
    • 示例
      # I/O 密集型:使用 asyncioimport asyncioasync def fetch(): pass
  2. Windows 兼容性

    • Windows 使用 spawn,需确保代码在 if __name__ == \"__main__\": 中。
    • 示例
      if __name__ == \"__main__\": main()
  3. 资源管理

    • 及时关闭进程和池,释放资源。
    • 示例
      with Pool() as pool: pool.map(func, data)
  4. 序列化开销

    • 传递大数据到子进程(如通过 Queue)可能慢,使用共享内存。
    • 示例
      shared_data = Value(\"d\", 0.0)
  5. 调试难度

    • 子进程错误可能不易捕获,使用日志或 Queue 返回错误。
    • 示例
      import logginglogging.basicConfig(level=logging.INFO)

7. 总结

Python 的 multiprocessing 模块是实现多进程并行的强大工具,绕过 GIL,适合 CPU 密集型任务。其核心特点包括:

  • 定义:提供进程创建、通信、同步和共享内存的 API。
  • 功能:支持 ProcessPoolQueuePipeLock 等。
  • 应用:数值计算、图像处理、机器学习、数据处理、爬虫。
  • 最佳实践:使用 if __name__ == \"__main__\":、优化通信、测试代码。

参考文献

  • Python 官方文档:https://docs.python.org/3/library/multiprocessing.html
  • Real Python 教程:https://realpython.com/python-multiprocessing/
  • PyPI 页面:https://pypi.org/project/multiprocessing/