> 技术文档 > Python-多进程编程 (multiprocessing 模块)_python 多进程

Python-多进程编程 (multiprocessing 模块)_python 多进程


目录

  • 一、创建进程
    • 1. Process 的语法结构
    • 2. 进程不共享全局变量
  • 二、进程间通信
    • 1. 队列通信
    • 2. 管道通信
  • 三、进程池
    • 1. 常用函数
    • 2. 进程池中的 Queue
  • 四、应用:复制文件夹(多进程版)
  • 五、守护进程和进程同步
  • 六、注意事项

通过使用 multiprocessing 模块,Python 程序可以在多核处理器上实现并行处理,提高程序的执行效率和响应速度。

一、创建进程

要创建一个新的进程,需要实例化 multiprocessing.Process 类,并调用它的 start() 方法。Process 类的构造函数接受参数 target 作为子进程需要执行的函数,args 和 kwargs 作为传递给 target 函数的参数。

1. Process 的语法结构

创建 Process 对象:Process(group , target , name , args , kwargs)

  • target :表示调用对象,指定子进程要执行的函数。
  • args :传递给 target 函数的参数 (tuple) 。
  • kwargs :传递给 target 函数的关键字参数 (dict) 。
  • name :子进程名称,可以不设定。
  • group :指定进程组,大多数情况下用不到,默认值为 None 。

Process 创建的实例对象的常用方法

  • start() :启动子进程实例。

注:

  1. 创建 Process 对象时,实际上是创建了子进程的 “描述对象” ,此时还没有真正启动子进程。
  2. 子进程真正启动是在调用 p.start() 的时候,操作系统才会分配资源创建子进程,执行对应代码
  3. 之后,子进程执行传入的 target 函数或重写的 run() 方法中的代码。
  4. start() 是非阻塞的,父进程继续执行,而子进程代码则独立运行。
  • is_alive() :判断子进程是否还活着。

  • join(timeout=None) :用于阻塞当前(父)进程,直到子进程执行结束。如果不调用该函数,主进程会继续执行,不管子进程是否完成。

timeout(可选):指定等待子进程结束的最长时间,单位是秒。如果在超时时间内子进程结束,join() 返回,父进程继续,否则超时后父进程继续(子进程仍继续执行)。

  • terminate() :调用后,操作系统会立即杀死子进程,不管子进程是否完成任务。

terminate() 是个强制退出方法,子进程不会正常执行清理工作(如 finally 代码块、关闭文件等可能不被执行)。一般用于子进程 “无响应” 或超时等情况下,强制结束子进程。

时机 行为 创建 Process 对象 创建子进程描述对象,尚未启动 调用 start() 操作系统创建子进程并运行子进程代码 子进程执行传入的 target 函数 子进程代码开始在子进程空间运行 调用 join() 父进程等待子进程运行结束

Python 代码示例:

# !/usr/bin/python# -*- coding:utf-8 -*-from multiprocessing import Processdef run_proc(name, age, **kwargs): \"\"\"子进程要执行的代码\"\"\" print(f\'My name is {name} and my age is {age}\') print(kwargs) # print(1 / 0) # 退出码是 1 exit(12)if __name__ == \'__main__\': p = Process(target=run_proc, args=(\'Alice\', 20), kwargs={\'city\':\'beijing\', \'country\':\'China\'}) p.start() print(f\'1.子进程是否存活:{p.is_alive()}\') # p.terminate() # 退出码是 -15 p.join() # p.join(0) print(f\'2.子进程是否存活:{p.is_alive()}\') print(f\'子进程的退出码是:{p.exitcode}\') print(\'父进程结束\')

Process 创建的实例对象的常用属性

  • name :当前进程的别名,默认为 Process-N ,N 为从 1 开始递增的整数
  • pid :当前进程的 pid(进程号)
  • exitcode :子进程的退出码

其他

  • 孤儿进程 :父进程退出(kill 杀死父进程),子进程变为孤儿。

  • 僵尸进程 :子进程退出,父进程在忙碌,没有回收它,要避免僵尸。

  • Python 的 os 模块封装了常见的系统调用,其中就包括:

    • 创建子进程:os.fork()
    • 获取自身 ID :os.getpid()
    • 获取父进程 ID :os.getppid()

Python 代码示例:

# !/usr/bin/python# -*- coding:utf-8 -*-from multiprocessing import Processimport osimport timedef run_proc(): \"\"\"子进程要执行的代码\"\"\" print(\'运行子进程 : pid = %d , ppid = %d\' % (os.getpid(), os.getppid())) print(\'子进程运行结束\')if __name__ == \'__main__\': print(\'运行父进程 : pid = %d\' % os.getpid()) # os.getpid 获取当前进程的进程号 p = Process(target=run_proc) p.start() time.sleep(1) print(\'父进程运行结束\')

2. 进程不共享全局变量

进程不共享全局变量或者列表,主要原因是每个进程拥有独立的内存空间。具体来说:

  • 独立的地址空间:每个进程在操作系统中都有自己独立的虚拟地址空间,全局变量和列表都存储在该进程的内存空间内。不同进程的内存空间相互隔离,不能直接访问对方的变量。

  • 进程隔离保证安全和稳定:这种隔离防止一个进程意外(或恶意)修改另一个进程的数据,从而保证系统的稳定性和安全性。

  • 复制而非共享:当创建新进程(如使用 fork),父进程的内存会被复制给子进程,这样变量看似 “相同” ,但其实是不同地址的独立副本,修改一个进程的变量不会影响另一个进程。

  • 共享数据需显式机制:如果需要进程间通信(IPC)和共享数据,必须使用特殊机制,如共享内存(shm)、消息队列、管道(pipe)、信号量、套接字等,而不能直接共享普通的全局变量或数据结构。

总结:进程间的内存隔离使得全局变量、列表等数据结构不被共享,确保各进程运行互不干扰。

Python 代码示例:

# !/usr/bin/python# -*- coding:utf-8 -*-import osimport timefrom multiprocessing import Processnums = [11, 22]n = 10def work1(): \"\"\"子进程要执行的代码\"\"\" global n n = 100 print(\"in process1 pid=%d , nums=%s\" % (os.getpid(), nums)) # nums=[11, 22] for i in range(3): nums.append(i) time.sleep(1) print(\"in process1 pid=%d , nums=%s\" % (os.getpid(), nums)) # nums=[11, 22, 0, 1, 2] print(\"in process1 pid=%d , n=%d\" % (os.getpid(), n)) # n=100def work2(): \"\"\"子进程要执行的代码\"\"\" print(\"in process2 pid=%d , nums=%s\" % (os.getpid(), nums)) # nums=[11, 22] print(\"in process2 pid=%d , n=%d\" % (os.getpid(), n)) # n=10if __name__ == \'__main__\': p1 = Process(target=work1) p1.start() p1.join() print(\'-\' * 50) p2 = Process(target=work2) p2.start()

二、进程间通信

进程间的通信通常通过 Queue 或 Pipe 实现。Queue 是一个进程安全的队列,可以用于多个进程之间的数据传递。Pipe 则提供了两个连接对象,通过管道连接,允许两个进程间的双向通信。

1. 队列通信

初始化 Queue() 对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。

  • Queue.qsize() :返回当前队列包含的消息数量。

  • Queue.empty() :如果队列为空,返回 True ,反之返回 False 。

  • Queue.full() :如果队列满了,返回 True ,反之返回 False 。

  • Queue.get([block[, timeout]]) :获取队列中的一条消息,然后将其从列队中移除,block 默认值为 True 。

    • 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了 timeout ,则会等待 timeout 秒,若还没读取到任何消息,则抛出 “Queue.Empty” 异常。
    • 如果 block 值为 False,消息列队如果为空,则会立刻抛出 “Queue.Empty” 异常。
  • Queue.get_nowait() :相当于 Queue.get(block=False)

  • Queue.put(item,[block[, timeout]]) :将 item 消息写入队列,block 默认值为 True 。

    • 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了 timeout ,则会等待 timeout 秒,若还没空间,则抛出 “Queue.Full” 异常。
    • 如果 block 值为 False,消息列队如果没有空间可写入,则会立刻抛出 “Queue.Full” 异常。
  • Queue.put_nowait(item) :相当于 Queue.put(item, False)

Python 代码示例:

# !/usr/bin/python# -*- coding:utf-8 -*-from multiprocessing import Process, Queueimport timedef write(q): for i in range(10): print(\'Put %d to queue...\' % i) q.put(i) time.sleep(0.1)def read(q): while True: if not q.empty(): print(\'Get %d from queue.\' % q.get()) time.sleep(0.2) else: breakif __name__ == \'__main__\': q = Queue() p_w = Process(target=write, args=(q,)) p_r = Process(target=read, args=(q,)) p_w.start() p_r.start() p_w.join() p_r.join()

2. 管道通信

通常情况下,管道有 2 个口,而 Pipe 也常用来实现 2 个进程之间的通信,这 2 个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据。

使用 Pipe 实现进程通信,首先需要调用 multiprocessing.Pipe() 函数来创建一个管道。该函
数的语法格式为:conn1, conn2 = multiprocessing.Pipe([duplex=True])

其中,conn1 和 conn2 分别用来接收 Pipe 函数返回的 2 个端口;duplex 参数默认为 True ,表示该管道是双向的,即位于 2 个端口的进程既可以发送数据,也可以接受数据,而如果将 duplex 值设为 False ,则表示管道是单向的,conn1 只能用来接收数据,而 conn2 只能用来发送数据。另外值得一提的是,conn1 和 conn2 都属于 PipeConnection 对象。

三、进程池

当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,但当有大量的任务(上百甚至上千个)需要并行处理时,手动的去创建进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的 Pool 方法。

进程池允许将任务分配给池中的工作进程执行,这样可以有效管理进程的创建和销毁,避免系统资源的浪费。初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时:

  • 如果池中的进程数没有达到指定的最大值,那么就会创建一个新的进程用来执行该请求;

  • 如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,该进程会被用来执行新的任务。

1. 常用函数

multiprocessing.Pool 常用函数解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用 func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。

  • close() :关闭 Pool ,使其不再接受新的任务。

  • terminate() :不管任务是否完成,立即终止。

  • join() :主进程阻塞,等待子进程的退出, 必须在 close 或 terminate 之后使用。

Python 代码示例:

# !/usr/bin/python# -*- coding:utf-8 -*-from multiprocessing.pool import Poolimport os, time, randomdef worker(msg): t_start = time.time() print(f\'{os.getpid()} 开始执行任务 {msg}\') # random.random()随机生成 0~1 之间的浮点数 time.sleep(random.random() * 2) t_stop = time.time() print(\"任务 %d 执行完毕,%d 释放,耗时%0.2f\" % (msg, os.getpid(), t_stop - t_start))if __name__ == \'__main__\': po = Pool(3) # 定义一个进程池,最大进程数 3(注意这句必须放在 main 下面) for i in range(0, 10): # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker, (i,)) # 这里的 worker 不能是对象方法 print(\"------start------\") po.close() # 关闭进程池,关闭后 po 不再接收新的请求 po.join() # 等待 po 中所有子进程执行完成,必须放在 close 语句之后 print(\"-------end-------\")

2. 进程池中的 Queue

如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager().Queue() ,而不是 multiprocessing.Queue() ,否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

Python 代码示例:

# !/usr/bin/python# -*- coding:utf-8 -*-from multiprocessing import Manager, Poolimport time, osdef reader(q): print(\"reader 启动 (%s) , 父进程为 (%s)\" % (os.getpid(), os.getppid())) for i in range(q.qsize()): print(\"reader 从 Queue 获取到消息:%s\" % q.get())def writer(q): print(\"writer 启动 (%s) , 父进程为 (%s)\" % (os.getpid(), os.getppid())) for i in \"hello\": q.put(i) time.sleep(1)if __name__ == \"__main__\": print(\"(%s) start\" % os.getpid()) q = Manager().Queue() # 使用 Manager 中的 Queue po = Pool() po.apply_async(writer, (q,)) time.sleep(1) # 先让上面的任务向 Queue 存入数据,然后再让下面的任务开始从中取数据 po.apply_async(reader, (q,)) po.close() po.join() print(\"(%s) End\" % os.getpid())

四、应用:复制文件夹(多进程版)

# !/usr/bin/python# -*- coding:utf-8 -*-import multiprocessingimport osimport timeimport randomdef copy_file(queue, file_name, source_folder_name, dest_folder_name): \"\"\"copy 文件到指定的路径\"\"\" f_read = open(source_folder_name + \"/\" + file_name, \"rb\") f_write = open(dest_folder_name + \"/\" + file_name, \"wb\") while True: time.sleep(random.random()) content = f_read.read(1024) if content: f_write.write(content) else: break f_read.close() f_write.close() # 发送已经拷贝完毕的文件名字 queue.put(file_name)def main(): # 获取要复制的文件夹 source_folder_name = input(\"请输入要复制文件夹名字:\") # 整理目标文件夹 dest_folder_name = source_folder_name + \"[副本]\" # 创建目标文件夹 try: os.mkdir(dest_folder_name) except FileExistsError: print(\'该文件夹已存在\') # 如果文件夹已经存在,那么创建会失败 # 获取这个文件夹中所有的普通文件名 file_names = os.listdir(source_folder_name) # 创建 Queue queue = multiprocessing.Manager().Queue() # 创建进程池 pool = multiprocessing.Pool(3) for file_name in file_names: # 向进程池中添加任务 pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name)) # 主进程显示进度 pool.close() all_file_num = len(file_names) while True: file_name = queue.get() if file_name in file_names: file_names.remove(file_name) copy_rate = (all_file_num - len(file_names)) * 100 / all_file_num print(\"\\r%.2f...(%s)\" % (copy_rate, file_name) + \" \"*50, end=\"\") if copy_rate >= 100: break print()if __name__ == \"__main__\": main()

五、守护进程和进程同步

multiprocessing 模块还提供了守护进程(daemon process)的概念,守护进程会在主进程代码执行结束后自动终止。此外,模块中还包含了锁(Lock)和信号量(Semaphore)等同步原语,用于在进程间同步操作。

六、注意事项

  • 使用 multiprocessing 时,应避免在多个进程间共享状态。

  • 确保传递给 Process 类的所有参数都是可序列化的。

  • 在主模块中使用 if __name__ == \"__main__\": 来保护程序的入口点。

  • 尽量不要使用 Process.terminate() 来终止进程,因为这可能会导致共享资源变得不可用。

参考文章:【Python多进程编程详解】