python-并发编程
python-并发编程
- 前言
- 一、多线程(threading)
-
- 1.1 多线程的使用
- 1.2 自定义线程类
- 1.3 线程锁
-
- 1.3.1 互斥锁
- 1.3.2 信号量
- 1.4 全局解释器锁(GIL)
- 二、多进程(multiprocessing)
-
- 2.1 使用os库创建多进程
- 2.2 多进程的使用
- 2.3 自定义进程类
- 2.4 进程通信方式
-
- 2.4.1 管道(pipe)
- 2.4.2 消息队列(message queues)
- 2.4.3 信号量(semaphores)
- 2.4.4 共享内存(shared memory)
- 2.4.5 信号(signal)
- 2.4.5 socket通信
- 2.5 进程池
- 三、协程(coroutine)
- 总结
前言
进程
(Process):正在运行的程序,是系统进行资源分配的最小单位。每个进程都有自己独立的内存空间和系统资源线程
(Thread):运行在进程之上,系统进行调度的最小单位。一个进程可以包含多个线程,它们共享进程的内存空间和资源协程
(Coroutine):协程是一种用户态的轻量级线程,又称微线程。协程的调度完全由用户控制,不需要上下文切换的开销,因此执行效率极高并行
(Parallelism):指多个任务在同一时刻同时执行,需要多核 CPU 的支持并发
(Concurrency):指多个任务在同一时间段内交替执行,通过时间片轮转或协作式调度实现
进程和线程
- 一个进程可以有一个以上的线程,进程之间都是独立的,一个进程内的线程共享这个进程空间
- 同一个进程内的线程是可以直接通信的,进程要想通信,必须通过内核空间实现
- 创建新的线程很简单,创建新的进程需要对父进程进行克隆,所有的进程都是由另外一个进程创建的
- 一个线程可以控制和操作同一个进程内的其他线程,而进程只能操作子进程
- 一个主线程的改变可能会影响其他进程,而父进程不会影响子进程
多进程 安全性高 开销大 占用空间大 上下文切换开销大 分布式支持
多线程 安全性低 开销小 占用空间小 上下文切换开销小 不支持
一、多线程(threading)
1.1 多线程的使用
以从网页中下载图片为例
下载一张图片
def download_image(url, path): response = requests.get(url) with open(path, \'wb\') as f: f.write(response.content)url = \'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg\'path = \'1.jpg\'download_image(url, path)
下载5张图片
import requestsimport timedef runtime(func): def runtime_inner(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f\"执行函数{func.__name__}花费了{end - start}s\") return result return runtime_innerdef download_image(url, path): response = requests.get(url) with open(path, \'wb\') as f: f.write(response.content) @runtimedef main(): for i in range(5): download_image(url, str(i) + \".jpg\")url = \'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg\'main()# 执行函数main花费了1.8841361999511719s
使用多线程下载5张图片
import threading@runtimedef main(): print(\"start...\") t_list = [] for i in range(5): # 创建线程对象 # target --> 指定传入一个callable对象 做什么 # args --> 指定方法需要传入的参数 元组类型 (1,) t = threading.Thread(target=download_image, args=(url, str(i) + \".jpg\")) # 启动线程 start --> run <--target # 默认情况为前台线程 主线程要等待子线程结束才退出 # 设置后台线程 主线程执行结束,子线程也要退出 t.daemon = True # t.setDaemon(True) # 设置为后台线程,在start之前设置 t.start() t_list.append(t) for t in t_list: t.join() # 阻塞当前环境上下文,直到t的线程执行完成 # 去掉t.jon() 执行函数main花费了0.0039768218994140625sprint(\"start...\")main()print(\"end...\")# start...# start...# 执行函数main花费了0.6438114643096924s# end...
1.2 自定义线程类
class MyThread(threading.Thread): def __init__(self, num): super().__init__() self.num = num def run(self): print(f\"running...{self.num}\") t1 = MyThread(1)t2 = MyThread(2)t1.start()t2.start()
使用自定义线程类,创建多线程下载图片
class MyThread(threading.Thread): def __init__(self, url, path): super().__init__() self.url = url self.path = path def run(self): response = requests.get(self.url) with open(self.path, \'wb\') as f: f.write(response.content) url = \'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg\'for i in range(5): t = MyThread(url, str(i) + \".jpg\") t.start()
1.3 线程锁
为什么需要线程锁?
公共资源进行访问修改,存在资源争抢,造成脏数据
解决公共资源竞争
限制同一时刻只有一个线程可以访问公共资源
1.3.1 互斥锁
import threadingimport timefrom threading import Lock, RLocknum = 0def sum_num(i): # lock.acquire() # 获取锁 with lock: global num time.sleep(1) num += i print(num) # lock.release() # 释放锁# 创建锁对象lock = Lock()# lock = RLock()t_list = []for i in range(10): t = threading.Thread(target=sum_num, args=(1,)) t.start() t_list.append(t)[t.join() for t in t_list]print(\"end...\")# Lock 原始锁 获取锁之前不做判断,直到获取到锁为止# RLock 重入锁 获取锁之前先判断,如果有这把锁了就 立即返回r1 = Lock()r2 = RLock()r1.acquire()# print(\"lock1 acquired 1\")# r1.acquire() # 死锁# print(\"lock1 acquired 2\")r2.acquire()print(\"lock1 acquired 1\")r2.acquire()print(\"lock1 acquired 2\")
死锁
程序设计不到位
尽量避免产生死锁
- 尽量避免同一个线程对多lock进行锁定
- 多个线程需要对多个lock进行锁定,尽量保证他们以相同的顺序获取锁
- 设置超时
# 当两个线程以相反顺序调用 transfer时,会发生死锁class Account: def __init__(self, id, balance, lock): self.id = id self.balance = balance self.lock = lock # 取钱 def withdraw(self, amount): self.balance -= amount # 存钱 def deposit(self, amount): self.balance += amount # 查看余额 def get_balance(self): return self.balancedef transfer(from_id, to_id, amount): if from_id.lock.acquire(): from_id.withdraw(amount) time.sleep(1) print(\"wait...end\") if to_id.lock.acquire(): to_id.deposit(amount) to_id.lock.release() from_id.lock.release() print(f\"{from_id.id}向{to_id.id}转了{amount}元\")huang = Account(\"huang\", 10000, RLock())zhang = Account(\"zhang\", 20000, RLock())t1 = threading.Thread(target=transfer, args=(huang, zhang, 5000))t2 = threading.Thread(target=transfer, args=(zhang, huang, 2000))t1.start()t2.start()t1.join()t2.join()print(huang.get_balance())print(zhang.get_balance())
1.3.2 信号量
信号量允许指定数量的线程同时执行
from threading import BoundedSemaphorenum = 0def sum_num(i): # lock.acquire() # 获取锁 with lock: global num time.sleep(1) num += i print(num) # lock.release() # 释放锁# 信号量锁对象,最多允许2和线程同时执行lock = BoundedSemaphore(2)t_list = []for i in range(10): t = threading.Thread(target=sum_num, args=(1,)) t.start() t_list.append(t)[t.join() for t in t_list]print(\"end...\")
1.4 全局解释器锁(GIL)
GIL
全称Global Interpreter Lock
GIL 和 Python 语言没有任何关系,只是因为历史原因导致在官方推荐的解释器Cpython
中遗留的问题(Jpython无此类问题)
每个线程在执行的过程中都需要先获取GIL,保证同一时刻同一个进程内只有一个线程可以执行代码
GIL最基本的行为只有两个:
- 当前执行的线程持有GIL
- 当线程遇到io阻塞时,会释放GIL
计算密集型(cpu) 使用多进程
io密集型(频繁阻塞等待) 使用多线程
二、多进程(multiprocessing)
2.1 使用os库创建多进程
import os, time# linux系统中result = os.fork()# 父进程运行时result为子进程的pid# 子进程运行时这个result就是0print(\"outerside pid is:\", result)if result == 0: print(\"child process\") #time.sleep(60) print(\"child pid is:\", os.getpid()) print(\"child-parent pid is:\", os.getppid())else: print(\"parent process\") #time.sleep(60) print(\"parent pid is:\", os.getpid())
僵尸进程
: 子进程退出,父进程没有调用wait或者waitpid取获取子进程的状态 --> 无time.sleep()
那么这个子进程的进程描述符就依然存在系统中,这种进程称之为僵尸进程
孤儿进程
: 父进程退出,子进程还在运行,那么这个子进程就会称为孤儿进程 --> 取消父进程的sleep(60)注释,保留子进程的sleep(60)
孤儿进程会被pid为1的进程所收养
2.2 多进程的使用
import multiprocessingfrom multiprocessing import Process, current_processimport timelst = []def task(i): print(current_process().name, i, \'start...\') time.sleep(2) lst.append(i) print(lst) print(current_process().name, i, \'end...\')if __name__ == \'__main__\': for i in range(10): p = Process(target=task, args=(i,)) p.start() # 进程之间资源隔离# Process-4 3 start...# Process-5 4 start...# Process-2 1 start...# Process-1 0 start...# Process-6 5 start...# Process-8 7 start...# Process-10 9 start...# Process-3 2 start...# Process-9 8 start...# Process-7 6 start...# [3]# Process-4 3 end...# [4]# Process-5 4 end...# [1]# Process-2 1 end...# [0]# Process-1 0 end...# [5]# Process-6 5 end...# [7]# Process-8 7 end...# [2][9]## Process-10 9 end...# Process-3 2 end...# [8]# Process-9 8 end...# [6]# Process-7 6 end...
2.3 自定义进程类
import multiprocessingclass MyProcess(multiprocessing.Process): def __init__(self, i): super().__init__() self.i = i def run(self): print(self.name, self.i, \'start...\') print(self.name, self.i, \'end...\')if __name__ == \'__main__\': for i in range(10): p = MyProcess(i) p.start()
2.4 进程通信方式
2.4.1 管道(pipe)
传递二进制数据流,消息之间没有明确界限
半双工的通信方式,本质上就是内核空间中固定大小的缓冲区 (只能从一边到另一边)
匿名管道
适用有亲缘关系的进程
命名管道
无亲缘关系也可以进行访问
2.4.2 消息队列(message queues)
消息队列是保存在内核中的消息链表,有明确的界限,支持多种数据类型传入
发送方和接收方不需要同时存在,消息可持久化
2.4.3 信号量(semaphores)
信号量就是一个计数器,用于控制最多n个进程对共享资源访问
p
操作(申请资源)-> 将信号量的值减 1
v
操作(释放资源) -> 将信号量的值加 1
若信号量 ≥ 0,表示资源可用,进程继续执行
若信号量 < 0,表示资源已被耗尽,进程被阻塞并放入等待队列
2.4.4 共享内存(shared memory)
多个进程通过映射共享同一片物理内存区域,这是最快的进程通信(IPC)方式
直接读写速度最快
配合信号量或者互斥锁来使用
2.4.5 信号(signal)
信号是最古老的进程通信方式,是一种异步通知机制,用来通知进程,控制进程的一些行为
ctrl + c 停止信号
ctrl + z 终止信号
2.4.5 socket通信
套接字,通常用于不同主机之间的通信
支持全双工通信,数据按字节流传输
分布式系统,跨网络通信
2.5 进程池
有效的降低频繁创建销毁线程多带来的额外开销
from multiprocessing import Pool, current_processimport timelst = []def task(i): print(current_process().name, i, \'start...\') time.sleep(1) lst.append(i) print(lst) print(current_process().name, i, \'end...\')# 每个进程独立空间,互相隔离if __name__ == \"__main__\": # 创建进程池,建议进程数和cpu核数一致 # maxtaskperchild 指定每个子进程最多可以处理多少任务,防止过多的内存占用 p = Pool(processes=4, maxtasksperchild=3) for i in range(20): # 进程池接受任务 p.apply_async(func=task, args=(i,)) # 关闭进程池,不接受任务 p.close() # 阻塞当前环境,直到p子进程执行完成。如果没有join,父进程退出,子进程也会退出 p.join() print(\"end...\")
三、协程(coroutine)
协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。又称为微线程,纤程
import asyncioasync def func1(): print(1) await asyncio.sleep(1) print(2)async def func2(): print(3) await asyncio.sleep(1) print(4)# 创建任务列表tasks = [asyncio.ensure_future(func1()), asyncio.ensure_future(func2())]# 生成事件循环 -- 监听loop = asyncio.get_event_loop()# 运行loop.run_until_complete(asyncio.wait(tasks))# 依次输出:1 3 2 4
总结
并发编程是提升 Python 程序效率的核心手段
选择方式需遵循 “任务类型优先” 原则:
- CPU 密集型→多进程
- I/O 密集型→协程(高并发)或多线程(简单场景)
同时,需注意同步机制(避免资源竞争)、GIL 限制(多线程的局限性)、进程 / 线程开销(控制数量)等问题,才能写出高效、可靠的并发程序