Linux——进程间通信,匿名管道,进程池
文章目录
- 一、进程间通信(IPC)的理解
-
- 1.为什么进程间要通信(IPC)
- 2.如何进行通信
- 二、匿名管道
-
- 1.管道的理解
- 2.匿名管道的使用
- 3.管道的五种特性
- 4.管道的四种通信情况
- 5.管道缓冲区容量
- 三、进程池
-
- 1.进程池的理解
- 2.进程池的制作
- 四、源码
-
- ProcessPool.hpp
- Task.hpp
- Main.cc
一、进程间通信(IPC)的理解
1.为什么进程间要通信(IPC)
首先进程之间是相互独立的,尽管是父子进程之间,它们虽然资源共享,但当子进程需要修改数据时仍然需要 进行写时拷贝,保持独立性。
而让进程间通信可以实现数据之间的交互,资源共享,事件通知,又或者是让一个进程对另一个进程进行控制。
进程间通信是操作系统中实现进程间协作和数据交换的重要机制 ,它使得多个进程能够共同完成任务,提高系统的效率和可靠性。
2.如何进行通信
进程间通信的原理其实很简单,只需要两个进程共同访问一个资源,而一个进程对资源的更改能被另一进程感知到,从而做出相应的操作。
所以通信的前提是进程之间能够访问同一个资源,而且该资源是公共的,而不是某进程内部的。
IPC 的典型方式对比
二、匿名管道
1.管道的理解
我们把进程之间通信的介质(资源)叫作管道。
开发者在设计管道技术时文件系统已经比较成熟,所以为了方便管理该资源就使用文件来实现, 而对文件的读写就是通信的过程 ,但它与一般的文件还是有些区别,文件都是储存到磁盘上的,而进程之间通信用的文件并不需要把它储存到磁盘上,它只是作为一个传输介质。
它比较特殊,所以起名为管道。管道其实是一个内存级的文件。
注意:父子进程之间的管道叫作匿名管道
,顾名思义就是没有名字,也不需要名字,因为子进程能够继承下来父进程开辟的管道资源。
2.匿名管道的使用
创建匿名管道常用的接口是:
int pipe(int pipefd[2]);
需要包含头文件:
#include
- 返回值:创建成功返回0,失败返回-1
- 参数:这个是一个输出型参数,传入一个int类型长度为2的数组,然后得到
pipefd[0]:以读的方式打开的文件描述符
pipefd[1]:以写的方式打开的文件描述符。
示例:
#include #include #include int main(){ int pipefd[2]; pipe(pipefd); int rfd = pipefd[0],wfd = pipefd[1]; pid_t id = fork(); if(id == 0) { close(wfd);//关闭子进程的写文件,只让它读 int k=0; while(true) { read(rfd,&k,sizeof(k)); printf(\"read:%d\\n\",k); } } else { close(rfd);//关闭父进程的读文件,只让它写。 int num=0; while(true) { write(wfd,&num,sizeof(num)); num++; sleep(1); } } return 0;}
要记住pipefd[2]中哪个是读哪个是写有一个小技巧,0像嘴巴,所以下标为0的是读,1像钢笔,所以1下标是写。
3.管道的五种特性
- 匿名管道,只能用来进行具有血缘关系的进程间通信(用于父与子)。
管道文件,自带同步机制
。如上代码示例,父进程写一次休眠一秒,而子进程是一直不断地读,快的一端会迁就于慢的一端,最后实现同步。- 管道是面向字节流的。怎么读与怎么写并没有联系,比如写入“hello world”,但可能读到“hel”,这取决于你要读多少字节。
管道是单向通信的
。也就是a(表示进程)写的时候b读。b写的时候a在读。而不是既在写同时也在读。- 管道(文件)的生命周期是随进程的。进程结束管道也随之销毁。
4.管道的四种通信情况
- 写慢,读快 — 读端就要阻塞(等待写端写入)。
- 写快,读慢 —到管道容量满了后,写端就要阻塞(等待读端读取数据,然后就可以覆盖式地继续往管道写入)。
- 写关闭,读继续 — read就会返回0,表示文件结尾。
- 写继续,读关闭 — 写端不再有意义,系统会杀掉写端进程。
5.管道缓冲区容量
管道缓冲区容量为64kb,大家可以根据管道的性质与通信特点,自行进行测试。
三、进程池
1.进程池的理解
在程序使用内存的时候,比如vector扩容机制,会提前给你开辟一块空间供你使用,尽管现在用不到,相当于做一下预备。减少开辟空间的频次,从而达到提高效率的效果。
那么进程池也同样,给父进程提前开辟一些子进程,提供父进程使用。其中我们使用匿名管道建立联系。
在父进程给子进程派发任务时,为了提高效率会让每个子进程均匀地分配到任务(称为负载均匀),而不是把大部分的任务都派发到一个子进程上,通常会有以下策略:
- 轮询:按顺序一一分配。
- 随机:随机进行分配。
- 负载因子:设计一个负载因子,让子进程按负载因子的大小排成一个小根堆,每次取出堆头的子进程派发任务,然后更新负载因子插回到堆中。
2.进程池的制作
在面向对象的编程中最重要的就是对对象的描述与组织,这里我们的核心就是对管道进行管理。那么首先需要一个类对管道进行描述。
class Channel{public: Channel(int fd,pid_t id): _wfd(fd),_subid(id) { } //... ... ~Channel() {}private: int _wfd; int _subid;};
_wfd是该管道对应写端的fd,_subid是该管道对应的子进程的pid。
这里我们不必把rfd(读端fd)加入,因为我们现在对管道的描述组织,目的是方便父进程管理,而rfd是给子进程用的,所以不用添加为变量。
这里我们就以轮询
的方式派发任务,刚才创建的Channel相当于对管道的描述,接下来创建ChannelManage进行组织。这里选择使用数组来管理,派发任务方式选择轮询,所以需要记录下一个需要派发到的管道的下标。
class ChannelManage{public: ChannelManage():_next(0) {} //... ... ~ChannelManage() {}private: vector<Channel> _channels; int _next;};
接下来还需要创建一个类对整体的进程池做管理。
class ProcessPool{public: ProcessPool(int num) : __process_num(num) {} // ... ... ~ProcessPool() {}private: ChannelManage _cm; int _process_num;};
其中_process_num表示需要创建多少子进程,这是由使用者来决定的。
在ProcessPool中我们准备实现这些方法
-
bool Start():用于创建子进程。
由于我们是要生成多个通道所以需要循环来进行,而单趟循环需要做以下这些操作:1.创建管道,然后创建子进程。(这样能让子进程继承到管道信息)
2.关于子进程:写端关闭,然后执行Work(),最后把读端关闭,并exit退出。
3.关于父进程:读端关闭,然后把wfd,pid存入_cm中。
-
void Work(int rfd):用于子进程读取任务码并执行命令。
-
void Run():用于获取并派发任务。
-
void Stop():用于关闭写端并回收子进程。
最后为方便测试我们还需要一个管理任务的类和方法。我们可以单独创建一个Task.hpp文件。
typedef void (*task_t)();class TaskManage{public: TaskManage() { //随机数种子 srand((unsigned int)time(nullptr)); } int Code() { //随机生成任务码(数组下标) return rand()%_tasks.size(); } void Execute(int code) { //执行任务 _tasks[code](); } // ... ... ~TaskManage() {}private: vector<function<task_t>> _tasks;//用于储存任务的数组};
然后需要在ProcessPool中放入TaskManage成员变量,并在ProcessPool的构造函数中完成对_tasks中内容的插入。具体操作参考下面源码。
四、源码
ProcessPool.hpp
#ifndef _PROCESS_POOL_HPP_#define _PROCESS_POOL_HPP_#include #include #include #include #include \"Task.hpp\"using namespace std;//先描述class Channel{public: Channel(int fd,pid_t id): _wfd(fd),_subid(id) { _name = \"channel-\" + to_string(_wfd) + \"-\" + to_string(_subid); } ~Channel() { } void Send(int code) { int n = write(_wfd,&code,sizeof(code)); (void)n; } void Close() { close(_wfd); } void Wait() { pid_t rid = waitpid(_subid, nullptr, 0); (void)rid; } int Fd() { return _wfd; } pid_t SubId() { return _subid; } string Name() { return _name; }private: int _wfd; pid_t _subid; string _name; //int _loadnum;};//再组织class ChannelManager{public: ChannelManager(): _next(0) { } ~ChannelManager() { } void Insert(int wfd,pid_t subid) { _channels.emplace_back(wfd,subid); // Channel c(wfd,subid); // _channels.push_back(move(c)); } Channel& Select() { auto& c = _channels[_next]; _next++; _next %= _channels.size(); return c; } void PrintChannel() { for(auto& channel : _channels) { cout << channel.Name() << endl; } } void CloseAll() { for(auto& channel: _channels) { channel.Close(); } } void StopSubProcess() { for(auto& channel: _channels) { channel.Close(); cout << \"关闭: \" << channel.Name() << endl; } } void WaitSubProcess() { for(auto& channel: _channels) { channel.Wait(); cout << \"回收: \" << channel.Name() << endl; } } void CloseAndWait() { for(auto& channel: _channels) { channel.Close(); cout << \"关闭: \" << channel.Name() << endl; channel.Wait(); cout << \"回收: \" << channel.Name() << endl; } //解决方法1 倒着关闭 // for(int i = _channels.size() - 1;i >= 0;i--) // { // _channels[i].Close(); // cout << \"关闭: \" << _channels[i].Name() << endl; // _channels[i].Wait(); // cout << \"回收: \" << _channels[i].Name() << endl; // } }private: vector<Channel> _channels; int _next;};const int gdefaultnum = 5;class ProcessPool{public: ProcessPool(int num): _process_num(num) { _tm.Register(PrintLog); _tm.Register(DownLoad); _tm.Register(UpLoad); } ~ProcessPool() { } void Work(int rfd) { while(true) { int code = 0; size_t n = read(rfd,&code,sizeof(code)); if(n > 0) { if(n != sizeof(code)) { continue; } cout << \"子进程[\" << getpid() << \"]收到一个任务码: \" << code << endl; _tm.Execute(code); } else if(n == 0) { cout << \"子进程退出\" << endl; break; } else { cout << \"读取错误\" << endl; break; } } } bool Start() { for(int i = 0;i < _process_num;i++) { //1.创建管道 int pipefd[2] = { 0 }; int n = pipe(pipefd); if(n < 0) { return false; } //2.创建子进程 pid_t subid = fork(); if(subid < 0) { return false; } else if(subid == 0) { //子进程 //关闭子进程继承的哥哥的w端 _cm.CloseAll(); //3.关闭不需要的文件描述符 close(pipefd[1]); Work(pipefd[0]); close(pipefd[0]); exit(0); } else { //父进程 //3.关闭不需要的文件描述符 close(pipefd[0]); _cm.Insert(pipefd[1],subid); } } return true; } void Debug() { _cm.PrintChannel(); } void Run() { //1.选择一个任务 int taskcode = _tm.Code(); //2.选择一个信道[子进程],负载均衡的选择一个子进程,完成任务 auto& c = _cm.Select(); cout << \"选择了一个子进程: \" << c.Name() << endl; //3.发送任务 c.Send(taskcode); cout << \"发送了一个任务码: \" << taskcode << endl; } void Stop() { //关闭父进程 //_cm.StopSubProcess(); //回收所有的子进程 //_cm.WaitSubProcess(); _cm.CloseAndWait(); }private: ChannelManager _cm; int _process_num; TaskManager _tm;};#endif
Task.hpp
#pragma once#include #include #include using namespace std;typedef void (*task_t)();void PrintLog(){ cout << \"我是一个打印日志的任务\" << endl;}void DownLoad(){ cout << \"我是一个下载的任务\" << endl;}void UpLoad(){ cout << \"我是一个上传的任务\" << endl;}class TaskManager{public: TaskManager() { srand((unsigned int)time(nullptr)); } ~TaskManager() { } void Register(task_t t) { _tasks.push_back(t); } int Code() { return rand() % _tasks.size(); } void Execute(int code) { if(code >= 0 && code < _tasks.size()) { _tasks[code](); } }private: vector<task_t> _tasks;};
Main.cc
#include \"ProcessPool.hpp\"int main(){ //创建进程池对象 ProcessPool pp(gdefaultnum); //启动进程池 pp.Start(); //自动派发任务 int cnt = 10; while(cnt--) { pp.Run(); sleep(1); } //回收,结束进程池 pp.Stop(); return 0;}